Hot questions for Using Amazon S3 in apache camel

Top Java Programmings / Amazon S3 / apache camel

Question:

I am using AWS-S3 consumer to poll files on a certain location on S3 at regular intervals. After polling for certain no of times, it starts failing with exceptions as given,

Will try again at next poll. Caused by:[com.amazonaws.AmazonClientException - Unable to execute HTTP request:
Timeout waiting for connection from pool]
com.amazonaws.AmazonClientException: Unable to execute HTTP request:Timeout waiting for connection from pool
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:376) ~[aws-java-sdk-1.5.5.jar:na]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:202) ~[aws-java-sdk-1.5.5.jar:na]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3037) ~[aws-java-sdk-1.5.5.jar:na]
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3008) ~[aws-java-sdk-1.5.5.jar:na]
at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:531) ~[aws-java-sdk-1.5.5.jar:na]
at org.apache.camel.component.aws.s3.S3Consumer.poll(S3Consumer.java:69) ~[camel-aws-2.12.0.jar:2.12.0]
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187) [camel-core-2.12.0.jar:2.12.0]
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:114) [camel-core-2.12.0.jar:2.12.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_60]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_60]

From what I understand, the reason shall be the consumer exhausting the available connections from the pool as it uses a new connection every poll. What I need to know is how to release the resources after every poll and why does the component itself doesn't do it.

Camel Version: 2.12

Edit: I modified the consumer to pick custom S3 client with specific connection timeout, maxconnections, maxerrorretry and sockettimeout, but of no use. Resultant is same.

S3 Client configuration:

ClientConfiguration clientConfiguration = new ClientConfiguration();
    clientConfiguration.setMaxConnections(50);
    clientConfiguration.setConnectionTimeout(6000);
    clientConfiguration.setMaxErrorRetry(3);
    clientConfiguration.setSocketTimeout(30000);
    main.bind("s3Client", new AmazonS3Client(awsCredentials, clientConfiguration));

The object of AmazonS3Client named "s3Client" is bounded to the Camel context and is provided to Camel AWS-S3 component based route. Now, Camel on its own manages this resource.

Required solution: Am expecting solution specific to Camel Aws-S3 consumer and not generic Java solution as am aware that connection shall be closed after its task is done for it to be released and used again. What am confused about is why is Camel not doing this automatically when provided with the connection pool or if I am missing any configuration specifically.


Answer:

Camel Consumer class opens connection for each "Key" and creates an exchange out of it. This exchange is forwarded on to the route for processing but never closed automatically, even on calling "Stop". Resultant, the connection pool runs out of free connections. What needs to be done is extract the S3ObjectInputStream out of the exchange and close it.

S3ObjectInputStream s3InputStream = exchange.getIn().getBody(S3ObjectInputStream.class);
s3InputStream.close();

The answer is pretty much close to what the others suggest that is to close the connection. But as explained, Camel specific answer was expected and an explanation to why doesn't Camel handles this on its own.

Question:

I have an requirement to move the files from one S3 bucket to another S3 bucket, during this process I need to split the file (that is an image file).

So, this is how I am doing

camelContext.addRoutes(new RouteBuilder() {

        @Override
        public void configure() throws Exception {

            from(sourcePoint.getS3SourcePoint()).split().method(new SheetImageSplitterImpl(), "split").to(destinationPoint.getS3DestinationPoint());
        }
    });

    camelContext.start();

Inside SheetImageSplitter's split() method, I am trying to implement the logic to split the image file. The exchange body returns me a body of type S3Object.inputStream. I do not find any help to convert the S3ObjectInputStream to image files

public List<Message> split(Exchange exchange) {

    System.out.println(exchange.getIn().getBody());
}

Is there way, So that I can process the image files

Note : I am using Java DSL


Answer:

You can convert that S3ObjectInputStream as InputStream and put it back into the message

public List<message> split(Exchange exchange){
      InputStream iStream = (InputStream) exchange.getIn().getBody();
      File file = new File("tmp/filename.jpg");
      FileUtils.copyInputStreamToFile(iStream, file);

      List<File> files = <your splitting logic method>

      List<Message> messageList = new ArrayList<Message>();
      for (File file : files) {
           DefaultMessage message = new DefaultMessage(); 
           InputStream ip = new FileInputStream(file.getName());
           message.setBody((InputStream) ip);
           messageList.add(message);
    }
    return messageList;
}

Question:

I am sending a file to s3 bucket using camel. I want to verify the file integrity using md5. i am using org.apache.commons.codec.digest.DigestUtils.

from(ftp_endpoint)
    .idempotentConsumer(simple("${in.header.CamelFileName}"), redisIdempotentRepository)
    .setHeader(S3Constants.KEY, simple("${file:name}"))
    .setHeader(S3Constants.CONTENT_MD5, simple(DigestUtils.md5(body().toString()).toString()))
    .to(s3_endpoint)

I am getting the following exception

com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you specified was invalid. 
(Service: Amazon S3; Status Code: 400; Error Code: InvalidDigest; Request ID: 8462458C6250091C)

How do i calculate the MD5 correctly so that it uploads to S3.


Answer:

I can spot a couple of issues in your setHeader.

.setHeader(S3Constants.CONTENT_MD5, simple(DigestUtils.md5(body().toString()).toString()))

First, you are NOT calculating MD5 of your body (which I assume it's byte[] since you're reading a file) because you are calling toString() on it. Second, docs for DigestUtils.md5 states that the return type is byte[] which once again you are calling toString() on it.

Calling toString() on a byte array returns a String containing something like

[B@106d69c

See for example this other question on SO "UTF-8 byte[] to String".

You can try this solution using DigestUtils.md5Hex which returns the hash as a String:

.setHeader(S3Constants.CONTENT_MD5, simple(DigestUtils.md5Hex(body())))

Question:

Apache Camel interfaces well with AWS S3, but I have found a scenario in which it is not built correctly for. Going over all of the Camel examples I have seen online, I have never seen anyone use the recommended, industry standard, AWS temporary credentials on non-local environments. Using static credentials that live for ~6 months is a security issue as well as a manual burden (to refresh) and realistically shouldn't be used anywhere except for local environments.

Given a custom, s3 client setup, Camel can take temporary credentials, however, a Camel route pointed to AWS S3 will experience an expiration at some point. Camel is not smart enough to know this and will continue to try to poll a S3 bucket without throwing any exceptions or timeout errors indefinitely.

I have tried to add a timeout configuration to my endpoint like so:

aws-s3://" + incomingAWSBucket + "?" + "amazonS3Client=#amazonS3Client&timeout=4000

Can anyone explain how to interface Camel with AWS temporary credentials or throw an exception if AWS credentials expire (given the aforementioned setup)?

Thanks for the help!


UPDATE:

I pushed a feature to Apache Camel to handle the issue above: https://github.com/apache/camel/blob/master/components/camel-aws-s3/src/main/docs/aws-s3-component.adoc#use-useiamcredentials-with-the-s3-component


Answer:

The answer to this question is dense enough for a tutorial if others want it. For now, I will copy and paste it to the correct forums and threads to get the word out:

Without complaining too much, I'd just like to say that for how powerful Camel is, its documentation and example base is really lacking for production scenarios in the AWS world... Sigh... Thats a mouthful and probably a stretch for any open source lib.

I figured out how to solve the credential problem by referencing the official camel-s3 documentation to first see how to create an advanced S3 configuration (relying on the aws sdk itself -- you can see a bare bones example there -- it makes the s3 client manually). After I figured this out, I went out to the aws sdk documentation on IAM credentials to figure out how this could work on an EC2 instance since I am able to build the client itself. In the aforementioned docs, there are a few bare bones examples as well. Upon testing testing with the examples listed, I found that the credential refresh (the sole purpose of this question) was not working. It could get credentials at first, but it was not refreshing them during my tests after they were manually expired. Lastly, I figured out that you can specify a provider chain that can handle the refreshing of the credentials on its own. The aws documentation that explains this is here.

In the end, I still need to have static credentials for my local camel setups that poll aws s3 buckets, however, my remote environments that live on ec2s can access them with temporary credentials that refresh themselves flawlessly. WOWSA! :)

To do this, I simply made a factory that uses a local camel setup for my local development and remote camel setup that relies on the temporary IAM credentials. This saves me the security concern and the work on needing to manually refresh credentials for all remote environments!

I will not explain how to create a factory or how my local & remote configurations are set up entirely, but I will include my code sample of the AmazonS3ClientBuilder that creates an S3 Client for remote setups.

AmazonS3ClientBuilder.standard()
   .withCredentials(new InstanceProfileCredentialsProvider(false))
   .withRegion(Regions.US_WEST_2)
   .build();

If there is a desire on how I got this to work, I can provide an example project that shows the entire process.

By request, here are my local and remote implementations of the s3 client: Local:

public class LocalAWSS3ClientManagerImpl implements AWSS3ClientManager {
    private static Logger logger = LoggerFactory.getLogger(LocalAWSS3ClientManagerImpl.class);
    private PriorityCodeSourcesRoutesProperties priorityCodeSourcesRoutesProperties;
    private SimpleRegistry registry = new SimpleRegistry();
    private CamelContext camelContext;

public LocalAWSS3ClientManagerImpl(PriorityCodeSourcesRoutesProperties priorityCodeSourcesRoutesProperties) {
    this.priorityCodeSourcesRoutesProperties = priorityCodeSourcesRoutesProperties;
    registry.put("amazonS3Client", getS3Client());
    camelContext = new DefaultCamelContext(registry);
    logger.info("Creating an AWS S3 manager for a local instance (you should not see this on AWS EC2s).");
}

private AmazonS3 getS3Client() {
    try {
        String awsBucketAccessKey = priorityCodeSourcesRoutesProperties.getAwsBucketAccessKey();
        String awsBucketSecretKey = priorityCodeSourcesRoutesProperties.getAwsBucketSecretKey();
        AWSCredentials awsCredentials = new BasicAWSCredentials(awsBucketAccessKey, awsBucketSecretKey);
        return AmazonS3ClientBuilder.standard().withCredentials(
                new AWSStaticCredentialsProvider(awsCredentials)).build();
    } catch (RuntimeException ex) {
        logger.error("Could not create AWS S3 client with the given credentials from the local config.");
    }
    return null;
}

public Endpoint getIncomingAWSEndpoint(final String incomingAWSBucket, final String region,
        final String fileNameToSaveAndDownload) {
    return camelContext.getEndpoint(
            "aws-s3://" + incomingAWSBucket + "?" + "amazonS3Client=#amazonS3Client"
            + "&region=" + region + "&deleteAfterRead=false" + "&prefix=" + fileNameToSaveAndDownload);
}

public Endpoint getOutgoingLocalEndpoint(final String outgoingEndpointDirectory,
        final String fileNameToSaveAndDownload) {
    return camelContext.getEndpoint(
            "file://" + outgoingEndpointDirectory + "?" + "fileName="
            + fileNameToSaveAndDownload + "&readLock=markerFile");
}
}

Remote:

public class RemoteAWSS3ClientManagerImpl implements AWSS3ClientManager {
private static Logger logger = LoggerFactory.getLogger(RemoteAWSS3ClientManagerImpl.class);
private PriorityCodeSourcesRoutesProperties priorityCodeSourcesRoutesProperties;
private SimpleRegistry registry = new SimpleRegistry();
private CamelContext camelContext;

public RemoteAWSS3ClientManagerImpl(PriorityCodeSourcesRoutesProperties priorityCodeSourcesRoutesProperties) {
    this.priorityCodeSourcesRoutesProperties = priorityCodeSourcesRoutesProperties;
    registry.put("amazonS3Client", getS3Client());
    camelContext = new DefaultCamelContext(registry);
    logger.info("Creating an AWS S3 client for a remote instance (normal for ec2s).");
}

private AmazonS3 getS3Client() {
    try {
        logger.info("Attempting to create an AWS S3 client with IAM role's temporary credentials.");
        return AmazonS3ClientBuilder.standard()
                                    .withCredentials(new InstanceProfileCredentialsProvider(false))
                                    .withRegion(Regions.US_WEST_2)
                                    .build();
    } catch (RuntimeException ex) {
        logger.error("Could not create AWS S3 client with the given credentials from the instance. "
                     + "The default credential chain was used to create the AWS S3 client. "
                     + ex.toString());
    }
    return null;
}

public Endpoint getIncomingAWSEndpoint(final String incomingAWSBucket, final String region,
        final String fileNameToSaveAndDownload) {
    return camelContext.getEndpoint(
            "aws-s3://" + incomingAWSBucket + "?" + "amazonS3Client=#amazonS3Client"
            + "&region=" + region + "&deleteAfterRead=false" + "&prefix=" + fileNameToSaveAndDownload);
}

public Endpoint getOutgoingLocalEndpoint(final String outgoingEndpointDirectory,
        final String fileNameToSaveAndDownload) {
    return camelContext.getEndpoint(
            "file://" + outgoingEndpointDirectory + "?" + "fileName="
            + fileNameToSaveAndDownload + "&readLock=markerFile");
}

}

Question:

I bring a file from Amazon S3 and process it with Spring Batch, all integrated with Apache Camel.

After the batch, message body is not the file content anymore, it's de Job. But I want to move the file to another s3 bucket. How can I get the File content back? (I do have the headers)

(Using S3 is circumstantial)

        from("aws-s3://my-bucket...")
            .idempotentConsumer(header("CamelAwsS3Key"),
                  MemoryIdempotentRepository.memoryIdempotentRepository(200))
            .setHeader("CamelFileName", simple("${in.header.CamelAwsS3Key}"))
            .to("file:my-files/")
            .log("Copied from S3.")
            .to("spring-batch:historicoJob")
            .choice()
            .when(simple("${in.body.status} == 'COMPLETED'"))
            .to("direct:moveIt")
            .endChoice();

    from("direct:moveIt")
             ...
            .to("aws-s3://done-bucket...");

Answer:

You can get access to the original message (there is some api on camel exchange -> unit of work), or you can copy the message body to an exchange property before the spring batch, and then restore it again afterwards.

Question:

Is there a way to consume all the files in a S3 bucket without removing the files from S3 (in the S3, there are about 15,000 files)?

With the lag of noop parameter in aws-s3, the following configuration has a problem. And that problem is: it constantly retrieve the same 5 files over and over again.

    <endpoint id="fbPage" uri="aws-s3://bucket?amazonS3Client=#aws-credential&amp;deleteAfterRead=false&amp;maxMessagesPerPoll=5&amp;prefix=dev/facebook/page"/>

    <route id="consumeS3FbPage">
        <from uri="ref:fbPage"/>
        <choice>
            <when>                  
                <simple>${header.CamelAwsS3ContentLength}  &gt; 0</simple> 
                <log message="Page File detected: ${header.CamelAwsS3Key}"/>
                <bean ref="dfaReportingRePull" method="s3toElasticFormat"/>

                <setHeader headerName="CamelHttpMethod">
                    <constant>POST</constant>
                </setHeader>
                <to uri="http://localhost:9200/fb_camel/page/_bulk"/>
                <log message="Success"/>
            </when>
            <when>
                <simple>${header.CamelAwsS3ContentLength} == 0</simple>
                <log message="Empty content, Probably the s3 key Folder itself: ${header.CamelAwsS3Key}"/>
            </when>
        </choice>               
    </route>

The following log shows that the same file retrieved over and over again:

[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:46,904 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/05/31/9c9537e6-12a3-415e-aa3d-a450011008be.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:46,993 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:46,994 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/01/97d85443-74af-4d64-9808-a4500110117a.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:47,002 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:47,002 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/02/223410b2-b4ce-4b7f-8e47-a45001101254.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:47,010 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:47,011 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/03/e5c21710-d764-453d-9736-a4500110132e.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:47,019 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:47,019 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/04/851d3759-0c35-4679-838c-a4500110140b.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:47,027 INFO  consumeS3FbPage - Success


[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,375 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/05/31/9c9537e6-12a3-415e-aa3d-a450011008be.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,396 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,397 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/01/97d85443-74af-4d64-9808-a4500110117a.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,409 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,410 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/02/223410b2-b4ce-4b7f-8e47-a45001101254.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,419 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,420 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/03/e5c21710-d764-453d-9736-a4500110132e.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,429 INFO  consumeS3FbPage - Success
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,430 INFO  consumeS3FbPage - Page File detected: dev/facebook/page/166866083481130/2014/06/04/851d3759-0c35-4679-838c-a4500110140b.json
[Camel (camel-1) thread #0 - aws-s3://bucket] 21:26:51,439 INFO  consumeS3FbPage - Success

Even if i use Idempotent, it simply detect that all the 5 files are duplicate, and hence ignored.

I wonder if i do deleteAfterRead, and put it back will work? No, when i look at the code in http://camel.465427.n5.nabble.com/camel-aws-s3-get-only-files-I-need-td5714095.html , it seems the code only will loop through the list on the current return list from aws s3.

When i look at the code ListObjectsRequest.java, i see there is a way to define a marker, which indicate on which was the last processed s# key. Is there a way to get this market set via Camel Spring DSL? [Updated]No.

After drilling down the code, i found the root cause on why this is the case. And Is trackable via this JiRA ticket: https://issues.apache.org/jira/browse/CAMEL-8431

note: the Camel version is 2.14.0


Answer:

According to the Apache Committer Willem Jiang, the fix will be part of release 2.14.3. Camel-8431