Hot questions for Using Cassandra in connection

Question:

I want to use Docker to start my application and Cassandra database, and I would like to use Docker Compose for that. Unfortunately, Cassandra starts much slower than my application, and since my application eagerly initializes the Cluster object, I get the following exception:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra/172.18.0.2:9042 (com.datastax.driver.core.exceptions.TransportException: [cassandra/172.18.0.2:9042] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1454)
    at com.datastax.driver.core.Cluster.init(Cluster.java:163)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:334)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:309)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:251)

According to the stacktrace and a little debugging, it seems that Cassandra Java driver does not apply retry policies to the initial startup. This seems kinda weird to me. Is there a way for me to configure the driver so it will continue its attempts to connect to the server until it succeeds?


Answer:

You should be able to write some try/catch logic on the NoHostAvailableException to retry the connection after a 5-10 second wait. I would recommend only doing this a few times before throwing the exception after a certain time period where you know that it should have started by that point.

Example pseudocode

Connection makeCassandraConnection(int retryCount) {
    Exception lastException = new IllegalStateException();
    while (retryCount > 0) {
        try {
            return doConnectionStuff();
        } catch (NoHostAvailableException e) {
            lastException = e;
            retryCount--;
            Thread.sleep(TimeUnit.SECONDS.toMillis(5));
        }
    }
    throw lastException;
}

Question:

my team is moving from using Astyanax driver (which is deprecated soon if not already) to using Datastax 3.0 driver.

Our code implements Astyanax's ConnectionPoolMonitor class and we capture about 22 different metrics on our connection pool usage.

I am trying to find an equivalent way to do this with Datastax driver. But all I could find is this: https://datastax.github.io/java-driver/manual/pooling/#monitoring-and-tuning-the-pool

Basically, the example above shows how you can run a background thread that continuously polls Session.State. This seems rather awkward. Astyanax does callbacks to the classes that implement ConnectionPoolMonitor.

And the amount of info exposed in Session.State is rather limited: connected hosts, inflight queries, open connections, and trashed connections.

Is there a better option out there that I haven't found somehow? How can I capture metrics such as these:

  • count of when Pool is exhausted, got connection timeout, socket timeout, got not hosts
  • count of connection created, closed, borrowed, returned, creation error
  • count of host added, removed, down, reactivated/reconnected
  • count of exception unknown error, bad request, interrupted, transport error

Answer:

Try cluster.getMetrics() and read this Java doc: http://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/Metrics.html

Question:

I am trying to bulk load a 4 node Cassandra 3.0.10 cluster with some data. I've successfully generated the SStables following the documentation, however it seems like I cannot get the sstableloader load them.

I get the following java.net.ConnectException: Connection refused

bin/sstableloader -v -d localhost test-data/output/si_test/messages/
Established connection to initial hosts
Opening sstables and calculating sections to stream
Streaming relevant part of /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-1-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-10-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-11-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-12-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-13-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-14-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-15-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-16-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-17-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-18-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-19-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-2-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-20-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-21-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-22-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-23-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-24-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-25-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-26-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-27-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-28-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-29-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-3-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-30-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-31-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-32-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-33-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-34-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-35-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-36-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-37-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-38-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-39-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-4-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-40-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-41-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-42-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-43-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-44-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-45-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-46-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-47-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-48-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-49-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-5-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-50-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-51-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-52-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-53-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-54-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-55-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-56-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-57-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-58-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-59-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-6-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-60-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-61-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-7-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-8-big-Data.db /home/ubuntu/cassandra/apache-cassandra-3.0.10/test-data/output/si_test/messages/mc-9-big-Data.db to [localhost/127.0.0.1]
ERROR 14:46:24 [Stream #3d0c24e0-cc43-11e6-8c9f-615437259231] Streaming error occurred
java.net.ConnectException: Connection refused
    at sun.nio.ch.Net.connect0(Native Method) ~[na:1.8.0_111]
    at sun.nio.ch.Net.connect(Net.java:454) ~[na:1.8.0_111]
    at sun.nio.ch.Net.connect(Net.java:446) ~[na:1.8.0_111]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) ~[na:1.8.0_111]
    at java.nio.channels.SocketChannel.open(SocketChannel.java:189) ~[na:1.8.0_111]
    at org.apache.cassandra.tools.BulkLoadConnectionFactory.createConnection(BulkLoadConnectionFactory.java:60) ~[apache-cassandra-3.0.10.jar:3.0.10]
    at org.apache.cassandra.streaming.StreamSession.createConnection(StreamSession.java:255) ~[apache-cassandra-3.0.10.jar:3.0.10]
    at org.apache.cassandra.streaming.ConnectionHandler.initiate(ConnectionHandler.java:84) ~[apache-cassandra-3.0.10.jar:3.0.10]
    at org.apache.cassandra.streaming.StreamSession.start(StreamSession.java:242) ~[apache-cassandra-3.0.10.jar:3.0.10]
    at org.apache.cassandra.streaming.StreamCoordinator$StreamSessionConnector.run(StreamCoordinator.java:212) [apache-cassandra-3.0.10.jar:3.0.10]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
progress: total: 100% 0  MB/s(avg: 0 MB/s)WARN  14:46:24 [Stream #3d0c24e0-cc43-11e6-8c9f-615437259231] Stream failed
Streaming to the following hosts failed:
[localhost/127.0.0.1]
java.util.concurrent.ExecutionException: org.apache.cassandra.streaming.StreamException: Stream failed
    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at org.apache.cassandra.tools.BulkLoader.main(BulkLoader.java:120)
Caused by: org.apache.cassandra.streaming.StreamException: Stream failed
    at org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1310)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
    at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
    at org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:211)
    at org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:187)
    at org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:440)
    at org.apache.cassandra.streaming.StreamSession.onError(StreamSession.java:540)
    at org.apache.cassandra.streaming.StreamSession.start(StreamSession.java:248)
    at org.apache.cassandra.streaming.StreamCoordinator$StreamSessionConnector.run(StreamCoordinator.java:212)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

The utility seems to connect to the cluster (...Established connection to initial hosts..), however I does not stream the data.

What I've tried so far to debug the issue:

  • Dropped the target keyspace (and got a different error), created again via cqlsh

  • I can telnet to each node of the cluster through ports 9042 and 7000

  • Enabled thrift using nodetool enablethrift

EDIT

That's the output of netstat -an | grep 7000. The nodes have only one network interface and Cassandra is listening to it. It has also established connections with all the other nodes on port 7000.

tcp        0      0 172.31.3.88:7000        0.0.0.0:*               LISTEN     
tcp        0      0 172.31.3.88:7000        172.31.3.86:54348       ESTABLISHED
tcp        0      0 172.31.3.88:7000        172.31.3.87:60661       ESTABLISHED
tcp        0      0 172.31.3.88:53061       172.31.3.87:7000        ESTABLISHED
tcp        0      0 172.31.3.88:7000        172.31.11.43:36984      ESTABLISHED
tcp        0      0 172.31.3.88:51412       172.31.11.43:7000       ESTABLISHED
tcp        0      0 172.31.3.88:54018       172.31.3.87:7000        ESTABLISHED
tcp        0      0 172.31.3.88:7000        172.31.3.87:40667       ESTABLISHED
tcp        0      0 172.31.3.88:34469       172.31.3.86:7000        ESTABLISHED
tcp        0      0 172.31.3.88:43658       172.31.3.86:7000        ESTABLISHED
tcp        0      0 172.31.3.88:7000        172.31.11.43:49487      ESTABLISHED
tcp        0      0 172.31.3.88:40798       172.31.11.43:7000       ESTABLISHED
tcp        0      0 172.31.3.88:7000        172.31.3.86:51537       ESTABLISHED

EDIT 2

Changing the initial host from 127.0.0.1 to the actual address of the node in the network results to a com.datastax.driver.core.exceptions.TransportException:

bin/sstableloader -v -d 172.31.3.88 test-data/output/si_test/messages/
All host(s) tried for query failed (tried: /172.31.3.88:9042 (com.datastax.driver.core.exceptions.TransportException: [/172.31.3.88] Cannot connect))
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /172.31.3.88:9042 (com.datastax.driver.core.exceptions.TransportException: [/172.31.3.88] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1424)
    at com.datastax.driver.core.Cluster.init(Cluster.java:163)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:334)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:309)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:251)
    at org.apache.cassandra.utils.NativeSSTableLoaderClient.init(NativeSSTableLoaderClient.java:70)
    at org.apache.cassandra.io.sstable.SSTableLoader.stream(SSTableLoader.java:159)
    at org.apache.cassandra.tools.BulkLoader.main(BulkLoader.java:104)

Any suggestion is appreciated.

Thanks


Answer:

Thats it trying to connect to the storage port (7000). Its most likely binding to a different interface than 127.0.0.1. You can check what its binding too with netstat -an | grep 7000. You may want to double check any firewall or iptable settings.

UPDATE: its not bound to 127.0.0.1 which is default but to 172.31.3.88. So call sstableloader -v -d 172.31.3.88 test-data/output/si_test/messages/

Also if you have ssl enabled (server_encryption_options in cassandra.yaml) you need to use 7001 and configure it to match. If you can telnet to 7000 its most likely not that though.

Worth noting that enabling thrift is not necessarily in 3.0.10. sstableloader no longer uses that (in older versions it was used to read the schema).

Question:

I have the following java code:

String serverIP = "127.0.0.1";  
String keyspace = "testkeyspace"; 
Cluster cluster = Cluster.builder().addContactPoints(serverIP).build();  
Session session = cluster.connect(keyspace);

which i am trying to use to check my connection to Cassandra.

I have Cassandra locally installed in my own machine, and connecting there is successful. My maven project is using the following

spring-data-cassandra 2.0.6.RELEASE

However, when i am trying to connect my java application to Cassandra run by docker (with my local cassandra stopped) i encounter the following error:

All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1:9042] Cannot connect))

I do not know if my IP is wrong, but on my docker CQLSH the following message is written whenever i open my CQLSH:

Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4] Use HELP for help.

Please help, i must have missed something here. Thank you!


Answer:

Just add my answer here, as it's been accepted as a comment in order to help others with the same issue.

You should expose your Cassandra Docker container port to localhost with:

docker run -p 9042:9042 cassandra:latest

Cassandra should now be accessible on localhost:9042

Question:

When using the Cassandra driver within a Java project, what is the best practice for managing connections? Specifically with respect to whether it's better practice to allow multiple threads to share a single Cluster instance or to allocate a separate Cluster instance for each thread that needs to talk to Cassandra.

I followed the example code and am setting up my Cluster instance like:

Cluster.builder().addContactPoint(HOST).withPort(PORT)
    .withCredentials(USER, PASS).build();

So what I'm asking is would the preferred approach be to do something like this (single shared Cluster instance):

private static Cluster _cluster = null;

public static Cluster connect() {
    if (_cluster != null && ! _cluster.isClosed()) {
        //return the cached instance
        return _cluster;
    }

    //create a new instance
    _cluster = Cluster.builder().addContactPoint(HOST).withPort(PORT)
                .withCredentials(USER, PASS).build();
    return _cluster;
}

...or is it best practice to return multiple Cluster instances? Like so:

public static Cluster connect() {
    //every caller gets their own Cluster instance
    return Cluster.builder().addContactPoint(HOST).withPort(PORT)
                .withCredentials(USER, PASS).build();
}

I guess the points at the core of this question are:

  • Is building a new Cluster instance an expensive operation?
  • Will the Cluster object internally manage/pool connections to the backing datastore, or does it function more like an abstraction of a single connection?
  • Is the Cluster object thread-safe?

Answer:

Is building a new Cluster instance an expensive operation?

Calling build to construct a Cluster instance does no networking IO, so it is a non-expensive operation.

Will the Cluster object internally manage/pool connections to the backing datastore, or does it function more like an abstraction of a single connection?

What is expensive is calling cluster.init() which creates a single connection (control connection) to one of your contact points. cluster.connect() is even more expensive since it inits the Cluster (if it hasn't been already) and creates a Session which manages a connection pool (with pool size based on your PoolingOptions) to each discovered host. So yes, a Cluster has a 'control connection' to manage the state of hosts and each Session created via Cluster.connect() will have a connection pool to each host.

Is the Cluster object thread-safe?

Put simply, yes :)

4 simple rules when using the DataStax drivers for Cassandra provides further guidance on this topic.

Question:

As far as I understand the documentation and what I read, a prepared statement is bound to the session. So the question goes, if I close the connection and recreate it or if the session gets killed (how is that even possible?) one has to recreate all prepared statements right?

What would be the Exception one gets if a PreparedStatement is not managed by the underlying session instance (anymore)?

[Update]

I have created my own simple statement class to represent a prepared statement since I create the statement on creation time just before a session exists. Additionally I use a map to map both on demand and I am ready to go... .


Answer:

For Cassandra Drivers, the standard way is to prepare a statement once and bind variables multiple times before executing.

You can refer to this post: http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra

I am sharing a code snippet that you can use. Session validation and preparing statement is something that you need to do at the beginning of your application execution, before starting mutations.

    if (session == null) {
        session = CassandraUtils.getInstance().getSession();
        psUsers = session.prepare("INSERT INTO users(xx, yy, zz, tt) VALUES (?, ?, ?, ?)");
        psProducts = session.prepare ("INSERT INTO products(aa, bb, cc, dd) VALUES (?, ?, ?, ?)");
    }

From the shared post above:

Use at most one Session per keyspace, or use a single Session and explicitely specify the keyspace in your queries

For the loss of connection part, driver handles it for you, and of course you can use different kind of reconnection policies: http://www.datastax.com/drivers/java/2.0/com/datastax/driver/core/policies/Policies.html

Question:

I have my Cassandra sink configured as shown below:

    ClusterBuilder secureCassandraSinkClusterBuilder = new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoints(props.getCassandraClusterUrlAll().split(","))
                    .withPort(props.getCassandraPort())
                    .withAuthProvider(new DseGSSAPIAuthProvider("HTTP"))
                    .build();
    };

    CassandraSink
            .addSink(cassandraObjectStream)
            .setClusterBuilder(secureCassandraSinkClusterBuilder)
            .build()
            .name("Cassandra-Sink");

Now when the connection to Cassandra is not configured properly, I get a NoHostAvailableException or when the connection unexpectedly drops, I get a ConnectionTimeOutException, or sometimes a WriteTimeoutException. This ultimately triggers a JobExecutionException and the whole Flink job terminates.

Where do I catch these Cassandra exceptions? Where are these thrown? I tried putting a try-catch block around the CassandraSink but that doesn't do it. I want to catch these exceptions and retry connecting to Cassandra in case of a connection time-out or retry writing to Cassandra in case of a write time-out.


Answer:

AFAIK, you cannot try to catch these exceptions using CassandraSink.

One way to catch the exceptions like TimeoutException is to implement your own sink for Cassandra, but it may take a lot of time...

Another way is if you run your streaming job, you could set the task retry to more than 1 through StreamingExecutionEnvironment.setRestartStrategy, and enable the checkpoint so that the streaming job could continue working based on the last checkpoint. CassandraSink supports WAL, so the EXACTLY_ONCE can be achieved with checkpoint enabled.

Question:

I have a 4GB RAM ubuntu server on digitalocean I am using cassandra 3.9

After going through the setup process detailed here

cqlsh, nodetool status all return back this message:

nodetool: Failed to connect to '127.0.0.1:7199' - ConnectException: 'Connection refused (Connection refused)'.

I have read several similar issues and they all suggested 4GB minimum ram size, I have that but still get the same error >> Nodetool status connection refused

Some suggest to set listen_address and rpc_address to digitalocean assigned ip in cassandra.yaml, also tried that, but problem persists

some suggest looking at debuglogs & systemlogs, Alot of [INFO] and [DEBUG] lines, but I have some [WARN] lines, which dont terminate the execution and it terminates at an [ERROR] line

Warnings

...
WARN  [main] 2018-03-13 12:06:52,359 DatabaseDescriptor.java:563 - Small commitlog volume detected at /var/lib/cassandra/commitlog; setting commitlog_total_space_in_mb to 6158.  You can override this in cassandra.yaml
WARN  [main] 2018-03-13 12:06:52,361 DatabaseDescriptor.java:590 - Small cdc volume detected at /var/lib/cassandra/cdc_raw; setting cdc_total_space_in_mb to 3079. You can override this in cassandra.yaml
WARN  [main] 2018-03-13 12:06:52,365 DatabaseDescriptor.java:643 - Only 22.102GiB free across all data volumes. Consider adding more capacity to your cluster or removing obsolete snapshots
...

WARN  [main] 2018-03-13 12:06:52,530 StartupChecks.java:123 - jemalloc shared library could not be preloaded to speed up memory allocations
WARN  [main] 2018-03-13 12:06:52,530 StartupChecks.java:156 - JMX is not enabled to receive remote connections. Please see cassandra-env.sh for more info.
INFO  [main] 2018-03-13 12:06:52,533 SigarLibrary.java:44 - Initializing SIGAR library
WARN  [main] 2018-03-13 12:06:52,554 SigarLibrary.java:174 - Cassandra server running in degraded mode. Is swap disabled? : true,  Address space adequate? : true,  nofile limit adequate? : true, nproc limit adequate? : false

Error

...
ERROR [main] 2018-03-13 12:06:55,808 CassandraDaemon.java:747 - Exception encountered during startup
java.lang.AbstractMethodError: org.apache.cassandra.utils.JMXServerUtils$Exporter.exportObject(Ljava/rmi/Remote;ILjava/rmi/server/RMIClientSocketFactory;Ljava/rmi/ser$
        at javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:150) ~[na:1.8.0_161]
        at javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:135) ~[na:1.8.0_161]
        at javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:405) ~[na:1.8.0_161]
        at org.apache.cassandra.utils.JMXServerUtils.createJMXServer(JMXServerUtils.java:106) ~[apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.service.CassandraDaemon.maybeInitJmx(CassandraDaemon.java:145) [apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:219) [apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:601) [apache-cassandra-3.9.jar:3.9]
        at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:730) [apache-cassandra-3.9.jar:3.9]

Not sure what to do with that error message

I suspect many of you have had this issue, and some of you solved it please clearly detail for the rest of us (and future folks) how you sorted it

I am considering trying an earlier version of cassandra, maybe this problem is specific to version 3.9 and not earlier ones


Answer:

This is a known issue (CASSANDRA-14173). Either downgrade Java to Java 8 build 152, or upgrade the Cassandra.

Question:

I'm trying out Embedded Cassandra using cassandra-unit and ran into the following exception,

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1 (com.datastax.driver.core.ConnectionException: [/127.0.0.1] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/127.0.0.1] Unexpected exception triggered (java.io.IOException: Connection reset by peer))))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:195)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1029)
at com.datastax.driver.core.Cluster.init(Cluster.java:120)
at com.datastax.driver.core.SessionManager.init(SessionManager.java:61)
at com.datastax.driver.core.SessionManager.execute(SessionManager.java:416)
at com.datastax.driver.core.SessionManager.executeQuery(SessionManager.java:453)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:103)
at com.datastax.driver.core.SessionManager.execute(SessionManager.java:91)
at com.datastax.driver.core.SessionManager.execute(SessionManager.java:83)
at com.me.cc.bma.cassandra.test.main(test.java:30)    

for the following code,

import com.datastax.driver.core.Cluster;
import org.cassandraunit.CQLDataLoader;
import org.cassandraunit.dataset.CQLDataSet;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.apache.cassandra.config.DatabaseDescriptor;
import com.datastax.driver.core.ResultSet;
import static com.datastax.driver.core.Cluster.builder;
import com.datastax.driver.core.Session;


public class Test{

    static Session session;

    public static void main(String args[]) throws Exception {

          EmbeddedCassandraServerHelper.startEmbeddedCassandra(500000L);
          Cluster.Builder clusterBuilder = builder().addContactPoint("127.0.0.1").withClusterName(EmbeddedCassandraServerHelper.getClusterName()).withPort(7010);
          Cluster cluster = clusterBuilder.build();
          session = cluster.newSession();
          try {
               session.execute("CREATE KEYSPACE ci WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
               session = cluster.connect("ci");
               session.execute("CREATE TABLE login (l_id int)");
               session.execute("insert into login(l_id) values(1)");
               ResultSet res = session.execute("select * from login");
               System.out.println(res.all());
          }
          catch (Exception ex) {
               ex.printStackTrace();
          }         
     }

}

Versions:

  • cassandra-driver-core: 3.5.0
  • Cassandra-unit: 3.5.0.1

Any fix would be appreciated!


Answer:

I faced the same issue some time back and found the artifact versions were incorrect and non-compatible due to conflict in maven.

I am using below versions and works fine for me:

Cassandra-all: 2.1.14 Cassandra-unit: 2.1.9.2 Cassandra-driver-core: 3.3.2

Also there is a cool way to start embedded Cassandra using @EmbeddedCassandra Annotation which does the base work for you.

Hope this works for you too.

Question:

I am trying to connect spark and Cassandra using spark-cassandra-connector. the connection gets established but when i am trying to perform operations on JavaRDD i am facing.

   java.io.IOException: Failed to open native connection to Cassandra at {10.0.21.92}:9042

Here is the configuration and code which i am trying to implement :

    SparkConf sparkConf = new SparkConf().setAppName("Data Transformation").set("spark.serializer","org.apache.spark.serializer.KryoSerializer").setMaster("local[4]");
    sparkConf.set("spark.cassandra.connection.host", server ip);
    sparkConf.set("spark.cassandra.connection.port", "9042");
    sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
    sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
    sparkConf.set("spark.cassandra.auth.username", user_name);
    sparkConf.set("spark.cassandra.auth.password", password);

    JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

And below is the code where i am performing operation on javardd:

    CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext).cassandraTable(keySpaceName, tableName);
    JavaRDD<GenericTriggerEntity> rdd = cassandraRDD.map(new Function<CassandraRow, GenericTriggerEntity>() {

    private static final long serialVersionUID = -165799649937652815L; 

    @Override
    public GenericTriggerEntity call(CassandraRow row) throws Exception {
    GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
    if(row.getString("end") != null)                        genericTriggerEntity.setEnd(row.getString("end"));
    if(row.getString("key") != null)
    genericTriggerEntity.setKey(row.getString("key"));
    genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
    genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
    genericTriggerEntity.setRowdeleted(row.getString("rowDeleted"));
    genericTriggerEntity.setRows(row.getString("rows"));
    genericTriggerEntity.setStart(row.getString("start"));
    genericTriggerEntity.setTablename("tablename");
    genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
    genericTriggerEntity.setTriggertime(row.getString("triggertime"));
    genericTriggerEntity.setUuid(row.getUUID("uuid"));
    return genericTriggerEntity;
    }               

    });

Here is the JavaRDD operation i am performing

    JavaRDD<String> jsonDataRDDwords = rdd.flatMap(s -> Arrays.asList(SPACE.split((CharSequence) s)));
    JavaPairRDD<String, Integer> jsonDataRDDones = jsonDataRDDwords.mapToPair(s -> new Tuple2<>(s, 1));
    JavaPairRDD<String, Integer> jsonDataRDDcounts = jsonDataRDDones.reduceByKey((i1, i2) -> i1 + i2);
    List<Tuple2<String, Integer>> jsonDatRDDoutput = jsonDataRDDcounts.collect();

I even tried telnet to Cassandra server the port is open.

I am able to establish the connection but then while performing reduceByKey getting the above exception.

I am not able to figure out what is the issue. Is something wrong in the javardd operation. Any help would be appreciated. Thanks In Advance.


Answer:

The above error was due to some dependency issue of cassandra drive core. solved it by adding metric dependency in my pom.xml

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.2</version>
</dependency>

Question:

I'm working on a java project in which cassandra is included in the repository itself. I'm having trouble getting it to run however, receiving the following error:

/Users/xxx/dev/xxxx/build/cassandra/bin/cassandra-cli -h localhost -p 9052 -f 

/Users/xxx/dev/xxxx/schema.txt
return code: 0
stderr: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused
    at org.apache.thrift.transport.TSocket.open(TSocket.java:183)
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
    at org.apache.cassandra.cli.CliMain.connect(CliMain.java:73)
    at org.apache.cassandra.cli.CliMain.main(CliMain.java:249)
Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:178)
    ... 3 more
Exception connecting to localhost/9052. Reason: Connection refused.

stdout: Not connected to a cassandra instance.
Not connected to a cassandra instance.

I've tried altering the port and the localhost hostname to 127.0.0.1 and 0.0.0.0 but this makes no difference really.

I'm using java version "1.7.0_71"

Any ideas would be appreciated, thanks


Answer:

The issue was cassandra trying to start on a hostname that was not in my /etc/hosts file. I found which hostname by running /Users/xxx/dev/xxxx/build/cassandra/bin/cassandra -f which gave better output as to what the issue was.

Question:

I'm using Cassandra driver for java from datastax. I know that I have 20 millions of rows in one table. When I using

Select * from table

The process stops after around 800000 rows have been fetched.

In my Java code

futureResults = session.executeAsync(statement);
ResultSet results = futureResults.getUninterruptibly();
for (Row row : results) {

}

Maybe I did something wrong ?


Answer:

What you are doing there is a fairly common anti-pattern with Cassandra. Since each partition of data lives in different parts of your cluster, that query will create a massive scatter/gather, centered around one coordinator. Eventually things start timing out and the coordinator will throw an error. A quick look in the logs should find it.

Almost always, a select query should include a partition key for locality. If that's not possible, switching to something batch that will efficiently scan each node is best. The Spark connector for Cassandra is perfect for an access pattern like this.

Question:

Fairly new to spring boot here. I've been working on a microservice that connects to two databases, one Cassandra and one Oracle. They contain the same information, because the Cassandra database is replacing the Oracle database as the primary database for this project, however, we are keeping the oracle database as a fallback for if the Cassandra database goes down.

I'm unsure how to tell my application to "do something" when I encounter a loss of connection. My console outputs the following when it loses connection to Cassandra-

2019-09-06 14:23:15.127 ERROR 2836 --- [-reconnection-0] c.d.driver.core.ControlConnection        : [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
2019-09-06 14:23:15.147 ERROR 2836 --- [-reconnection-0] c.d.driver.core.ControlConnection        : [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
2019-09-06 14:23:16.134 ERROR 2836 --- [-reconnection-0] c.d.driver.core.ControlConnection        : [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
2019-09-06 14:23:16.149 ERROR 2836 --- [-reconnection-0] c.d.driver.core.ControlConnection        : [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
2019-09-06 14:23:23.137 ERROR 2836 --- [-reconnection-1] c.d.driver.core.ControlConnection        : [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds

And it keeps going like that, doubling timeout, until it can reconnect.

Is it possible to fire a method when this happens? If I could just change a variable's value in my controller when the connection is lost, everything would work fine, but I can't seem to find any information on the topic. Is there a different way to handle this I'm not even thinking about, like maybe a bool that changes based on the connection? Sorry if that sounds misguided, I'm just really grasping at straws here.

Additionally, is it possible to fire an event when the connection is reestablished?

Thanks for your help. I really appreciate any advice.

As a PS- I'm not sure how helpful this code will be, but I feel like I should at least give some context to my situation.

Here's my cassandra config

@ConfigurationProperties(prefix = "cassandra")
@Getter
@Setter
@Component
public class CassandraConfiguration {

    private String clusterName;
    private String hostNames;
    private String username;
    private String password;
    private String keyspace;
    private boolean sslEnabled;
    private int port;
    private int fetchSize;
    private String localDC;

    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraConfiguration.class);

    @Bean
    public Session cassandraSession() {
        String[] hostname = hostNames.split(",");
        Cluster cluster = null;

        LoadBalancingPolicy loadBalancingPolicy = new TokenAwarePolicy(
                DCAwareRoundRobinPolicy.builder()
                        .withLocalDc(localDC)
                        .build()
        );

        PoolingOptions poolingOptions = new PoolingOptions();
        poolingOptions
                .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
                .setConnectionsPerHost(HostDistance.REMOTE, 2, 4)
                .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
                .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);

        if (isSslEnabled()) {

            cluster = Cluster.builder().addContactPoints(hostname)
                    .withClusterName(clusterName)
                    .withCredentials(username, password)
                    .withPort(port)
                    .withoutJMXReporting()
                    .withSSL()
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                    .withLoadBalancingPolicy(loadBalancingPolicy)
                    .withPoolingOptions(poolingOptions)
                    .build();

        } else {
            cluster = Cluster.builder().addContactPoints(hostname)
                    .withClusterName(clusterName)
                    .withCredentials(username, password)
                    .withPort(port)
                    .withoutJMXReporting()
                    .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM))
                    .withLoadBalancingPolicy(loadBalancingPolicy)
                    .withPoolingOptions(poolingOptions)
                    .build();
        }

        cluster.getConfiguration().getCodecRegistry()
                .register(LocalDateCodec.instance)
                .register(InstantCodec.instance);
        Session session = cluster.connect(keyspace);

        return session;
    }

    @Bean
    public MappingManager mappingManager() {
        return new MappingManager(cassandraSession());
    }

}

Here's my jdbc config for oracle

@ConfigurationProperties("jdbc")
@Configuration
@Getter
@Setter
@ConditionalOnProperty(name = "jdbc.accessService", havingValue = "enabled")
public class JDBCConfig {
    @Autowired
    SshTunnel sshTunnel;

    private String driver;
    private String url;
    private String user;
    private String password;

    @PostConstruct
    private void init() {
        try {
            Class.forName(driver);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public Connection jdbcConnection() {
        Connection jdbcConnection = null;

        try {
            jdbcConnection = DriverManager.getConnection (url, user, password);
            System.out.println("Connection Established");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return jdbcConnection;
    }
}

Answer:

It sounds like you need a circuit breaker, which is a software component that watches for failures of some wrapped service, tries to detect when it's available again, and can redirect to some other implementation or let exceptions escape in a failure condition.

The current apparent default (and my recommendation) is resilience4j.

Question:

TL;DR - So i'm having a connection issues from DataStax java cassandra driver to a DataStax cassandra cluster. It initially connects and performs well, then suddenly at some point it looses connection and does not reconnect - at this point all the queries fail.

More info -

So i'm running DataStax cassandra 2.1 cluster of 3 nodes on CentOS, I'm using DataStax cassandra driver 3.0.0. Everything worked great in the past few months, recently iv'e deployed a some code changes that included some schema changes (namely, adding columns to an existing table) and an increase in the number of queries made. Disconnections started at this point.

So when my app goes up it connects to the cluster and holds a single cluster (and session) object as shown in the code snippet below, at this point everything goes well. After a few hours i start receiving NoHostAvailableException for every query performed. At this point i have other servers performing well with the same cassandra cluster so i know there's nothing wrong with the cluster itself. When i restart my server everything works good again.

After investigating a little more, when the issue start occurring, i see that there's no active connection to neither node. Iv'e set up the driver to log on DEBUG level into a dedicated log file and waited for the issue to reoccur. A few hours later the issue occurred again, at some point the log file shows this message:

Connection[/10.4.116.91:9042-1, inFlight=2, closed=false] connection error
io.netty.handler.codec.DecoderException: com.datastax.driver.core.exceptions.DriverInternalError: Adjusted frame length exceeds 268435456: 326843398 - discarded
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:418)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:245)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.DriverInternalError: Adjusted frame length exceeds 268435456: 326843398 - discarded
        at com.datastax.driver.core.Frame$Decoder$DecoderForStreamIdSize.decode(Frame.java:239)
        at com.datastax.driver.core.Frame$Decoder.decode(Frame.java:205)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:387)
        ... 11 common frames omitted

And right after that you see this:

Connection[/10.4.116.91:9042-1, inFlight=2, closed=false] connection error
io.netty.handler.codec.DecoderException: com.datastax.driver.core.exceptions.DriverInternalError: Adjusted frame length exceeds 268435456: 326843398 - discarded
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:418)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:245)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.DriverInternalError: Adjusted frame length exceeds 268435456: 326843398 - discarded
        at com.datastax.driver.core.Frame$Decoder$DecoderForStreamIdSize.decode(Frame.java:239)
        at com.datastax.driver.core.Frame$Decoder.decode(Frame.java:205)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:387)
        ... 11 common frames omitted

From this point on you see only timeouts and retries but the connection doesn't get reestablished.

// CREATION OF CASSANDRA SESSION
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
    .setPoolTimeoutMillis(0)
    .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
    .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
cluster = builder.withPoolingOptions(poolingOptions).build();
cluster.getConfiguration().getCodecRegistry().register(new EnumNameCodec<>(OnBoardingSlide.Type.class));
session = cluster.connect(Global.getServerConfig().CASSANDRA_KEYSPACE_NAME);

Answer:

This might be a bug in java driver

If a cassandra node is configured with native_transport_max_frame_size_in_mb > 256 and the driver reads a frame larger than 256mb it throws an exception: This breaks the drivers ability to read subsequent packets since the Decoder for parsing frames is static

This has been fixed in a in 3.0.4, Here is the link for the details.

https://datastax-oss.atlassian.net/browse/JAVA-1292

Can you try upgrading your driver ?

Question:

working with spark1.6.0 and cassandra-3.1.1 and I tried to connect to cassandra database using Java spark. there is no error while building but getting the following error while i run the application

vException in thread "main" java.lang.AbstractMethodError
at org.apache.spark.Logging$class.log(Logging.scala:51)
at com.datastax.spark.connector.cql.CassandraConnector$.log(CassandraConnector.scala:144)
at org.apache.spark.Logging$class.logDebug(Logging.scala:62)
at com.datastax.spark.connector.cql.CassandraConnector$.logDebug(CassandraConnector.scala:144)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:154)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:72)
at com.test.cassandra.spark.Main.generateData(Main.java:30)
at com.test.cassandra.spark.Main.run(Main.java:21)
at com.test.cassandra.spark.Main.main(Main.java:163)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

my code

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;

public class Main implements Serializable {
    private transient SparkConf sconf;
    private static final String keySpaceName = "java_api";
    private static final String primaryTableName = "test_cassandra";

    private Main(SparkConf conf) {
        this.sconf = conf;
    }
    private void run() {
      JavaSparkContext sc = new JavaSparkContext(sconf);
    generateData(sc); 
    sc.stop();
    }
    private void generateData(JavaSparkContext sc) {

        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        try (Session session = connector.openSession()) {
            System.out.println("connected to cassandra");
        session.execute("DROP KEYSPACE IF EXISTS java_api");
        session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");            
        session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
        session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
        System.out.println("connected");
        }
    }
public static void main(String[] args) {
        if (args.length != 2) {
            System.err
                    .println("Syntax: com.datastax.spark.demo.Main <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }
        SparkConf conf = new SparkConf()
                .set("spark.cassandra.connection.host", "localhost")
                .set("spark.cassandra.connection.native.port", "9042");
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        //conf.set("spark.cassandra.connection.host", "127.0.0.1");
        Main app = new Main(conf);
        app.run();
}
}

my pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.test</groupId>
    <artifactId>cassandra-spark</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <!--Spark Cassandra Connector -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.10</artifactId>
            <version>1.5.0-M3</version>
        </dependency>
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector-java_2.10</artifactId>
            <version>1.5.0-M3</version>
        </dependency>
        <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>3.0.0-rc1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
    </dependencies>
</project>

Answer:

This may come from the fact that

some class has incompatibly changed since the currently executing method was last compiled.

This may come from the java version for example

See the response to this question: Spark streaming StreamingContext.start() - Error starting receiver 0

Question:

I'm working with huge query set for C*. I already throttle async query executor with

Semaphore maxInFlight = new Semaphore(MAX_REQUEST_PER_CON_REMOTE);
...
maxInFlight.acquireUninterruptibly();
ResultSetFuture future = executeAsync(...);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
     @Override
     public void onSuccess(ResultSet rs) {
     ...
     maxInFlight.release();
     }

     @Override
     public void onFailure(Throwable t) {
     ...
     maxInFlight.release();
     }, executor);

where MAX_REQUEST_PER_CON_REMOTE is MaxRequestsPerConnection parameter. MaxConnectionsPerHos equals 1 by default and I didn't change it.

With increasing the number of nodes or increasing replication factor, i believe, that I could increase MaxConnectionsPerHos or MaxRequestsPerConnection to increase productivity. Is there any advantages increasing one or another?

Also then I want change dependency of Semaphore permits, but don't actually know how. The idea is to add listener and updating maxInFlight when Host was added or deleted.


Answer:

If you're not using Whitelist or Blacklist policy, then driver will connect to the every host separately (depending of course on configuration, if you're using DC Aware policy). So when you add the new node, then driver will add a new connection to that node, and load will be redistributed (if your queries don't have "hot" partitions). If you increase replication factor, then it also doesn't matter much, because sending requests to replicas will be done not by driver, but by the "coordinator" node...

So when you have N nodes in local DC, then you theoretically can send up to the N * MaxInFlighConnections requests (but this depends on the distribution of data in your requests).

Also, I see that you're using MAX_REQUEST_PER_CON_REMOTE - it's better to send requests only to nodes to local DC. Also you can configure up to 32k requests per connection, and in the most cases this is more than enough for one client doing the requests.

You can find more information in Driver's documentation.

P.S. I have a separate class that uses the same approach with Semaphore, but adds listener automatically - feel free to grab it.

Question:

I am trying to write a simple JAVA program which generates some data (just a POJO) which gets published in an Kafka topic. From this topic, a subscriber fetches the data and should write it into a Cassandra DB.

The producing and fetching is working fine but when it comes to writing the data into the Cassandra DB, there is something that makes me wondering.

When i am trying to write the data, i always have to open a new connection to the DB. The looks very unpleasant.

 @Override
  public void run() {

    setRunning(true);


    try {
      konsument.subscribe(Collections.singletonList(ServerKonfiguration.TOPIC));

      while (running) {

        ConsumerRecords<Long, SensorDaten> sensorDaten = konsument.poll(Long.MAX_VALUE);
        sensorDaten.forEach(
                datum -> {
                  CassandraConnector cassandraConnector = new CassandraConnector();

                  cassandraConnector.schreibeSensorDaten(datum.key(), datum.value());
                  System.out.printf(
                          "Consumer Record:(%d, %s, %d, %d)\n",
                          datum.key(), datum.value(), datum.partition(), datum.offset());
                });
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      konsument.close();
    }
  }

The code snippet above is working but as i mentioned, for every write i have to create a new connection.

When i initialize the cassandraConnector outside the loop, i make one successful write, and then i get "No hosts available" exceptions.

The CassandraConnector class:

 public class CassandraConnector {

  private final String KEYSPACE = "ba2";
  private final String SERVER_IP = "127.0.0.1";
  private Cluster cluster;
  private Session session;

  public CassandraConnector() {
    cluster = Cluster.builder().addContactPoint(SERVER_IP).build();
    session = cluster.connect(KEYSPACE);
  }

  public void schreibeSensorDaten(Long key, SensorDaten datum) {

    try {

      session.execute(
          "INSERT INTO.....

Answer:

No, you need to re-use the cluster/session instances - they are quite heavyweight regarding initialization...

It's also better to use prepared statements for data insertion - after you create a session, do something like:

PreparedStatement pStmt = session.prepare("INSERT INTO ... VALUES (?, ?)");

and then in the loop

session.execute(pStmt.bind(datum.key(), datum.value()));

Regarding the error, please check the logs on the Cassandra side.

Question:

I'm researching the best way to make a connection from my Java to Cassandra and found some amount of examples how to do that. I'm doing some kind of chat application on my localhost (will allow to create new messages, update them or delete), but also I would like to research best practice. Can anybody help me to choose the best one solution?

The first one example is Spark 1.6:

public static JavaSparkContext getCassandraConnector(){
         SparkConf conf = new SparkConf();
         conf.setAppName("Chat");
         conf.set("spark.driver.allowMultipleContexts", "true");
         conf.set("spark.cassandra.connection.host", "127.0.0.1");
         conf.set("spark.rpc.netty.dispatcher.numThreads","2");
         conf.setMaster("local[2]");

         JavaSparkContext sc = new JavaSparkContext(conf);
         return sc;
    }

So, I also got an example for Spark 2.x where the builder will automatically reuse an existing SparkContext if one exists and create a SparkContext if it does not exist. Configuration options set in the builder are automatically propagated over to Spark and Hadoop during I/O.

public static SparkSession getSparkSession(){
    SparkSession sparkSession = SparkSession
        .builder()
        .appName("Chat")
        .config("spark.driver.allowMultipleContexts","true")
        .config("spark.sql.warehouse.dir", "/file:C:/temp")
        .config("spark.cassandra.connection.host", "127.0.0.1")
        .config("spark.cassandra.connection.port", "9042")
        .master("local[2]")
        .getOrCreate();
    return sparkSession;
}

I also researched Pooling Options but found examples one for Session, like:

public static Session getPoolSession(){
    PoolingOptions poolingOptions = new PoolingOptions();
    poolingOptions
    .setCoreConnectionsPerHost(HostDistance.LOCAL,  4)
    .setMaxConnectionsPerHost( HostDistance.LOCAL, 10)
    .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
    .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000)
    .setHeartbeatIntervalSeconds(120);

    Cluster cluster = Cluster.builder()
        .addContactPoints("127.0.0.1")
        .withPoolingOptions(poolingOptions)
        .build();

    Session session = cluster.connect("chat");
    return session;
    }

So I wonder, what is the most efficient way to make a connection (I'm going to execute single statements and PreparedStatements as well). I think that the first way from Spark 1.6 was no as good as 2.x, but what about Pooling Option examples for 2.x (I'm not 100% sure if this is already included to SparkSession)? I found related questions, but there is no enough information to me: https://stackoverflow.com/questions/42148056/cassandra-datastax-optimal-poolingoption


Answer:

Are you trying to connect to cassandra using spark or just java to Cassandra connection?

Spark is used most of the time for analytical workflows, and single insert/update statements is not a use case for it.

I suggest using regular datastax java driver:

http://docs.datastax.com/en/developer/java-driver-dse/1.1/

As answered in pooling options question, you don't need to touch these parameters and default values should do the work. Here's is a explanation of how it works:

http://docs.datastax.com/en/drivers/java/2.2/com/datastax/driver/core/PoolingOptions.html

Question:

I Recently started learning cassandra and going through online tutorials for cassandra with DataStax Java Drivers. I am simply trying to connect a localhost node on my laptop.

Setup Details -

OS - Windows 7

Cassandra Version - Cassandra version: 2.1-SNAPSHOT

DataStax java driver version - 3.1.0

I could able to connect to local node by using CQLSH and cassandra-cli clients. I can also see the default keyspace system and system_traces. Below is the cassandra server log

INFO  12:12:51 Node localhost/127.0.0.1 state jump to normal
INFO  12:12:51 Startup completed! Now serving reads.
INFO  12:12:51 Starting listening for CQL clients on /0.0.0.0:9042...
INFO  12:12:51 Binding thrift service to /0.0.0.0:9160
INFO  12:12:51 Using TFramedTransport with a max frame size of 15728640 bytes.
INFO  12:12:51 Using synchronous/threadpool thrift server on 0.0.0.0 : 9160
INFO  12:12:51 Listening for thrift clients...

I am trying below simple code -

        Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
        Metadata metadata = cluster.getMetadata();

This code throws below exception - Part of the Trace

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured columnfamily schema_usertypes))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233)

I have been through all the previously asked question. Most of answers suggests changing the configuration in cassandra.yaml file.

My cassandra.yaml configuration is -

start_native_transport: true
native_transport_port: 9042
listen_address: localhost
rpc_address: 0.0.0.0
rpc_port: 9160

Most of the answers suggests to use actual IP address of machine at rpc_address, which I tried but did not worked.

Here are the Questions I been through - Question One, Question two ,Question three, Topic ,Connection requirement, Question four.

This page lists compatibility of Java DataStax drivers with cassandra versions, so I changed the driver version to 2.1.1 (As I am using cassandra 2.1), but it did not worked.

Please suggest what could be wrong.


Answer:

The error with schema_usertypes seems like the driver is trying to look for a table that isn't there maybe related to this Jira.

You say you are running a 2.1-SNAPSHOT of Cassandra? Try Cassandra 2.1.15. Something seems off on your Cassandra node, the driver is able to talk to your cluster since it trys to look up data inside schema_usertypes.

Question:

I have a problem connecting to my Cassandra db using jdbc.

I get as an error message SQLNonTransientConnectionException: Keyspace names must be composed of alphanumerics and underscores

This is my jdbc url: jdbc:cassandra://test/test@localhost:7199/tutorialspoint

Stacktrace:

java.sql.SQLNonTransientConnectionException: Keyspace names must be composed of alphanumerics and underscores (parsed: 'test@localhost:7199/tutorialspoint')
    at org.apache.cassandra.cql.jdbc.Utils.parseURL(Utils.java:183)
    at org.apache.cassandra.cql.jdbc.CassandraDriver.connect(CassandraDriver.java:85)
    at java.sql.DriverManager.getConnection(DriverManager.java:664)
    at java.sql.DriverManager.getConnection(DriverManager.java:270)

Thanks for any help!

Edit: This is my method to connect to the db.

@Override
public boolean connect() {
    StringBuilder sb_url = new StringBuilder("jdbc:cassandra://")
            .append(this.username)
            .append("/")
            .append(this.password)
            .append("@")
            .append(this.url)
            .append(":")
            .append(this.port)
            .append("/")
            .append(this.database);



    try {
        log.debug("Trying to connect to: {}", sb_url.toString());
        this.conn = DriverManager.getConnection(sb_url.toString());
    } catch (SQLException ex) {
        log.debug("Connection to database could not been established");
        log.debug(ex.toString());
        ex.printStackTrace();
        return false;
    }
    log.debug("Connection to database has been established!");
    return true;
}

Answer:

I solved my problem by changing the connection URL. I replaced

 StringBuilder sb_url = new StringBuilder("jdbc:cassandra://")
    .append(this.username)
    .append("/")
    .append(this.password)
    .append("@")
    .append(this.url)
    .append(":")
    .append(this.port)
    .append("/")
    .append(this.database);

with:

        StringBuilder sb_url = new StringBuilder("jdbc:cassandra://")
            .append(this.url)
            .append(":")
            .append(this.port)
            .append("/")
            .append(this.database)
            ;

    Properties props = new Properties();
    props.setProperty("user", this.username);
    props.setProperty("password", this.password);

apparently parsing of the URL

jdbc:cassandra://test/test@localhost:7199/tutorialspoint

caused an error, because of the '@' in the URL.

I found the solution by inspecting the Cassandra JDBC source code on Github https://github.com/slowenthal/cassandra-jdbc/tree/master/src/main/java/org/apache/cassandra/cql/jdbc

I hope this helps other people as well. Thanks for you help.

Question:

i am trying to use cassandra as database for an app i am working on. The app is a Netbeans platform app. In order to start the cassandra server on my localhost i issue Runtime.getRuntime().exec(command) where command is the string to start the cassandra server and then i connect to the cassandra sever with the datastax driver. However i get the error:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query   failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.TransportException: [/127.0.0.1:9042]  Cannot connect))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:199)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:80)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1154)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:318)
at org.dhviz.boot.DatabaseClient.connect(DatabaseClient.java:43)
at org.dhviz.boot.Installer.restored(Installer.java:67)
....

i figure it out that the server requires some time to start so i have added the line Thread.sleep(MAX_DELAY_SERVER) which seem to resolve the problem.

Is there any more elegant way to sort this issue? Thanks.

Code is below.

public class Installer extends ModuleInstall {

private final int MAX_DELAY_SERVER = 12000;

//private static final String pathSrc = "/org/dhviz/resources";
@Override
public void restored() {

    /*
     -*-*-*-*-*DESCRIPTION*-*-*-*-*-*
     IMPLEMENT THE CASSANDRA DATABASE
     *********************************
     */
    DatabaseClient d = new DatabaseClient();
    // launch an instance of the cassandra server 
    d.loadDatabaseServer();


    /*wait for MAX_DELAY_SERVER milliseconds before launching the other instructions. 
    */
    try {
        Thread.sleep(MAX_DELAY_SERVER);
        Logger.getLogger(Installer.class.getName()).log(Level.INFO, "wait for MAX_DELAY_SERVER milliseconds before the connect database");
    } catch (InterruptedException ex) {
        Exceptions.printStackTrace(ex);
        Logger.getLogger(Installer.class.getName()).log(Level.INFO, "exeption in thread sleep");
    }

    d.connect("127.0.0.1");

}
}



public class DatabaseClient {

private Cluster cluster;
private Session session;
private ShellCommand shellCommand;
private final String defaultKeyspace = "dhviz";

final private String LOAD_CASSANDRA = "launchctl load    /usr/local/Cellar/cassandra/2.1.2/homebrew.mxcl.cassandra.plist";

final private String UNLOAD_CASSANDRA = "launchctl unload /usr/local/Cellar/cassandra/2.1.2/homebrew.mxcl.cassandra.plist";

public DatabaseClient() {
    shellCommand = new ShellCommand();

}

public void connect(String node) {
//this connect to the cassandra database

    cluster = Cluster.builder()
            .addContactPoint(node).build();
//  cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(12000);
    Metadata metadata = cluster.getMetadata();
    System.out.printf("Connected to cluster: %s\n",
            metadata.getClusterName());
    for (Host host
            : metadata.getAllHosts()) {
        System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n",
                host.getDatacenter(), host.getAddress(), host.getRack());

    }

        session = cluster.connect();


    Logger.getLogger(DatabaseClient.class.getName()).log(Level.INFO, "connected to server");
}

public void loadDatabaseServer() {
    if (shellCommand == null) {

        shellCommand = new ShellCommand();

    }
    shellCommand.executeCommand(LOAD_CASSANDRA);
    Logger.getLogger(DatabaseClient.class.getName()).log(Level.INFO, "database cassandra loaded");
}

public void unloadDatabaseServer() {
    if (shellCommand == null) {

        shellCommand = new ShellCommand();

    }

    shellCommand.executeCommand(UNLOAD_CASSANDRA);

    Logger.getLogger(DatabaseClient.class.getName()).log(Level.INFO, "database cassandra unloaded");
}

}

Answer:

If you are calling cassandra without any parameters in Runtime.getRuntime().exec(command) it's likely that this is spawning cassandra as a background process and returning before the cassandra node has fully started and is listening.

I'm not sure why you are attempting to embed cassandra in your app, but you may find using cassandra-unit useful for providing a mechanism to embed cassandra in your app. It's primarily used for running tests that require a cassandra instance, but it may also meet your use case.

The wiki provides a helpful example on how to start an embedded cassandra instance using cassandra-unit:

EmbeddedCassandraServerHelper.startEmbeddedCassandra();

In my experience cassandra-unit will wait until the server is up and listening before returning. You could also write a method that waits until a socket is in use, using logic opposite of this answer.

Question:

I am not getting, HostDistance concept means it has LOCAL or REMOTE value. Need to set it while creating connections i.e core/max in pooling options using java API.

Question is

  1. Do we need to set HostDistance values for each node in cluster? if yes how?

or

  1. Does it depend on loadBalancing policy? If yes,how?,since we don't specify it in load balancing policy as per my understanding. If no, how it decides HostDistance then?

I read these links

https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/policies/LoadBalancingPolicy.html#distance-com.datastax.driver.core.Host-

https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/HostDistance.html

Please correct my understanding. Thanks!


Answer:

HostDistance is used for your client, so it could maintain pool & not to open too many connections to remote data center. The distance is calculated by implementation LoadBalancingPolicy that will use implementation-specific algorithm to mark Cassandra node as LOCAL or REMOTE.

See Connection Pooling & Load Balancing sections in Java Driver documentation for additional explanation.

Question:

Components: apache ignite + apache cassandra. Use defaut datastax driver. After doing several operation(about 3-5 billions entities put to cache) we get a situation when datastax driver always reconnects to cassandra from ignite.

2017-02-16 13:29:21.287  INFO 160487 --- [ sys-#441%null%] m.r.t.d.c.m.p.c.c.p.RetryPolicyImpl      :  init cluster
2017-02-16 13:29:21.288  INFO 160487 --- [ sys-#441%null%] com.datastax.driver.core.Cluster         : New Cassandra host <our host> added
2017-02-16 13:29:21.307  INFO 160487 --- [ sys-#441%null%] m.r.t.d.c.m.p.c.c.p.RetryPolicyImpl      :  close cluster
2017-02-16 13:29:23.516  INFO 160487 --- [ sys-#441%null%] com.datastax.driver.core.ClockFactory    : Using native clock to generate timestamps.
2017-02-16 13:29:23.537  INFO 160487 --- [ sys-#441%null%] c.d.d.c.p.DCAwareRoundRobinPolicy        : Using data-center name 'datacenter1' for DCAw

And this process is endless and it can be interrupted by server restarted.

Infrastructure : 1 server ignite - ~Xmx30g and 8 cores. 25 clients ignites ~Xmx1g and 8 cores. 1 node cassandra. Batch size(entities which will be put to cache and then cassandra) is about 1-2M.

Config datasource =>

 public DataSource dataSource() {
        DataSource dataSource = new DataSource();
        dataSource.setUser(login);
        dataSource.setPassword(pass);
        dataSource.setPort(port);
        dataSource.setContactPoints(host);
        dataSource.setRetryPolicy(retryPolicy);
        dataSource.setFetchSize(10_000);
        dataSource.setReconnectionPolicy(new ConstantReconnectionPolicy(1000));
        dataSource.setLoadBalancingPolicy(DCAwareRoundRobinPolicy.builder().withUsedHostsPerRemoteDc(0).build());
        dataSource.setSocketOptions(new SocketOptions().setReadTimeoutMillis(100_000).setConnectTimeoutMillis(100_00));
        return dataSource;
    }

config cache =>

 CacheConfiguration<KeyIgnite, Long> cfg = new CacheConfiguration<>();
        cfg
                .setName(area)
                .setRebalanceMode(CacheRebalanceMode.SYNC)
                .setStartSize(1_000_000)
                .setAtomicityMode(CacheAtomicityMode.ATOMIC)
                .setIndexedTypes(KeyIgnite.class, Long.class)
                .setCacheMode(CacheMode.PARTITIONED)
                .setBackups(0);

        if (!clientMode) {

            CassandraCacheStoreFactory<KeyIgnite, Long> csFactory = new CassandraCacheStoreFactory<>();
            csFactory.setDataSource(ds);
            csFactory.setPersistenceSettings(kv);

//            CassandraCacheStoreFactoryDwh<KeyIgnite, Long> csFactory = new CassandraCacheStoreFactoryDwh<>(ds, kv,params);

            cfg
                    .setCacheStoreFactory(csFactory)
                    .setReadThrough(true)
                    .setWriteThrough(true);
        }

        cfg.setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new Duration(TimeUnit.DAYS, 5)));
        return cfg;

ignite config =>:

   TcpDiscoveryMulticastIpFinder finder = new TcpDiscoveryMulticastIpFinder();
        finder.setAddresses(adresses);

        return Ignition.start(
                new IgniteConfiguration()
                        .setClientMode(clientMode)
                        .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(finder).setNetworkTimeout(10000))
                        .setFailureDetectionTimeout(50000)
                        .setPeerClassLoadingEnabled(false)
                        .setLoadBalancingSpi(new RoundRobinLoadBalancingSpi())

        );

When we have done several iteration of putting items to cache we have gotten this case.

after including debug level i have gotten this record:

2017-02-17 17:48:41.570 DEBUG 24816 --- [ sys-#184%null%] com.datastax.driver.core.RequestHandler  : [1071994376-1] Error querying ds-inmemorydb-02t/10.216.28.34:9042 : com.datastax.driver.core.exceptions.DriverException: Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections)

Answer:

The timeout increase on the driver helped

Question:

Initializer myInitializer = ... // your implementation
Cluster cluster = Cluster.buildFrom(myInitializer);

Im trying to connect to Cassandracluster with several node details mentioning in addcontactpoints("192.1.1.1","192.2.2.2").build().

Now I want to connect to Cassandra cluster with out mentioning the in that method. I want to mention my node details in separate properties file and want to connect to my cluster using that properties file. I have got one method in Java driver called getcontactpoint().

I'm not getting how to use that and implement it. Please help me to improve my code


Answer:

put all nodes ip like below nodes=192.1.1.1,192.2.2.2

In java Resource interface is there by using that you can get your properties file

Like ResourceBundle resource=ResourceBundle.getBundle("cassandra")

then by getProperty method you can get nodes and split it by comma(,) so it will gives you Array of String mean all IP . Like nodes=resource.getString("nodes")

then in method addContactPoints() just give nodes variable .

Like addContactPoints(nodes)

Question:

can somebody help and answer me, why I can not connect and get an error in netbeans. I get the following error message:

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory at com.datastax.driver.core.Cluster.(Cluster.java:63) at testcassandra.TestCassandra.main(TestCassandra.java:30) Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:423) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:356) ... 2 more

(with Datstax DevCenter I can connect)

Error at cluster = Cluster.builder().addContactPoint("localhost").withPort(9042).build();

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class TestCassandra {

/**
 * @param args the command line arguments
 */
public static void main(String[] args) {
    // TODO code application logic here
    Cluster cluster;
    Session session;
    // Connect to the cluster and keyspace "demo"
    cluster = Cluster.builder().addContactPoint("localhost").withPort(9042).build();
    session = cluster.connect("demo");
    // Insert one record into the users table
    session.execute("INSERT INTO users (lastname, age, city, email, firstname) VALUES ('Jones', 35, 'Austin', 'bob@example.com', 'Bob')");
    // Use select to get the user we just entered
    ResultSet results = session.execute("SELECT * FROM users WHERE lastname='Jones'");
    for (Row row : results) {
        System.out.format("%s %d\n", row.getString("firstname"), row.getInt("age"));
    }
    cluster.close();
}

}


Answer:

It appears that you may be missing the proper package. You can download the JAR file here That site also has full documentation on how to implement as well.

If you are still having issues after installing verify that you are not using BOTH SLF4J and Log4j 2 as they will cause events to endlessly be routed between them.

Please also make sure you have the correct import statement

Example:

 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

OR

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Question:

I am having the following persistent error in Cassandra:

Cassandra- All host(s) tried for query failed (tried: xxxxxx (com.datastax.driver.core.exceptions.DriverException: Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections))))

The problem occurs while I am trying to load a large database into a single cluster. So far I have tried all the recommendations I found regarding this issue:

  • I have only one cluster and one session
  • I am using a prepared statement for insertions
  • I have generously increased the the timeouts from both sides

I am pasting here the function I am using to load following the recommendations in this blog post, maybe someone can spot something. Changing the BATCH_SIZE , was the only factor which improved somehow the situation. If I set it in 1_000_000 it fails almost immediately, If I set it to 100_000 It runs for quite some time. In the code below pstatement is a PreparedStatement and futures is a List < ResultSetFuture >

.

public boolean addPair(byte[] key, byte[] value) throws IOException {
    if (futures.size() >= BATCH_SIZE) {
      flush();
    }
    BoundStatement boundStatementInsert = new BoundStatement(pstatement);
    futures
           .add(session.executeAsync(boundStatementInsert.bind(ByteBuffer.wrap(key), ByteBuffer.wrap(value)).setConsistencyLevel(ConsistencyLevel.ALL)));
    return true; }


private void flush() {
    for (ResultSetFuture rsf : futures) {
      rsf.getUninterruptibly();
    }
    futures.clear(); }

thanks in advance

Altober


Answer:

The driver won't handle more than a maximum number of requests to a given host simultaneously. This number depends on your pooling configuration, see details here (use the combo in the top-left corner to match your driver version).

If you try to send more requests, they will queue up. The message Timeout while trying to acquire available connection indicates that queued up requests are timing out, in other words you're sending more than the driver can handle.

With the driver defaults the maximum should be 1024. That's very conservative, given the setup you describe I think you can go higher. Try adding more connections and/or raising the number of requests per connections, adjusting BATCH_SIZE accordingly.