Hot questions for Using Cassandra in apache beam

Question:

You can read about my setup here. I solved problems, described there, but I have new one.

I am reading the data from 3 tables. I have problem with one (the largest) table. I've read a lot of data from the table with great rate ~300000 rows/s, but after ~10 hours (and when reading from other two tables finished) it decreased to ~20000 rows/s. And after 24 hours it's not finished yet.

There are a lot of suspicious lines in the log:

I  Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Proposing dynamic split of work unit cybrmt;2018-01-17_22_54_11-12138573770170126316;3251780906818434621 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source.
UPD

The job ended with exception:

(f000632be487340d): Workflow failed. Causes: (844d65bb40eb132b): S14:Read from Cassa table/Read(CassandraSource)+Transform to KV by id+CoGroupByKey id/MakeUnionTable0+CoGroupByKey id/GroupByKey/Reify+CoGroupByKey id/GroupByKey/Write failed., (c07ceebe5d95f668): A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
  starterpipeline-sosenko19-01172254-4260-harness-wrdk,
  starterpipeline-sosenko19-01172254-4260-harness-xrkd,
  starterpipeline-sosenko19-01172254-4260-harness-hvfd,
  starterpipeline-sosenko19-01172254-4260-harness-0pf5
About CoGroupByKey

There are two tables. One have ~2 billion rows, each with unique key (1 row per key). Second have ~20 billion rows with less or equal 10 rows per key.

Graph of the pipeline

Here is what inside CoGroupByKey match_id block:

Code of the pipeline
// Create pipeline
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

// Read data from Cassandra table opendota_player_match_by_account_id2
PCollection<OpendotaPlayerMatch> player_matches = p.apply("Read from Cassa table opendota_player_match_by_account_id2", CassandraIO.<OpendotaPlayerMatch>read()
        .withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
        .withKeyspace("cybermates").withTable(CASSA_OPENDOTA_PLAYER_MATCH_BY_ACCOUNT_ID_TABLE_NAME)
        .withEntity(OpendotaPlayerMatch.class).withCoder(SerializableCoder.of(OpendotaPlayerMatch.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Transform player_matches to KV by match_id
PCollection<KV<Long, OpendotaPlayerMatch>> opendota_player_matches_by_match_id = player_matches
        .apply("Transform player_matches to KV by match_id", ParDo.of(new DoFn<OpendotaPlayerMatch, KV<Long, OpendotaPlayerMatch>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                // LOG.info(c.element().match_id.toString());
                c.output(KV.of(c.element().match_id, c.element()));
            }
        }));

// Read data from Cassandra table opendota_match
PCollection<OpendotaMatch> opendota_matches = p.apply("Read from Cassa table opendota_match", CassandraIO.<OpendotaMatch>read()
        .withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
        .withKeyspace("cybermates").withTable(CASSA_OPENDOTA_MATCH_TABLE_NAME).withEntity(OpendotaMatch.class)
        .withCoder(SerializableCoder.of(OpendotaMatch.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Read data from Cassandra table match
PCollection<OpendotaMatch> matches = p.apply("Read from Cassa table match", CassandraIO.<Match>read()
        .withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
        .withKeyspace("cybermates").withTable(CASSA_MATCH_TABLE_NAME).withEntity(Match.class)
        .withCoder(SerializableCoder.of(Match.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL))
        .apply("Adopt match for uniform structure", ParDo.of(new DoFn<Match, OpendotaMatch>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                // LOG.info(c.element().match_id.toString());
                OpendotaMatch m = new OpendotaMatch();

                // opendota_match and  match tables have slightly different schema. I've cut out conversion here because it's large and dummy

                c.output(m);
            }
        }));


// Union match and opendota_match
PCollectionList<OpendotaMatch> matches_collections = PCollectionList.of(opendota_matches).and(matches);
PCollection<OpendotaMatch> all_matches = matches_collections.apply("Union match and opendota_match", Flatten.<OpendotaMatch>pCollections());

// Transform matches to KV by match_id
PCollection<KV<Long, OpendotaMatch>> matches_by_match_id = all_matches
        .apply("Transform matches to KV by match_id", ParDo.of(new DoFn<OpendotaMatch, KV<Long, OpendotaMatch>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                // LOG.info(c.element().players.toString());
                c.output(KV.of(c.element().match_id, c.element()));
            }
        }));

// CoGroupByKey match_id
// Replicate data
final TupleTag<OpendotaPlayerMatch> player_match_tag = new TupleTag<OpendotaPlayerMatch>();
final TupleTag<OpendotaMatch> match_tag = new TupleTag<OpendotaMatch>();
PCollection<KV<Long, PMandM>> joined_matches = KeyedPCollectionTuple
        .of(player_match_tag, opendota_player_matches_by_match_id).and(match_tag, matches_by_match_id)
        .apply("CoGroupByKey match_id", CoGroupByKey.<Long>create())
        .apply("Replicate data", ParDo.of(new DoFn<KV<Long, CoGbkResult>, KV<Long, PMandM>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                try {
                    OpendotaMatch m = c.element().getValue().getAll(match_tag).iterator().next();
                    Iterable<OpendotaPlayerMatch> pms = c.element().getValue().getAll(player_match_tag);
                    for (OpendotaPlayerMatch pm : pms) {
                        if (0 <= pm.account_id && pm.account_id < MAX_UINT) {
                            for (OpendotaPlayerMatch pm2 : pms) {                                   
                                c.output(KV.of(pm.account_id, new PMandM(pm2, m)));
                            }
                        }
                    }
                } catch (NoSuchElementException e) {
                    LOG.error(c.element().getValue().getAll(player_match_tag).iterator().next().match_id.toString() + " " + e.toString());
                }
            }
        }));        


// Transform to byte array
// Write to BQ
joined_matches
        .apply("Transform to byte array, Write to BQ", BigQueryIO.<KV<Long, PMandM>>write().to(new DynamicDestinations<KV<Long, PMandM>, String>() {
            public String getDestination(ValueInSingleWindow<KV<Long, PMandM>> element) {
                return element.getValue().getKey().toString();
            }

            public TableDestination getTable(String account_id_str) {
                return new TableDestination("cybrmt:" + BQ_DATASET_NAME + ".player_match_" + account_id_str,
                        "Table for user " + account_id_str);
            }

            public TableSchema getSchema(String account_id_str) {
                List<TableFieldSchema> fields = new ArrayList<>();
                fields.add(new TableFieldSchema().setName("value").setType("BYTES"));
                return new TableSchema().setFields(fields);
            }
        }).withFormatFunction(new SerializableFunction<KV<Long, PMandM>, TableRow>() {
            public TableRow apply(KV<Long, PMandM> element) {
                OpendotaPlayerMatch pm = element.getValue().pm;                     
                OpendotaMatch m = element.getValue().m;
                TableRow tr = new TableRow();
                ByteBuffer bb = ByteBuffer.allocate(114);

                // I've cut out transform to byte buffer here because it's large and dummy

                tr.set("value", bb.array());
                return tr;                      
            }
        }));

p.run();
UPD2. I've tried to read the problem table alone

I've tried to read the problem table from above alone. Pipeline contains CassandraIO.Read transform and dummy ParDo transform with some logging output. And now it behaves like the full pipeline. There is one (I believe last) split that cannot be done:

I  Proposing dynamic split of work unit cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 

Here is the graph of the pipeline:

And here is the code:

// Create pipeline
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

