Hot questions for Using Cassandra in cql3

Question:

I am working with dates.

Datastax's CQL cassandra API Row.getDate() returns a com.datastax.driver.core.LocalDate.

I want to convert the com.datastax.driver.core.LocalDate object returned by the API to java.util.Date. How can I do that?


Answer:

The LocalDate.getMillisSinceEpoch() Javadoc says Returns the number of milliseconds since January 1st, 1970 GMT. And, the Date(long) constructor Javadoc says Allocates a Date object and initializes it to represent the specified number of milliseconds since the standard base time known as "the epoch", namely January 1, 1970, 00:00:00 GMT. So, given a LocalDate ld you should be able to do something like

Date d = new Date(ld.getMillisSinceEpoch());

Question:

I need a help. I have a query which get top 5 records group by date (not date + time) and sum of amount.

I wrote the following but it returns all the records not just top 5 records

CREATE OR REPLACE FUNCTION state_groupbyandsum( state map<text, double>, datetime text, amount text )
CALLED ON NULL INPUT
RETURNS map<text, double>
LANGUAGE java 
AS 'String date = datetime.substring(0,10); Double count = (Double) state.get(date);  if (count == null) count = Double.parseDouble(amount); else count = count +  Double.parseDouble(amount); state.put(date, count); return state;' ;


CREATE OR REPLACE AGGREGATE groupbyandsum(text, text) 
SFUNC state_groupbyandsum
STYPE map<text, double>
INITCOND {};

select groupbyandsum(datetime, amout) from warehouse;

Could you please help out to get just 5 records.


Answer:

Here's one way to do that. Your group by state function could be like this:

CREATE FUNCTION state_group_and_total( state map<text, double>, type text, amount double )
CALLED ON NULL INPUT
RETURNS map<text, double>
LANGUAGE java AS '
     Double count = (Double) state.get(type);
     if (count == null)
         count = amount;
     else
         count = count + amount;
     state.put(type, count);
     return state;
';

That will build up a map of all the amount rows selected by your query WHERE clause. Now the tricky part is how to keep just the top N. One way to do it is by using a FINALFUNC which gets executed after all the rows have been put in the map. So here's a function to do that using a loop to find the maximum value in the map and move it to a result map. So to find the top N it would iterate over the map N times (there are more efficient algorithms than this, but it's just a quick and dirty example).

So here's an example to find the top two:

CREATE FUNCTION topFinal (state map<text, double>)
CALLED ON NULL INPUT
RETURNS map<text, double>
LANGUAGE java AS '
    java.util.Map<String, Double> inMap = new java.util.HashMap<String, Double>(),
                                  outMap = new java.util.HashMap<String, Double>();

    inMap.putAll(state);

    int topN = 2;
    for (int i = 1; i <= topN; i++) {
        double maxVal = -1;
        String moveKey = null;
        for (java.util.Map.Entry<String, Double> entry : inMap.entrySet()) {

            if (entry.getValue() > maxVal) {
                maxVal = entry.getValue();
                moveKey = entry.getKey();
            }
        }
        if (moveKey != null) {
            outMap.put(moveKey, maxVal);
            inMap.remove(moveKey);
        }
    }

    return outMap;
';

Then lastly you need to define the AGGREGATE to call the two functions you defined:

CREATE OR REPLACE AGGREGATE group_and_total(text, double) 
     SFUNC state_group_and_total 
     STYPE map<text, double> 
     FINALFUNC topFinal
     INITCOND {};

So let's see if that works.

CREATE table test (partition int, clustering text, amount double, PRIMARY KEY (partition, clustering));
INSERT INTO test (partition , clustering, amount) VALUES ( 1, '2015', 99.1);
INSERT INTO test (partition , clustering, amount) VALUES ( 1, '2016', 18.12);
INSERT INTO test (partition , clustering, amount) VALUES ( 1, '2017', 44.889);
SELECT * from test;

 partition | clustering | amount
-----------+------------+--------
         1 |       2015 |   99.1
         1 |       2016 |  18.12
         1 |       2017 | 44.889

Now, drum roll...

SELECT group_and_total(clustering, amount) from test where partition=1;

 agg.group_and_total(clustering, amount)
-------------------------------------------
            {'2015': 99.1, '2017': 44.889}

So you see it kept the top 2 rows based on the amount.

Note that the keys won't be in sorted order since it's a map, and I don't think we can control the key order in the map, so sorting in the FINALFUNC would be a waste of resources. If you need the map sorted then you could do that in the client.

I think you could do more work in the state_group_and_total function to drop items from the map as you go along. That might be better to keep the map from getting too big.

Question:

I am using Cassandra 2.0.8 and I have got a cql3 table defined like this:

CREATE TABLE search_scf_tdr (
  fieldname text,
  fieldvalue text,
  scalability int,
  timestamptdr bigint,
  tdrkeys set<blob>,
  PRIMARY KEY ((fieldname, fieldvalue, scalability), timestamptdr)
)

I use a replication factor of 2 per DC for this keyspace. I am inserting in this table by adding items to tdrkeys collection one by one by using an update like this:

UPDATE search_scf_tdr SET tdrkeys = tdrkeys + "new value" WHERE "all primary key fields";

Each element in tdrkeys is 84 bytes (fixed size).

When querying in this table I retrieve about 160 rows at once with my query (using ranges on timestamptdr and scalability and a fixed value for fieldname and fieldvalue). Rows are containing a few thousands elements in tdrkeys collection.

I have a cluster of 42 nodes split in two data centers. I have separate servers using datastax java driver 2.0.9.2 running a total of 24 threads in each data center calling this query (doing many other things with the result between each query) with consistency level ONE:

SELECT tdrkeys FROM search_scf_tdr WHERE fieldname='timestamp' and fieldvalue='' and scalability IN (0,1,2,3,4,5,6,7,8,9,10) and timestamptdr >= begin and timestamptdr < end;

Each Cassandra node has 8 Gb of Java heap and 16 Gb of physical memory. We have tuned as much as we can the cassandra.yaml file and JVM parameters but still getting out of memory problems.

The heap dumps that we get on out of memory errors are showing more than 6 Gb of the heap taken by threads (between 200 and 300) holding many instances of org.apache.cassandra.io.sstable.IndexHelper$IndexInfo containing 2 HeapByteBuffer containing 84 bytes of data.

Cassandra system.log shows errors like this:

ERROR [Thread-388] 2015-05-18 12:11:10,147 CassandraDaemon.java (line 199) Exception in thread Thread[Thread-388,5,main]
java.lang.OutOfMemoryError: Java heap space
ERROR [ReadStage:321] 2015-05-18 12:11:10,147 CassandraDaemon.java (line 199) Exception in thread Thread[ReadStage:321,5,main]
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
    at org.apache.cassandra.io.util.MappedFileDataInput.readBytes(MappedFileDataInput.java:146)
    at org.apache.cassandra.utils.ByteBufferUtil.read(ByteBufferUtil.java:392)
    at org.apache.cassandra.utils.ByteBufferUtil.readWithShortLength(ByteBufferUtil.java:371)
    at org.apache.cassandra.io.sstable.IndexHelper$IndexInfo.deserialize(IndexHelper.java:187)
    at org.apache.cassandra.db.RowIndexEntry$Serializer.deserialize(RowIndexEntry.java:122)
    at org.apache.cassandra.io.sstable.SSTableReader.getPosition(SSTableReader.java:970)
    at org.apache.cassandra.io.sstable.SSTableReader.getPosition(SSTableReader.java:871)
    at org.apache.cassandra.db.columniterator.SSTableSliceIterator.<init>(SSTableSliceIterator.java:41)
    at org.apache.cassandra.db.filter.SliceQueryFilter.getSSTableColumnIterator(SliceQueryFilter.java:167)
    at org.apache.cassandra.db.filter.QueryFilter.getSSTableColumnIterator(QueryFilter.java:62)
    at org.apache.cassandra.db.CollationController.collectAllData(CollationController.java:250)
    at org.apache.cassandra.db.CollationController.getTopLevelColumns(CollationController.java:53)
    at org.apache.cassandra.db.ColumnFamilyStore.getTopLevelColumns(ColumnFamilyStore.java:1547)
    at org.apache.cassandra.db.ColumnFamilyStore.getColumnFamily(ColumnFamilyStore.java:1376)
    at org.apache.cassandra.db.Keyspace.getRow(Keyspace.java:327)
    at org.apache.cassandra.db.SliceFromReadCommand.getRow(SliceFromReadCommand.java:65)
    at org.apache.cassandra.db.ReadVerbHandler.doVerb(ReadVerbHandler.java:47)
    at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:60)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

Answer:

You are using "IN" query for multiple partitions, since scalability is the part of the partition key. This causes cassandra to coordinate the query across multiple nodes. For more details, see, for example, this.

The solution would be to run a separate query for every value in scalability and then merge the result manually or not make it part of the partition key, ie. PRIMARY KEY ((fieldname, fieldvalue), scalability, timestamptdr) if possible.

Question:

My use case is like this: I am inserting 10 million rows in a table described as follows:

keyval bigint, rangef bigint, arrayval blob, PRIMARY KEY (rangef, keyval)

and input data is like follows -

