Hot questions for Using Amazon S3 in spring integration

Top Java Programmings / Amazon S3 / spring integration

Question:

I am working on a project where I am required to poll S3 bucket for files and upload in a different S3 bucket. As a first step to implementing it, I am trying to poll S3 bucket for new files created and create them in my local directory using Spring Integration. To achieve that I have created a simple spring-boot application with maven with the below object polling configuration while handles the fileReading IntegrationFlow

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class ObjectPollerConfiguration {
    @Value("${amazonProperties.bucketName}")
    private String bucketName;
    public static final String OUTPUT_DIR2 = "target2";
    @Autowired
    private AmazonClient amazonClient;
    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(amazonClient.getS3Client());
        synchronizer.setDeleteRemoteFiles(true);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setRemoteDirectory(bucketName);            
        return synchronizer;
    }
    @Bean
    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() {
        S3InboundFileSynchronizingMessageSource messageSource =
                new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new File("."));
        messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return messageSource;
    }
    @Bean
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }
    @Bean
    IntegrationFlow fileReadingFlow() {
        return IntegrationFlows
                .from(s3InboundFileSynchronizingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
                .handle(fileProcessor())
                .get();
    }
    @Bean
    public MessageHandler fileProcessor() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR2));
        handler.setExpectReply(false); // end of pipeline, reply not needed
        return handler;
    }
}*

But when I start my application as a java application and upload files to S3, I don't see the target2 directory with file nor getting any logs corresponding to polling execution. Can someone help me to get it working ?


Answer:

I think the problem that you don't use your OUTPUT_DIR2 property for local dir to push into.

Your code for local dir is like this:

messageSource.setLocalDirectory(new File("."));

This is fully not what you are looking for. Try to change it into the

messageSource.setLocalDirectory(new File(OUTPUT_DIR2));

Question:

I'm using a AmazonS3InboundSynchronizationMessageSource to read in what may amount to millions of files strewn across an S3 bucket's sub-directories organized by type >> year >> month >> day >> hour >> {filename}-{uniqueid}.gz. Ideally, I'd like to poll and write and have the Synchronizer remember the last place I read from on subsequent polls to retrieve the subsequent batch. This is NOT the way the above MessageSource is designed however.

Anyhow, I can get around that problem by picking a range and reading in contents.

Beside that if I take a simple approach and read in files from one directory on first poll; I want to shutdown (System.exit) after that (actually after some processing in comments below).

So, similar to what was asked here:

Spring Integration Inbound-channel-adapter: make one poll and exit

I want to poll only once and exit after the first poll. (Maybe there's a different way to go about it? I'm open to suggestions).

app bootstrap

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationApp extends SpringBootServletInitializer {

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
    return application.sources(DataMigrationApp.class);
}

public static void main(String[] args) {
    SpringApplication.run(DataMigrationApp.class, args);
}

}

UPDATED (2015-09-06)

code sample

@Configuration
public class DataMigrationModule {

private final Logger log = LoggerFactory.getLogger(getClass());

@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;

@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;

@Value("${cloud.aws.s3.bucket}")
private String bucket;

@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;

@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;

@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;

@Value("${cloud.aws.s3.local-directory:target/s3-dump}")
private String localDirectory;

@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;

@Value("${app.persistent-type:}")
private String persistentType;

@Value("${app.repository-type:}")
private String repositoryType;

@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;

@Autowired
private ListableBeanFactory beanFactory;

private final AtomicBoolean invoked = new AtomicBoolean();

public Date nextExecutionTime(TriggerContext triggerContext) {
    return this.invoked.getAndSet(true) ? null : new Date();
}

private FileToInputStreamTransformer unzipTransformer() {
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);
    return transformer;
}

private Class<?> repositoryType() {
    try {
        return Class.forName(repositoryType);
    } catch (ClassNotFoundException cnfe) {
        log.error("DataMigrationModule.failure -- (Unknown repository implementation!)", cnfe);
        System.exit(0);
    }
    return null;
}

private Class<?> persistentType() {
    try {
        return Class.forName(persistentType);
    } catch (ClassNotFoundException cnfe) {
        log.error("DataMigrationModule.failure -- (Unsupported type!)", cnfe);
        System.exit(0);
    }
    return null;
}

@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
    AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
    AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
    messageSource.setCredentials(credentials);
    messageSource.setBucket(bucket);
    messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
    messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
    messageSource.setRemoteDirectory(remoteDirectory);
    if (!fileNameWildcard.isEmpty()) {
        messageSource.setFileNameWildcard(fileNameWildcard);
    }
    String directory = System.getProperty("java.io.tmpdir");
    if (!localDirectory.startsWith("/")) {
        localDirectory = "/" + localDirectory;
    }
    if (!localDirectory.endsWith("/")) {
        localDirectory = localDirectory + "/";
    }
    directory = directory + localDirectory;
    FileUtils.mkdir(directory);
    messageSource.setDirectory(new LiteralExpression(directory));
    return messageSource;
}

@Bean
DirectChannel inputChannel() {
    return new DirectChannel();
}

@Bean 
JdbcRepositoryHandler jdbcRepositoryHandler() {
    return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}

@Bean
public IntegrationFlow flow() {
    // formatter:off
    return IntegrationFlows
            .from(
                    this.amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            // TODO add advised PollableChannel to deal with possible decompression issues

            .split(f -> new FileSplitter())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .transform(Transformers.fromJson(persistentType()))
            // TODO add advised PollableChannel to deal with possible transform issues

            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> 
                            a.releaseStrategy(g -> g.size() == persistenceBatchSize)
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            .groupTimeoutExpression("size() ge 2 ? 10000 : -1")
                            , null
            )
            .handle(jdbcRepositoryHandler())
            // TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
            .get();
    // formatter:on
}

public class JdbcRepositoryHandler extends AbstractReplyProducingMessageHandler {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @SuppressWarnings("rawtypes")
    private Insertable repository;

    public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
        repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
    }

    @Override
    protected Object handleRequestMessage(Message<?> message) {
        List<?> result = null;
        try {
            result = repository.insert((List<?>) message.getPayload());
        } catch (TransactionSystemException | DataAccessException e) {
            // TODO Quite a bit more work to add retry capability for records that didn't cause failure
            log.error("DataMigrationModule.failure -- (Could not persist batch!)", ExceptionUtils.getStackTrace(e));
        }
        return result;
    }

}

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    @Override
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload));
    }
}

}

Answer:

Actually not sure what is your question.

You go right way, by the way.

For the OnlyOnceTrigger you can use something like this from my test-cases:

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
        return this.invoked.getAndSet(true) ? null : new Date();
    }

...

    e -> e.poller(p -> p.trigger(this::nextExecutionTime))

For unzipping your file you should do something like this:

.<File, InputStream>transform(p -> new GZIPInputStream(new FileInputStream(p)))

And you must do this just because there is a FileSplitter out-of-the-box component to read files line by line and emit message for each of them. And that one supports InputStream as a payload to allow you to avoid loading the entire file to the memory.

So, the next EIP-method in your IntegrationFlow is like:

.split(new FileSplitter())

Not sure after that if you need to aggregate each domain object to some list to make the further batch insertion, because you can do that one by one distributing them via ExecutorChannel

As you see there is no reason in the delete unpacked file step.

As well as in the last delete all *.gz files step. Just because you may rely on the AcceptOnceFileListFilter to avoid re-reading the same file on the next poll task.

Let me know if I have missed anything.