Hot questions for Using Amazon S3 in streaming

Question:

I'm trying to implement multipart upload in Java, following this sample: https://docs.aws.amazon.com/AmazonS3/latest/dev/llJavaUploadFile.html

But my actual task is a bit more complicated: I need to support resuming in case application was shut down during uploading. Also, I can't use TransferManager - I need to use low-level API for particular reason.

The code there is pretty straight-forward, but the problem comes with List<PartETag> partETags part. When finalizing resumed upload, I need to have this collection, previously filled during the upload process. And, obviously, if I'm trying to finalize upload after application restart, I don't have this collection anymore.

So the question is: how do I finalize resumed upload? Is it possible to obtain List<PartETag> partETags from the server using some API? What I have is only a MultipartUpload object.


Answer:

Get the list of multipart uploads in progress

MultipartUploadListing multipartUploadListing = 
s3Client.listMultipartUploads(new ListMultipartUploadsRequest(bucketName)); 

## for uploadId and keyName 

Get the list of parts for each uploadId and key

PartsListing partsListing = 
s3Client.listParts(new ListPartsRequest(bucketName, key, uploadId));

Get the List of part summary

List<PartSummary> parts = partsListing.getParts();

From PartSummary getETag() and getPartNumber()

for(PartSummary part: parts)
{
  part.getETag();
  part.getPartNumber();
}

Amazon S3 SDK Package

AmazonS3 client

Question:

I have tried the suggestions given in the Apache Spark (Structured Streaming) : S3 Checkpoint support

I am still facing this issue. Below is the error i get

17/07/06 17:04:56 WARN FileSystem: "s3n" is a deprecated filesystem 
name. Use "hdfs://s3n/" instead.
Exception in thread "main" java.lang.IllegalArgumentException: 
java.net.UnknownHostException: s3n

I have something like this as part of my code

SparkSession spark = SparkSession
    .builder()
    .master("local[*]")
    .config("spark.hadoop.fs.defaultFS","s3")
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    .config("spark.hadoop.fs.s3n.awsAccessKeyId","<my-key>")
    .config("spark.hadoop.fs.s3n.awsSecretAccessKey","<my-secret-key>")
    .appName("My Spark App")
    .getOrCreate();

and then checkpoint directory is being used like this:

StreamingQuery line = topicValue.writeStream()
   .option("checkpointLocation","s3n://<my-bucket>/checkpointLocation/")

Any help is appreciated. Thanks in advance!


Answer:

For checkpointing support of S3 in Structured Streaming you can try following way:

SparkSession spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("My Spark App")
    .getOrCreate();

spark.sparkContext.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<my-key>")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<my-secret-key>")

and then checkpoint directory can be like this:

StreamingQuery line = topicValue.writeStream()
   .option("checkpointLocation","s3n://<my-bucket>/checkpointLocation/")

I hope this helps!

Question:

I have created simple ingestion service that picks onpremise files and ingest to s3 using StreamingFileSink.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html

I have set up everything as per the documentation but it is not working. I tested with the sink location to another local on prem path and files are getting there (but hidden as .part files)

Does this mean part files are also send to s3 but not visible ?

...

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            String path = "/tmp/component_test";

            MyFileInputFormat myFileInputFormat = new MyFileInputFormat(new Path(path));
            myFileInputFormat.setNumSplits(1);

            ContinuousFileMonitoringFunction<String> monitoringFunction =
                    new ContinuousFileMonitoringFunction<>(myFileInputFormat,
                            FileProcessingMode.PROCESS_CONTINUOUSLY,
                            env.getParallelism(), 1000);


            // the monitor has always DOP 1
            DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);

            ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(myFileInputFormat);
            TypeInformation<String> typeInfo = new SimpleStringSchema().getProducedType();

            // the readers can be multiple
            DataStream<String> content = splits.transform("FileSplitReader", typeInfo, reader);

            SingleOutputStreamOperator<Tuple2<String, String>> ds = content.flatMap(
                    new XMLSplitter());


            //new Path("s3://<bucket_name>/raw/")
            //new Path("file:///tmp/raw/")
            StreamingFileSink<Tuple2<String, String>> sink = StreamingFileSink
                    .forRowFormat(new Path("s3a://<bucket-name>/raw/"),
                            (Tuple2<String, String> element, OutputStream stream) -> {
                                PrintStream out = new PrintStream(stream);
                                out.println(element.f1);
                            })
                    // Determine component type for each record
                    .withBucketAssigner(new ComponentBucketAssigner())
                    .withRollingPolicy(DefaultRollingPolicy.create().withMaxPartSize(100).withRolloverInterval(1000).build())
                    .withBucketCheckInterval(100)
                    .build();
            ds.addSink(sink);       
            FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")));
            env.execute();
...

I am looking for the part files in s3 or do I need to make any changes to StreamingFileSink to roll the part files with min size ?

...

09:37:39,387 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 for job 34d46d2671c996d6150d88a2f74b4218 (7558 bytes in 38 ms).
09:37:39,388 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=1.
09:37:39,389 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 1 received completion notification for checkpoint with id=1.
09:37:39,390 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 2 received completion notification for checkpoint with id=1.
09:37:39,391 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 3 received completion notification for checkpoint with id=1.
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER1>/part-1-0 with MPU ID CEYMmUslgCnA2KcD5pslz.7dpaQuCAqmTJo6oDPv7P.Rj45O4tHrVTfDQMABxrRvdWSTwO2RoIR.r9VP2s4IMxlPtHz9r6CP_iQ7.DcP9yGDLjIN1gaLPTunAhVGuGen
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER2>/part-0-0 with MPU ID ExM_.cfOZVvXHHGNakUeshSQrkLFtm3HytooPAxDet1MoXBEJYhxlEJBYyXFmeSpk7b.ElmoydrMgotnpZAgmsh6lGhQgMYoS2hFJtOZLtPCOLyJvOt3TKRecc8YqSAJ
09:37:39,391 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER3>/part-2-0 with MPU ID 64._ocicEwPAwrMrI_LXcKyEfqYtISKsLsheAjgXwGdpf3qTH0qvOM2C3k8s2L6UDJ8yZfm9YEJhopgQIrL0hmFokCyMa49bzUbhgm3KQmiCVe9CoNiTEb4ETnEJCZFA
09:37:39,393 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 4 received completion notification for checkpoint with id=1.
09:37:39,394 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER4>/part-3-0 with MPU ID yuFGGVfh9YOL36mUUTIAyyLehCMyQGrYoabdv0BBe.e3uCIkLYLI6S4RfnCGtFsT2pjiEJq97bfftMycp4wGW5KKX4jsrmZAfK.kqiYnMUeWWcolXKmWOktVvwHvmSpB
09:37:39,394 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 5 received completion notification for checkpoint with id=1.
09:37:39,395 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 6 received completion notification for checkpoint with id=1.
09:37:39,394 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER5>/part-4-0 with MPU ID Ab7sTpLJp3fNCCYVXe2nUO5qWmYxMeYQlOssRpeawoY2LDV.a58eShdp.Anfe6YxTnVIewCmReKiYSguJS2SlBxwNRPh2ax50nCXuSdfkyVazgiNMZYMUQJjbzTxgdYW
09:37:39,395 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER6>/part-5-0 with MPU ID xDbouvLhpX7q9rFrs9y93lc7wWO20L5mxKTCWFBAmAVkTWzEiGEu2bU5H2nnCrZWbcPDMePSdpOBK64lVoS8txuhLFtq_nkBfXIs2K6OY6NuTtiSDGWi4SrWwnedC6RM
09:37:39,395 INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 7 received completion notification for checkpoint with id=1.
09:37:39,397 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing <BUCKET NAME>/<FOLDER7>/part-6-0 with MPU ID 0uZ35XrL2ShWxZL5nlY3Z1KHTSHBsQhiaJ6HZ9CbzfgxFIf7bwRNjdGHQHWPs9N0WfcpQXBM12XbNENjfILXQ6CLCx0XZrgvGHakUgeWhfeBiOURrO8xUVMT1ot7gxIY

...


Answer:

The StreamingFileSink only works if you have checkpointing enabled. The part files are finalized as part of the checkpointing process.

The documentation has recently been updated to explain this, but for now this is only available in the nightly builds of the docs: https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

Question:

For various historical reasons Im presented with a series of files on s3 that have times in the title represented as 'HH:MM:SS' (along with other characters). The file names are coming to me via SQS along with other files that don't have these characters.

