Hot questions for Using Amazon S3 in hdfs

Question:

I want to transfer a file from HDFS to S3 in Java. Some files may be huge, so I don't want to download my file locally before uploading it to S3. Is there any way to do that in Java?

Here's what I have right now (a piece of code that uploads a local file to S3). I can't really use this, because using the File object implies me having it on my HDD.

File f = new File("/home/myuser/test");

TransferManager transferManager  = new TransferManager(credentials);
MultipleFileUpload upload = transferManager.uploadDirectory("mybucket","test_folder",f,true);

Thanks


Answer:

I figured out the uploading part.

AWSCredentials credentials = new BasicAWSCredentials(
            "whatever",
            "whatever");

    File f = new File("/home/myuser/test");

    TransferManager transferManager  = new TransferManager(credentials);

    //+upload from HDFS to S3
    Configuration conf = new Configuration();
    // set the hadoop config files
    conf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
    conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));

    Path path = new Path("hdfs://my_ip_address/user/ubuntu/test/test.txt");
    FileSystem fs = path.getFileSystem(conf);
    FSDataInputStream inputStream = fs.open(path);
    ObjectMetadata objectMetadata =  new ObjectMetadata();
    Upload upload = transferManager.upload("xpatterns-deployment-ubuntu", "test_cu_jmen3", inputStream, objectMetadata);
    //-upload from HDFS to S3

    try {
        upload.waitForCompletion();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

Any ideas about how to do something similar for downloading? I haven't found any download() method in TransferManager that can use a stream like in the above code.

Question:

I have a class to copy directory content from one location to another using Apache FileUtil:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

class Folder {
    private final FileSystem fs;
    private final Path pth;

    // ... constructors and other methods

    /**
     * Copy contents (files and files in subfolders) to another folder.
     * Merges overlapping folders
     * Overwrites already existing files
     * @param destination Folder where content will be moved to
     * @throws IOException If fails
     */
    public void copyFilesTo(final Folder destination) throws IOException {
        final RemoteIterator<LocatedFileStatus> iter = this.fs.listFiles(
            this.pth,
            true
        );
        final URI root = this.pth.toUri();
        while (iter.hasNext()) {
            final Path source = iter.next().getPath();
            FileUtil.copy(
                this.fs,
                source,
                destination.fs,
                new Path(
                    destination.pth,
                    root.relativize(source.toUri()).toString()
                ),
                false,
                true,
                this.fs.getConf()
            );
        }
    }
}

This class is working fine with local (file:///) directories in a unit test, but when I'm trying to use it in Hadoop cluster to copy files from HDFS (hdfs:///tmp/result) to Amazon S3 (s3a://mybucket/out) it doesn't copy anything and doesn't throw error, just silently skip copying.

When I'm using same class (with both HDFS or S3a filesystems) for another purpose it's working fine, so the configuration and fs reference should be OK here.

What I'm doing wrong? How to copy files from HDFS to S3A correctly?

I'm using Hadoop 2.7.3.


UPDATE I've added more logs to copyFilesTo method to log root, source and target variables (and extracted rebase() method without changing the code):

    /**
     * Copy contents (files and files in subfolders) to another folder.
     * Merges overlapping folders
     * Overwrites already existing files
     * @param dst Folder where content will be moved to
     * @throws IOException If fails
     */
    public void copyFilesTo(final Folder dst) throws IOException {
        Logger.info(
            this, "copyFilesTo(%s): from %s fs=%s",
            dst, this, this.hdfs
        );
        final RemoteIterator<LocatedFileStatus> iter = this.hdfs.listFiles(
            this.pth,
            true
        );
        final URI root = this.pth.toUri();
        Logger.info(this, "copyFilesTo(%s): root=%s", dst, root);
        while (iter.hasNext()) {
            final Path source = iter.next().getPath();
            final Path target = Folder.rebase(dst.path(), this.path(), source);
            Logger.info(
                this, "copyFilesTo(%s): src=%s target=%s",
                dst, source, target
            );
            FileUtil.copy(
                this.hdfs,
                source,
                dst.hdfs,
                target,
                false,
                true,
                this.hdfs.getConf()
            );
        }
    }

    /**
     * Change the base of target URI to new base, using root
     * as common path.
     * @param base New base
     * @param root Common root
     * @param target Target to rebase
     * @return Path with new base
     */
    static Path rebase(final Path base, final Path root, final Path target) {
        return new Path(
            base, root.toUri().relativize(target.toUri()).toString()
        );
    }

After running in the cluster I've got these logs:

io.Folder: copyFilesTo(hdfs:///tmp/_dst): from hdfs:///tmp/_src fs=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_182008924_1, ugi=hadoop (auth:SIMPLE)]]
io.Folder: copyFilesTo(hdfs:///tmp/_dst): root=hdfs:///tmp/_src
INFO io.Folder: copyFilesTo(hdfs:///tmp/_dst): src=hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file target=hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file

I localized the wrong code in rebase() method, it's not working correctly when running in EMR cluster because RemoteIterator is returning URIs in remote format: hdfs://ip-172-31-2-12.us-east-2.compute.internal:8020/tmp/_src/one.file but this method is expecting format hdfs:///tmp/_src/one.file, this is why it's working locally with file:/// FS.


Answer:

I don't see anything obviously wrong.

  1. Does it do hdfs-hdfs or s3a-s3a?
  2. Upgrade your hadoop version; 2.7.x is woefully out of date, especially with the S3A code. It's unlikely to make whatever this problem go away, but it will avoid other issues. Once you've upgraded, switch to the fast upload and it will do incremental updates of large files; currently your code will be saving each file to /tmp somewhere and then uploading it in the close() call.
  3. turn on the logging for the org.apache.hadoop.fs.s3a module and see what it says

Question:

Background

Simple age old problem of uploading files to s3 using java with a slight twist

S3 does not support streaming (AFAIK) so data needs to be grouped into some appropriate sized files before it is uploaded.

While creating these temporary files mentioned above there are some options as far as location is concerned

  1. Locally in some designated directories
  2. Locally in HDFS (if something like that is possible, I don't know even H from Hadoop)
  3. To HDFS in hadoop cluster

Question

  1. Which one is likely to be faster?
  2. Is there advantage in using HDFS (either locally or cluster) vs local FS, HDFS being closer in nature to S3 format?

Tech and infrastructure

EC2, linux, java


Answer:

If you have enough disk space locally, just do it locally. Otherwise, you can merge the data into the stores you need on HDFS and then upload them. However, HDFS should not be your first resort here.