Hot questions for Using Cassandra in rest

Question:

I am using Spark to consume data from Kafka and save it in Cassandra. My program is written in Java. I am using the spark-streaming-kafka_2.10:1.6.2 lib to accomplish this. My code is:

SparkConf sparkConf = new SparkConf().setAppName("name");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String,String> kafkaParams = new HashMap<>();
kafkaParams.put("zookeeper.connect", "127.0.0.1");
kafkaParams.put("group.id", App.GROUP);
JavaPairReceiverInputDStream<String, EventLog> messages =
  KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
    @Override
    public EventLog call(Tuple2<String, EventLog> tuple2) {
        return tuple2._2();
    }
});
lines.foreachRDD(rdd -> {
    javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra();
});
jssc.start();

In my Cassandra table event_log, there is a column named offsetid to store the offset ID of the stream. How do I get the offset id till where this stream has read the Kafka stream and store it in Cassandra?

After saving it in Cassandra, I want to use the latest offset id to be used when Spark is started again. How do I do that?


Answer:

Below is the code for reference you may need to change the things as per your requirement. What I have done with the code and approach is that maintain Kafka partition wise offset for each topic in Cassandra(This can be done in zookeeper also as a suggestion using its java api). Store or update the the latest offset range for the topic with each string message received, in EventLog table. So always retrieve from table and see if present, then create direct stream from that offset, otherwise fresh direct stream.

package com.spark;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;

import scala.Tuple2;

public class KafkaChannelFetchOffset {
    public static void main(String[] args) {
        String topicName = "topicName";
        SparkConf sparkConf = new SparkConf().setAppName("name");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topicName));
        HashMap<TopicAndPartition, Long> kafkaTopicPartition = new HashMap<TopicAndPartition, Long>();
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("zookeeper.connect", "127.0.0.1");
        kafkaParams.put("group.id", "GROUP");
        kafkaParams.put("metadata.broker.list", "127.0.0.1");
        List<EventLog> eventLogList = javaFunctions(jssc).cassandraTable("test", "event_log", mapRowTo(EventLog.class))
                .select("topicName", "partion", "fromOffset", "untilOffset").where("topicName=?", topicName).collect();
        JavaDStream<String> kafkaOutStream = null;
        if (eventLogList == null || eventLogList.isEmpty()) {
            kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
                    topicsSet).transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
                @Override
                public JavaRDD<String> call(JavaPairRDD<String, String> pairRdd) throws Exception {
                    JavaRDD<String> rdd = pairRdd.map(new Function<Tuple2<String, String>, String>() {
                        @Override
                        public String call(Tuple2<String, String> arg0) throws Exception {
                            return arg0._2;
                        }
                    });
                    writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
                    return rdd;
                }
            });
        } else {
            for (EventLog eventLog : eventLogList) {
                kafkaTopicPartition.put(new TopicAndPartition(topicName, Integer.parseInt(eventLog.getPartition())),
                        Long.parseLong(eventLog.getUntilOffset()));
            }
            kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class,
                    kafkaParams, kafkaTopicPartition, new Function<MessageAndMetadata<String, String>, String>() {
                        @Override
                        public String call(MessageAndMetadata<String, String> arg0) throws Exception {
                            return arg0.message();
                        }
                    }).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {

                @Override
                public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
                    writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());
                    return rdd;
                }
            });
        }
        // Use kafkaOutStream for further processing.
        jssc.start();
    }

    private static void writeOffset(JavaRDD<String> rdd, final OffsetRange[] offsets) {
        for (OffsetRange offsetRange : offsets) {
            EventLog eventLog = new EventLog();
            eventLog.setTopicName(String.valueOf(offsetRange.topic()));
            eventLog.setPartition(String.valueOf(offsetRange.partition()));
            eventLog.setFromOffset(String.valueOf(offsetRange.fromOffset()));
            eventLog.setUntilOffset(String.valueOf(offsetRange.untilOffset()));
            javaFunctions(rdd).writerBuilder("test", "event_log", null).saveToCassandra();
        }
    }
}

Hope this helps and resolve your problem...

Question:

Let's take a java ee application in production, which uses prepared statements. Let's assume that it prepared required statements on initialization of its session beans.

In cassandra, each prepared statement has an id. If now, if cassandra is restarted, will java ee application also needs to get restarted?


Answer:

Almost all Cassandra clients should support this situation without having to restart (all datastax drivers do to my knowledge). For example, in the java-driver, when a node goes down and comes up again, one of the first things it does when marking it as available is reprepare all PreparedStatements your application has registered.

In addition, if you make a query using a PreparedStatement in your application and a Cassandra node responds that it does not know about that statement, the client should prepare that query and retry the request.

Question:

I am running cassandra database on a cloud. I have created a Rest web service to query the cassandra Db. I want to take input from the user who will be using the UI of my web service

Following is the code that i run:

public String cql2(int psa)
    {
        Cluster.Builder clusterBuilder = Cluster.builder()
                .addContactPoints("52.36.24.246").withPort(9042)
                .withQueryOptions(new QueryOptions().setFetchSize(2000))
                .withCredentials("username", "password");
        Session session = clusterBuilder.build().connect();
        String cqlStatement = "SELECT * FROM godfather.crime WHERE           psa='"+psa+"' ALLOW FILTERING";
        for (Row row : session.execute(cqlStatement))
        {
            cql=(row.toString());
        }
        return cql;
    }
}

Answer:

