Hot questions for Using Cassandra in apache flink

Top Java Programmings / Cassandra / apache flink


Im doing some proof-of-concept work with Flink and have gotten to the point where I want to try my various jobs (topologies?) on a 4 node cluster.


  • 32 core
  • 96g Gb RAM

The topologies vary from 3 to 6 'tasks' (workers? bolt-analogs?). I'm hoping that someone here can offer some suggested settings. Specifically:

  • taskmanager.numberOfTaskSlots: set this to # of cores?
  • taskmanager.heap.mb: "This value should be as large as possible." 96Gb? Really?
  • parallelism.default: tried setting this to 30. Got this error1.
  • I tried upping this value but it didn't seem to have any effect. Tasks always show '1' for parallelism.
  • any other settings that people have found useful / interesting?

One task in particular: reading from Kafka where topic in question has 6 partitions. From each of these partitions I want to read, aggregate and write to Cassandra. When I implemented this job in Storm it had 6 bolts to read the data and several times as many to write. (IE 6 read, 18 write)

If Flink gets adopted by my company each machine will run numerous, simultaneous jobs. How will the config params alter under such a circumstance?

FWIW: cluster is v1.0-SNAPSHOT.

EDIT: This seems useful.

1 "Insufficient number of network buffers: required 30, but only 8 available. The total number of network buffers is currently set to 2048." Does this mean that 2000 buffers are being used when nothing is happening?


There are multiple sources that will help you to set the correct setting (see below)

