Hot questions for Using Cassandra in performance

Question:

I have to store around 250 numerical values per second, per client, which is around 900k numbers per hour. It probably will not be a full-day recording (probably between 5-10 hours a day), but I will partition my data based on the client id and the day the reading is made. The maximum row length comes at about 22-23M which is still manageable. Neverteless, my scheme looks like this:

CREATE TABLE measurement (
  clientid text,
  date text,
  event_time timestamp,
  value int,
  PRIMARY KEY ((clientid,date), event_time)
);

The keyspace has a replication factor of 2, just for testing, the snitch is GossipingPropertyFileSnitch and NetworkTopologyStrategy. I know that replication factor 3 is more production standard.

Next up, I created a small cluster on the companies servers, three bare metal virtualized machines with 2 CPUs x 2 cores and 16GB of RAM and a lot of space. I'm in gigabit LAN with them. The cluster is operational, based on the nodetool.

Here is the code I'm using to test my setup:

        Cluster cluster = Cluster.builder()
                .addContactPoint("192.168.1.100")
                .addContactPoint("192.168.1.102")
                .build();
        Session session = cluster.connect();
        DateTime time = DateTime.now();
        BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);

    try {

        ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts

        String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement = session.prepare(insertQuery);
        BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also

        //generating the entries
        for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
            time = time.plus(4); //4ms between each entry
            BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
            batch.add(bound);

            //The batch statement must have 65535 statements at most
            if (batch.size() >= 65534) {
                queryQueue.put(batch);
                batch = new BatchStatement();
            }
        }
        queryQueue.put(batch); //the last batch, perhaps shorter than 65535

        //storing the data
        System.out.println("Starting storing");
        while (!queryQueue.isEmpty()) {
            pool.execute(() -> {
                try {

                    long threadId = Thread.currentThread().getId();
                    System.out.println("Started: " + threadId);
                    BatchStatement statement = queryQueue.take();
                    long start2 = System.currentTimeMillis();
                    session.execute(statement);
                    System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
                } catch (Exception ex) {
                    System.out.println(ex.toString());
                }
            });

        }
        pool.shutdown();
        pool.awaitTermination(120,TimeUnit.SECONDS);


    } catch (Exception ex) {
        System.out.println(ex.toString());
    } finally {
        session.close();
        cluster.close();
    }

I came up with the code by reading posts here and on other blogs and websites. As I understood it is important for the client to use multiple threads, that's why I have done this. I also tried using async operations.

The bottom line result is this, no matter which approach I use, one batch executes in 5-6 seconds, although it might take up to 10. It takes the same if I enter just one batch (so, only ~65k columns) or if I use a dumb single thread application. Honestly, I expected a bit more. Especially since I get more or less similar performance on my laptop with a local instance.

The second, maybe more important issue, are the exceptions I am facing in an unpredictable manner. These two:

com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)

and

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.102:9042 (com.datastax.driver.core.TransportException: [/192.168.1.102:9042] Connection has been closed), /192.168.1.100:9042 (com.datastax.driver.core.TransportException: [/192.168.1.100:9042] Connection has been closed), /192.168.1.101:9042 (com.datastax.driver.core.TransportException: [/192.168.1.101:9042] Connection has been closed))

In the bottom line, am I doing something wrong? Should I reorganize the way I load data, or change the scheme. I tried reducing the row length (so I have 12-hour rows) but that didn't make a big difference.

============================== Update:

I was rude and forgot to paste an example of the code I used after the question was answered. It works reasonably well, however I'm continuing my research with KairosDB and binary transfer with Astyanax. It looks like I can get much better performance with them over CQL, although KairosDB can have some issues when it is in overload (but I'm working on it) and Astyanax is a bit verbose to use for my taste. Nevertheless, here is the code, I'm maybe mistaken somewhere.

The semaphore slot number has no effect on performance when going above 5000, its almost constant.

String insertQuery = "insert into keyspace.measurement     (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement =     session.prepare(insertQuery);
        Semaphore semaphore = new Semaphore(15000);

    System.out.println("Starting " + Thread.currentThread().getId());
    DateTime time = DateTime.parse("2015-01-05T12:00:00");
    //generating the entries
    long start = System.currentTimeMillis();

    for (int i = 0; i < 900000; i++) { 

        BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
        semaphore.acquire();
        ResultSetFuture resultSetFuture = session.executeAsync(statement);
        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {

                semaphore.release();
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("Error: " + throwable.toString());
                semaphore.release();
            }
        });
        time = time.plus(4); //4ms between each entry
    }

Answer:

What are your results using unlogged batching? Are you sure you want to use batch statements at all? https://medium.com/@foundev/cassandra-batch-loading-without-the-batch-keyword-40f00e35e23e

Question:

My Cassandra table has following schema

CREATE TABLE cachetable1 (
id text,
lsn text,
lst timestamp,
PRIMARY KEY ((id))
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='{"keys":"ALL", "rows_per_partition":"ALL"}' AND
comment='' AND
dclocal_read_repair_chance=0.100000 AND
gc_grace_seconds=864000 AND
read_repair_chance=0.000000 AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

Above table contains 221 Million rows (approx. 16 GB data). The CassandraDaemon is running with 4GB heap space and I have configured 4 GB memory for row cache. I am try to run select queries from my java code like this

for(int i = 0; i < 1000; i ++)
    {
        int id = random.nextInt(20000000 - 0) + 0;
        for(j = id; j <= id + 100; j++)
        {
            ls.add(j+"");
        }

           Statement s = QueryBuilder.select("lst","lsn").from("ks1" , "cachetable1").where(QueryBuilder.in("id",ls.toArray()));
           s.setFetchSize(100);

               ResultSet rs=sess.execute( s );
              List<Row> lsr=rs.all();
               for(Row rw:lsr)
               {
                   //System.out.println(rw.toString());
                   count++;
               }

        ls.clear();
    }

In above code, I am trying to fetch 0.1 Million records. But the read/get performance is very bad. It takes 400-500 seconds to fetch 0.1 Million rows. Is there any better way to read/get records from Cassandra through Java? Is some tuning required other than row cache size and Cassandra heap size?


Answer:

You appear to want to retrieve your data in 100 row chunks. This sounds like a good candidate for a clustering column.

Change your schema to use an id as the partition key and a chunk index as a clustering column, i.e. PRIMARY KEY ( (id), chunk_idx ). When you insert the data, you will have to figure out how to map your single indexes into an id and chunk_idx (e.g. perhaps do a modulo 100 on one of your values to generate a chunk_idx).

Now when you query for an id and don't specify a chunk_idx, Cassandra can efficiently return all 100 rows with one disk read on the partition. And you can still do range queries and retrievals of single rows within the partition by specifying the chunk_idx if you don't always want to read a whole chunk of rows.

So your mistake is you are generating 100 random partition reads with each query, and this will hit all the nodes and require a separate disk read for each one. Remember that just because you are querying for sequential index numbers doesn't mean the data is stored close together, and with Cassandra it is exactly the opposite, where sequential partition keys are likely stored on different nodes.

The second mistake you are making is you are executing the query synchronously (i.e. you are issuing the query and waiting for the request to finish before you issue any more queries). What you want to do is use a thread pool so that you can have many queries running in parallel, or else use the executeAsync method in a single thread. Since your query is not efficient, waiting for the 100 random partition reads to complete is going to be a long wait, and a lot of the highly pipelined Cassandra capacity is going to be sitting there twiddling its thumbs waiting for something to do. If you are trying to maximize performance, you want to keep all the nodes as busy as possible.

Another thing to look into is using the TokenAwarePolicy when connecting to your cluster. This allows each query to go directly to a node that has a replica of the partition rather than to a random node that might have to act as a coordinator and get the data via an extra hop. And of course using consistency level ONE on reads is faster than higher consistency levels.

The row cache size and heap size are not the source of your problem, so that's the wrong path to go down.

Question:

I want to know, which is faster using apache cassandra in combination with java. I have the following options to get my result:

Statement s = QueryBuilder.select().from("table").where(QueryBuilder.eq("source",source);
ResultSet resultSet = session.execute(s);
if (resultSet.all().size() == 0) {
  //Do Something
}

The second option to achieve my count is:

ResultSet rs = session.execute("SELECT COUNT(*) as coun FROM table WHERE source = '"+source+"'");
Row r = rs.one();
if (r.getLong("count") == 0) {
  //Do Something
}

In every query, the maximum count is 1. Now my question is, which would be faster in general.


Answer:

I tested several queries on multiple tables, the version with count(*) is much faster than using resultSet.all().size() == 0. I used CQLSH to try which is faster with the following queries, which should be equal to the java one's:

SELECT COUNT(*) as coun FROM table WHERE source = '...';

And the slower one:

SELECT * FROM table WHERE source = '...';

Question:

Low TTL with Leveled Compaction, should I reduce gc_grace_seconds to improve read performance?

Scenario: Cassandra Table to cache an external db values - read performance needs to be good (less than 100ms) TTL = 4 hrs at row level Functional full table refresh (delete and then lazy load) every 6 hrs If I keep gc_grace_seconds at default value of 10 days, I can potentially have 60 rows with tombstones for every live row. This will affect read performance. Or not?

Is reducing gc_grace_seconds to say 1 day a safe enough value to allow delete replication across nodes? Given that even if a node is out of tier for some issue, it should be brought back in less than a day. Will this improve read performance?


Answer:

I can potentially have 60 rows with tombstones for every live row. This will affect read performance. Or not?

Yes, it will definitely affect your read performance. At a data/tombstone ratio of 1:60, you'll be asking Cassandra to weed through 60 deletes for every good row. If you have a lot of records, it will probably perform pretty terribly.

Is reducing gc_grace_seconds to say 1 day a safe enough value to allow delete replication across nodes? Given that even if a node is out of tier for some issue, it should be brought back in less than a day. Will this improve read performance?

This should improve your read performance significantly. But the drawback, is that if you have a node drop out of the cluster, you then only have 1 day to get it back in before it has no idea about the delete. But even if you miss it, you should be able to get your node back to a consistent state by running a nodetool repair. Otherwise you'll run the possible risk of deleted data re-appearing.

Question:

I have built an importer for MongoDB and Cassandra. Basically all operations of the importer are the same, except for the last part where data gets formed to match the needed cassandra table schema and wanted mongodb document structure. The write performance of Cassandra is really bad compared to MongoDB and I think I'm doing something wrong.

Basically, my abstract importer class loads the data, reads out all data and passes it to the extending MongoDBImporter or CassandraImporter class to send data to the databases. One database is targeted at a time - no "dual" inserts to both C* and MongoDB at the same time. The importer is run on the same machine against the same number of nodes (6).


The Problem:

MongoDB import finished after 57 minutes. I ingested 10.000.000 documents and I expect about the same amount of rows for Cassandra. My Cassandra importer is now running since 2,5 hours and is only at 5.000.000 inserted rows. I will wait for the importer to finish and edit the actual finish time in here.


How I import with Cassandra:

I prepare two statements once before ingesting data. Both statements are UPDATE queries because sometimes I have to append data to an existing list. My table is cleared completely before starting the import. The prepared statements get used over and over again.

PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);

For every row, I create a BoundStatement and pass that statement to my "custom" batching method:

    BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
    bs = bs.bind();

    //add data... with several bs.setXXX(..) calls

    cassandraConnection.executeBatch(bs);

With MongoDB, I can insert 1000 Documents (thats the maximum) at a time without problems. For Cassandra, the importer crashes with com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large for just 10 of my statements at some point. I'm using this code to build the batches. Btw, I began with 1000, 500, 300, 200, 100, 50, 20 batch size before but obviously they do not work too. I then set it down to 10 and it threw the exception again. Now I'm out of ideas why it's breaking.

private static final int MAX_BATCH_SIZE = 10;

private Session session;
private BatchStatement currentBatch;

...

@Override
public ResultSet executeBatch(Statement statement) {
    if (session == null) {
        throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
    }

    if (currentBatch == null) {
        currentBatch = new BatchStatement(Type.UNLOGGED);
    }

    currentBatch.add(statement);
    if (currentBatch.size() == MAX_BATCH_SIZE) {
        ResultSet result = session.execute(currentBatch);
        currentBatch = new BatchStatement(Type.UNLOGGED);
        return result;
    }

    return null;
}

My C* schema looks like this

CREATE TYPE stream.event (
    data_dbl frozen<map<text, double>>,
    data_str frozen<map<text, text>>,
    data_bool frozen<map<text, boolean>>,
);

CREATE TABLE stream.data (
    log_creator text,
    date text, //date of the timestamp
    ts timestamp,
    log_id text, //some id
    hour int, //just the hour of the timestmap
    x double,
    y double,
    events list<frozen<event>>,
    PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)

I sometimes need to add further new events to an existing row. That's why I need a List of UDTs. My UDT contains three maps because the event creators produce different data (key/value pairs of type string/double/boolean). I am aware of the fact that the UDTs are frozen and I can not touch the maps of already ingested events. That's fine for me, I just need to add new events that have the same timestamp sometimes. I partition on the creator of the logs (some sensor name) as well as the date of the record (ie. "22-09-2016") and the hour of the timestamp (to distribute data more while keeping related data close together in a partition).


I'm using Cassandra 3.0.8 with the Datastax Java Driver, version 3.1.0 in my pom. According to What is the batch limit in Cassandra?, I should not increase the batch size by adjusting batch_size_fail_threshold_in_kb in my cassandra.yaml. So... what do or what's wrong with my import?


UPDATE So I have adjusted my code to run async queries and store the currently running inserts in a list. Whenever an async insert finishes, it will be removed from the list. When the list size exceeds a threshold and an error occured in an insert before, the method will wait 500ms until the inserts are below the threshold. My code is now automatically increasing the threshold when no insert failed.

But after streaming 3.300.000 rows, there were 280.000 inserts being processed but no error happened. This seems number of currently processed inserts looks too high. The 6 cassandra nodes are running on commodity hardware, which is 2 years old.

Is this the high number (280.000 for 6 nodes) of concurrent inserts a problem? Should I add a variable like MAX_CONCURRENT_INSERT_LIMIT?

private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    //Sleep while the currently processing number of inserts is too high
    while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        concurrentInsertLimit += 2000;
        LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
    }

    return;
}

