Hot questions for Using Amazon S3 in hadoop

Question:

I implemented spark application. I've created spark context:

    private JavaSparkContext createJavaSparkContext() {
            SparkConf conf = new SparkConf();
            conf.setAppName("test");
            if (conf.get("spark.master", null) == null) {
                conf.setMaster("local[4]");
            }
            conf.set("fs.s3a.awsAccessKeyId", getCredentialConfig().getS3Key());
            conf.set("fs.s3a.awsSecretAccessKey", getCredentialConfig().getS3Secret());
            conf.set("fs.s3a.endpoint", getCredentialConfig().getS3Endpoint());

            return new JavaSparkContext(conf);
        }

And I try to get data from s3 via spark dataset API (Spark SQL):

     String s = "s3a://" + getCredentialConfig().getS3Bucket();
     Dataset<Row> csv = getSparkSession()
                        .read()
                        .option("header", "true")
                        .csv(s + "/dataset.csv");

     System.out.println("Read size :" + csv.count());

There is an error:

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 1A3E8CBD4959289D, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: Q1Fv8sNvcSOWGbhJSu2d3Nfgow00388IpXiiHNKHz8vI/zysC8V8/YyQ1ILVsM2gWQIyTy1miJc=

Hadoop version: 2.7

AWS endpoint: s3.eu-central-1.amazonaws.com

(On hadoop 2.8 - all works fine)


Answer:

The problem is: Frankfurt doesn't support s3n. Need to use s3a. And this region has V4 auth version. http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region

EU (Frankfurt) eu-central-1 Version 4 only

It mean's need to enable it on aws client. Need to add system property

com.amazonaws.services.s3.enableV4 -> true

conf.set("com.amazonaws.services.s3.enableV4", "true");//doesn't work for me

On local machine I've used:

System.setProperty("com.amazonaws.services.s3.enableV4", "true");

For running on AWS EMR need to add params to spark-submit:

spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true
spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true

Question:

I get strange errors such as - cant't get aws credentials or Unable to load credentials from ...

Is there any way to set explicitly the s3a credentials in hadoop configuration?


Answer:

As s3a is relatively new implementation (and works correctly from hadoop 2.7), you need to set two sets properties in hadoop configuration -

    conf.set("fs.s3a.access.key", access_key);
    conf.set("fs.s3a.secret.key", secret_key);
    conf.set("fs.s3a.awsAccessKeyId", access_key);
    conf.set("fs.s3a.awsSecretAccessKey", secret_key);

(conf is hadoop configuration)

the reason is that the naming convention changed between versions and to be on the safe side - set both

Question:

I am trying to wordcount program using the MapReduce Hadoop technology. What I need to do is develop an Indexed Word Count application that will count the number of occurences of each word in each file in a given input file set. This file set is present in the Amazon S3 bucket. It will also count the total occurences of each word. I have attached the code that counts the occurences of the words in the given file set. After this I need to print that which word is occuring in which file with the number of occurrences of the word in that particular file.

I know its a bit complex but any would be appreciated.

Map.java

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private String pattern= "^[a-z][a-z0-9]*$";

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        InputSplit inputSplit = context.getInputSplit();
        String fileName = ((FileSplit) inputSplit).getPath().getName();
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            String stringWord = word.toString().toLowerCase();
            if (stringWord.matches(pattern)){
                context.write(new Text(stringWord), one);
            }

        }
    }
}

Reduce.java

import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
}   

WordCount.java

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = new Job(conf, "WordCount");
        job.setJarByClass(WordCount.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setNumReduceTasks(3);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

Answer:

In the mapper, create a custom writable textpair which would be the output key that would hold filename and word from your file and value as 1.

Mapper Output:

<K,V> ==> <MytextpairWritable,new IntWritable(1)

You can get the filename in mapper with below snippet.

FileSplit fileSplit = (FileSplit)context.getInputSplit();
String filename = fileSplit.getPath().getName();

And pass these as a constructor to the custom writable class in the context.write. Something like this.

context.write(new MytextpairWritable(filename,word),new IntWritable(1));

And in the reducer side just sum up the value, so that you could get for each file how many occurrences are there for a particular word. Reducer code would be something like this.

public class Reduce extends Reducer<mytextpairWritable, IntWritable,mytextpairWritable, IntWritable> {


    public void reduce(mytextpairWritable key, Iterable<IntWritable> values , Context context)
    throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable val: values){
            sum+=val.get();
            }
       context.write(key, new IntWritable(sum));
}

Your output will be something like this.

File1,hello,2
File2,hello,3
File3,hello,1

Question:

I am having a problem saving text files to S3 using pyspark. I am able to save to S3, but it first uploads to a _temporary on S3 and then proceeds to copy to the intended location. This increases the jobs run time significantly. I have attempted to compile a DirectFileOutputComitter which should write directly to the intended S3 url, but I cannot get Spark to utilize this class.

Example:

someRDD.saveAsTextFile("s3a://somebucket/savefolder")

this creates a

s3a://somebucket/savefolder/_temporary/

directory which is then written to after which a S3 copy operation moves the files back to

s3a://somebucket/savefolder

My question is does anyone have a working jar of the DirectFileOutputCommiter, or if anyone has experience working around this issue.

Relevant Links:

  1. https://issues.apache.org/jira/browse/HADOOP-10400
  2. https://gist.github.com/aarondav/c513916e72101bbe14ec
  3. https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3C029201d06334$a0871180$e1953480$@gmail.com%3E
  4. http://tech.grammarly.com/blog/posts/Petabyte-Scale-Text-Processing-with-Spark.html

Answer:

I was able to fix this issue by patching Hadoop 2.7.2 with a DirectOutputCommitter from Databricks and deploying the patched jar to my spark instances. Linked below is a git repo with the patched jar.

Github Link

Question:

I am writing a program to upload a data to some s3a:// link. The program is compiled through mvn install. Running the program locally (as in using java -jar jarfile.jar) returned no error. However, when I use spark-submit (as in using spark-submit jarfile.jar), it returned such error:

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.reloadExistingConfigurations()V at org.apache.hadoop.fs.s3a.S3AFileSystem.addDeprecatedKeys(S3AFileSystem.java:181) at org.apache.hadoop.fs.s3a.S3AFileSystem.(S3AFileSystem.java:185) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) ...

The error log traced to this portion of my source code:

sparkDataset
        .write()
        .format("parquet")
        .mode(SaveMode.Overwrite)
        .save("some s3a:// link");

where sparkDataset is an instance of org.apache.spark.sql.Dataset.

Trying How to access s3a:// files from Apache Spark? is unsuccessful and returned another error as such:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/GlobalStorageStatistics$StorageStatisticsProvider

Problem from java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.reloadExistingConfigurations()V is also unlikely because I can run locally, in which the compatilibity is not a problem.

In addition, these are the version of related libraries that I used:

  • aws-java-sdk-bundle:1.11.199
  • hadoop-aws:3.0.0

I am expecting files written through the s3a:// links. I think dependency is not the issue because I can run locally. I only face this problem when using spark-submit to run this program. Anyone have any ideas on how to resolve this?

Edit: In addition, I have checked that the spark version of the spark submit is said to be built for hadoop 2.7 and above. I am strictly using hadoop 3.0.0. Could this be a clue for why such error happened in my program?


Answer:

Answer from Run spark-submit with my own build of hadoop had seem to guide me on finding my own solution.

Based on my understanding, for some unknown reason*, the spark-submit provided by the distribution 'spark-2.4.0-bin-hadoop2.7.tgz' will exclude any packages of hadoop that is compiled together in your application.

The reason why was the NoSuchMethodError error raised is because the method reloadExistingConfiguration does not exist until Hadoop version 2.8.x. It seemed that writing a parquet would somehow invoke this particular method along the way.

My solution is to use the separate distribution of 'spark-2.4.0-without-hadoop.tgz' while connecting it to hadoop 3.0.0 so that it will use the correct version of hadoop even if spark-submit excluded the packages in your application during execution.

In addition, since the packages would be excluded by spark-submit anyway, I would not create a fat jar during compilation through Maven. Instead, I would use the flag --packages during execution to specify the dependencies that is required to run my application.

Question:

Currently we are running MapReduce job in Hadoop in which the output is compressed into SnappyCompression. Then we are moving the output file to S3. Now I want to read the Compressed file from S3 through Java.


Answer:

I found the answer to read snappy compressed file from S3. First you should get the object content from S3. And then decompress the file.

    S3Object s3object = s3Client.getObject(new GetObjectRequest(bucketName,Path));
    InputStream inContent = s3object.getObjectContent();
    CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(SnappyCodec.class, new Configuration());
    InputStream inStream = codec.createInputStream(new BufferedInputStream(inContent));
    InputStreamReader  inRead = new InputStreamReader(inStream);
    BufferedReader br = new BufferedReader(inRead);
    String line=null;
    while ((line = br.readLine()) != null){
        system.out.println(line);
    }   

Question:


Answer:

Solved this by adding the following dependency in pom.xml besides the above:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.1</version>
</dependency>