Hot questions for Using Amazon S3 in parquet

Question:

I need read parquet data from aws s3. If I use aws sdk for this I can get inputstream like this:

S3Object object = s3Client.getObject(new GetObjectRequest(bucketName, bucketKey));
InputStream inputStream = object.getObjectContent();

But the apache parquet reader uses only local file like this:

ParquetReader<Group> reader =
                    ParquetReader.builder(new GroupReadSupport(), new Path(file.getAbsolutePath()))
                            .withConf(conf)
                            .build();
reader.read()

So I don't know how parse input stream for parquet file. For example for csv files there is CSVParser which uses inputstream.

I know solution to use spark for this goal. Like this:

SparkSession spark = SparkSession
                .builder()
                .getOrCreate();
Dataset<Row> ds = spark.read().parquet("s3a://bucketName/file.parquet");

But I cannot use spark.

Could anyone tell me any solutions for read parquet data from s3?


Answer:

String SCHEMA_TEMPLATE = "{" +
                        "\"type\": \"record\",\n" +
                        "    \"name\": \"schema\",\n" +
                        "    \"fields\": [\n" +
                        "        {\"name\": \"timeStamp\", \"type\": \"string\"},\n" +
                        "        {\"name\": \"temperature\", \"type\": \"double\"},\n" +
                        "        {\"name\": \"pressure\", \"type\": \"double\"}\n" +
                        "    ]" +
                        "}";
String PATH_SCHEMA = "s3a";
Path internalPath = new Path(PATH_SCHEMA, bucketName, folderName);
Schema schema = new Schema.Parser().parse(SCHEMA_TEMPLATE);
Configuration configuration = new Configuration();
AvroReadSupport.setRequestedProjection(configuration, schema);
ParquetReader<GenericRecord> = AvroParquetReader.GenericRecord>builder(internalPath).withConf(configuration).build();
GenericRecord genericRecord = parquetReader.read();

while(genericRecord != null) {
        Map<String, String> valuesMap = new HashMap<>();
        genericRecord.getSchema().getFields().forEach(field -> valuesMap.put(field.name(), genericRecord.get(field.name()).toString()));

        genericRecord = parquetReader.read();
}

Gradle dependencies

    compile 'com.amazonaws:aws-java-sdk:1.11.213'
    compile 'org.apache.parquet:parquet-avro:1.9.0'
    compile 'org.apache.parquet:parquet-hadoop:1.9.0'
    compile 'org.apache.hadoop:hadoop-common:2.8.1'
    compile 'org.apache.hadoop:hadoop-aws:2.8.1'
    compile 'org.apache.hadoop:hadoop-client:2.8.1'

Question:

I recently had a requirement where I needed to generate Parquet files that could be read by Apache Spark using only Java (Using no additional software installations such as: Apache Drill, Hive, Spark, etc.). The files needed to be saved to S3 so I will be sharing details on how to do both.

There were no simple to follow guides on how to do this. I'm also not a Java programmer so the concepts of using Maven, Hadoop, etc. were all foreign to me. So it took me nearly two weeks to get this working. I'd like to share my personal guide below on how I achieved this


Answer:

Disclaimer: The code samples below in no way represent best practices and are only presented as a rough how-to.

Dependencies:

I'll be using NetBeans as my IDE.

