Hot questions for Using Amazon S3 in scala

Question:

I am using Play Framework (Scala version) with the Amazon AWS Java SDK to integrate Amazon S3 into an application.

The AWS SDK has a TransferManager class that provides an abstraction to manage a thread pool for handling download/uploads to S3.

I am trying to determine if it is possible to integrate the core support Play has for custom ExecutionContexts into this object provided by the SDK. In particular, when instantiating the TransferManager provided by the AWS SDK, you can supply a custom ExecutorService as an optional parameter.

Scala's ExecutionClass binds the ExecutorService class via the "with" keyword in its class declaration, so I am wondering if there is some mechanism to get an ExecutorService object from the ExecutionContext like a method that transforms ExecutionContext => ExecutorService.

If not, is there any other approach? At the moment I am just instantiating a custom ExecutorService directly in a class outside of Play's standard approach which is outlined here:

https://www.playframework.com/documentation/2.3.x/ThreadPools

This feels messy and against the conventions provided by the framework.

Thanks for your time.


Answer:

If you create your context like this (do not copy-paste this blindly - it's configured for blocking operations):

val blockingContext: ExecutionContext = {
    val executor = new ThreadPoolExecutor(100, 100, 1, TimeUnit.MINUTES, new LinkedBlockingQueue(1000))
    executor.allowCoreThreadTimeOut(true)
    ExecutionContext.fromExecutorService(executor) // main part
}

Then you will be able to get ExecutorService instance from it:

val executor: ExecutorService = blockingContext.prepare().asInstanceOf[ExecutorService]

Question:

when I run all these 3 commands in unix shell/terminal, they all work fine, returning the exit status as 0

unix_shell> ls -la
unix_shell> hadoop fs -ls /user/hadoop/temp
unix_shell> s3-dist-cp --src ./abc.txt --dest s3://bucket/folder/

now I am trying to run these same commands through scala process api as an external process, the sample code is like below:

import scala.sys.process._

val cmd_1 = "ls -la"
val cmd_2 = "hadoop fs -ls /user/hadoop/temp/"
val cmd_3 = "/usr/bin/s3-dist-cp --src /tmp/sample.txt --dest s3://bucket/folder/"
val cmd_4 = "s3-dist-cp --src /tmp/sample.txt --dest s3://bucket/folder/"

val exitCode_1 = (stringToProcess(cmd_1)).! // works fine and produces result
val exitCode_2 = (stringToProcess(cmd_2)).! // works fine and produces result
val exitCode_3 = (stringToProcess(cmd_3)).! // **it just hangs, yielding nothing**
val exitCode_4 = (stringToProcess(cmd_4)).! // **it just hangs, yielding nothing**

The difference between above cmd_3 and cmd_4 is just the absolute path. And I am passing the relevant dependency explicitly in spark-submit script like below

--jars hdfs:///user/hadoop/s3-dist-cp.jar

Your inputs/suggestion would be helpful. Thanks !


Answer:

Seems like what you have done is right.see here https://github.com/gorros/spark-scala-tips/blob/master/README.md

import scala.sys.process._

def s3distCp(src: String, dest: String): Unit = {
    s"s3-dist-cp --src $src --dest $dest".!
}

Please check this note... I wonder whether this is the case with you.

regarding your --jars /usr/lib/hadoop/client/*.jar

you can append jars related to s3-dist-cp using tr command like this. see my answer

--jars $(echo /dir_of_jars/*.jar | tr ' ' ',')

Note: To be able to use this method, you need Hadoop application to be added and you need to run Spark in client or local mode since s3-dist-cp is not available on slave nodes. If you want to run in cluster mode, then copy s3-dist-cp command to slaves during bootstrap.

Question:

I'm trying to list all so-called folders and sub-folders in an s3 bucket. Now, as I am trying to list all the folders in a path recursively I am not using withDelimeter() function. All the so-called folder names should end with / and this is my logic to list all the folders and sub-folders.

Here's the scala code (Intentionally not pasting the catch code here):

val awsCredentials = new BasicAWSCredentials(awsKey, awsSecretKey)
val client = new AmazonS3Client(awsCredentials)
def listFoldersRecursively(bucketName: String, fullPath: String): List[String] = {
  try {
    val objects = client.listObjects(bucketName).getObjectSummaries
    val listObjectsRequest = new ListObjectsRequest()
      .withPrefix(fullPath)
      .withBucketName(bucketName)
    val folderPaths = client
      .listObjects(listObjectsRequest)
      .getObjectSummaries()
      .map(_.getKey)
    folderPaths.filter(_.endsWith("/")).toList
  }
}

Here's the structure of my bucket through an s3 client

Here's the list I am getting using this scala code

Without any apparent pattern, many folders are missing from the list of retrieved folders. I did not use

client.listObjects(listObjectsRequest).getCommonPrefixes.toList

because it was returning empty list for some reason.

P.S: Couldn't add photos in post directly because of being a new user.


Answer:

Without any apparent pattern, many folders are missing from the list of retrieved folders.

Here's your problem: you are assuming there should always be objects with keys ending in / to symbolize folders.

This is an incorrect assumption. They will only be there if you created them, either via the S3 console or the API. There's no reason to expect them, as S3 doesn't actually need them or use them for anything, and the S3 service does not create them spontaneously, itself.

If you use the API to upload an object with key foo/bar.txt, this does not create the foo/ folder as a distinct object. It will appear as a folder in the console for convenience, but it isn't there unless at some point you deliberately created it.

Of course, the only way to upload such an object with the console is to "create" the folder unless it already appears -- but appears in the console does not necessarily equate to exists as a distinct object.

Filtering on endsWith("/") is invalid logic.

This is why the underlying API includes CommonPrefixes with each ListObjects response if delimiter and prefix are specified. This is a list of the next level of "folders", which you have to recursively drill down into in order to find the next level.

If you specify a prefix, all keys that contain the same string between the prefix and the first occurrence of the delimiter after the prefix are grouped under a single result element called CommonPrefixes. If you don't specify the prefix parameter, the substring starts at the beginning of the key. The keys that are grouped under the CommonPrefixes result element are not returned elsewhere in the response.

https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html

You need to access this functionality with whatever library you or using, or, you need to iterate the entire list of keys and discover the actual common prefixes on / boundaries using string splitting.

Question:

We have some Scala code running in Elastic Beanstalk (using Tomcat) that accesses S3 using the Java AWS SDK. It was working perfectly for months. Then, a few days ago, we started seeing some strange errors. It can read and write to S3 about a third of the time. The other two thirds of the time, it gets an access denied error when reading from S3.

The exceptions look like this: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 6CAC5AB616FC6F23)

All S3 operations use the same bucket. The IAM role has full access to S3 (allowed to do any operation using any bucket).

We contacted Amazon support and they can't help us unless we provide a host ID and request ID that they can research. But the exception only has a request ID.

I'm looking for one of two things: either a solution to the access denied errors, or a way to get a host ID we can give to Amazon support. I already tried calling s3Client.getCachedResponseMetadata(getObjectRequest), but it always returns null after the getObject call fails.


Answer:

I was able to get the Host ID by calling AmazonS3Exception.getErrorResponseXml(). We're still working with Amazon to determine the root cause.

Question:

What is a good way from a Scala or Java program to check if an S3 bucket has objects matching a certain key pattern? That is, if i have a bucket named "CsvBucket" how can i check if it contains an object where the key matches the pattern "processed/files/2015/8/*.csv" ?

Thanks


Answer:

Since S3 object keys are just Strings you can just iterate over them and test each using a regular expression. Perhaps something like this (using jets3t library):

Pattern pattern = Pattern.compile(".*\\.csv");
// 'service' is an instance of S3Service
S3Bucket bucket = service.getBucket(bucketName);
S3Object[] files = service.listObjects(bucket, "processed/files/2015/8", null);
for (int i = 0; i < files.length; i++)
{
    if (pattern.matches(files[i].getKey()))
    {
        // ... work with the file ...
    }
}

Question:

When i am trying to image upload file to my s3 bucket via command line aws s3 cp /Users/shravan40/Downloads/scan1.jpg s3://s3_bucket_name/access_key_id it successfully uploads. But When i try to upload the image through Scala API then, it shows the Gateway timeout. I have set time out time as 2 minutes.

public void connect(){
    conn = new AmazonS3Client(credentials);        
}


public AmazonS3Client(AWSCredentials awsCredentials) {
    this(awsCredentials, new ClientConfiguration());
} 

public void setBucket(String bucketName){
    this.bucketName = bucketName;
    this.baseUrl = "https://s3.us-west-2.amazonaws.com/"+bucketName;
}

Here is the API code,

post("/api/:version/user/:id/image/upload") {
  CGMetrics.apiProfiler.profile("api.uploadImage") {
    val memberId: Long = params("id").toLong
    if (!AccountService.validateMemberId(memberId))
      APIResponseError(APIResponseCode.NOT_FOUND, "Invalid Member Id")
    else {
      val filePath: String = getUploadedFilePath("file_path", "file")
      val fileName: String = getParam("filename").get.toString

      if (filePath != null) {
        val (uploadImageUrl, resizedImageHeight, resizedImageWidth) = MediaService.uploadImage(memberId, 0, fileName, filePath)
        APIResponseOK(Map("imgUrl" -> uploadImageUrl, "imgHeight" -> resizedImageHeight, "imgWidth" -> resizedImageWidth))
      } else {
        APIResponseError(APIResponseCode.UNPROCESSABLE_ENTITY, "No File Found")
      }
    }
  } 
}

And getUploadFilePath method,

private def getUploadedFilePath(filePathKey: String = "file_path", fieldName: String = "file")(implicit request: HttpServletRequest): String = {

val filePath: String = getParam(filePathKey) match {
  case Some(x) => x
  case None => fileParams.get(fieldName) match {
    case Some(file) =>

      val fileName = file.getName
      val ext = Helper.getFileExtension("/tmp/" + fileName)

      Log.logger(Log.FILE.DEFAULT).debug("file name = " + fileName)
      Log.logger(Log.FILE.DEFAULT).debug("file ext = " + ext)

      val tempFile = File.createTempFile(fileName, "." + ext)
      Helper.writeToFile(tempFile, file.get())
      Log.logger(Log.FILE.DEFAULT).info("Created Temporary Asset : " + fileName + " at  " + tempFile.getAbsolutePath)

      tempFile.getAbsolutePath
    case None =>
      null
  }
}

filePath
}

Update : Now files are being uploaded to S3 bucket, but no confirmation for the same.

Any suggestions?


Answer:

There was some problem with my method uplodImage, which was unable to get create thumbnail that's why i was getting time out.

And i was running reddis server on port 4381 but had provided the default port number i.e. 6397

Question:

I am using the AWS TransferManager to upload my objects on S3 bucket using Scala programming.

My code is something like:

val xferMgr:TransferManager = TransferManagerBuilder.standard().withS3Client(awsClient).build();

val putObjectRequest:PutObjectRequest = new PutObjectRequest(bucketName, key + fileToUpload.getName, fileToUpload);

val upload:Upload = xferMgr.upload(putObjectRequest);

val uploadResult:UploadResult = upload.waitForUploadResult();

if (uploadResult != null) {
    val uploadedS3Filename:String = uploadResult.getKey();
    println("File uploaded with key: " + uploadedS3Filename);
}

This code does upload objects to S3 bucket, however my problem is that the thread keeps running in the background and it doesn't end upon completion.

How do I make my program exit successfully once the upload is done.


Answer:

AWS TransferManager is using multiple threads to make uploads faster by uploading multiple parts of a single file in the same time. To release the resources that manager is using you should use:

xferMgr.shutdownNow()

after your upload is finished.

Question:

I am trying to list the folders under the s3 bucket, The problem is that using the S3 browser tool, I can see four folders but when I use java/scala code to get the list of folders under the bucket, It returns only one folder. I have used the following code using simple AWS-JAVA-SDK.

val awsCreds: BasicAWSCredentials = new BasicAWSCredentials(accessKey, accessSecret)
val s3: AmazonS3 = new AmazonS3Client(awsCreds)
val listObjectsRequest = new ListObjectsRequest()
        .withBucketName(bucketName).withPrefix(prefix)
        .withDelimiter(delimiter);
val objectListing = s3.listObjects(listObjectsRequest);
val directories = objectListing.getCommonPrefixes
println(directories.mkString(","))

it prints only one folder /staging

I have also tried the awsScala library and used the following code

val bucket = s3.bucket("prod-tapp").get
val summaries=s3.ls(bucket, "/")
summaries.foreach(println(_))

But same result.

I can see the correct folders using desktop application of aws s3 browser on windows. Here is the screen-shot of the result.

Any suggestion?

Thanks


Answer:

I'm guessing that you're not including the delimiter (/) in the prefix.

If I run the following code (Java, but doesn't really matter):

public class S3Prefix {

    private static final AmazonS3Client s3 = new AmazonS3Client();

    public static void main(String[] args) {
        Arrays.asList(null, "test1", "test1/").forEach(S3Prefix::listPrefix);
    }

    public static void listPrefix(String prefix) {
        System.out.println("Listing prefix '" + prefix + "'");
        final ListObjectsV2Result result = s3.listObjectsV2(new ListObjectsV2Request()
                .withPrefix(prefix)
                .withBucketName("raniz-prefix-test")
                .withDelimiter("/"));

        System.out.println("\tCommon prefixes");
        result.getCommonPrefixes().forEach(p -> System.out.println("\t\t" + p));

        System.out.println("\tKeys");
        result.getObjectSummaries().forEach(s -> System.out.println("\t\t" + s.getKey()));
    }
}

I get the following output:

Listing prefix 'null'
    Common prefixes
        test1/
        test2/
        test3/
    Keys
Listing prefix 'test1'
    Common prefixes
        test1/
    Keys
Listing prefix 'test1/'
    Common prefixes
    Keys
        test1/
        test1/bar.txt
        test1/foo.txt

As you can see, it's important to include the delimiter in the prefix if you want to list the contents of that "folder".

Question:

I am trying to store accented characters such as ò in the metadata of an S3 object. I am using the REST API which according to this page only accepts US-ASCII: http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html

Is there a way to convert Strings in Scala or Java from Bòrd to B\u00F2rd?

I have tried using Normalizer.normalize(str, Normalizer.Form.NFD) but the character when submitted to S3 is still causing an error because it appears as ò. When I try to print out the returned String it is also showing ò.


Answer:

A normalized unicode string is just normalized in terms of composing characters, not necessarily to ASCII. Using NFKC would be more likely to convert characters to ASCII forms, but certainly would not reliably to do so.

It sounds like what you want is to escape non-ascii characters. You could use e.g. UnicodeEscaper from commons-lang, and UnicodeUnescaper to translate back.