Hot questions for Using Cassandra in bigdata

Question:

I have a three nodes Cassandra Cluster and I have created one table which has more than 2,000,000 rows.

When I execute this (select count(*) from userdetails) query in cqlsh, I got this error:

OperationTimedOut: errors={}, last_host=192.168.1.2

When I run count function for less row or with limit 50,000 it works fine.


Answer:

count(*) actually pages through all the data. So a select count(*) from userdetails without a limit would be expected to timeout with that many rows. Some details here: http://planetcassandra.org/blog/counting-key-in-cassandra/

You may want to consider maintaining the count yourself, using Spark, or if you just want a ball park number you can grab it from JMX.

To grab from JMX it can be a little tricky depending on your data model. To get the number of partitions grab the org.apache.cassandra.metrics:type=ColumnFamily,keyspace={{Keyspace}},scope={{Table​}},name=EstimatedColumnCountHistogram mbean and sum up all the 90 values (this is what nodetool cfstats outputs). It will only give you the number that exist in sstables so to make it more accurate you can do a flush or try to estimate number in memtables from the MemtableColumnsCount mbean

For a very basic ballpark number you can grab the estimated partition counts from system.size_estimates across all the ranges listed (note that this is only number on one node). Multiply that out by number of nodes, then divided by RF.

Question:

I have one spark dataset Dataset<T> loaded from Cassandra Table, and I want to apply list of operations (chain or pipeline) on this dataset.

For example:

Dataset<T> dataset= sparkSession.createDataset(javaFunctions(spark.sparkContext())
                    .cassandraTable(...));

Dataset<Row> result = dataset.apply(func1()).apply(func2()).apply(func3());

func1() will replace null values with most frequent ones.

func2() will add new columns with new values.

func3() ....etc.

What is the best way to apply this pipeline of functions?


Answer:

If your functions accept Datasets and return Datasets, ie. have the signature:

public Dataset[U] myMethod(Dataset[T] ds) {
  ...
}

Then you can use the transform method defined on a Dataset to neatly apply your functions.

ds.tranform(myMethod)
  .transform(myMethod1)
  .transform(myMethod2)

If the functions are on standard Java objects, eg.

public U myMethod(T row) {
  ...
}

Then you want the map method defined on a Dataset.

ds.map(myMethod)
  .map(myMethod1)
  .map(myMethod2)

Full API docs: https://spark.apache.org/docs/2.3.0/api/java/index.html?org/apache/spark/sql/Dataset.html

Question:

I am using Apache Spark to analyse the data from Cassandra and will insert the data back into Cassandra by designing new tables in Cassandra as per our queries. I want to know that whether it is possible for spark to analyze in real time? If yes then how? I have read so many tutorials regarding this, but found nothing.

I want to perform the analysis and insert into Cassandra whenever a data comes into my table instantaneously.


Answer:

This is possible with Spark Streaming, you should take a look at the demos and documentation which comes packaged with the Spark Cassandra Connector.

https://github.com/datastax/spark-cassandra-connector

This includes support for streaming, as well as support for creating new tables on the fly.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md

Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Cassandra.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#saving-rdds-as-new-tables

Use saveAsCassandraTable method to automatically create a new table with given name and save the RDD into it. The keyspace you're saving to must exist. The following code will create a new table words_new in keyspace test with columns word and count, where word becomes a primary key:

case class WordCount(word: String, count: Long) val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60))) collection.saveAsCassandraTable("test", "words_new", SomeColumns("word", "count"))

Question:

Am learning Cassandra and have written a restclient which would read a file from the DB.

Below is my code to retrieve the data from the DB.

        ResultSet rs = getFile(fileName);
        Row row=rs.one();

        ByteBuffer filecontent =row.getBytes("file_content");
        byte[] data = new byte[filecontent.remaining()];
        ByteBuffer bb = filecontent.get(data);

        String filelocation = row.getString("file_location");
        String filename = row.getString("filename");

        ByteArrayInputStream filecontentfromDB= new ByteArrayInputStream(bb.array());
        File file=writeToFile(filecontentfromDB,fileName);

        if (rs == null) {
            return null;
        }
        return file;

when I look at the returned file, I find some junk characters at the beginning and followed by my file contents.

Please help me in removing the junk data


Answer:

Yipee, found a solution, below lines should be replaced

String filelocation = row.getString("file_location");
        String filename = row.getString("filename");

        ByteArrayInputStream filecontentfromDB= new ByteArrayInputStream(bb.array());

with

String filelocation = row.getString("file_location");
        String filename = row.getString("filename");
String bb = new String(filecontent.array(), filecontent.arrayOffset() + filecontent.position(), filecontent.remaining());
        ByteArrayInputStream filecontentfromDB= new ByteArrayInputStream(bb.getBytes());

It works .