Answer:

After using C* for a bit, I'm convinced you should really use batches only for keeping multiple tables in sync. If you don't need that feature, then don't use batches at all because you will incur in performance penalties.

The correct way to load data into C* is with async writes, with optional backpressure if your cluster can't keep up with the ingestion rate. You should replace your "custom" batching method with something that:

  • performs async writes
  • keep under control how many inflight writes you have
  • perform some retry when a write timeouts.

To perform async writes, use the .executeAsync method, that will return you a ResultSetFuture object.

To keep under control how many inflight queries just collect the ResultSetFuture object retrieved from the .executeAsync method in a list, and if the list gets (ballpark values here) say 1k elements then wait for all of them to finish before issuing more writes. Or you can wait for the first to finish before issuing one more write, just to keep the list full.

And finally, you can check for write failures when you're waiting on an operation to complete. In that case, you could:

  1. write again with the same timeout value
  2. write again with an increased timeout value
  3. wait some amount of time, and then write again with the same timeout value
  4. wait some amount of time, and then write again with an increased timeout value

From 1 to 4 you have an increased backpressure strength. Pick the one that best fit your case.


EDIT after question update

Your insert logic seems a bit broken to me:

  1. I don't see any retry logic
  2. You don't remove the item in the list if it fails
  3. Your while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) is wrong, because you will sleep only when the number of issued queries is > concurrentInsertLimit, and because of 2. your thread will just park there.
  4. You never set to false concurrentInsertErrorOccured

I usually keep a list of (failed) queries for the purpose of retrying them at later time. That gives me powerful control on the queries, and when the failed queries starts to accumulate I sleep for a few moments, and then keep on retrying them (up to X times, then hard fail...).

This list should be very dynamic, eg you add items there when queries fail, and remove items when you perform a retry. Now you can understand the limits of your cluster, and tune your concurrentInsertLimit based on eg the avg number of failed queries in the last second, or stick with the simpler approach "pause if we have an item in the retry list" etc...


EDIT 2 after comments

Since you don't want any retry logic, I would change your code this way:

private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            runningInsertList.remove(future);
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    //Sleep while the currently processing number of inserts is too high
    while (runningInsertList.size() >= concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    if (!concurrentInsertErrorOccured) {
        // Increase your ingestion rate if no query failed so far
        concurrentInsertLimit += 10;
    } else {
        // Decrease your ingestion rate because at least one query failed
        concurrentInsertErrorOccured = false;
        concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
        while (runningInsertList.size() >= concurrentInsertLimit) {
            Thread.sleep(concurrentInsertSleepTime);
        }
    }

    return;
}

You could also optimize a bit the procedure by replacing your List<ResultSetFuture> with a counter.

Hope that helps.

Question:

Currently we're challenging our architecture while using apache spark against our cassandra db because we're experiencing a really bad read performance.

The hardware where the spark & cassandra takes place is a cloud server with 16GB Memory with 8 Cores and is using a SSD for the OS.

Cassandra 'data_file_directories' is set to another hdd whose test-results were with hdparm -tT:

Timing cached reads:   13140 MB in  1.99 seconds = 6604.42 MB/sec
Timing buffered disk reads: 428 MB in  3.00 seconds = 142.65 MB/sec

The target cf in cassandra:

CREATE TABLE test.stats (
day timestamp,
received timestamp,
target inet,
via inet,
prefix blob,
rtt decimal,
PRIMARY KEY (day, received, target, via)
) WITH CLUSTERING ORDER BY (received ASC, target ASC, via ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"NONE", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class':     'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',     'max_threshold': '32'}
    AND compression = {}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

We are currently using Cassandra 2.1.6 with the java datastax driver (2.1.6) and the spark java connector (spark-cassandra-connector_2.10, version 1.4.0-M2).

The Spark process currently has one worker with conf.set("spark.executor.memory", "2G"); set.

Starting a simple spark job to read all rows of one explicit partition key (which has round about 83.520.000 rows) via a submit/or serialized-driver took 17minutes.

The job simply writes all rows to a file, which has a final size of 1,2G.

Here is the driver code:

CassandraTableScanJavaRDD<EchoRepliesBean> cassandraTable2 = null;
    switch (timespanMode)
    {
        case SIX_HOURS:
            Calendar calendarDay = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
            calendarDay.setTimeInMillis(now);
            calendarDay.set(Calendar.HOUR_OF_DAY, 0);
            calendarDay.set(Calendar.MINUTE, 0);
            calendarDay.set(Calendar.SECOND, 0);
            calendarDay.set(Calendar.MILLISECOND, 0);
            Timestamp tsEnd = new Timestamp(calendarDay.getTimeInMillis());
            calendarDay.add(Calendar.DAY_OF_MONTH, -1);
            Timestamp tsStart = new Timestamp(calendarDay.getTimeInMillis());
            System.out.println(tsStart);
            cassandraTable2 = javaFunctions(sc).cassandraTable("test", "stats", mapRowTo(EchoRepliesBean.class))
                                               .where("day = ?", tsStart);
            break;
        default:
            /* make compiler happy */
            // cassandraTable = null;
    } 
    cassandraTable2.saveAsTextFile("/opt/out_TEST_" + System.currentTimeMillis());

    sc.stop();

This is strange, any help or ideas for further debugging would be really appreciated.


Answer:

Some possible ways to improve performance:

  1. Increase the parallelism by partitioning the data on multiple nodes. Since you are partitioning by day, you have a large number of rows in one partition on one node. This is forcing the reads and writes to be a serial operation. If you partitioned by hour, then your data could be spread across multiple nodes and multiple spark workers.

  2. I suspect your day partition is too big to fit into the single spark worker's memory, which may be causing some swapping of data to disk. Using smaller partitions, giving the spark worker more memory, or using more spark workers would avoid that.

  3. Make sure your spark workers are running on the Cassandra nodes and not on separate machines. If the workers are on separate machines then there will be a lot of network overhead to shuffle the data from the nodes to the workers.

  4. Make sure your cloud server is using local storage for Cassandra and not network storage.

To debug I would try running your test on a partition with only one row in it. If that performs badly then there is something wrong with your machine setup. If that performs well, then increase the number of rows in the partition until you see a sharp drop off in performance.

Question:

We are using NoSQL (Cassandra) in our project. We have a Table A (5000 records) which is a master table. We have another Table B (2000 records). Table B have 4 columns and Table A have 25 columns. We exposed a REST service to get all records from B; like /service/getB. This service will return 6 columns in response as –

{
    "result": [
        {
            "col1FromB": "1B",
            "col2FromB": "2B",
            "col3FromB": "3B",
            "col4FromB": "4B",
            "col1FromA": "1A",
            "col2FromA": "2A"
        },
        {
            "col1FromB": "11B",
            "col2FromB": "12B",
            "col3FromB": "13B",
            "col4FromB": "14B",
            "col1FromA": "11A",
            "col2FromA": "12A"
        }
    ]
}

So, there is a look up query to Table A for each item in Table B. This is how I am doing it –

    //Get all from Table B (took 90 ms in Local and 30 ms in Test)
    Select select = QueryBuilder.select().from("B");
    List<B> bList = cassandraOperations.select(select, B.class);

    //Loop through bList and do a lookup using id in Table A (took 46000 ms (46 sec) in Local (horrible) and 6000 ms (6 sec) in Test)
    For(B b: bList) {
    Select select = QueryBuilder.select("col1FromA", "col2FromA").from("A");
    select.where(QueryBuilder.eq("id", b.getId()));
    A a = cassandraOperations.selectOne(select, A.class);

    ----
    ----
    //Prepare final Pojo with a and b objects and add into a List<finalPjo> and return
}

So, the lookup time is very high in Local environment and also quite not good in Test environment. All I am using is Java collections only.

Is there any way to make it better so that we get records in lesser time.


Answer:

For(B b: bList) {
 Select select = QueryBuilder.select("col1FromA", "col2FromA").from("A");
 select.where(QueryBuilder.eq("id", b.getId()));
 A a = cassandraOperations.selectOne(select, A.class);

This code performs blocking request cassandraOperations.selectOnein each iteration, it means that each next iteration have to wait the previous one. All 2000 requests will be executed one by one and for a long time.

To avoid this, use asynchronous way to get records in the loop (as I see, you are using Spring and selectOne can be replaced by selectOneAsynchronously which returns ResultSetFuture, save these futures in some list, and use it to retrieve records when all requests are sent).

Question:

I am using the Datastax Driver (Java) with Cassandra 2.1, and I was wondering if there are benefits, in terms of performance, using the Accessor-annotated interfaces rather than Prepared Statements.

Basically, I wonder what is the best method between:

Method 1 :

PreparedStatement statement = session.prepare("SELECT * FROM USER WHERE name = ?");
ResultSet resultSet = session.execute(statement.bind("someone"));

User user = userMapper.map(resultSet).one();

AND

Method 2 :

@Accessor
public interface UserAccessor {
    @Query("SELECT * FROM USERS WHERE name = :name")
    User getUser(@Param("userName") String name);
}

...

MappingManager manager = new MappingManager (getSession());
User user = new User ();
UserAccessor userAccessor = manager.createAccessor(UserAccessor.class);
User user = userAccessor.getUser("someone");

Moreover, should I use this mapper if I want performances, or simple Bound Statements with POJO are better ?

Example from Datastax Documentation

PreparedStatement statement = session.prepare("SELECT * FROM USER WHERE name = ?");
BoundStatement bind= statement.bind("someone");

ResultSet results = session.execute(bind);
for (Row row : results) {
    User user = new User (row.getString("name"));
}

Answer:

  1. I cannot find a good reference to link to, but afaik @Accessor queries are prepared too.

  2. The choice you are making depends mostly on the approach you'll feel more comfortable with. Depending on your app, even if there might be some overhead in using the mapper, it might not be significant.

Question:

On May 3rd we deployed. Our 3 node cassandra cluster became extremely slow and many web requests were timing out. By EOD May 3rd we launched another m1.large machine to our cluster which resolved the timeouts. That being said, the cluster was still extremely slow; on May 4th we launched five i3.xLarge nodes. This helped our application response time considerably, on May 5th we removed the old m1.large boxes from the cluster. As of EOD May 5th everything was quick and responsive. This morning, the application began timing out again.

We have noticed some odd CPU utilization behaviour - the CPU usage fluctuates between 100% and 200% regardless of load (they are four core machines). We have extremely light weekends with absolutely no load and relatively heavy monday loads but we are seeing absolutely no change in CPU usage.

As you can see in the 2 week graph below, our database CPU usage was once bound to application usage. You can see the large spike in the 3rd, the introduction of the new machines an the 4th, and the stable high CPU usage starting on the 6th.

We have spent a good amount of time trying to identify the cause for the CPU usage and were able to identify (and subsequently rule out) three main reasons:

  1. High khugepaged CPU usage.
  2. Poorly tuned garbage collection
  3. Poorly tuned compactions

We have ruled out all three of these things.

  1. Our servers have 0.0% khugepaged CPU usage.
  2. Our GC throughput is about 96%. We have also tweaked the heap and new heap sizes as well as switching over to G1 GC. Our logs were once showing warnings related to long GC pauses but no longer do. Also, the GC threads only account for a small amount of CPU usage.
  3. nodetool compactionstats returns 0 pending tasks. We have switched over to LeveledCompactionStrategy and set the GC_GRACE_SECONDS to 1 day. Our logs were once showing warnings related to large numbers of tombstones but no longer do. nodetool compactionhistory shows about one compaction per hour and according to the logs they occur extremely quickly (< 1 second).

It appears that Cassandra's SharedPoolWorker threads have very high usage. Here's one node's CPU usage by type of thread (they all look pretty similar):

84.6 SharedPoolWorker
22.1 Thrift
13.5 CCompilerThread
11.9 MessagingServiceOutgoing
9.4  MessagingServiceIncoming
3.6  GangworkerParallelGCThreads
1.6  DestroyJavaVM
.3   VMThread
.1   Thread
.1   ScheduledTasks
.1   OptionalTasks
0    ...

Checking out the state of the SharedPool-Worker threads show that the vast majority are in WAITING with the following stack trace:

java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:85)
    at java.lang.Thread.run(Unknown Source)

I think this is the problem but I am not sure why this may be as very little CPU time is spent waiting (consistently 0% as per dstat).

Now, interestingly enough, running nodetool tpstats on any given node shows a small number of ReadStage threads in active, occasionally one or two in pending. There are none blocked, all time blocked, or dropped.

Here's the output to nodetool cfstats and here's nodetool netstats:

Mode: NORMAL
Not sending any streams.
Read Repair Statistics:
Attempted: 12229
Mismatch (Blocking): 2
Mismatch (Background): 0
Pool Name                    Active   Pending      Completed   Dropped
Commands                        n/a         0         707576         0
Responses                       n/a         0         859216       n/a

Does anyone have any ideas about why this may be occurring? Any potential things that we can look into?


Answer:

It can be related to high number of tombstones or high number of sstables scanned for single read - that creates constant high CPU load and slow responses due to high amount of reads it needs to do for every request.

These symptoms can show, for example, using STCS with constantly and frequently updated (updating rows, not adding new ones) data.

Can you add nodetool tablestats/cfstats of your main tables to the question?

Question:

Environment: Cassandra 2.1, DataStax Driver 2.1.9, single node cluster with DSE 4.8

I created a table:

create table calc_data_test2(
    data_set_id uuid,svod_type text,section text,index_code text,value_type text,data_hash text,c1 text,c2 text,c3 text,c4 text,c5 text,c6 text,c7 text,c8 text,c9 text,c10 text,c11 text,c12 text,c13 text,c14 text,c15 text,c16 text,c17 text,c18 text,c19 text,c20 text,c21 text,c22 text,c23 text,c24 text,c25 text,c26 text,c27 text,c28 text,c29 text,c30 text,c31 text,c32 text,c33 text,c34 text,c35 text,c36 text,c37 text,c38 text,c39 text,c40 text,c41 text,c42 text,c43 text,c44 text,c45 text,c46 text,c47 text,c48 text,c49 text,c50 text,c51 text,c52 text,c53 text,c54 text,c55 text,c56 text,c57 text,c58 text,c59 text,c60 text,c61 text,c62 text,c63 text,c64 text,c65 text,c66 text,c67 text,c68 text,c69 text,c70 text,c71 text,c72 text,c73 text,c74 text,c75 text,c76 text,c77 text,c78 text,c79 text,c80 text,c81 text,c82 text,c83 text,c84 text,c85 text,c86 text,c87 text,c88 text,c89 text,c90 text,c91 text,c92 text,c93 text,c94 text,c95 text,c96 text,c97 text,c98 text,c99 text,c100 text,se1 text,se2 text,data_value double,
    primary key ((data_set_id))
);

Then I made a few experiments with async inserts into the table. There were 1000000 inserts in the same table with 50 parallel request for each case. The difference in the count of affected columns. Here is the results:

  • 85 columns - 143860 ms
  • 65 columns - 108564 ms
  • 45 columns - 78213 ms
  • 25 columns - 68447 ms
  • 5 columns - 49812 ms

The details below.


Insert of 85 columns:

>java -jar store-utils-cli.jar -pt "insert into csod.calc_data_test2(data_set_id, svod_type,section,index_code,value_type,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,c27,c28,c29,c30,c31,c32,c33,c34,c35,c36,c37,c38,c39,c40,c41,c42,c43,c44,c45,c46,c47,c48,c49,c50,c51,c52,c53,c54,c55,c56,c57,c58,c59,c60,c61,c62,c63,c64,c65,c66,c67,c68,c69,c70,c71,c72,c73,c74,c75,c76,c77,c78,c79,c80) VALUES(now(), '58','9281','7611','367','7371','8353','4269','134','5884','6794','3147','7639','7798','7890','8547','4212','8630','5962','8686','4482','372','7218','6070','5525','1381','9816','5721','3632','5364','3980','6635','9641','518','6394','2560','1202','5595','7466','1507','7783','9586','6724','9169','9673','7867','8509','6889','3540','5994','4290','1925','8924','4704','4987','803','4291','4987','1111','4934','9885','6441','8212','9349','6852','6628','42','6713','3696','3316','8122','3288','3845','6063','5430','2052','5121','3343','6362','8724','2184','1380','5828','3723','8185');" 1000000 --cassandra.connection.requests.max.local=50
22:56:40,398  INFO ru.croc.rosstat.csod.store.cassandra.connection.CassandraCluster:-1 - Connection to CassandraSettings$Connection(nodes:[csodx01.lab.croc.ru], port:9042, keyspace:csod, requests:CassandraSettings$Connection$Requests(fetchSize:1000, batchSize:2000, consistencyLevel:LOCAL_QUORUM, max:CassandraSettings$Connection$Requests$Max(local:50, remote:20, retry:CassandraSettings$Connection$Requests$Max$Retry(enabled:true, read:10, write:10, unavailable:5)))) established

Entering: Overall process
Entering: Prebuilding of statements
Leaving [1086 ms]: Prebuilding of statements
Entering: Executing statements async
Leaving [143860 ms][6951.202558042542 ops/s]: Executing statements async
Leaving [144954 ms]: Overall process

Insert of 65 columns:

>java -jar store-utils-cli.jar -pt "insert into csod.calc_data_test2(data_set_id, svod_type,section,index_code,value_type,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,c27,c28,c29,c30,c31,c32,c33,c34,c35,c36,c37,c38,c39,c40,c41,c42,c43,c44,c45,c46,c47,c48,c49,c50,c51,c52,c53,c54,c55,c56,c57,c58,c59,c60) VALUES(now(), '58','9281','7611','367','7371','8353','4269','134','5884','6794','3147','7639','7798','7890','8547','4212','8630','5962','8686','4482','372','7218','6070','5525','1381','9816','5721','3632','5364','3980','6635','9641','518','6394','2560','1202','5595','7466','1507','7783','9586','6724','9169','9673','7867','8509','6889','3540','5994','4290','1925','8924','4704','4987','803','4291','4987','1111','4934','9885','6441','8212','9349','6852');" 1000000 --cassandra.connection.requests.max.local=50
00:28:27,393  INFO ru.croc.rosstat.csod.store.cassandra.connection.CassandraCluster:-1 - Connection to CassandraSettings$Connection(nodes:[csodx01.lab.croc.ru], port:9042, keyspace:csod, requests:CassandraSettings$Connection$Requests(fetchSize:1000, batchSize:2000, consistencyLevel:LOCAL_QUORUM, max:CassandraSettings$Connection$Requests$Max(local:50, remote:20, retry:CassandraSettings$Connection$Requests$Max$Retry(enabled:true, read:10, write:10, unavailable:5)))) established

Entering: Overall process
Entering: Prebuilding of statements
Leaving [847 ms]: Prebuilding of statements
Entering: Executing statements async
Leaving [108564 ms][9211.15655281677 ops/s]: Executing statements async
Leaving [109413 ms]: Overall process

Insert of 45 columns:

>java -jar store-utils-cli.jar -pt "insert into csod.calc_data_test2(data_set_id, svod_type,section,index_code,value_type,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,c27,c28,c29,c30,c31,c32,c33,c34,c35,c36,c37,c38,c39,c40) VALUES(now(), '58','9281','7611','367','7371','8353','4269','134','5884','6794','3147','7639','7798','7890','8547','4212','8630','5962','8686','4482','372','7218','6070','5525','1381','9816','5721','3632','5364','3980','6635','9641','518','6394','2560','1202','5595','7466','1507','7783','9586','6724','9169','9673');" 1000000 --cassandra.connection.requests.max.local=50
00:33:19,972  INFO ru.croc.rosstat.csod.store.cassandra.connection.CassandraCluster:-1 - Connection to CassandraSettings$Connection(nodes:[csodx01.lab.croc.ru], port:9042, keyspace:csod, requests:CassandraSettings$Connection$Requests(fetchSize:1000, batchSize:2000, consistencyLevel:LOCAL_QUORUM, max:CassandraSettings$Connection$Requests$Max(local:50, remote:20, retry:CassandraSettings$Connection$Requests$Max$Retry(enabled:true, read:10, write:10, unavailable:5)))) established

Entering: Overall process
Entering: Prebuilding of statements
Leaving [845 ms]: Prebuilding of statements
Entering: Executing statements async
Leaving [78213 ms][12785.598302072545 ops/s]: Executing statements async
Leaving [79060 ms]: Overall process

Insert of 25 columns:

>java -jar store-utils-cli-1.2.0-SNAPSHOT.jar -pt "insert into csod.calc_data_test2(data_set_id, svod_type,section,index_code,value_type,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20) VALUES(now(), '58','9281','7611','367','7371','8353','4269','134','5884','6794','3147','7639','7798','7890','8547','4212','8630','5962','8686','4482','372','7218','6070','5525');" 1000000 --cassandra.connection.requests.max.local=50
00:39:29,337  INFO ru.croc.rosstat.csod.store.cassandra.connection.CassandraCluster:-1 - Connection to CassandraSettings$Connection(nodes:[csodx01.lab.croc.ru], port:9042, keyspace:csod, requests:CassandraSettings$Connection$Requests(fetchSize:1000, batchSize:2000, consistencyLevel:LOCAL_QUORUM, max:CassandraSettings$Connection$Requests$Max(local:50, remote:20, retry:CassandraSettings$Connection$Requests$Max$Retry(enabled:true, read:10, write:10, unavailable:5)))) established

Entering: Overall process
Entering: Prebuilding of statements
Leaving [885 ms]: Prebuilding of statements
Entering: Executing statements async
Leaving [68447 ms][14609.844112963314 ops/s]: Executing statements async
Leaving [69339 ms]: Overall process

And insert of 5 columns:

>java -jar store-utils-cli-1.2.0-SNAPSHOT.jar -pt "insert into csod.calc_data_test2(data_set_id, svod_type,section,index_code,value_type) VALUES(now(), '58','9281','7611','367');" 1000000 --cassandra.connection.requests.max.local=50
00:43:35,293  INFO ru.croc.rosstat.csod.store.cassandra.connection.CassandraCluster:-1 - Connection to CassandraSettings$Connection(nodes:[csodx01.lab.croc.ru], port:9042, keyspace:csod, requests:CassandraSettings$Connection$Requests(fetchSize:1000, batchSize:2000, consistencyLevel:LOCAL_QUORUM, max:CassandraSettings$Connection$Requests$Max(local:50, remote:20, retry:CassandraSettings$Connection$Requests$Max$Retry(enabled:true, read:10, write:10, unavailable:5)))) established

Entering: Overall process
Entering: Prebuilding of statements
Leaving [968 ms]: Prebuilding of statements
Entering: Executing statements async
Leaving [49812 ms][20075.483819160043 ops/s]: Executing statements async
Leaving [50782 ms]: Overall process

Is it really true that number of affected columns in an insert has so big impact on the performance? I haven't find any info about such dependency yet. May be I'm doing something wrong?

All meaningfull code for the inserts is here:

override fun run(args: Array<String?>) {
    if (args.isEmpty() || args.size < 2){
        System.err.println("You should specify a query and a number of iterations: ${args.toList()}")
        return
    }

    val query: String? = args[0]
    val iterationCount: Long = args[1]!!.toLong()

    // get the session
    val session: Session = cassandraCluster.connection().driverSession
    // prepare the query
    val preparedQuery: PreparedStatement = session.prepare(query)

    MeasureTime("Overall process").use {
        // create bound statements 
        val statements = MeasureTime("Prebuild statements").use {
            (1..iterationCount).map { BoundStatement(preparedQuery) }
        }

        // execute async
        MeasureTime("Execute statements async", iterationCount).use {
            val phaser = Phaser(1)
            statements.map { statement ->
                phaser.register()
                session.executeAsync(statement).withCallback({
                    phaser.arriveAndDeregister()
                }, { err ->
                    System.err.println(err)
                    phaser.arriveAndDeregister()
                })
            }
            // block until all tasks are done
            phaser.arriveAndAwaitAdvance()
        }
    }
}

// extension method for convenience
private fun <T> ListenableFuture<T>.withCallback(onSuccessCallback: (T?) -> Unit, onFailureCallback: (Throwable?) -> Unit): ListenableFuture<T> {
    Futures.addCallback(this, object: FutureCallback<T> {
        override fun onSuccess(p0: T?) {
            onSuccessCallback(p0)
        }

        override fun onFailure(p0: Throwable?) {
            onFailureCallback(p0)
        }
    })
    return this
}

class MeasureTime(val message: String, val operationCount: Long? = null): Closeable {
    private val startTime: Long

    init {
        startTime = System.nanoTime()
        System.out.println("Entering: $message")
    }

    override fun close() {
        val endTime = System.nanoTime()
        val elapsed = (endTime - startTime)/1000000
        val opStats = if (operationCount != null) {
            val f = operationCount/elapsed.toDouble()*1000
            "[$f ops/s]"
        } else ""
        val message = "Leaving [$elapsed ms]$opStats: $message"
        System.out.println(message)
    }
}

I believe it's not a problem for Java-man to understand what's going on in the kotlin code.


Answer:

If you're inserting more data in the second table than in the 1st one (c1 to c100 + couple of other cols) then it's normal that the insert is slower.

Now, even if you insert the same amount of data (in term of bytes count) in both tables, the insert in the 2nd table will still be a little bit slower because of:

  1. the overhead of meta data. You have more columns so there are more objects to create in memory to store them

  2. the CPU consumption to serialize a lot of columns instead of a few of them

  3. and maybe I forget other parameters

Question:

I have my java application that writes bulk records into cassandra cluster.My application is running on two WAS nodes and both connected to same cluster.The bulk insert (asynchronous) process runs simultaneously on both the was nodes. The 1st WAS node inserts the first 50% of the entire set of records to be inserted and the 2nd WAS node inserts the rest 50%.

We observed one of these two server nodes is taking almost double time to complete its insertion process. Both the WAS instance has sane configuration and same cluster connected.

Please suggest what would be possible reason.


Answer:

One possibility is that the two WAS nodes are not identical, although you intended them to be. Basic platform performance information is the starting point for diagnosing any performance problem.

I would start diagnosing this problem by comparing performance monitoring information for the two WAS nodes, using tooling like NMON if you are on Linux.

http://nmon.sourceforge.net/pmwiki.php

A command like this will cause nmon data to be written to a file every 10 seconds for 1800 samples

nmon -f -F <filename.nmon> -s 10 -c 1800 -t

and then with a tool like NMON visualizer you can compare the data for the two nodes in graphical form

https://nmonvisualizer.github.io/nmonvisualizer/

Similar tooling is available for other platforms, e.g. perfmon for Windows.

In this case I would be looking initially for a difference in CPU used between the two WAS nodes. If CPU is high in the node that takes longer to insert the records, maybe that node is configured with fewer cores (VM), or maybe the java heap in that node is smaller so it is spending lots of time doing Garbage Collection, or maybe that node was configured with SSL and the other node was not, etc.etc.

If CPU is low in the node that takes longer to insert the records, then there must be some external bottleneck that is limiting the work done by the node - maybe the network port on the node is misconfigured so that traffic between that node and the Cassandra cluster is limited, or maybe the configuration of the Cassandra interface for that node is incorrect, or maybe the node has a slow or failing hard drive so reading the bulk data for insert is slow, etc.etc.

Performance problem diagnosis requires gathering performance data and then following the clues.

Question:

I'm using org.apache.spark.sql.SparkSession to read a Cassandra table to Spark Dataset<Row>. The dataset has the whole table information and if I add a new row into Cassandra it seems to be working asynchronously in the background and updates the dataset with the row, without reading the table again.

  1. Is there any way to limit or is there built in limit for the data read in from the table?

  2. What's the size of a Dataset<Row> that Spark starts to find difficult to process?

  3. What are the requirements for Spark to handle calculations if Cassandra table is half a terabyte?

  4. If Spark wants to write a large new table of information into Cassandra, does it cause more problems for Spark to write it in Cassandra or for Cassandra to read it? I just wonder which product would cause data loss or break down first.

If someone could tell me how SparkSession .read() exactly works in the background or Dataset<Row> and what they require to preform well, would be really useful. Thank you.


Answer:

SparkSession.read() invokes the underlying datasource's scan method. For Cassandra that is the Spark Cassandra Connector.

The Spark Cassandra Connector breaks up the C* token ring into chunks, each chunk more or less becomes a Spark Partition. Single Spark partitions are then read in each executor core.

A video explaining this at Datastax Academy

The actual size of the Row is pretty unrelated to stability, the data is broken up by token range so you only should end up with difficulties if the underlying Cassandra data has very large hot spots. This would lead to very large Spark Partitions which could lead to memory issues. In general a well distributed C* database should have no problems at any size.

Question:

I am using Titan Database (version 1.0.0) with Cassandra backend storage. My database is very big (millions of vertices and edges). I am using elasticsearch for indexing. It does very good job and I'm relatively easily and quickly receives thousands (~40000) of vertices as answer of my queries. But i have performance issue then I try to iterate over thous vertices and retrieve basic data saved on vertex properties. It take me about almost 1 min!!!

Usage of parallel streams of Java 8 significantly increase the performance but not enough (10 sec instead of 1 min).

Considered that i have thousand vertices with location property and time stamp. I want to retrieve only vertices with location (Geoshape) within queried area and collect the distinct time stamps.

This is part of my java code using Java 8 parallel streams:

TitanTransaction tt = titanWraper.getNewTransaction();
PropertyKey timestampKey = tt.getPropertyKey(TIME_STAMP);
TitanGraphQuery graphQuery = tt.query().has(LOCATION, Geo.WITHIN, cLocation);
Spliterator<TitanVertex> locationsSpl = graphQuery.vertices().spliterator();

Set<String> locationTimestamps = StreamSupport.stream(locationsSpl, true)
        .map(locVertex -> {//map location vertices to timestamp String
            String timestamp = locVertex.valueOrNull(timestampKey);

            //this iteration takes about 10 sec to iterate over 40000 vertices
            return timestamp;
         })
         .distinct()
         .collect(Collectors.toSet());

Same code using standard java iteration:

TitanTransaction tt = titanWraper.getNewTransaction();
PropertyKey timestampKey = tt.getPropertyKey(TIME_STAMP);
TitanGraphQuery graphQuery = tt.query().has(LOCATION, Geo.WITHIN, cLocation);
Set<String> locationTimestamps = new HashSet<>();
for(TitanVertex locVertex : (Iterable<TitanVertex>) graphQuery.vertices()) {
    String timestamp = locVertex.valueOrNull(timestampKey);
    locationTimestamps.add(timestamp);        
    //this iteration takes about 45 sec to iterate over 40000 vertices            
}

This performance is very disappoint me. Even worse if the result will be around 1 million vertices. I try to understand what is the reason of this issue. I am expecting that this should take me less the 1 sec to iterate over thous vertices.


Answer:

Same query but using gremlin traversal instead of graph query has much better performance and much shorter code:

TitanTransaction tt = graph.newTransaction();
Set<String> locationTimestamps = tt.traversal().V().has(LOCATION, P.within(cLocation))
    .dedup(TIME_STAMP)
    .values(TIME_STAMP)
    .toSet();

Question:

In our web application, we are using Cassandra 1.2 and Astyanax java library to communicate with database. We are using a replication factor of 3 in cassandra. For a specific use case we are writing String json in a column whose payload looks like this:

{
  "tierAggData": {
    "tierRrcMap": {
      "Tier1": {
        "min": 0.08066999,
        "max": 0.13567,
        "t0": 1419235200,
        "t1": 1421334000,
        "type": null,
        "cons": 0,
        "tierCost": 37.692207887768745,
        "tierCons": 326758,
        "name": "Tier1"
      },
      "Tier2": {
        "min": 0.11252999,
        "max": 0.16752002,
        "t0": 1421337600,
        "t1": 1421625600,
        "type": null,
        "cons": 0,
        "tierCost": 14.50184826925397,
        "tierCons": 96910,
        "name": "Tier2"
      },
      "Tier3": {
        "min": 0.10361999,
        "max": 0.25401002,
        "t0": 1421629200,
        "t1": 1421910000,
        "type": null,
        "cons": 0,
        "tierCost": 17.739905051887035,
        "tierCons": 78776,
        "name": "Tier3"
      },
      "Tier4": {
        "min": 3.4028235e+38,
        "max": -3.4028235e+38,
        "t0": 2147483647,
        "t1": -2147483648,
        "type": null,
        "cons": 0,
        "tierCost": 0,
        "tierCons": 0,
        "name": "Tier4"
      }
    }
  }
}

I am writing this data on hourly basis and I might have to write 3 years of data in one go. So total number of columns to be written are 3*24*365=26280 columns.Since the json payload is also big, I am confused between two approaches for this: 1) Using mutation batch to get the row,writing all the data in one go and do an execute. 2) Using mutation batch to get the row,use a counter and writing only 1000 columns at a time and executing.

Please suggest which approach is the better one and if any more details are required for the answer.


Answer:

This is not a transactional database where you start then commit so your two option are a little confusing.

You should probably avoid batching, it can be faster but it isn't really there as a throughput optimization. That said it can help if everything is on one partition by reducing network latencies here and there. In some cases its most efficient to just do individual mutations each by themselves to parallelize work and distribute coordinator work on all nodes. Its also easier then trying to tune batch sizes and grouping them correctly. Writes are really fast so time it will take you to get it fast as possible will be longer then it is to load everything.

What you probably need to worry about is your schema with this since you have large columns. Remember that this is not a relational database where you just put your data into it and query anyway, plan out how you want to read the data, and organize schema so that the read will be a simple lookup, maybe check out the free online resources (like https://academy.datastax.com/) to make sure data modeling is good.

Finally 1.2 is very old, consider using newer version with CQL (thrift is deprecated). If you do upgrade to newer version and use cql use https://github.com/datastax/java-driver instead of Astyanax which isnt really maintained anymore.