In general when a query is meant to be executed many times through the life of the application (with or without different parameters), prepared statements are the optimal solution.

Basically your code will become:

public class MyService {
    private Cluster cluster;
    private Session session;
    private PreparedStatement query;

     // exception handling not included to keep the code short
    public void init() {
        cluster = Cluster.builder()
            .addContactPoints("52.36.24.246").withPort(9042)
            .withQueryOptions(new QueryOptions().setFetchSize(2000))
            .withCredentials("username", "password")
            .build();
        session = cluster.connect();
        query = session.prepare("SELECT * FROM godfather.crime WHERE psa= ?");
    }

    public String execQuery(int psa) {
        for(Row r : session.execute(prepared.bind(psa))) {
            // your processing here
        }
    }

}

A couple of additional details:

  1. it is recommended to initialize the Cluster and Session only once per application. These are long lived objects that should be kept around.
  2. it is recommended to prepare statements only once
  3. things like exception handling, initialization, etc are not included in the code above for keeping it simple.

If you want to learn more about prepare statements:

  1. Cassandra, as other databases, does support prepared statements. You can read more about the advantages of prepared statements on this Wikipedia article
  2. Prepared statements in Cassandra are very well documented in Prepared statements in the Java driver for Cassandra

Question:

I've created simple jersey rest api to query cassandra. Ajax Long polling is also used. But with each requests memory, ports usage and threads increases even after request completion these do not decreases. closed the cassandra session also but no luck. Below is the rest code

@Path("/pull")
@GET
@Produces(MediaType.APPLICATION_JSON)
public JSONArray get(@QueryParam("offset") String offset, @QueryParam("currentUser") String currentUser,
        @QueryParam("longPolling") boolean longPolling) {
    JSONArray messages = new JSONArray();
    JSONObject message = null;
    Session session = Connector.getSession();
    String query = "SELECT * FROM messages";
    ResultSet resultSet = null;
    PreparedStatement ps = null;
    BoundStatement stmt = null;
    int timeOut = 0;
    try {
        while (messages.length() == 0 && timeOut++ < 30) {
            if (longPolling && !offset.equals("0")) {
                query = "SELECT * FROM messages WHERE id > ? ALLOW FILTERING";
                ps = session.prepare(query);
                stmt = ps.bind(UUID.fromString(offset));
                resultSet = session.execute(stmt);
            } else {
                resultSet = session.execute(query);
            }
            List<Row> ls = resultSet.all();
            ls.sort(new Sort<Row>());
            for (Row row : ls) {
                if ((!longPolling || !row.getString("sentby").equals(currentUser))) {
                    message = new JSONObject();
                    message.put("id", row.getUUID("id"));
                    message.put("sentby", row.getString("sentby"));
                    message.put("message", row.getString("message"));
                    messages.put(message);
                }
            }
            if (messages.length() == 0 && longPolling) {
                Thread.sleep(1000);
            }
        }
    } catch (JSONException | InterruptedException e) {
        e.printStackTrace();
    } finally {
        message = null;
        resultSet = null;
        session.close();
    }
    return messages;
}

Answer:

I think the main problem is that you are initialising session and tearing it down every time. Session is a relatively heavy object and only one should be used per application. Please put the session to some other bean and then use dependency injection in your code here to get it. Don't open and close session per request. This will definitely not be performant.

Also don't prepare the statements every time. On this bean of yours create one prepared statement only and use it across requests.

Question:

I'm trying to fetch records from Cassandra using BoundStatement's bind method and then doing session.execute on that. Suppose the json looks like this as shown below

{
id:1,
name:abc,
.
.
.
}

Here,if I want to query using multiple columns (id=1 and name=abc),I'm able to fetch the records successfully.

However, my json looks something like this:

{
id:1
name:["abc","xyz"]
.
.
.
}

In the database, it gets saved in two different rows(as there are list of values associated with the name attribute)

The problem is how can I fetch the records from the database?

P.S.:id and name are compound partition key


Answer:

Since id and name are part of a compound partition key you will always need to search by both of them. So you can take your JSON and fan out the name list into a few select queries or use an IN.

Recommended Solution

Use two different SELECT queries at the same time then aggregate the results this allows each SELECT to hit the best node assuming you are using TokenAware.

I use RxJava to spread out to many requests then merge them back to a list of rows.

public class CassandraService {
  //...

  public void run() {

    String id = "1";
    List<String> names = ['abc','xyz'];


    PreparedStatement selectStmt = session.prepare("SELECT id, name, value FROM table WHERE id=? AND name=?;");
    List statements = new ArrayList<Statement>();

    for(String name : names) {
      statements.add(selectStmt.bind(id,name));
    }

    Observable<Row> rows = execute(statements);

    //Do work with rows.
  }


  public Observable<ResultSet> executeAndReturnResultSet(Statement statement) {
    return Observable.from(session.executeAsync(statement));
  }


  public Observable<Row> execute(List<Statement> statements) {
    List<Observable<ResultSet>> resultSets = Lists.transform(statements, this::executeAndReturnResultSet);
    return Observable.merge(resultSets).flatMap(Observable::from);
  }
}

Alternative

Otherwise you could use IN like the following CQL:

SELECT * FROM multi_partition_key WHERE id='1' AND name IN ('abc','xyz');

For binding to a prepared statement in the Java Driver.

String id = "1";
List<String> names = ['abc','xyz'];

PreparedStatement sel = session.prepare("SELECT id, name, value FROM table WHERE id=? AND name IN ?;");
session.execute(sel.bind(id,name));