To address your questions right away:

  • taskmanager.numberOfTaskSlots: set this to # of cores? yes
  • taskmanager.heap.mb: "This value should be as large as possible." 96Gb? Really? bacially yes, but it depends If you deploy Flink manually in a cluster, it is assumed that Flink is the only system running. It would be a waste if you do not make this parameter as large as possible. For a shared setup with other systems, you might want to consider to use YARN. On the ohter hand running in standalone cluster mode, you might want to leave some memory for the OS and if you know that there a other components running, you might not assign all memory to Flink. However, as Flink manages the memory internally, you should avoided to have unused memory.

  • parallelism.default: tried setting this to 30. Got this error. look at the following configuration paramters: and (also have a look here, here and here

  • I tried upping this value but it didn't seem to have any effect. Tasks always show '1' for parallelism. If I am not wrong, this parameter got replaced by parallelism.default

check out this links for more background info:

And general FAQ, the Flink Blog, and Flink Forward talks:


I am trying to pull in data from a cassandra table to use as a dataset, but have two issues that I have come across.

The first is that cassandraInputFormat only returns a tuple, and I'd prefer to not have a tuple12 and just use a pojo to define what it will expect back. So I don't know if this is just something I will have to accept, if there is a way to use a pojo instead like with the cassandraConnector (, or if using the cassandraInputFormat is not the best way to go about getting the data.

The other issue is even with the data I pull I get from the cassandraInputFormat (be it tuple or not) I don't know the way to set it as the data source. For files, csv and HDFS there is plenty of methods ( but none that are explicitly for cassandra. So my guess is that I would need to pull the data using the cassandraInputFormat and use something like .fromElements() or .fromCollecton() and what the proper way to do that is.

Thanks for any help in advance!


This "works" (and thanks to Chesnay Schepler for the help):

DataSet<Tuple2<String, String>> testSet = 
exEnv.createInput(cassandraInputFormat, TypeInformation.of(newTypeHint<Tuple2<String, String>>(){})); 

But this error is occurring now...

Exception in thread "main" org.apache.flink.optimizer.CompilerException: 
Error translating node 'Data Source "at execute( 
(org.apache.flink.batch.connectors.cassandra.CassandraInputFormat)" : NONE 
[[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]':
Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : flink.streaming.code.CodeBatchProcessorImpl

And further down it includes:

Caused by:


Had to set environment to transient. Fixed now!


You can use the CassandraInputFormat, and all InputFormats for that matter, by calling ExecutionEnvironment#createInput(InputFormat).

There is currently no option to directly read elements as POJOs. The easiest workaround is to add a MapFunction after the sink that converts the Tuples into your desired POJO.


I have a Cassandra database that have to receive its data in my Flink program from socket like steam for Streamprocessing. So, I wrote a simple client program that read data from Cassandra and sent the data to the socket;also,I wrote the Flink program in server base.In fact, my client program is simple and does not use any Flink instructions;it just send a Cassandra row in string format to socket and Server must receive the row. First, I run the Flink program to listen to the client and then run the client program. The client received this stream from server (because server send datastream data and client cannot receive it correctly):

Hi Client org.apache.flink.streaming.api.datastream.DataStreamSource@68c72235

After that both programs stay running without sending and receiving any data and there is no error.

The Flink program is in following: public class WordCount_in_cassandra {

 private static int myport=9999;
 private static String hostname="localhost";
 //static ServerSocket variable
 private static ServerSocket server;
 private static int count_row=0;

 public static void main(String[] args) throws Exception {
 // Checking input parameters
 final ParameterTool params = ParameterTool.fromArgs(args);
 // set up the execution environment
 final StreamExecutionEnvironment env = 

 //create the socket server object
    server = new ServerSocket(myport);
 // make parameters available in the web interface

    while (true){
        System.out.println("Waiting for client request");
        //creating socket and waiting for client connection
        Socket socket = server.accept();
        DataStream<String> stream = env.socketTextStream(hostname, 


        //write object to Socket
        oos.writeObject("Hi Client " + stream.toString());

        // parse the data, group it, window it, and aggregate the 
    DataStream<Tuple2<String, Long>> counts = stream
                .flatMap(new FlatMapFunction<String, Tuple2<String, 
    Long>>() {
            public void flatMap(String value, 
     Collector<Tuple2<String, Long>> out) {
                        // normalize and split the line
           String[] words = value.toLowerCase().split("\\W+");

                        // emit the pairs
             for (String word : words) {

                if (!word.isEmpty()) {
                   out.collect(new Tuple2<String, Long>(word, 1L));

        // emit result
        if (params.has("output")) {
        } else {
            System.out.println("Printing result to stdout. Use -- 
            output to specify output path.");


        //terminate the server if client sends exit request
        if (stream.equals("exit")){
            System.out.println("row_count : "+count_row);

        // execute program
        env.execute("Streaming WordCount");
    }//while true
    System.out.println("Shutting down Socket server!!");

The client program is like this:

public class client_code {
private static Cluster cluster = 
private static Session session = cluster.connect("mar1");

 public static void main(String[] args) throws UnknownHostException, 
   IOException, ClassNotFoundException, InterruptedException {
    String serverIP = "localhost";
    int port=9999;
    Socket socket = null;
    ObjectOutputStream oos = null;
    ObjectInputStream ois = null;

    ResultSet result = session.execute("select * from tlbtest15");
    for (Row row : result) {
        //establish socket connection to server
        socket = new Socket(serverIP, port);
        //write to socket using ObjectOutputStream
        oos = new ObjectOutputStream(socket.getOutputStream());
        System.out.println("Sending request to Socket Server");

        if (row==result) oos.writeObject("exit");
        else oos.writeObject(""+row+"");
        //read the server response message
        ois = new ObjectInputStream(socket.getInputStream());
        String message = (String) ois.readObject();
        System.out.println("Message: " + message);
        //close resources


Would you please tell me how I can solve my problem?

Any help would be appreciated.


There are several problems with the way you've tried to construct the Flink application. A few comments:

  • The Flink DataStream API is used to describe a dataflow graph that is sent to a cluster for execution when env.execute() is called. It doesn't make sense to wrap this in a while(true) loop.
  • socketTextStream sets up a client connection. Your server doesn't appear to do anything useful.
  • stream.equals("exit") -- stream is a DataStream, not a String. If you want to do something special when a stream element has a specific value, that needs to be done differently, by using one of the stream operations that does event-at-a-time processing. As for shutting down the Flink job, streaming jobs are normally designed to either run indefinitely, or to run until a finite input source reaches its end, at which point they shutdown on their own.

You can simplify things considerably. I would start over, and begin by replacing your client with a command line like this:

cqlsh -e "SELECT * from tlbtest15;" | nc -lk 9999

nc (netcat) will act as a server in this case, allowing Flink to be a client. This will make things easier, as that's how env.socketTextTream is meant to be used.

Then you'll be able to process the results with a normal Flink application. The socketTextStream will produce a stream containing the query's results as lines of text, one for each row.


I create a program to count words in Wikipedia. It works without any errors. Then I created the Cassandra table with two columns "word(text) and count(bigint)". The problem is when I wanted to enter words and counts to Cassandra table.My program is in following:

    public class WordCount_in_cassandra {

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface

     DataStream<String> text=env.addSource(new WikipediaEditsSource()).map(WikipediaEditEvent::getTitle);

       DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"

        // emit result
        if (params.has("output")) {
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");


                    .setQuery("INSERT INTO mar1.examplewordcount(word, count) values values (?, ?);")

        // execute program
        env.execute("Streaming WordCount");

  public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));


After running this code I got this error:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractCassandraTupleSink is not serializable. The object probably contains or references non serializable fields.

I searched a lot but I could not find any solutions for it.Would you please tell me how I can solve the issue?

Thank you in advance.


I tried to replicate your problem, but I didn't get the serialization issue. Though because I don't have a Cassandra cluster running, it fails in the open() call. But this happens after serialization, as it's called when the operator being started by the TaskManager. So it feels like you have something maybe wrong with your dependencies, such that it's somehow using the wrong class for the actual Cassandra sink.

BTW, it's always helpful to include context for your error - e.g. what version of Flink, are you running this from an IDE or on a cluster, etc.

Just FYI, here are the Flink jars on my classpath...



I’m using datastax driver to use Cassandra as sink for some data streams with Apache Flink: I have a problem executing my application raising an error at runtime about the queue which become full after some seconds. I discovered that the default value is 256, that is probably too low for my load, so I have raised it using poolingOptions setting maxRequestsPerConnection as suggested here:

Unfortunately with the following code I obtain the following error when I launch it:

The implementation of the ClusterBuilder is not serializable. 
The object probably contains or references non serializable fields.

My code:

PoolingOptions poolingOptions = new PoolingOptions();
      .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
      .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);

ClusterBuilder cassandraBuilder = new ClusterBuilder() {

    public Cluster buildCluster(Cluster.Builder builder) {
        return builder.addContactPoint(CASSANDRA_ADDRESS)

    .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
        + " (user, sensor, timestamp, rdf_stream, observed_value, value)"
        + " VALUES (?, ?, ?, ?, ?, ?);")

How can I deal with it?


You have to define the PoolingOptions within ClusterBuilder#buildCluster.


My flink program should do a Cassandra look up for each input record and based on the results, should do some further processing.

But I'm currently stuck at reading data from Cassandra. This is the code snippet I've come up with so far.

ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))

    for (int i=1; i<5; i++) {
        CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat =
                new CassandraInputFormat<>("select * from test where id=hello" + i, secureCassandraSinkClusterBuilder);
        Tuple2<String, String> out = new Tuple8<>();

But the issue with this is, it takes nearly 10 seconds for each look up, in other words, this for loop takes 50 seconds to execute.

How do I speed up this operation? Alternatively, is there any other way of looking up Cassandra in Flink?


I came up with a solution that is fairly fast at querying Cassandra with streaming data. Would be of use to someone with the same issue.

Firstly, Cassandra can be queried with as little code as,

Session session = secureCassandraSinkClusterBuilder.getCluster().connect();
ResultSet resultSet = session.execute("SELECT * FROM TABLE");

But the problem with this is, creating Session is a very time-expensive operation and something that should be done once per key space. You create Session once and reuse it for all read queries.

Now, since Session is not Java Serializable, it cannot be passed as an argument to Flink operators like Map or ProcessFunction. There are a few ways of solving this, you can use a RichFunction and initialize it in its Open method, or use a Singleton. I will use the second solution.

Make a Singleton Class as follows where we create the Session.

public class CassandraSessionSingleton {
    private static CassandraSessionSingleton cassandraSessionSingleton = null;

    public Session session;

    private CassandraSessionSingleton(ClusterBuilder clusterBuilder) {
        Cluster cluster = clusterBuilder.getCluster();
        session = cluster.connect();

    public static CassandraSessionSingleton getInstance(ClusterBuilder clusterBuilder) {
        if (cassandraSessionSingleton == null)
            cassandraSessionSingleton = new CassandraSessionSingleton(clusterBuilder);
        return cassandraSessionSingleton;


You can then make use of this session for all future queries. Here I'm using the ProcessFunction to make queries as an example.

public class SomeProcessFunction implements ProcessFunction <Object, ResultSet> {
    ClusterBuilder secureCassandraSinkClusterBuilder;

    // Constructor
    public SomeProcessFunction (ClusterBuilder secureCassandraSinkClusterBuilder) {
        this.secureCassandraSinkClusterBuilder = secureCassandraSinkClusterBuilder;

    public void  ProcessElement (Object obj) throws Exception {
        ResultSet resultSet = CassandraLookUp.cassandraLookUp("SELECT * FROM TEST", secureCassandraSinkClusterBuilder);
        return resultSet;

Note that you can pass ClusterBuilder to ProcessFunction as it is Serializable. Now for the cassandraLookUp method where we execute the query.

public class CassandraLookUp {
    public static ResultSet cassandraLookUp(String query, ClusterBuilder clusterBuilder) {
        CassandraSessionSingleton cassandraSessionSingleton = CassandraSessionSingleton.getInstance(clusterBuilder);
        Session session = cassandraSessionSingleton.session;
        ResultSet resultSet = session.execute(query);
        return resultSet;

The singleton object is created only the first time the query is run, after that, the same object is reused, so there is no delay in look up.


I have a flink project that will be inserting data in a cassandra table as a batch job. I already have a flink stream project where it is writing a pojo to the same cassandra table, but cassandraOutputFormat needs the data as a Tuple (hope that is changed to accept pojos like CassandraSink does at some point). So here is the pojo that I have that:

@Table(keyspace="mykeyspace", name="mytablename")
public class AlphaGroupingObject implements Serializable {

    @Column(name = "jobId")
    private String jobId;
    @Column(name = "datalist")
    private List<CustomDataObj> dataobjs;
    @Column(name = "userid")
    private String userid;

    //Getters and Setters

And the dataset of tuple I am making from this pojo:

DataSet<Tuple3<String, List<CustomDataObj>, String>> outputDataSet = AlphaGroupingObjectToTuple3Mapper());

And here is the line that triggers the output as well:

outputDataSet.output(new CassandraOutputFormat<>("INSERT INTO mykeyspace.mytablename (jobid, datalist, userid) VALUES (?,?,?);", clusterThatWasBuilt));

Now the issue that I have is when I try to run this, I get this error when it tries to output it to the cassandra table:

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: 
Codec not found for requested operation: [frozen<mykeyspace.dataobj> <->]

So I know when it was a pojo, I just had to add the @Frozen annotation to the field, but I don't know how to do that for a tuple. What is the best/proper way to fix this? Or am I doing something unnecessary because there is actually a way to send pojos through the cassandraOutputFormat I just haven't found?

Thanks for any and all help in advance!


Here is the code for the CustomDataObj class too:

@UDT(name="dataobj", keyspace = "mykeyspace")
public class CustomDataObj implements Serializable {

    @Field(name = "userid")
    private String userId;

    @Field(name = "groupid")
    private String groupId;

    @Field(name = "valuetext")
    private String valueText;

    @Field(name = "comments")
    private String comments;

    //Getters and setters


Including the table schema in cassandra that the CustomDataObj is tied to and the mytablename schema.

CREATE TYPE mykeyspace.dataobj (
    userid text,
    groupid text,
    valuetext text,
    comments text

CREATE TABLE mykeyspace.mytablename (
    jobid text,
    datalist list<frozen<dataobj>>,
    userid text,
    PRIMARY KEY (jobid, userid)


Add UDT Annotation on CustomDataObj class

@UDT(name = "dataobj")
public class CustomDataObj { 


Change jobid Annotation to @Column(name = "jobid") and dataobjs Frozen Annotation to @Frozen

@Table(keyspace="mykeyspace", name="mytablename")
public class AlphaGroupingObject implements Serializable {

    @Column(name = "jobid")
    private String jobId;

    @Column(name = "datalist")
    private List<CustomDataObj> dataobjs;
    @Column(name = "userid")
    private String userid;

    //Getters and Setters


While experimenting with Flink streaming together with Cassandra I ran into an interesting problem when trying to generate INSERT statements in a MapFunction. If I used a DataStream<Insert> I would get a confusing RuntimeException thrown at me. However, by using DataStream<Statement> instead, everything worked as I expected it to, even though I still use an Insert instance in the code that executes.

I found a solution (using DataStream<Statement>) by trial and error, but am still confused about what is causing this. Is it intentional or a bug? I have been unable to find any explanations by googling, so might as well ask here if anyone knows what is going on.

Expected output (using DataStream<Statement>):

log4j:WARN No appenders could be found for logger (
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See for more info.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-638132790]
01/17/2017 15:57:42 Job execution switched to status RUNNING.
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to RUNNING 
INSERT INTO tablename (name,age) VALUES ('Test Nameson',27);
01/17/2017 15:57:42 Source: Custom Source -> Map -> Sink: Unnamed(1/1) switched to FINISHED 
01/17/2017 15:57:42 Job execution switched to status FINISHED.

Error output (using DataStream<Insert>):

Exception in thread "main" java.lang.RuntimeException: The field private java.util.List com.datastax.driver.core.querybuilder.BuiltStatement.values is already contained in the hierarchy of the class com.datastax.driver.core.querybuilder.BuiltStatement.Please use unique field names through your classes hierarchy
    at se.hiq.bjornper.testenv.cassandra.SOCassandraQueryTest.main(

Code example (switch the commented code for the two different cases):

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;

public class SOCassandraQueryTest {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Map<String, Object>> myDataStream = env.addSource(new RichSourceFunction<Map<String, Object>>() {

            public void run(SourceContext<Map<String, Object>> ctx) throws Exception {
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("name", "Test Nameson");
                map.put("age", 27);

            public void cancel() {

        /* Works just fine */
        DataStream<Statement> debugDatastream = MapFunction<Map<String, Object>, Statement>() {

            public Statement map(Map<String, Object> datarow) throws Exception {
                Insert insert = QueryBuilder.insertInto("tablename");

                for (Entry<String, Object> e : datarow.entrySet()) {
                    insert.value(e.getKey(), e.getValue());
                return insert;

        /* Throws RuntimeException if using "Insert" instead of "Statement" */
//        DataStream<Insert> debugDatastream = MapFunction<Map<String, Object>, Insert>() {
//            @Override
//            public Insert map(Map<String, Object> datarow) throws Exception {
//                Insert insert = QueryBuilder.insertInto("tablename");
//                for (Entry<String, Object> e : datarow.entrySet()) {
//                    insert.value(e.getKey(), e.getValue());
//                }
//                return insert;
//            }
//        });




  • Java 8
  • Flink 1.1.3 (Cassabdra driver from this maven package)
  • Eclipse IDE


Flink is analyzing the types you are sending over the wire to generate fast serializers, and to allow access to your keys when building windows or when shuffling data over the network.

The problem here is probably the following: - When using Insert as the user type, Flink tries to generate a PojoSerializer for the type, but fails with that RuntimeException. I think the behavior is not correct. I've filed a bug report in the Flink project for the issue. - For the Statement, Flink sees that it can not serialize the type as a POJO, so its falling back to a generic serializer (in Flink's case Kryo).

I think this documentation page is the closest we have that describes how Flink's serialization stack works: