Hot questions for Using Cassandra in resultset

Question:

How should I check for an empty resultset using datastax java cassandra driver?

Suppose I'm executing the following query "SELECT * FROM my_table WHERE mykey=something"

there is a great chance that the query will not be matched. The following code does not work:

if (rs != null) 
   rs.one().getString("some_column");

Answer:

You were pretty close, the correct solution is:

Row r = rs.one();
if (r != null)
    r.getString("some_column");

The driver will always return a result set, whether or not there were any returned results. The documentation for one() states that if no rows were returned rs.one() returns null.

You can also use getAvailableWithoutFetching() which returns the number of rows in the result set without fetching more rows. Since pageSize has to be >= 1, you can be assured if there are is at least 1 row this will always return a value greater than 0.

Question:

I'm trying to get the number of key value pairs in a Cassandra column family. Following is the code I used.

PreparedStatement statement = client.session
            .prepare("select count(*) from corpus.word_usage");
ResultSet results = client.session.execute(statement.bind());   
Row row = results.one();
System.out.println(row.getVarint(0));

But when I ran this code, I'm getting following exception.

Exception in thread "main" com.datastax.driver.core.exceptions.InvalidTypeException: Column count is of type bigint
   at com.datastax.driver.core.ColumnDefinitions.checkType(ColumnDefinitions.java:291)
   at com.datastax.driver.core.ArrayBackedRow.getVarint(ArrayBackedRow.java:185)
   at SimpleClient.main(SimpleClient.java:57)