keyval - some timestamp
rangef - a random number
arrayval - a byte array

I am taking my primary key as composite key because after inserting 10 million rows, I want to perform range scan on keyval. As keyval contains timestamp, and my query will be like, give me all the rows between this-time to this-time. Hence to perform these kind of Select queries, i have my primary key as composite key.

Now, while ingestion, the performance was very good and satisfactory. But when I ran the query described above, the performance was very low. When I queried - bring me all the rows within t1 and t1 + 3 minutes, almost 500k records were returned in 160 seconds.

My query is like this

Statement s = QueryBuilder.select().all().from(keySpace, tableName).allowFiltering().where(QueryBuilder.gte("keyval", 1411516800)).and(QueryBuilder.lte("keyval", 1411516980));
    s.setFetchSize(10000);
    ResultSet rs = sess.execute(s);
    for (Row row : rs)
    {
        count++;
    }
    System.out.println("Batch2 count = " + count);

I am using default partitioner, that is MurMur partitioner.

My cluster configuration is -

No. of nodes - 4 No. of seed nodes - 1 No. of disks - 6 MAX_HEAP_SIZE for each node = 8G

Rest configuration is default.

How I can improve my range scan performance?


Answer:

Your are actually performing a full table scan and not a range scan. This is one of the slowest queries possible for Cassandra and is usually only used by analytics workloads. If at any time your queries require allow filterting for a OLTP workload something is most likely wrong. Basically Cassandra has been designed with the knowledge that queries which require accessing the entire dataset will not scale so a great deal of effort is made to make it simple to partition and quickly access data within a partition.

To fix this you need to rethink your data model and think about how you can restrict the data to queries on a single partition.

Question:

I have a schema as follows

CREATE TABLE location_by_name(
id uuid,
coordinates frozen<coords>,
name text,
primary key (name)
);

CREATE TYPE coords(
longitude double, 
latitude double
);

I am able to insert data using the prepared statement, but I couldn't make it out using the QueryBuilder API,it would be great if anyone could point me in the right direction.

Thanks!


Answer:

This should do the trick:

UserType coordsType = cluster.getMetadata()
     .getKeyspace("ks")
     .getUserType("coords");
UDTValue coordinates = coordsType.newValue()
    .setDouble("longitude", 2.35)
    .setDouble("latitude", 48.853);
Statement insert = QueryBuilder.insertInto("location_by_name")
    .value("id", id)
    .value("coordinates", coordinates)
    .value("name", name);

Question:

I am trying to use a datastax accessor (vs using a mapper).

My accessor is defined as follows:

@Accessor
public interface TableAccessor {
    @Query(
        "INSERT INTO tableName " +
        "(id, data)" +
        "VALUES (:beanId, :beanData)")
    public ResultSet insertProblem(@Param("bean") Bean bean);

And my bean is defined as.

@Table(name="tableName")
public class Bean {
    @PartitionKey
    @Column(name = "id")
    private int beanId;

    @PartitionKey
    @Column(name = "data")
    private Date beanData;