Some info regarding parquet in Java (For noobs such as me):

  • In order to serialize your data into parquet, you must choose one of the popular Java data serialization frameworks: Avro, Protocol Buffers or Thrift (I'll be using Avro (1.8.0), as can be seen from our parquet-avro dependency)
  • You will need to use an IDE that supports Maven. This is because the dependencies above have a lot of dependencies of their own. Maven will automatically download those for you (like NuGet for VisualStudio)

Pre-requisite:

You must have hadoop on the windows machine that will be running the Java code. The good news is you don't need to install the entire hadoop software, rather you need only two files:

  • hadoop.dll
  • winutils.exe

These can be downloaded here. You will need version 2.8.1 for this example (due to parquet-avro 1.9.0).

  1. Copy these files to C:\hadoop-2.8.1\bin on the target machine.
  2. Add a new System Variable (not user variable) called: HADOOP_HOME with the value C:\hadoop-2.8.1

  3. Modify the System Path variable (not user variable) and add the following to the end: %HADOOP_HOME%\bin

  4. Restart the machine for changes to take affect.

If this config was not done properly you will get the following error at run-time: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

Getting Started with Coding:

  • First create a new empty Maven Project and add parquet-avro 1.9.0 and hadoop-aws 2.8.2 as dependencies:
  • Create your main class where you can write some code
  • First thing is you need to generate a Schema. Now as far as I can tell there is no way you can generate a schema programmatically at run-time. the Schema.Parser class' parse() method only takes a file or a string literal as a parameter and doesn't let you modify the schema once it is created. To circumvent this I am generating my Schema JSON at run time and parsing that. Below is an example Schema:

    String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
        + "\"type\": \"record\"," //Must be set as record
        + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
        + "\"fields\": ["
        + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
        + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
        + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
        + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
        + " ]}";
    Parser parser = new Schema.Parser().setValidate(true);
    Schema avroSchema = parser.parse(schema);
    

    Details on Avro schema can be found here: https://avro.apache.org/docs/1.8.0/spec.html

  • Next we can start generating records (Avro primitive types are simple):

    GenericData.Record record = new GenericData.Record(avroSchema);
    record.put("myInteger", 1);
    record.put("myString", "string value 1");
    
    • In order to generate a decimal logical type a fixed or bytes primitive type must be used as the actual data type for storage. The current Parquet format only supports Fixed length byte arrays (aka: fixed_len_byte_array). So we have to use fixed in our case as well (as can be seen in the schema). In Java we must use BigDecimal in order to truly handle decimals. And I've identified that a Decimal(32,4) will not take more than 16 bytes no matter the value. So we will use a standard byte array size of 16 in our serialization below (and in the schema above):
    BigDecimal myDecimalValue = new BigDecimal("99.9999");
    
    //First we need to make sure the BigDecimal matches our schema scale:
    myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
    
    //Next we get the decimal value as one BigInteger (like there was no decimal point)
    BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
    
    //Finally we serialize the integer
    byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
    
    //We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
    GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
    
    byte[] myDecimalBuffer = new byte[16];
    if (myDecimalBuffer.length >= decimalBytes.length) {            
        //Because we set our fixed byte array size as 16 bytes, we need to
        //pad-left our original value's bytes with zeros
        int myDecimalBufferIndex = myDecimalBuffer.length - 1;
        for(int i = decimalBytes.length - 1; i >= 0; i--){
            myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
            myDecimalBufferIndex--;
        }
        //Save result
        fixed.bytes(myDecimalBuffer);
    } else {
        throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
    }
    
    //We can finally write our decimal to our record
    record.put("myDecimal", fixed);
    
  • For Date values, Avro specifies that we need to save the number of days since EPOCH as an integer. (If you need the time component as well, such as an actual DateTime type, you need to use the Timestamp Avro type, which I will not cover). The easiest way I found to get the number of days since epoch is using the joda-time library. If you added the hadoop-aws dependency to your project you should already have this library. If not you will need to add it yourself:

    //Get epoch value
    MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
    
    DateTime currentDate = new DateTime(); //Can take Java Date in constructor
    Days days = Days.daysBetween(epoch, currentDate);
    
    //We can write number of days since epoch into the record
    record.put("myDate", days.getDays());
    
  • We finally can start writing our parquet file as such

    try {
       Configuration conf = new Configuration();
       conf.set("fs.s3a.access.key", "ACCESSKEY");
       conf.set("fs.s3a.secret.key", "SECRETKEY");
       //Below are some other helpful settings
       //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
       //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
       //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
       //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors
    
       Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
    
       //Use path below to save to local file system instead
       //Path path = new Path("data.parquet");
    
       try (ParquetWriter writer = AvroParquetWriter.builder(path)
               .withSchema(avroSchema)
               .withCompressionCodec(CompressionCodecName.GZIP)
               .withConf(conf)
               .withPageSize(4 * 1024 * 1024) //For compression
               .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
               .build()) {
           //We only have one record to write in our example
           writer.write(record);
       }
    } catch (Exception ex) { ex.printStackTrace(System.out); }
  • Here is the data loaded into Apache Spark (2.2.0):

And for your convenience, the entire source code:

package com.mycompany.stackoverflow;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.MutableDateTime;

public class Main {
    public static void main(String[] args) {
        System.out.println("Start");

        String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
                + "\"type\": \"record\"," //Must be set as record
                + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
                + "\"fields\": ["
                + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
                + " {\"name\": \"myString\",  \"type\": [\"string\", \"null\"]},"
                + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
                + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
                + " ]}";

        Schema.Parser parser = new Schema.Parser().setValidate(true);
        Schema avroSchema = parser.parse(schema);

        GenericData.Record record = new GenericData.Record(avroSchema);
        record.put("myInteger", 1);
        record.put("myString", "string value 1");

        BigDecimal myDecimalValue = new BigDecimal("99.9999");

        //First we need to make sure the huge decimal matches our schema scale:
        myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);

        //Next we get the decimal value as one BigInteger (like there was no decimal point)
        BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();

        //Finally we serialize the integer
        byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();

        //We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
        GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));

        byte[] myDecimalBuffer = new byte[16];
        if (myDecimalBuffer.length >= decimalBytes.length) {            
            //Because we set our fixed byte array size as 16 bytes, we need to
            //pad-left our original value's bytes with zeros
            int myDecimalBufferIndex = myDecimalBuffer.length - 1;
            for(int i = decimalBytes.length - 1; i >= 0; i--){
                myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
                myDecimalBufferIndex--;
            }

            //Save result
            fixed.bytes(myDecimalBuffer);
        } else {
            throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
        }

        //We can finally write our decimal to our record
        record.put("myDecimal", fixed);

        //Get epoch value
        MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);

        DateTime currentDate = new DateTime(); //Can take Java Date in constructor
        Days days = Days.daysBetween(epoch, currentDate);

        //We can write number of days since epoch into the record
        record.put("myDate", days.getDays());

        try {
           Configuration conf = new Configuration();
           conf.set("fs.s3a.access.key", "ACCESSKEY");
           conf.set("fs.s3a.secret.key", "SECRETKEY");
           //Below are some other helpful settings
           //conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
           //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
           //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
           //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.

           Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");

           //Use path below to save to local file system instead
           //Path path = new Path("data.parquet");

           try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
                   .withSchema(avroSchema)
                   .withCompressionCodec(CompressionCodecName.GZIP)
                   .withConf(conf)
                   .withPageSize(4 * 1024 * 1024) //For compression
                   .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
                   .build()) {

               //We only have one record to write in our example
               writer.write(record);
           }
        } catch (Exception ex) { 
            ex.printStackTrace(System.out);
        }
    }
}

