Hot questions for Using Amazon S3 in apache spark

Top Java Programmings / Amazon S3 / apache spark

Question:

I saw a few discussions on this but couldn't quite understand the right solution: I want to load a couple hundred files from S3 into an RDD. Here is how I'm doing it now:

ObjectListing objectListing = s3.listObjects(new ListObjectsRequest().
                withBucketName(...).
                withPrefix(...));
List<String> keys = new LinkedList<>();
objectListing.getObjectSummaries().forEach(summery -> keys.add(summery.getKey())); // repeat while objectListing.isTruncated()

JavaRDD<String> events = sc.parallelize(keys).flatMap(new ReadFromS3Function(clusterProps));

The ReadFromS3Function does the actual reading using the AmazonS3 client:

    public Iterator<String> call(String s) throws Exception {
        AmazonS3 s3Client = getAmazonS3Client(properties);
        S3Object object = s3Client.getObject(new GetObjectRequest(...));
        InputStream is = object.getObjectContent();
        List<String> lines = new LinkedList<>();
        String str;
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            if (is != null) {
                while ((str = reader.readLine()) != null) {
                    lines.add(str);
                }
            } else {
                ...
            }
        } finally {
            ...
        }
        return lines.iterator();

I kind of "translated" this from answers I saw for the same question in Scala. I think it's also possible to pass the entire list of paths to sc.textFile(...), but I'm not sure which is the best-practice way.


Answer:

the underlying problem is that listing objects in s3 is really slow, and the way it is made to look like a directory tree kills performance whenever something does a treewalk (as wildcard pattern maching of paths does).

The code in the post is doing the all-children listing which delivers way better performance, it's essentially what ships with Hadoop 2.8 and s3a listFiles(path, recursive) see HADOOP-13208.

After getting that listing, you've got strings to objects paths which you can then map to s3a/s3n paths for spark to handle as text file inputs, and which you can then apply work to

val files = keys.map(key -> s"s3a://$bucket/$key").mkString(",")
sc.textFile(files).map(...)

And as requested, here's the java code used.

String prefix = "s3a://" + properties.get("s3.source.bucket") + "/";
objectListing.getObjectSummaries().forEach(summary -> keys.add(prefix+summary.getKey())); 
// repeat while objectListing truncated 
JavaRDD<String> events = sc.textFile(String.join(",", keys))

Note that I switched s3n to s3a, because, provided you have the hadoop-aws and amazon-sdk JARs on your CP, the s3a connector is the one you should be using. It's better, and its the one which gets maintained and tested against spark workloads by people (me). See The history of Hadoop's S3 connectors.

Question:

I have upgraded to Apache Spark 1.5.1 but I'm not sure if that has caused it. I have my access keys in spark-submit which has always worked.

Exception in thread "main" java.lang.NoSuchMethodError: org.jets3t.service.impl.rest.httpclient.RestS3Service.<init>(Lorg/jets3t/service/security/AWSCredentials;)V

    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read()
        .format("com.databricks.spark.csv")
        .option("inferSchema", "true")
        .load("s3n://ossem-replication/gdelt_data/event_data/" + args[0]);

    df.write()
        .format("com.databricks.spark.csv")
        .save("/user/spark/ossem_data/gdelt/" + args[0]);

More of the error below. There is a class that does not contain the method so that means the dependencies are mismatched. It seems like the jets3t does not contain the method RestS3Service.(Lorg/jets3t/service/security/AWSCredentials;)V Can some one explain this to me?

Exception in thread "main" java.lang.NoSuchMethodError: org.jets3t.service.impl.rest.httpclient.RestS3Service.<init>(Lorg/jets3t/service/security/AWSCredentials;)V
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:60)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at org.apache.hadoop.fs.s3native.$Proxy24.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1277)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1312)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1311)
    at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:101)
    at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:99)
    at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:82)
    at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:42)
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:74)
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:39)
    at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:27)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
    at com.bah.ossem.spark.GdeltSpark.main(GdeltSpark.java:20)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)

Answer:

I had the same problem but with Spark 1.6 and I am using Scala instead of Java. The reason for this error is that Spark Core has the Hadoop Client version 2.2 and the Spark cluster installation I was using was 1.6. I had to make the following changes to get it to work.

  1. Change the hadoop client dependency to 2.6 (The version of Hadoop I was using)

    "org.apache.hadoop" % "hadoop-client" % "2.6.0",
    
  2. Include the hadoop-aws library in my Spark fat jar as this dependency is no longer included in the Hadoop libraries in 1.6

    "org.apache.hadoop" % "hadoop-aws" % "2.6.0",
    
  3. Export the AWS key and secret as environment variables.

  4. Specify the following Hadoop configuration in SparkConf

    val sparkContext = new SparkContext(sparkConf)
    val hadoopConf = sparkContext.hadoopConfiguration
    hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    hadoopConf.set("fs.s3.awsAccessKeyId", sys.env.getOrElse("AWS_ACCESS_KEY_ID", ""))
    hadoopConf.set("fs.s3.awsSecretAccessKey", sys.env.getOrElse("AWS_SECRET_ACCESS_KEY", ""))
    

Question:

I'm trying to load some data from an Amazon S3 bucket by:

SparkConf sparkConf = new SparkConf().setAppName("Importer");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
HiveContext sqlContext = new HiveContext(ctx.sc());

DataFrame magento = sqlContext.read().json("https://s3.eu-central-1.amazonaws.com/*/*.json");

This last line however throws an error:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: https

The same line has been working in another project, what am I missing? I'm running Spark on a Hortonworks CentOS VM.


Answer:

By default Spark supports HDFS, S3 and local. S3 can be accessed by s3a:// or s3n:// protocols (difference between s3a, s3n and s3 protocols)

So to access a file the best is to use the following:

s3a://bucket-name/key

Depending on your spark version and included libraries you may need to add external jars:

Spark read file from S3 using sc.textFile ("s3n://...)

(Are you sure that you were using s3 with https protocol in previous projects? Maybe you had custom code or jars included to support https protocol?)

Question:

I'm trying to create JAVARDD on s3 file but not able to create rdd.Can someone help me to solve this problem.

Code :

        SparkConf conf = new SparkConf().setAppName(appName).setMaster("local");
            JavaSparkContext javaSparkContext = new JavaSparkContext(conf);

    javaSparkContext.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
                    accessKey);
            javaSparkContext.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
                    secretKey);
            javaSparkContext.hadoopConfiguration().set("fs.s3.impl",
                    "org.apache.hadoop.fs.s3native.NativeS3FileSystem");

JavaRDD<String> rawData = sparkContext
                    .textFile("s3://mybucket/sample.txt");

This code throwing exception

