Hot questions for Using Cassandra in prepared statement

Top Java Programmings / Cassandra / prepared statement

Question:

I need to query one of the tables in Cassandra using Datastax Java driver. Below is the code I have which works fine -

public class TestCassandra {

        private Session session = null;
        private Cluster cluster = null;

        private static class ConnectionHolder {
            static final TestCassandra connection = new TestCassandra();
        }

        public static TestCassandra getInstance() {
            return ConnectionHolder.connection;
        }

        private TestCassandra() {
            Builder builder = Cluster.builder();
            builder.addContactPoints("127.0.0.1");

            PoolingOptions opts = new PoolingOptions();
            opts.setCoreConnectionsPerHost(HostDistance.LOCAL, opts.getCoreConnectionsPerHost(HostDistance.LOCAL));

            cluster = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE).withPoolingOptions(opts)
                    .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("DC2")))
                    .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                    .build();
            session = cluster.connect();
        }

    private Set<String> getRandomUsers() {
        Set<String> userList = new HashSet<String>();

        for (int table = 0; table < 14; table++) {
            String sql = "select * from testkeyspace.test_table_" + table + ";";

            try {
                SimpleStatement query = new SimpleStatement(sql);
                query.setConsistencyLevel(ConsistencyLevel.QUORUM);
                ResultSet res = session.execute(query);

                Iterator<Row> rows = res.iterator();
                while (rows.hasNext()) {
                    Row r = rows.next();

                    String user_id = r.getString("user_id");
                    userList.add(user_id);
                }
            } catch (Exception e) {
                System.out.println("error= " + ExceptionUtils.getStackTrace(e));
            }
        }

        return userList;
    }
}

I am using above class like this in my main application -

TestCassandra.getInstance().getRandomUsers();

Is there any way I can use PreparedStatement in getRandomUsers efficiently? I guess I need to make sure that I am creating PreparedStatement only once instead of creating it multiple times. What is the best design for that in my current architecture and how can I use it?


Answer:

You can create a cache (this is a fairly basic example to give you an idea) of the statements you need. Lets start by creating the class that will be used as a cache.

private class StatementCache {
    Map<String, PreparedStatement> statementCache = new HashMap<>();
    public BoundStatement getStatement(String cql) {
        PreparedStatement ps = statementCache.get(cql);
        // no statement cached, create one and cache it now.
        if (ps == null) {
            ps = session.prepare(cql);
            statementCache.put(cql, ps);
        }
        return ps.bind();
    }
}

Then add an instance to your singleton:

public class TestCassandra {
    private Session session = null;
    private Cluster cluster = null;
    private StatementCache psCache = new StatementCache();
    // rest of class...

And finally use the cache from your function:

private Set<String> getRandomUsers(PreparedStatement ps) {
// lots of code.    
        try {
            SimpleStatement query = new SimpleStatement(sql);
            query.setConsistencyLevel(ConsistencyLevel.QUORUM);
            // abstract the handling of the cache to it's own class.
            // this will need some work to make sure it's thread safe
            // as currently it's not.
            ResultSet res = session.execute(psCache.getStatement(sql));

Question:

I have a table that is roughly like

create table mytable (
   id uuid,
   something text,
   primary key (id)
);

I'm trying to create a prepared statement that has a bound in-clause:

PreparedStatement ps = session.prepare("select * from mytable where id IN (?)");
...
UUID[] ids = { uuid1, uuid2, uuid3} ;

No matter how I express the ids to bind, the java driver rejects them.

ps.bind( /*as array*/): driver complains statement has only one value, 2 supplied

ps.bind( /*as comma separated string list of uuids*/): driver complains it wants UUID.class objects, not strings

ps.bind( /*as list object*/): driver complains it wants UUID.class objs, not a List.class object

I really hope the driver isn't expecting as many ? place holders as there are values in the in-list, because that means you'd have to reprepare the statement every time you wanted to execute it, which the Datastax docs says not to do!

I looked at the com.datastax.driver.core.BoundStatement.bind() method and there is no indication that anything else would work - no magic wrappers or anything.

Is there a way to do this?


Answer:

The correct syntax is SELECT * FROM mytable WHERE id IN ? (without parens around the ?). This will allow you to pass a list of UUIDs to bind to that single query parameter.

Question:

Backgound: we are using Cassandra to store some time series data and we are using prepared statements to access data.

We are partitioning data in tables by:

  • time period (like one week or one month) and
  • retention policy (like 1 year, 5 or 10 years)

Having different tables we need to prepare (only upon usage) a different statement for every combination of query, time period and retention policy, so we will have an explosion in number of prepared statements. Some math:

timePeriods = 12..52 * yearsOfData
maxNumOfPrepStatements = timePeriods * policies * numOfQueries

ourCase => (20 * 10 y) * 10 p * 10 q = 20.000 prep statements

On client side I can keep in cache only the most used PS, but I could not find a way to remove the unused ones from the server, so I am worried that having about 20.000 prepared statements could be a big cost for every node.

Problem: will this number of PS cause any problem on the server?

This breaks into smaller questions:

  • How much will be the server side cost of those prepared statements?
  • Will the server keep all the PS or will it remove the less used ones?
  • Is there a better solution than restarting Cassandra nodes to clean the PS cache?
  • using the Java client, will closing the Session / Cluster object alleviate this (server side)?

Answer:

How much will be the server side cost of those prepared statements?

Each prepared statement will be parsed and further stored in a cache using it's MD5 digest as key. Identical prepare statements, that the client is about the re-register, will cause the server to match the MD5 digest against already existing statements and should therefor be avoided. Executing already registered statements will have the client send the MD5 along with the query arguments to the server and the server is able to retrieve the cached statement using the MD5, which is faster to execute compared to parsing a regular CQL statement. Each cached statement will also consume part of the Java heap which corresponds to the total size of the MD5 key and representation of the statement object.

Will the server keep all the PS or will it remove the less used ones?

Prepared statements are managed by the server by creating a cache based on ConcurrentLinkedHashMap. The cache's capacity depends on the available memory: Runtime.getRuntime().maxMemory() / 256. Entries are weighted by their memory usage as well and large statements will be evicted first from the cache in case the capacity has been reached. You can monitor this behavior using the org.apache.cassandra.metrics.CQL.PreparedStatementsEvicted JMX metric.

Is there a better solution than restarting Cassandra nodes to clean the PS cache?

Not that I'm aware of. I'm also not really sure why you'd like to do that as identical MD5 digests will be created for identical queries. Please also notice that the Java client will automatically re-register prepared statements that cannot be found on the server, e.g. in case it has been evicted from the cache (see also this answer).

using the Java client, will closing the Session / Cluster object alleviate this (server side)?

I don't think so. The server would have to keep track of which statements have been registered by the hundred of potential clients in order to clean them up safely.

Question:

This doc guides how to use Cassandra prepared and bound statements.

It says:

You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). ... BoundStatement is not thread-safe. You can reuse an instance multiple times with different parameters, but only from a single thread and only if you use synchronous calls:

BoundStatement bound = ps1.bind();

// This is safe:
bound.setString("sku", "324378");
session.execute(bound);

bound.setString("sku", "324379");
session.execute(bound);

// This is NOT SAFE. executeAsync runs concurrently with your code, so the first execution might actually read the
// values after the second setString call, and you would insert 324381 twice:
bound.setString("sku", "324380");
session.executeAsync(bound);

bound.setString("sku", "324381");
session.executeAsync(bound);

It's clear that above is not thread safe, but if we change the code this way:

BoundStatement bound1 = ps1.bind();
BoundStatement bound2 = ps1.bind();

bound1.setString("sku", "324380");
session.executeAsync(bound1);

bound2.setString("sku", "324381");
session.executeAsync(bound2);

That is: Use common PreparedStatement for several threads and every thread uses its own BoundStatement.

1) Is this thread safe?

2) Is this otherwise recommended way to go for parallel execution with prepared statements? Or are BoundStatements expensive / slow to create / consume lots of memory etc reasons to keep the number of them low?


Answer:

Short answer is that if you are thinking to use the same PreparedStatement object multiple times but with different parameters each time bounded using different BoundStatement object then it is thread safe because PreparedStatement is thread safe so you can resuse it multiple threads and BoundStatement is not thread safe so you are having different object each time.

Just to be clear - so, your thread 1 will create your prepare statement using ps1 = session.prepare("insert into product (sku, description) values (?, ?)"); and all other threads will use this ps1 object to create their own BoundStatement object because each will have its own values to be passed, for example:

Thread 1 will bind and execute as (notice that using same ps1 object):

BoundStatement bound = ps1.bind().setString("sku", "001").setString("description", "LCD screen"); 
session.execute(bound);

Thread 2 will bind and execute as (notice that using same ps1 object):

BoundStatement bound = ps1.bind().setString("sku", "002").setString("description", "TFT screen"); 
session.execute(bound);

Thread 3 will bind and execute as (notice that using same ps1 object):

BoundStatement bound = ps1.bind().setString("sku", "003").setString("description", "LED screen"); 
session.execute(bound);

In nutshell: Major performance cost is incurred while creating the PreparedStatement object because it take a round trip to DB server (see below depiction), so you reuse the same and it is thread safe, while you create a separate BoundStatement each time because it is not thread safe and also it is not a heavy object to create and do not take a round trip to DB server.

Question:

I have a Java list of 100,000 names that I'd like to ingest into a 3 node Cassandra cluster that is running Datastax Enterprise 5.1 with Cassandra 3.10.0

My code ingests but it takes a looooong time. I ran a stress test on the cluster and was able to do over 25,000 writes per second. With my ingest code I am getting a terrible performace of around 200/second.

My Java List has 100,000 names in it and is called myList. I use the following prepared statement and session execution to ingest the data.

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");

         int id = 0;

         for(int i = 0; i < myList.size(); i++) {
             id += 1;
             session.execute(prepared.bind(id, myList.get(i)));
        }

I added a cluster monitor to my code to see what was going on. Here is my monitoring code.

    /// Monitoring Status of Cluster
    final LoadBalancingPolicy loadBalancingPolicy =
    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
    ScheduledExecutorService scheduled =
    Executors.newScheduledThreadPool(1);
        scheduled.scheduleAtFixedRate(() -> {
            Session.State state = session.getState();
            state.getConnectedHosts().forEach((host) -> {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                System.out.printf("%s connections=%d, current load=%d, maxload=%d%n",
                        host, connections, inFlightQueries,
                        connections *
                                poolingOptions.getMaxRequestsPerConnection(distance));
            });
    }, 5, 5, TimeUnit.SECONDS); 

The monitoring 5 second output shows the following for 3 iterations:

/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=0, maxload=32768
/192.168.20.26:9042 connections=1, current load=1, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768

It doesn't appear that I am very effectively utilizing my cluster. I'm not sure what I am doing wrong and would greatly appreciate any tips.

Thank you!


Answer:

Use executeAsync.

Executes the provided query asynchronously. This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the ResultSetFuture.

You are inserting huge amount of data. If you use executeAsync and your cluster could not handle such amount of data, it can throw exception. You can limit executeAsync with Semaphore.

Example :

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");

int numberOfConcurrentQueries = 100;
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries);

int id = 0;    

for(int i = 0; i < myList.size(); i++) {
    try {
        id += 1;
        semaphore.acquire();
        ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i)));
        Futures.addCallback(future, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(ResultSet result) {
                semaphore.release();
            }

            @Override
            public void onFailure(Throwable t) {
                semaphore.release();
            }
        });
    } catch (Exception e) {
        semaphore.release();
        e.printStackTrace();
    }
}

Source : https://stackoverflow.com/a/30526719/2320144 https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/Session.html#executeAsync-com.datastax.driver.core.Statement-

Question:

I need Prepared Statement / instead of Query Builder using Cassandra Operations Interface and session Any Example or recent docs. for Cassandra using java


Answer:

See this to check how to use prepared statment while using java datastax driver.

However i would recommend to store all preparedstatments in a cache (for example a map) while application initializes and reuse the same whenever requreid by creating boundstatment out of it. For example :

//Here Key is query string 
 private static final Map<String, PreparedStatement> psMap = new ConcurrentHashMap<String, PreparedStatement>();

 //Will be invoked @ initialization 
 public void init(Session session) {
        this.session = session;
        for (QuerySetEnum cql : QuerySetEnum.values()) {

            psMap.put(cql.getStatement(), session.prepare(cql.getStatement()));
        }


        //In Dao Impl class 
        //Get bounded statment + execute by passing the value
         @Override
    public void decreaseStats(long size, long count, String mapname,
            int bucketId) {
        BoundStatement boundStatement = getBoundStatement(QuerySetEnum.DECREASE_STATS);
        metaTemplate.execute(boundStatement.bind(size, count, mapname,
                bucketId));

    }
//Below is the implementation how to get BoundStatement out to prepared statment cache
    private BoundStatement getBoundStatement(QuerySetEnum query) {
        PreparedStatement preparedStatement = queryPool
                .getPreparedStatement(query);
        BoundStatement boundStatement = new BoundStatement(preparedStatement);
        return boundStatement;
    }

Question:

I am working with Datastax Java driver to read and write data into Cassandra. I am using datastax java driver 3.1.0 and my cassandra cluster version is 2.0.10.

I have created below two methods to execute my cql query.

I am calling first method when I don't need to set any value in my cql query so it works for cql string like below:

select * from testkeyspace.testtable where row_id=1 // cql-1
select * from testkeyspace.meta where row_id=1 // cql-2
select * from testkeyspace.proc where row_id=1 // cql-3

Now I need to call second method whenever I have query like this:

select * from testkeyspace.testtabletwo where row_id=1 and active=? // cql-4
select * from testkeyspace.storage where topic=? and parts=? // cql-5, in this part is Integer and topic is String.

So my question is how can I make my second method more generic so that if I need to set n number of values in my cql query using BoundStatement, it should work? Right now I am not sure what I am supposed to do for cql-4 and cql-5 while calling my second method with values passed to that method? I know I have to set those values using BoundStatement but if I need to set two or three or four value, how can I make that method generic so that I don't need to hardcode anything? Some can be Integer and some can be String.

First Method:-

  public ResultSet executeQuery(final String cql) {
    return executeWithSession(new SessionCallable<ResultSet>() {
      @Override
      public ResultSet executeWithSession(Session session) {
        BoundStatement bs = getStatement(cql);
        bs.setConsistencyLevel(consistencyLevel);
        return session.execute(bs);
      }
    });
  }

Second Method:-

  public ResultSet executeQuery(final String cql, final Object... values) {
    return executeWithSession(new SessionCallable<ResultSet>() {
      @Override
      public ResultSet executeWithSession(Session session) {
        BoundStatement bs = getStatement(cql);
        bs.setConsistencyLevel(consistencyLevel);
        // how to set these **values** into my cql query using BoundStatement in a generic way 
        // so that I don't need to hardcode anything for cql-4 and cql-5
        return session.execute(cql, values);
      }
    });
  }

  // cache the prepared statement
  private BoundStatement getStatement(final String cql) {
    Session session = getSession();
    PreparedStatement ps = cache.get(cql);
    // no statement cached, create one and cache it now.
    if (ps == null) {
      ps = session.prepare(cql);
      PreparedStatement old = cache.putIfAbsent(cql, ps);
      if (old != null)
        ps = old;
    }
    return ps.bind();
  }

Do I have to use BoundStatement here at all? I guess I have to because that's how I can set consistency level?


Answer:

How about that:

public ResultSet executeQuery(final String cql, final Object... values) {
    return executeWithSession(new SessionCallable<ResultSet>() {
        @Override
        public ResultSet executeWithSession(Session session) {
            BoundStatement bs = getStatement(cql, values);
            bs.setConsistencyLevel(consistencyLevel);
            return session.execute(cql);
        }
    });
}

// cache the prepared statement
private BoundStatement getStatement(final String cql, Object... values) {
    Session session = getSession();
    PreparedStatement ps = cache.get(cql);
    // no statement cached, create one and cache it now.
    if (ps == null) {
        ps = session.prepare(cql);
        PreparedStatement old = cache.putIfAbsent(cql, ps);
        if (old != null)
            ps = old;
    }
    return ps.bind(values);
}

Question:

I am using PreparedStatement and boundStatment to execute some Cassandra queries. I am trying to execute the range query. This is the query I want to create:

getAllRecordsOnIndexRange = getSession.prepare(QueryBuilder.select(documentId, documentName, documentIndex)
          .from(tableName)               
          .where(QueryBuilder.eq(documentId,QueryBuilder.bindMarker()))
          .and(QueryBuilder.gte(documentIndex, QueryBuilder.bindMarker()))
          .and(QueryBuilder.lte(documentIndex, QueryBuilder.bindMarker())));

'documentId' is partition key and 'documentIndex' is clustering key. I want to have range query on column documentIndex like "get me all records with given documentId and documentIndex >= 3 and documentIndex <= 10"

When I want to run the query, I call

public Statement buildGetAllRecordsOnIndexRange(int documentId, String documentName, int startDocumentIndex, int endDocumentIndex)
{
    BoundStatement boundStatement = getAllRecordsOnIndexRange
            .bind()
            .setString(DOCUMENT_ID, documentId)
            //how to set start and end documentIndex
    databaseManager.applyReadStatementsConfiguration(boundStatement);
    return boundStatement;
}

How can I set the startDocumentIndex and endDocumentIndex for the above query?


Answer:

I would recommend to use named bind markers instead of unnamed - it's much easier to read code that use them. So in your case the code will look as following:

PreparedStatement pStatement = session.prepare(QueryBuilder.select(documentId, documentName, documentIndex)
      .from(tableName)               
      .where(QueryBuilder.eq(documentId,QueryBuilder.bindMarker("documentId")))
      .and(QueryBuilder.gte(documentIndex, QueryBuilder.bindMarker("documentIndexStart")))
      .and(QueryBuilder.lte(documentIndex, QueryBuilder.bindMarker("documentIndexEnd"))));

And then you can bind the by name:

BoundStatement stmt = pStatement.bind()
       .setString("documentId", startDocumentIndex)
       .setInt("documentIndexStart", startDocumentIndex)
       .setInt("documentIndexEnd", endDocumentIndex);