Question:

I am currently using the code below to write parquet via Avro. This code writes it to a file system but I want to write to S3.

try {
    StopWatch sw = StopWatch.createStarted();
    Schema avroSchema = AvroSchemaBuilder.build("pojo", message.getTransformedMessage().get(0));
    final String parquetFile = "parquet/data.parquet";
    final Path path = new Path(parquetFile);

    ParquetWriter writer = AvroParquetWriter.<GenericData.Record>builder(path)
        .withSchema(avroSchema)
        .withConf(new org.apache.hadoop.conf.Configuration())
        .withCompressionCodec(CompressionCodecName.SNAPPY)
        .withWriteMode(Mode.OVERWRITE)//probably not good for prod. (overwrites files).
        .build();

    for (Map<String, Object> row : message.getTransformedMessage()) {
      StopWatch stopWatch = StopWatch.createStarted();
      final GenericRecord record = new GenericData.Record(avroSchema);
      row.forEach((k, v) -> {
        record.put(k, v);
      });
      writer.write(record);
    }
    //todo:  Write to S3.  We should probably write via the AWS objects.  This does not show that.
    //https://stackoverflow.com/questions/47355038/how-to-generate-parquet-file-using-pure-java-including-date-decimal-types-an
    writer.close();
    System.out.println("Total Time: " + sw);

  } catch (Exception e) {
    //do somethign here.  retryable?  non-retryable?  Wrap this excetion in one of these?
    transformedParquetMessage.getOriginalMessage().getMetaData().addException(e);
  }

This writes to a file fine, but how do I get it to stream it into the AmazonS3 api? I have found some code on the web using the Hadoop-aws jar, but that requires some Windows exe files to work and, of course, we want to avoid that. Currently I am using only:

 <dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.parquet</groupId>
  <artifactId>parquet-avro</artifactId>
  <version>1.8.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
</dependency>

So the question is, is there a way to intercept the output stream on the AvroParquetWriter so I can stream it to S3? The main reason I want to do this is for retries. S3 automagically retries up to 3 times. This would help us out a lot.


Answer:

This does depend on the hadoop-aws jar, so if you're not willing to use that I'm not sure I can help you. I am, however, running on a mac and do not have any windows exe files, so I'm not sure where you say those are coming from. The AvroParquetWriter already depends on Hadoop, so even if this extra dependency is unacceptable to you it may not be a big deal to others:

You can use an AvroParquetWriter to stream directly to S3 by passing it a Hadoop Path that is created with a URI parameter and setting the proper configs.

val uri = new URI("s3a://<bucket>/<key>")
val path = new Path(uri)

val config = new Configuration()
config.set("fs.s3a.access.key", key)
config.set("fs.s3a.secret.key", secret)
config.set("fs.s3a.session.token", sessionToken)
config.set("fs.s3a.aws.credentials.provider", credentialsProvider)