I can read in the files that don't contain the ':' character just fine but am not having success with those that do. I've tried various UUEncode/Decode processes but everything gives me this error:

Caught an AmazonServiceException, which means your request made it to Amazon S3, 
but was rejected with an error response for some reason.

Error Message: The specified key does not exist. 
(Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey;

I'm wondering if there is some way to encode the key such that the file(s) can be accessed.


Answer:

The documentation indicates that the colon character is among those that may need special handling (see the docs):

The following characters in a key name may require additional code handling and will likely need to be URL encoded or referenced as HEX.

According to this list, the colon character's hex value is %3A.

In other words, replace your ":" characters with "%3A" when you specify the key and it ought to just work.

Question:

I need to download(or possibly stream) a song from an Amazon s3 bucket. The problem is that if the song is say 270 seconds long I want to start the download/stream at 150 seconds into the song and then play it from there.

How do i acomplish this using Amazon s3 web services and Java EE servlet technology?


Answer:

Using the AWS Java SDK, you should use AmazonS3Client, and call getObject on it with an instance of GetObjectRequest. When creating your request, call setRange with the byte range to get. This will let you get a portion of the file without downloading the entire thing. getObject() returns an S3Object which you can use to stream the file contents to a music player or local file or whatever you want.

There are two tricky parts:

One tricky part is that you probably need the song's header bytes as well. E.g. if the file is in MP3 format, you need the MP3 header. You can use the same method above to fetch the byte range of the header.

The other tricky part is how to determine the byte offset in a file from a number of seconds into the song. See this question regarding that. You could easily estimate byte offset by getting the entire file size in bytes (see getObjectMetadata), dividing by the total length of the song in seconds, and multiplying by the number of seconds to offset the download.

Question:

I'm currently working with Apache POI to create an excel file. I want to send this file to AWS S3 via multipart upload. I'm using the SXSSFWorkbook combined with the substitution techniques used by the BigGridDemo in order to create the document itself and send the sheet data. This is where it gets a little tricky. I have something mostly working, but am generating an invalid excel file due to NULs being written into the XML file that composed the sheet data.

In trying to track down why this happens I've stumbled onto this:

import java.io._
import java.util.zip._
val bo = new ByteArrayOutputStream()
val zo = new ZipOutputStream(bo)
zo.putNextEntry(new ZipEntry("1"))
zo.write("hello".getBytes())
zo.write("\nhello".getBytes())
val bytes1 = bo.toByteArray()
// bytes1: Array[Byte] = Array(80, 75, 3, 4, 20, 0, 8, 8, 8, 0, 107, -121, -9, 76, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 49)

bo.reset()
zo.write("hello".getBytes())
val bytes2 = bo.toByteArray() // bytes2: Array[Byte] = Array()
zo.flush()
val bytes2 = bo.toByteArray() // bytes2: Array[Byte] = Array()
bo.size //res11: Int = 0
zo.putNextEntry() // If I make a new entry it works but I can't do this in real code...
bo.size // res17: Int = 66

It seems that when I reset the underlying byte output stream it causes the ZipOutputStream to note write anything anymore. This surprised me, so I went looking into the underlying source code of ZipOutputStream. I noticed the default method is DEFLATED, which just calls DeflaterOutputStream#write, I then looked into the deflater code itself thinking that maybe there's something deeper in the compression algorithm that I don't understand that requires the stream to not be reset or that is somehow affected by it. I found a reference to FULL_FLUSH and noted

The compression state is reset so that the inflater that works on the compressed output data can restart from this point if previous compressed data has been damaged or if random access is desired.

Which sounded good to me since I could imagine that a reset byte stream could be seen as damaged data perhaps. So I repeated my minimal experiment:

import java.io._
import java.util.zip._
val bo = new ByteArrayOutputStream()
val zo = new ZipOutputStream(bo)
zo.setLevel(Deflater.FULL_FLUSH)
zo.putNextEntry(new ZipEntry("1"))
zo.write("hello".getBytes())

val bytes1 = bo.toByteArray()
// bytes1: Array[Byte] = Array(80, 75, 3, 4, 20, 0, 8, 8, 8, 0, 84, 75, -8, 76, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 49)

zo.flush()
bo.reset()
zo.write("\nhello".getBytes())
zo.flush()
val bytes2 = bo.toByteArray() // bytes2: Array[Byte] = Array()

So no dice. My goal here was to keep everything in memory (hence the byte arrays) and keep the memory pressure low by removing the bytes I had already written to the UploadPartRequest, but this really throws a wrench into things since I'm under the impression that the XML file must be compressed since the excel file format is effectively a zip file. My full code is obviously a bit more complicated and is using the Play framework and Scala 2.12.6, I've put it on github here and added some additional comments if you'd like to look at it or run it.

I know I could accomplish uploading this file to s3 in parts by writing the excel file out to disk first and then uploading it, but for my purposes I'm hoping for an all in-memory solution so I don't have to deal with disk space problems on web servers when large temp files are generated. By keeping the rows generated uploaded as they're made I was thinking the memory pressure should stay fairly constant per upload. Here's what the current code generates in the xml file sheet data:

...

Which implies to me that despite my experiment showing no bytes, at some point more bytes happen and are written to the file since the NULs end eventually.

So... why does this happen? Why does ByteArrayOutputStream.reset() cause a problem for writing on the ZipOutputStream? If I don't call .reset() it seems that the ByteArrayOutputStream will expand until it's huge and cause Out of Memory errors? Or should I not worry since the data is getting compressed anyway?


Answer:

I don't think it's the fault of ByteArrayOutputStream.reset().

Similar to CipherStreams and other filter streams, DeflaterOutputStream and thus ZipOutputStream does not actually write to the underlying stream (your ByteArrayOutputStream) until it can/needs to (sometimes even when you flush).

I believe in this case of a ZipInputStream it might only write to the underlying stream on certain block sizes or upon closing of the ZipEntry; Not exactly sure but that's my guess.

Example:

val bo = new ByteArrayOutputStream()
val zo = new ZipOutputStream(bo)
zo.putNextEntry(new ZipEntry("example entry"))

// v prints the entry header bytes v
println(bo.toString())

zo.write("hello".getBytes())
zo.flush();

// v still only the entry header bytes v
println(bo.toString())

One thing I noticed in ExcelStreamingToS3Service - line 155 you might want to change to zos.write(byteBuffer, offset, offset + bytesRead), or something similar. Writing the full buffer could certainly be what is causing all those NUL characters, since your buffer may not have been filled during the read and still have many empty indices. After all, it looks like the the xml continues where it left off from before the NULs like here: <c r="C1 ... 940" t="inlineStr"> so it does seem like you're writing all the data, just interspersing it with NULs.

Question:

I am working on a backend service which polls S3 bucket periodically using spring aws integration and processes the polled object from S3. Below is the implementation for it

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class S3PollerConfiguration {

    //private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);

    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Bean
    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);   
        return messageSource;
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));
    }

    @Bean
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileReadingFlow() throws IOException {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))
                .handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))
                .get();
    }
}