According to datastax documentation (http://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/Row.html) getVarint should return a BigInteger. So why I am getting a exception here? What an I doing wrong?


Answer:

As specified here, you can get value as a long, instead.

I couldn't test it but could you try this:

PreparedStatement statement = client.session.prepare("select count(*) from corpus.word_usage");
ResultSet results = client.session.execute(statement.bind());   
Row row = results.one();
long expected = row.getLong("count");

Question:

I'm doing bunch of bulk deletes in my application. I want to submit the query using using Session.executeAsync() and forget the returned ResultSetFuture without explicitly calling get() method to return the result. will this cause memory leak? I have seen the same question has been asked about java ExecutorService which seems to be solved in java 7 and 8. The same rule applies for Cassandra driver? I'm using java 8 with Cassandra driver 2.1.3.


Answer:

You will not hit any memory leaks doing that. That said, since your calling something asynchronously if you call it a lot (lot) with no throttling you could very well run out of memory by producing things faster then they can be processed.

Question:

I have a Cassandra table like this:

create table Engine (
    primayval text,
    Dataval  map<text,<map<text, double>>,
    PRIMARY KEY (tradeddate)
);

How can I retrieve this in a Java nested map Map<String,Map<String,Double>> using ResultSet-> getMap() without JSON conversion?


Answer:

turned out to be quite straightforward :

    Map<String, Map<String,Double>> DataVal;
    Row rw = resultSet.one();
    DataVal=(rw.getMap("DataVal", TypeToken.of(String.class),new TypeToken<Map<String,Double>>() {}));

Question:

How can I skip first few rows in the resultset in cqlsh? I know I can iterate the resultset and ignore first few rows, but I am looking to do it in the query itself.

Following queries work in SQL, but what is their equivalent in cqlsh

      SELECT * FROM foo LIMIT 10, 50
      SELECT column FROM table LIMIT 10 OFFSET 10

I looked up QueryBuilder (and related classes in DataStax) and didnot find anything there. Thanks


Answer:

As far as I know, CQL does not currently include support for a starting offset. The LIMIT clause only controls an upper bound and not a starting offset.

See the documentation here.

Probably your best bet would be to use tail and run cqlsh from bash like this:

cqlsh -e "SELECT ... LIMIT 10;" | tail -n+9

The value of 9 would skip the first 5 lines since there is a header for the column names you need to skip also.

If you are writing in java then of course you'd have other programmatic options.

Question:

I am using SpringBoot connceted with Hibernate and Cassandra Database. I made couple of methods using ResultSet and everything works perfect till now. I create another method, create query and then ResultSet.

String queryString = query.toString().replace("?", dayList.toString());
ResultSet rS = dataSource.executeQuery(queryString);

It throws me:

com.datastax.driver.core.exceptions.InvalidQueryException: No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename

Query is correct. When I execute query in database it returns me proper data.

It is wierd because I use same implementation in previous method and it works.

Here is my query:

SELECT * FROM object_action_statistics WHERE day IN ('2018-04-29','2018-04-30') AND action_id=14 AND timestamp_from>=1525099500073 AND timestamp_from<1525120897000 ALLOW FILTERING

Answer:

Correct query should be like this: SELECT * FROM KEYSPACE_NAME.object_action_statistics WHERE day IN ('2018-04-29','2018-04-30') AND action_id=14 AND timestamp_from>=1525099500073 AND timestamp_from<1525120897000 ALLOW FILTERING I guess you forgot to put keyspace name ahead of table name.

Question:

I'm using Datastax 4.2 and Cassandra 3.11.4. I have 3 nodes deployed and am trying to execute query, but ResultSet doesn't return the value. However, in datastax version 3.7.2 it returns the value. I think they have changed the output format, but I can't figure it out.

CqlSession sessionOne = CqlSession.builder().addContactPoint(addrSocOne).withLocalDatacenter("us-east-2").withKeyspace("test").build();
String query = "select id FROM samplequeue";
ResultSet rs = sessionOne.execute(query);
System.out.println(rs);
sessionOne.close();

The output is : com.datastax.oss.driver.internal.core.cql.SinglePageResultSet

It does not contain the value of id, whereas in the previous version it did. Can anyone help me resolve the op? I need the value of id. It works fine from backend. I have already tried looping the resultset with row, but op is same.


Answer:

Can you try something like this (Assuming it's an INT - otherwise choose your datatype)?:

for (Row row : rs) {
                 System.out.println("Item: " +  row.getInt("item"));
         }

Question:

I am using DataStax API to read records from Cassandra database,Some times execute method in Session is showing strange behavior. Sometimes it results empty result set, sometimes it returns correct result set.

Here is my code

    //Create session instance, using Singleton pattern
          public synchronized static Static getSession(){
          if(session !=null){
   //Not sending all the clusters

             Cluster cluster = Cluster.builder().withPort(myPort).addClusterPoints(clusters).withCredentials("username","password").
                withSocketOptions(new SocketOptions().setReadTimeoutMillis(30000).setConnectTimeoutMillis(30000)).build();
           session = cluster.connect("database");
           }
           else{
             return session;
           }
        }

        //get the session and execute query and return resultset
        public void executeQuery(){

           Session session = getSession();
           BoundStatement boundStatement = ..
           ResultSet result = session.execute(boundStatement);
           System.out.println(result.isExhausted); // true
           System.out.println(result.isFullyFetched); // true

           System.out.println(result.all().size()); /// **0 sometimes, correct count sometimes**

        }

I am not sending all clusters in addClusterPoints due to some reasons. Does it create any problem? But i am getting data some times.


Answer:

This may depend on your replication factor (RF) & the state of your cluster. If you have RF > 1, then default TokenAware/DCAware policy will try to fetch data from any of nodes that contain replica. By default, query is executed with consistency level LOCAL_ONE - this means that answer from one replica is enough, and in that case it could be possible that replica that is contacted, doesn't have data that you need.

This happens if one of the replicas was offline for time longer than the size of time window for hints (3 hours by default), and that repair wasn't performed afterwards. To mitigate the problem, run the nodetool repair (precise recommendation depends on your version of DSE - if you're using it).

Question:

I have a question regarding the heap size to have while executing a select query with billions rows.

I use jdbc with prepared statement and a fetch size of 1000 rows.

The code below illustrate my question :

ResultSet rs = ...
for (Row r : rs) {
    // If the result is not fully fetched
    if (rs.getAvailableWithoutFetching() == FETCH_SIZE && !rs.isFullyFetched()) {
        LOGGER.info("Load " + FETCH_SIZE + " more rows");
        rs.fetchMoreResults(); 
    }

    ...
}

Does the java load the billions rows or FETCH_SIZE rows by FETCH_SIZE rows ?


Answer:

Assuming you are using Datastax driver, from documentation for setFetchSize:

The fetch size controls how much resulting rows will be retrieved simultaneously (the goal being to avoid loading too much results in memory for queries yielding large results). Please note that while value as low as 1 can be used, it is highly discouraged to use such a low value in practice as it will yield very poor performance. If in doubt, leaving the default is probably a good idea.

Only SELECT queries only ever make use of that setting.

Note: Paging is not supported with the native protocol version 1. If you call this method with fetchSize > 0 and fetchSize != Integer.MAX_VALUE and the protocol version is in use (i.e. if you've force version 1 through Cluster.Builder.withProtocolVersion(int) or you use Cassandra 1.2), you will get UnsupportedProtocolVersionException when submitting this statement for execution

So it does not keep previously fetched results in the memory but you have to be careful not to have any references in your code to those results in order for it to be garbage collected. Also read the documentation for fetchMoreResults - it might not work the way you are expecting.

Question:

I tried implementing async query using java-driver-async-queries. I am modifying a List within the FutureCallback but seems its not working -

List<Product> products = new ArrayList<Product>();

for (// iterating over a Map) {
    key = entry.getKey();
    String query = "SELECT id,desc,category FROM products where id=?";
    ResultSetFuture future = session.executeAsync(query, key);
    Futures.addCallback(future,
        new FutureCallback<ResultSet>() {
            @Override public void onSuccess(ResultSet result) {
                Row row = result.one();
                if (row != null) {
                    Product product = new Product();
                    product.setId(row.getString("id"));
                    product.setDesc(row.getString("desc"));
                    product.setCategory(row.getString("category"));

                    products.add(product);
                }
            }

            @Override public void onFailure(Throwable t) {
                // log error
            }
        },
        MoreExecutors.sameThreadExecutor()
    );
}

System.out.println("Product List : " + products); // Not printing correct values. Sometimes print blank

Is there any other way?

Based on Mikhail Baksheev answer I implemented and now getting proper result. Just a twist. There is some extra logic i need to implement. I am wondering if I can use List<MyClass> instead of List<ResultSetFuture> and MyClass as -

public class MyClass {

    private Integer         productCount;
    private Integer         stockCount;
    private ResultSetFuture result;
}

Then while iterating set FutureList as -

ResultSetFuture result = session.executeAsync(query, key.get());
MyClass allResult = new MyClass();
allResult.setInCount(inCount);
allResult.setResult(result);
allResult.setSohCount(values.size() - inCount);

futuresList.add(allResult);

Answer:

As @RussS mentioned, the code is not waiting all futures are completed.

There are many ways to synchronize async code. For example, using CountDownLatch:

EDIT: Also please use separte thread for callbacks and use concurrent collection for products.

ConcurrentLinkedQueue<Product> products = new ConcurrentLinkedQueue<Product>();
final Executor callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch doneSignal = new CountDownLatch(/*the Map size*/);
for (// iterating over a Map) {
    key = entry.getKey();
    String query = "SELECT id,desc,category FROM products where id=?";
    ResultSetFuture future = session.executeAsync(query, key);
    Futures.addCallback(future,
        new FutureCallback<ResultSet>() {
            @Override public void onSuccess(ResultSet result) {
                Row row = result.one();
                if (row != null) {
                    Product product = new Product();
                    product.setId(row.getString("id"));
                    product.setDesc(row.getString("desc"));
                    product.setCategory(row.getString("category"));

                    products.add(product);
                }
                doneSignal.countDown();

            }

            @Override public void onFailure(Throwable t) {
                // log error
                doneSignal.countDown();
            }
        },
        callbackExecutor
    );
}

doneSignal.await();           // wait for all async requests to finish
System.out.println("Product List : " + products); 

Another way is to collect all futures in a list and wait all results as a single future with guava's Futures.allAsList, e.g:

List<ResultSetFuture> futuresList = new ArrayList<>( /*Map size*/);
        for (/* iterating over a Map*/) {
            key = entry.getKey();
            String query = "SELECT id,desc,category FROM products where id=?";
            futuresList.add( session.executeAsync( query, key ) );
        }

        ListenableFuture<List<ResultSet>> allFuturesResult = Futures.allAsList( futuresList );
        List<Product> products = new ArrayList<>();
        try {
            final List<ResultSet> resultSets = allFuturesResult.get();
            for ( ResultSet rs : resultSets ) {
                if ( null != rs ) {
                    Row row = rs.one();
                    if (row != null) {
                        Product product = new Product();
                        product.setId(row.getString("id"));
                        product.setDesc(row.getString("desc"));
                        product.setCategory(row.getString("category"));

                        products.add(product);
                    }
                }
            }
        } catch ( InterruptedException | ExecutionException e ) {
            System.out.println(e);
        }
        System.out.println("Product List : " + products);

EDIT 2

I am wondering if I can use List instead of List and MyClass as

Technically yes, but you can't pass List<MyClass> in Futures.allAsList in this case or MyClass should implement ListenableFuture interface