Hot questions for Using Amazon S3 in apache kafka

Top Java Programmings / Amazon S3 / apache kafka

Question:

I want to store data from Kafka into a bucket s3 using Kafka Connect. I had already a Kafka's topic running and I had a bucket s3 created. My topic has data on Protobuffer, I tried with https://github.com/qubole/streamx and I obtained the next error:

 [2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
 [2018-10-04 13:35:46,512] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
 [2018-10-04 13:35:46,645] INFO Successfully joined group connect-s3-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
 [2018-10-04 13:35:46,692] INFO Setting newly assigned partitions [ssp.impressions-11, ssp.impressions-10, ssp.impressions-7, ssp.impressions-6, ssp.impressions-9, ssp.impressions-8, ssp.impressions-3, ssp.impressions-2, ssp.impressions-5, ssp.impressions-4, ssp.impressions-1, ssp.impressions-0] for Group connect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
 [2018-10-04 13:35:47,193] ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
 java.lang.NullPointerException
    at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-10-04 13:35:47,194] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2018-10-04 13:35:51,235] INFO Reflections took 6844 ms to scan 259 urls, producing 13517 keys and 95788 values (org.reflections.Reflections:229)

I did the next steps:

  1. I cloned the repository.
  2. mvn DskipTests package
  3. nano config/connect-standalone.properties

    bootstrap.servers=ip-myip.ec2.internal:9092
    key.converter=com.qubole.streamx.ByteArrayConverter
    value.converter=com.qubole.streamx.ByteArrayConverter
    
  4. nano config/quickstart-s3.properties

    name=s3-sink 
    connector.class=com.qubole.streamx.s3.S3SinkConnector
    format.class=com.qubole.streamx.SourceFormat tasks.max=1
    topics=ssp.impressions
    flush.size=3
    s3.url=s3://myaccess_key:mysecret_key@mybucket/demo
    
  5. connect-standalone /etc/kafka/connect-standalone.properties quickstart-s3.properties

I would like to know if that I did is okay or another way to keep data into S3 from Kafka.


Answer:

You can use Kafka Connect to do this integration, with the Kafka Connect S3 connector.

Kafka Connect is part of Apache Kafka, and the S3 connector is an open-source connector available either standalone or as part of Confluent Platform.

For general information and examples of Kafka Connect, this series of articles might help:

Disclaimer: I work for Confluent, and wrote the above blog articles.


April 2020: I have recorded a video showing how to use the S3 sink: https://rmoff.dev/kafka-s3-video

Question:

It's not clear to me if there is some kind of plugin to consume data from kafka topics and insert them to the S3.

I already found this topic, but I could not solve this issue yet, there is this project, but honestly is hard to use because the last commit is from 2 years ago.

The main goal to me it was consume directly from kafka to spark jobs, but I think this can be a kind of complicated, so if could populate S3 with slices of events from kafka is enough to me.

Also, there is any consumer example in scala? It is kind of funny because kafka is built in scala but the documentation code is java. =p

I appreciate any help

Updated:

Camus may be a option too


Answer:

This tool from pinterest was the perfect answer for me.

Secor

Question:

com.amazonaws.AmazonClientException: com.amazonaws.AmazonServiceException: Roles may not be assumed by root accounts. (Service: AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied;

I created a role and it's Trust Relationship is :

    {
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<awsID>:root",
        "Service": "ec2.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

I even tried creating a policy and assigned it to my role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::secorbackup"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::secorbackup/*"
            ]
        }
    ]
}

Nothing seems to work. I'm getting the same error. I am using pinterest/secor for log persistence from kafka to s3. Any suggestions?


Answer:

Roles may not be assumed by root accounts.

This error means exactly what it says.

You cannot assume a role while using a root account, under any circumstances. You have to use an IAM account.

There is no other workaround for this. The behavior is by design.