2015-05-06 18:58:57 WARN  LoadSnappy:46 - Snappy native library not loaded
java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 3: s3:
    at org.apache.hadoop.fs.Path.initialize(Path.java:148)
    at org.apache.hadoop.fs.Path.<init>(Path.java:126)
    at org.apache.hadoop.fs.Path.<init>(Path.java:50)
    at org.apache.hadoop.fs.FileSystem.globPathsLevel(FileSystem.java:1084)
    at org.apache.hadoop.fs.FileSystem.globPathsLevel(FileSystem.java:1087)
    at org.apache.hadoop.fs.FileSystem.globPathsLevel(FileSystem.java:1087)
    at org.apache.hadoop.fs.FileSystem.globPathsLevel(FileSystem.java:1087)
    at org.apache.hadoop.fs.FileSystem.globPathsLevel(FileSystem.java:1087)
    at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1023)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1156)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1189)
    at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:477)
    at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)
    at com.cignifi.DataExplorationValidation.processFile(DataExplorationValidation.java:148)
    at com.cignifi.DataExplorationValidation.main(DataExplorationValidation.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.URISyntaxException: Expected scheme-specific part at index 3: s3:
    at java.net.URI$Parser.fail(URI.java:2829)
    at java.net.URI$Parser.failExpecting(URI.java:2835)
    at java.net.URI$Parser.parse(URI.java:3038)
    at java.net.URI.<init>(URI.java:753)
    at org.apache.hadoop.fs.Path.initialize(Path.java:145)
    ... 36 more

Some more details

Spark version 1.3.0.

Running in local mode using spark-submit.

I tried this thing on local and EC2 instance ,In both case I'm getting same error.


Answer:

It should be s3n:// instead of s3://

See External Datasets in Spark Programming Guide

Question:

This code works and passes:

public class Test {
    public static void main(String[] args) throws IOException {
        AWSCredentials h = new AWSCredentials();
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Test")
                .set("fs.s3a.access.key", h.access_key_id)
                .set("fs.s3a.secret.key", h.secret_access_key);
        if (h.session_token != null) {
            conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
            conf.set("fs.s3a.session.token", h.session_token);
        }
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
        long count = spark.read().text("s3a://mybucket/path-to-files/file+9+0000000223.bin").javaRDD().count();
        System.out.println("count from scala spark is: " + count);
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

        JavaRDD<String> maxwellRdd = sc.textFile("s3a://mybucket/path-to-files/*");
        System.out.println("count is: " + maxwellRdd.count());

        sc.stop();
    }
}

This code fails with the AWS credentials provider exception below:

public class Test {
    public static void main(String[] args) throws IOException {
        AWSCredentials h = new AWSCredentials();
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("Test")
                .set("fs.s3a.access.key", h.access_key_id)
                .set("fs.s3a.secret.key", h.secret_access_key);
        if (h.session_token != null) {
            conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
            conf.set("fs.s3a.session.token", h.session_token);
        }
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
        //long count = spark.read().text("s3a://mybucket/path-to-files/file+9+0000000223.bin").javaRDD().count();
        //System.out.println("count from scala spark is: " + count);
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

        JavaRDD<String> maxwellRdd = sc.textFile("s3a://mybucket/path-to-files/*");
        System.out.println("count is: " + maxwellRdd.count());

        sc.stop();
    }
}

Exception in thread "main" java.io.InterruptedIOException: doesBucketExist on mybucket: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider : com.amazonaws.AmazonClientException: Unable to load credentials from Amazon EC2 metadata service

This seems pretty weird to me. I would've expected 1. the JavaSparkContext and the SparkSession to use the same authentication methods and providers. 2. if the SparkSession were to use a different authentication method, I'm surprised that it apparently does so in some sort of side-effectful way that sets up a connection for the JavaSparkContext to use.

dependencies {
    compile group: 'org.ini4j', name: 'ini4j', version: '0.5.4'
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
    compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.2.1'
    compile group: 'org.apache.hadoop', name: 'hadoop-aws', version: '2.8.3'
    //compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.313'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

Answer:

I'm not convinced your first one worked —more specifically, if it does work, it's because something is picking your credentials from environment variables or EC2 IAM settings.

If you are trying to set s3a options in spark conf, you need to prefix every option with "spark.hadoop."

Simple test: after creating the spark context, call sc.hadoopConfiguration and look for the options there (which are all defined in org.apache.hadoop.fs.s3a.Constants if you want to be 100% sure you've not got any typos.

Question:

I am using Apache Spark and I have to parse files from Amazon S3. How would I know file extension while fetching the files from Amazon S3 path?


Answer:

I suggest to follow Cloudera tutorial Accessing Data Stored in Amazon S3 through Spark

To access data stored in Amazon S3 from Spark applications, you could use Hadoop file APIs (SparkContext.hadoopFile, JavaHadoopRDD.saveAsHadoopFile, SparkContext.newAPIHadoopRDD, and JavaHadoopRDD.saveAsNewAPIHadoopFile) for reading and writing RDDs, providing URLs of the form s3a://bucket_name/path/to/file.txt.

You can read and write Spark SQL DataFrames using the Data Source API.

Regarding the file extension, there are few solutions. You could simply take the extension by the filename (i.e. file.txt).

If your extensions were removed by files stored in your S3 buckets, you could still know the content-type looking at metadata added for each S3 resource.

http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html

Question:

I am trying to read a .xls file from AWS S3 but getting java.io.FileNotFoundException exception.

I tried below two approaches. One by giving the path in option() with key location and another by adding the same path in load() as well.

Dataset<Row> segmentConfigData = spark.read()
                .format("com.crealytics.spark.excel")
                .option("sheetName", "sheet1")
                .option("header","true")
                .option("location","s3a://input/552SegmentConfig.xls")
                .option("useHeader", "true")
                .option("treatEmptyValuesAsNulls", "true")
                .option("inferSchema", "true")
                .option("addColorColumns", "False")
                .load();

Dataset<Row> segmentConfigData = spark.read()
                .format("com.crealytics.spark.excel")
                .option("sheetName", "sheet1")
                .option("header","true")
                .option("location","s3a://input/552SegmentConfig.xls")
                .option("useHeader", "true")
                .option("treatEmptyValuesAsNulls", "true")
                .option("inferSchema", "true")
                .option("addColorColumns", "False")
                .load("s3a://input/552SegmentConfig.xls");

I get file not found an exception. Similarly, when I read .csv file I am able to read the file.

Edit- I have solved this issue. I was using an older version of "com.crealytics.spark.excel". I was able to ready once I ungraded the jar.

But now I am facing another issue. I am unable to read any other sheet other then the first sheet. Any Help?


Answer:

I have solved this issue. I was using an older version of "com.crealytics.spark.excel". I was able to ready once I ungraded the jar.

Further, I was just able to read the first sheet of (.xls) file. Below is the code snippet:

spark.read()
    .format("com.crealytics.spark.excel")
    .option("location",path)
    .option("sheetName", sheetName)
    .option("dataAddress", "'"+sheetName+"'!A1")
    .option("header","true")
    .option("useHeader", "true")
    .option("treatEmptyValuesAsNulls", "true")
    .option("inferSchema", "true")
    .option("addColorColumns", "False")
    .load(path);

Question:

I have partitioned parquet files stored on two locations on S3 in the same bucket:

path1: s3n://bucket/a/
path2: s3n://bucket/b/

The data has the same structure. I want to read the files from the first location and aggregate them to the second location using spark sql. Here is the code snippet:

val df1 = sql.read.parquet(path1)
val df2 = sql.read.parquet(path2)

val df = df1.unionAll(df2)

df.write.mode(SaveMode.Overwrite).parquet(path1)

When I run this piece of code I get the following exception:

java.io.FileNotFoundException: No such file or directory 
s3n://a/part-r-00001-file.gz.parquet

I'm using spark 1.6.1 and scala 2.11.


Answer:

I didn't find a direct solution to this problem so I used a workaround:

val df2 = sql.read.parquet(path2)
df2.write.mode(SaveMode.Append).parquet(path1)

val df1 = sql.read.parquet(path1)
df1.write.mode(SaveMode.Overwrite).parquet(path1)

Question:

I am trying to read data from aws s3 into dataset/rdd in Java but getting Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities. I am running Spark code in Java on IntelliJ, so added Hadoop dependencies as well in pom.xml

Below is my code and pom.xml file.

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;

public class SparkJava {

    public static void main(String[] args){

        SparkSession spark  = SparkSession
                .builder()
                .master("local")
                .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")                  .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
                .config("fs.s3n.awsAccessKeyId", AWS_KEY)
                .config("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)
                .getOrCreate();

        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
        String input_path = "s3a://bucket/2018/07/28/zqa.parquet";
        Dataset<Row> dF = spark.read().load(input_path); // THIS LINE CAUSES ERROR

    }
}

Here are the dependencies from pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>3.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.1.1</version>
    </dependency>
</dependencies>

Any help will be really appreciated. Thanks in advance!


Answer:

Solved this by adding the flowing dependency:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.1</version>
</dependency>

Question:

Hi i have an issue reading files from S3 with DataFrameReader i am able to read the exact same file when it is locally but when it is stored on s3 i'm not able to read it, here's a snippet of my code

private SparkSession getSparkSession() {
    return  sparkSessionService.getTransformedCsvSparkSession();
}

public void processFile(StructType structType, String srcFile, String targetFile) {
    //"s3n://yourAccessKey:yourSecretKey@/path/
    String spark = new String("s3n://TTTTTTTT:YYYYYYY@/bucket-qa1/test/123456785_Ads_mockup_.csv");
    Dataset<Row> dfSpark = getSparkSession().read().format("csv").schema(structType).option("header", srcIncludeHeader).load(spark);
    dfSpark.show();

But i got this error:

java.lang.IllegalArgumentException: Invalid hostname in URI s3n://.....

i am using DataBrics:

How do i "tell" the DataFrameReader to read from AWS S3 ?


Answer:

There shouldn't be a slash before the bucket name, its OK to have slash on the file path

Question:

I am considering using AWS EMR Spark to run a Spark application against very large Parquet files stored on S3. The overall flow here is that a Java process would upload these large files to S3, and I'd like to automatically trigger the running of a Spark job (injected with the S3 keyname(s) of the files uploaded) on those files.

Ideally, there would be some kind of S3-based EMR trigger available to wire up; that is, I configure EMR/Spark to "listen" to an S3 bucket and to kick off a Spark job when an upsertis made to that bucket.

If no such trigger exists, I could probably kludge something together, such as kick off a Lambda from the S3 event, and have the Lambda somehow trigger the EMR Spark job.

However my understanding (please correct me if I'm wrong) is that the only way to kick off a Spark job is to:

  1. Package the job up as an executable JAR file; and
  2. Submit it to the cluster (EMR or otherwise) via the spark-submit shell script

So if I have to do the Lambda-based kludge, I'm not exactly sure what the best way to trigger the EMR/Spark job is, seeing that Lambdas don't natively carry spark-submit in their runtimes. And even if I configured my own Lambda runtime (which I believe is now possible to do), this solution already feels really wonky and fault-intolerant.

Anybody ever trigger an EMR/Spark job from an S3 trigger or any AWS trigger before?


Answer:

EMR Spark job can be executed as a step as in Adding a Spark Step. Step is not just at the EMR cluster creation time after bootstrap.

aws emr add-steps --cluster-id j-2AXXXXXXGAPLF --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10]

As it is a AWS CLI, you can invoke it from Lambda in which also you can upload the jar file to HDFS or S3, then point it using s3:// or hdfs://.

The document also has a Java example.

AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);

StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest();
req.withJobFlowId("j-1K48XXXXXXHCB");

List<StepConfig> stepConfigs = new ArrayList<StepConfig>();

HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("spark-submit","--executor-memory","1g","--class","org.apache.spark.examples.SparkPi","/usr/lib/spark/examples/jars/spark-examples.jar","10");            

StepConfig sparkStep = new StepConfig()
            .withName("Spark Step")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(sparkStepConf);

stepConfigs.add(sparkStep);
req.withSteps(stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);

Question:

I am developing an application on Java Spark. Generated and successfully loaded the .jar to the EMR cluster. There is one line of the code that reads:

JsonReader jsonReader = new JsonReader(new FileReader("s3://naturgy-sabt-dev/QUERY/input.json"));

I am 100% sure of:

  • Such file does exist.
  • When executing aws s3 cp s3://naturgy-sabt-dev/QUERY/input.json . I'm receiving correctly the .json file.
  • IAM policies are set so that the tied EMR role has permissions to read, write and list.
  • This post about how to read from S3 in EMR does not help.

When submitting the spark jar, I am getting the following error: (Note the printing of the route that it is going to be read right before calling the Java statement above put)

...
...
...
19/12/11 15:55:46 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.31.36.11, 35744, None)
19/12/11 15:55:46 INFO BlockManager: external shuffle service port = 7337
19/12/11 15:55:46 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.31.36.11, 35744, None)
19/12/11 15:55:48 INFO EventLoggingListener: Logging events to hdfs:///var/log/spark/apps/local-1576079746613
19/12/11 15:55:48 INFO SharedState: Warehouse path is 'hdfs:///user/spark/warehouse'.
#########################################
I am going to read from s3://naturgy-sabt-dev/QUERY/input.json
#########################################
java.io.FileNotFoundException: s3:/naturgy-sabt-dev/QUERY/input.json (No such file or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at java.io.FileInputStream.<init>(FileInputStream.java:93)
        at java.io.FileReader.<init>(FileReader.java:58)
...
...
...

Does anyone know what's going on?

Thanks for any help you can provide.


Answer:

Java default Filereader cannot load files from aws s3 by. They can only be read with 3d party libs. The bare s3 reader is shipped within java aws sdk. However hadoop has also libraries to read from s3. Hadoop jars are preinstalled on aws emr spark cluster (actually on almost all spark installs).

Spark supports loading data from s3 filesystem into a spark dataframe directly without any manual steps. All readers can read either one file, or multiple files with same structure via a glob pattern. The json dataframe reader expects new-line delimited json by default. This can be configured.

various usage ways

# read single new-line delimited json file, each line is a record
spark.read.json("s3://path/input.json")

# read single serilized json object or array, spanning multiple lines.
spark.read.option("multiLine", true).json("s3://path/input.json")

# read multiple json files 
spark.read.json("s3://folder/*.json")

Question:

I am using Apache Spark for parsing files. I have Amazon S3 path s3n://my-bucket/amazone-folder/ how to list all files and sub folders using this path.


Answer:

AWS Java-SDK documentation has API details you could use for this purpose:

Provides an easy way to iterate Amazon S3 objects in a "foreach" statement. For example:

for ( S3ObjectSummary summary : S3Objects.withPrefix(s3, "my-bucket", "photos/") ) {
    System.out.printf("Object with key '%s'\n", summary.getKey());
}

The list of S3ObjectSummarys will be fetched lazily, a page at a time, as they are needed. The size of the page can be controlled with the withBatchSize(int) method.

And here is another tutorial explains how to work with AWS Java SDK