val writer = AvroParquetWriter.builder[GenericRecord](path).withConf(config).withSchema(schema).build()

I used the following dependencies (sbt format):

"org.apache.avro" % "avro" % "1.8.1"
"org.apache.hadoop" % "hadoop-common" % "2.9.0"
"org.apache.hadoop" % "hadoop-aws" % "2.9.0"
"org.apache.parquet" % "parquet-avro" % "1.8.1"

Question:

Currently, I am using the Apache ParquetReader for reading local parquet files, which looks something like this:

ParquetReader<GenericData.Record> reader = null;
    Path path = new Path("userdata1.parquet");
    try {
        reader = AvroParquetReader.<GenericData.Record>builder(path).withConf(new Configuration()).build();
        GenericData.Record record;
        while ((record = reader.read()) != null) {
            System.out.println(record);

However, I am trying to access a parquet file through S3 without downloading it. Is there a way to parse Inputstream directly with parquet reader?


Answer:

Yes, the latest versions of hadoop include support for S3 filesystem. Use the s3a client from hadoop-aws library to directly access the S3 filesystem.

The HadoopInputFile Path should be constructed as s3a://bucket-name/prefix/key along with the authentication credentials access_key and secret_key configured using the properties

  • fs.s3a.access.key
  • fs.s3a.secret.key

Additionally, you would require these dependant libraries

  • hadoop-common JAR
  • aws-java-sdk-bundle JAR

Read more: Relevant configuration properties

Question:

I am trying to get and read a parquet file on S3 with the Apache Parquet Reader, and my code looks something like this:

ParquetReader<GenericData.Record> reader = null;
Path internalPath = new Path("s3://S3AccessID:S3SecretKey@bucketName/tmp0.parquet");
try {
            InputFile inputFile = HadoopInputFile.fromPath(internalPath, new Configuration());
            reader = AvroParquetReader.<GenericData.Record>builder(inputFile).build();
            GenericData.Record record;
            while ((record = reader.read()) != null) {
                System.out.println(record);
            }
}

However, when I build and run a program, this is the error screen:

        at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:156)
        at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:195)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
        at com.sun.proxy.$Proxy12.retrieveINode(Unknown Source)
        at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:332)
        at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)
        at read.read.readParquetFile(read.java:153)
        at read.read.main(read.java:80)
Caused by: org.jets3t.service.S3ServiceException: S3 GET failed for '/%2Ftmp0.parquet' XML Error Message: <?xml version="1.0" encoding="UTF-8"?><Error><Code>InvalidRequest</Code><Message>The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.</Message><RequestId>1A66095653EBAD50</RequestId><HostId>jNzbaMmKmszHiLvzA4NsqILRxF+qJFxJLTWvKVwqHoggB0MnYy1ESoajHaa/Ufs5RE8ghs31Jaw=</HostId>

Does anyone have any idea how to address this?


Answer:

From the error message, it looks like your S3 bucket region uses Signature Version 4 (v4) signing protocol and does not support the older version (v2).

The authorization mechanism you have provided is not supported. Please use AWS4-HMAC-SHA256.

In that case, you must set the property fs.s3a.endpoint either in core-site.xml or in the Job configuration. The value for this property can be found here under Amazon S3 Endpoints.

Additionally,

  1. Use hadoop's s3a client instead of s3.

  2. Rather than embedding the access_key and secret_access_key in the s3a URL, use these properties fs.s3a.access.key and fs.s3a.secret.key. The entire list of properties that can be used for S3 authentication can be found here.

Question:

I struggled with this for a while and wanted to share my solution. AvroParquetReader is a fine tool for reading Parquet, but its defaults for S3 access are weak:

java.io.InterruptedIOException: doesBucketExist on MY_BUCKET: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider : com.amazonaws.AmazonClientException: Unable to load credentials from service endpoint

I want to use credentials providers akin to those used by com.amazonaws.auth.profile.ProfileCredentialsProvider, which works for accessing my S3 bucket, but it is not clear from AvroParquetReader's class definition or documentation how I would achieve this.


Answer:

This code worked for me. It allowed AvroParquetReader to access S3 using ProfileCredentialsProvider.

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.hadoop.fs.Path;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;

...

final String path = "s3a://"+bucketName+"/"+pathName;
final Configuration configuration = new Configuration();
configuration.setClass("fs.s3a.aws.credentials.provider", ProfileCredentialsProvider.class,
        AWSCredentialsProvider.class);
ParquetReader<GenericRecord> parquetReader =
        AvroParquetReader.<GenericRecord>builder(new Path(path)).withConf(configuration).build();