   // Setters and getters omitted 
}

My issues is when I try something like:

insertProblem(@Param("bean") Bean bean)

I keep getting errors telling me my param numbers don't match.


Answer:

I am not sure that is even possible, to send object and to map properties of that object (maybe some dot notation but haven't check code or documentation). Maybe your notation would work if you work with UDT and you have Bean as UDT in DB.

What works for sure is:

@Accessor
public interface TableAccessor {
    @Query(
        "INSERT INTO tableName " +
        "(id, data)" +
        "VALUES (:beanId, :beanData)")
    public ResultSet insertProblem(@Param("beanId") int beanId, @Param("beanData") Date beanData);

And yhan you call method with insertProblem(bean.getBeanId(), bean.getBeanData())

Question:

I've set up a cassandra cluster and work with the spring-cassandra framework 1.53. (http://docs.spring.io/spring-data/cassandra/docs/1.5.3.RELEASE/reference/html/)

I want to write millions of datasets into my cassandra cluster. The solution with executeAsync works good but the "ingest" command from the spring framework sounds interesting aswell.

The ingest method takes advantage of static PreparedStatements that are only prepared once for performance. Each record in your data set is bound to the same PreparedStatement, then executed asynchronously for high performance.

My code:

List<List<?>> session_time_ingest = new ArrayList<List<?>>();
for (Long tokenid: listTokenID) {
List<Session_Time_Table> tempListSessionTimeTable = repo_session_time.listFetchAggregationResultMinMaxTime(tokenid);
session_time_ingest.add(tempListSessionTimeTable);
}

cassandraTemplate.ingest("INSERT into session_time (sessionid, username, eserviceid, contextroot," +
                " application_type, min_processingtime, max_processingtime, min_requesttime, max_requesttime)" +
                " VALUES(?,?,?,?,?,?,?,?,?)", session_time_ingest);

Throws exception:

`Exception in thread "main" com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> ...tracking.Tables.Session_Time_Table]
at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:540)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:520)
at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:470)
at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:77)
at com.datastax.driver.core.BoundStatement.bind(BoundStatement.java:201)
at com.datastax.driver.core.DefaultPreparedStatement.bind(DefaultPreparedStatement.java:126)
at org.springframework.cassandra.core.CqlTemplate.ingest(CqlTemplate.java:1057)
at org.springframework.cassandra.core.CqlTemplate.ingest(CqlTemplate.java:1077)
at org.springframework.cassandra.core.CqlTemplate.ingest(CqlTemplate.java:1068)
at ...tracking.SessionAggregationApplication.main(SessionAggregationApplication.java:68)`

I coded exactly like in the spring-cassandra doku.. I've no idea how to map the values of my object to the values cassandra expects?!


Answer:

Your Session_Time_Table class is probably a mapped POJO, but ingest methods do not use POJO mapping.

Instead you need to provide a matrix where each row contains as many arguments as there are variables to bind in your prepared statement, something along the lines of:

List<List<?>> rows = new ArrayList<List<?>>();

for (Long tokenid: listTokenID) {
    Session_Time_Table obj = ... // obtain a Session_Time_Table instance
    List<Object> row = new ArrayList<Object>();
    row.add(obj.sessionid);
    row.add(obj.username);
    row.add(obj.eserviceid);
    // etc. for all bound variables
    rows.add(row);
}

cassandraTemplate.ingest(
    "INSERT into session_time (sessionid, username, eserviceid, " +
    "contextroot, application_type, min_processingtime, " +
    "max_processingtime, min_requesttime, max_requesttime) " +
    "VALUES(?,?,?,?,?,?,?,?,?)", rows);

Question:

This is a bit of a contrived example to illustrate my question, but let's say I have a Car entity which contains Lightbulb entities. A car has several lightbulbs, each of which could be "on", "off" or "broken".

Each type of lightbulb has a unique id. (left headlight = 100, right headlight = 101... that sort of thing)

The status of a lightbulb needs to be constantly updated.

What I'd like to do is query for a specific car for a set of lightbulbs with a specific status.

something like: "give me all the lightbulbs with status "on" for car "chevy" model "nova" vin "xyz-123"".

create table lightbulbstatus (
   bulbid uuid,
   carmake text,
   carmodel text,
   carvin uuid,
   lastupdate timestamp,
   status int,  
                   /* row key *                /* col keys  */
   PRIMARY KEY( (carmake, carmodel, carvin), ?   ?    ?    ?)
);

I believe the row key should have the car coordinate in it, but beyond that, I'm a bit lost. I assume each time there is a status change to a bulb, we add a column. But I'm not sure what the keys should be in the column to make the query work.

I think in RDBMS-land, you could do a subselected or nested query to find bulbs with the status = on.

select * from lightbulbstatus where status = 1 and lastupdate > (select lastupdate from lightbulbstatus where status != 1);

No idea how you would do this in CQL3. Obviously sub-selects are not allowed.


Answer:

Since you do not have to maintain status history, I would suggest to have a single row for each bulb by the following primary key:

PRIMARY KEY( (carmake, carmodel, carvin), bulbid)

In order to query lightbulbs by status you need to create a secondary index:

CREATE INDEX lightbulb_by_status ON lightbulbstatus (status);

SELECT * FROM lightbulbstatus 
  WHERE status = 1 
    AND carmake = 'chevy' 
    AND carmodel = 'nova'
    AND carvin = cfe638e9-5cd9-43c2-b5f4-4cc9a0e6b0ff;

Although cardinality of the status is low, your query includes the partition key and is highly efficient. If the number of rows to be filtered is very small (like number of lightbulbs in a car), you may consider to filter lightbulbs by status in the application (and skip the secondary index).

If you should handle a case that an obsolete lightbulb status update might override a more recent status update (as your RDBMS query suggests), consider using lightweight transactions:

UPDATE lightbulbstatus set status = 0, lastupdate = '2014-11-08 23:50:30+0019'
  WHERE carmake = 'chevy' 
    AND carmodel = 'nova' 
    AND carvin = cfe638e9-5cd9-43c2-b5f4-4cc9a0e6b0ff 
    AND bulbid = 9124f318-8253-4d94-b865-3be07899c8ff 
  IF status = 1 AND lastupdate < '2014-11-08 23:50:30+0019';

Hope it helps.