I am getting the messages from S3 on object upload and I am able to process it using the input stream received as part of message payload. But the problem I face here is that I get 'Time out waiting for connection from pool' exception after receiving few messages

2019-01-06 02:19:06.156 ERROR 11322 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:445)
    at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:405)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:194)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:180)
    at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:70)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:153)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:155)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:236)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:250)

I know that the issue is related to not closing the opened S3Object like stated here https://github.com/aws/aws-sdk-java/issues/1405 so I have implemented closing the input stream of the S3Object received as part of message payload. But that does not solve the issue and I keep getting the exceptions. Can someone help me to fix this issue ?


Answer:

Your problem that you still mix Messaging Annotations declarations with Java DSL in your configuration.

Looks like in the fileReadingFlow you close those InputStreams in your code processS3Object() method, but you do nothing with InputStreams produced by the @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5")). Why do you have it in fist place at all? What makes you to keep that code if you don't use it?

This S3StreamingMessageSource is polled all the time twice: by the @InboundChannelAdapter and IntegrationFlows.from().

You just have to remove that @InboundChannelAdapter from the S3StreamingMessageSource bean definition and that's all.

Please, read more Reference Manual to determine the reason of such an annotation and how you don't need it when you use Java DSL:

https://docs.spring.io/spring-integration/reference/html/configuration.html#_using_the_literal_inboundchanneladapter_literal_annotation

https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-inbound-adapters