// Read data from Cassandra table opendota_player_match_by_account_id2
PCollection<OpendotaPlayerMatch> player_matches = p.apply("Read from Cassa table opendota_player_match_by_account_id2", CassandraIO.<OpendotaPlayerMatch>read()
        .withHosts(Arrays.asList("10.132.9.101", "10.132.9.102", "10.132.9.103", "10.132.9.104")).withPort(9042)
        .withKeyspace("cybermates").withTable(CASSA_OPENDOTA_PLAYER_MATCH_BY_ACCOUNT_ID_TABLE_NAME)
        .withEntity(OpendotaPlayerMatch.class).withCoder(SerializableCoder.of(OpendotaPlayerMatch.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Print my matches
player_matches.apply("Print my matches", ParDo.of(new DoFn<OpendotaPlayerMatch, Long>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                if (c.element().account_id == 114688838) {
                    LOG.info(c.element().match_id.toString());
                    c.output(c.element().match_id);
                }
            }
        }));

p.run();
UPD3

The small pipeline (CassandraIO.Read and ParDo) successfully finished in 23 hours. First 4 hours there were max number of workers (40) and great read speed (~300000 rows/s). After that number of workers autoscaled to 1 as well as read speed to ~15000 rows/s. Here is the graph:

And here is the log end:

I  Proposing dynamic split of work unit cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Proposing dynamic split of work unit cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533 at {"fractionConsumed":0.5} 
I  Rejecting split request because custom reader returned null residual source. 
I  Success processing work item cybrmt;2018-01-20_21_28_01-3451798636786921663;1617811313034836533 
I  Finished processing stage s01 with 0 errors in 75268.681 seconds 

Answer:

I've finally used @jkff advise and read the data from a table with different partition key that is distributed more evenly (there were actually two tables with the same data but different partition keys in my data schema).

Question:

We have an apache beam dataflow job that reads the data from Big Query converts them into POJO before writing into Cassandra using Datastax driver. I recently added a new blob column to the table and added a ByteBuffer field to the POJO.

How I am creating the ByteBuffer

String str = objectMapper.writeValueAsString(installSkuAttributes);
byte[] bytes = str.getBytes( StandardCharsets.UTF_8 );
pojo.setInstallAttributes(ByteBuffer.wrap(bytes));

here is the pipeline snippet public void executePipeline() throws Exception {

Pipeline pipeline = 
Pipeline.create(jobMetaDataBean.getDataflowPipelineOptions());

.... writeDataToCassandra(installSkuData);

pipeline.run();

After making the necessary changes in the DAO writer I am getting the following exception When I ran the job. I am using Datastax driver

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.executePipeline(InstallSkusFullFeed.java:216)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.main(InstallSkusFullFeed.java:221)
Caused by: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:88)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
Caused by: java.io.NotSerializableException: java.nio.HeapByteBuffer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:166)
    at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:52)
    at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
    at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:189)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Answer:

The real problem is here:

java.io.NotSerializableException: java.nio.HeapByteBuffer

It's caused by the fact that ByteBuffer isn't serializable, so it couldn't be used for distributed work. You can avoid this by using byte[] directly as your attribute, or implement a wrapper around ByteBuffer that will be serializable.

Question:

Trying to compile and use the snapshot for Apache Beam Cassandra JAR. Seems like the build does not pack the Guava dependencies within the JAR. This causes compilation to fail when the JAR is used by other code - see following Exception:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/beam/vendor/guava/v20_0/com/google/common/base/Preconditions
    at org.apache.beam.sdk.io.cassandra.CassandraIO$Read.withHosts(CassandraIO.java:180)
    at org.apache.beam.examples.JoinFromCassandraToCassandra.main(JoinFromCassandraToCassandra.java:26)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 2 more

I couldn't find anyway to make the gradle build package the required dependencies within the JAR.

Building using command:

gradle -p ./sdks/java/io/cassandra shadowJar

Anyone knows how this can be done?


Answer:

So it appears you need to add the following dependencies to make it works at the latest version. Hope it helps someone.

compile group: 'org.apache.beam', name: 'beam-vendor-guava-20_0', version: '0.1' compile group: 'org.apache.beam', name: 'beam-vendor-grpc-1_13_1', version: '0.2'