Hot questions for Using Cassandra in multithreading

Top Java Programmings / Cassandra / multithreading

Question:

I am not using Elasticssearch. I am trying to perform some database operations in cassandra using CQL. I am using threads. While running the code I am always getting the exception in thread after a while : com.datastax.oss.driver.api.core.NoNodeAvailableException: No node was available to execute the query.

I have tested with even one thread. The error is still there. Here is my code :

InetAddress addrOne = InetAddress.getByName("52.15.195.41");
InetSocketAddress addrSocOne = new InetSocketAddress(addrOne,9042);
CqlSession sessionOne = CqlSession.builder().addContactPoint(addrSocOne).withLocalDatacenter("us-east-2").withKeyspace("test").build();

while(counter <= 100)
{
    String query = "select max(id) FROM samplequeue";
    ResultSet rs = session.execute(query);
    for (Row row : rs) 
    {
        int exS = row.getInt("system.max(id)");
    }
    counter++;
    Thread.sleep(50);
}

This is a very simple, modified example just to demonstrate the problem. I am unable to resolve it. All the threads are exiting giving the same exception. I am running cassandra 3.11.4 on AWS. All my nodes are up and running and I can perform operations finely in the backend.


Answer:

Change .withLocalDatacenter("us-east-2") to .withLocalDatacenter("datacenter1") and retry.

Question:

I am using cassandra as backend in web application(in java), where for each user need to create unique session or use single one as in below url?

I read session information in this link. https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/Session.html

Its saying Session instances are thread-safe. Means one thread at a time can use this instance.

Also written that each session maintains multiple connections to the cluster nodes.

Question is

  1. if only one thread can use on session instance then what is use of creating multiple connection per session to the cluster nodes?
  2. Doesn't it slow downs multithread operations on session?

Answer:

You got it wrong, it is safe to share an instance with different threads:

A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes...

Session instances are thread-safe and usually a single instance is enough per application.

Thread-safe means that it is safe to share the instance with other threads. It does not mean that you can only use it from a single one.

Question:

I have created a class which wraps within it a Graph. For example:

public class GraphManager(){
    Graph graph;
    public GraphManager(Graph graph){
        this.graph = graph;
    }
    public void commitGraph(){
        graph.commit();
    }
}

This GraphManager allows me to interact with the graph in predefined ways. I construct this GraphManager using a factory:

public class GraphManagerFactory(){
    public static GraphManager getGraphManager(){
        return new GraphManager(TitanFactory.open("conf/titan-cassandra.properties"));
    }
}

That is the base framework. Now onto the problem, using a rest controller I receive a JSON file. This results in instantiating a GraphManager which translates the file into a graph and then commits it. The basic paradigm is as follows:

public class Controller(){
    public List<String> handleRequest(){
        GraphManager manager = GraphManagerFactory.getGraphManager();
        //Do some work with graph manager
        synchronised(Controller.class){
            manager.commitGraph();
        }
    }
}

With the code above I assure that only one thread can commit to the graph at any time. However despite that I still get a PermanentLockingException:

com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException: Local lock contention
at com.thinkaurelius.titan.diskstorage.locking.AbstractLocker.writeLock(AbstractLocker.java:313) ~[titan-core-1.0.0.jar:na]
at com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingStore.acquireLock(ExpectedValueCheckingStore.java:89) ~[titan-core-1.0.0.jar:na]
at com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVSProxy.acquireLock(KCVSProxy.java:40) ~[titan-core-1.0.0.jar:na]
at com.thinkaurelius.titan.diskstorage.BackendTransaction.acquireIndexLock(BackendTransaction.java:240) ~[titan-core-1.0.0.jar:na]
at com.thinkaurelius.titan.graphdb.database.StandardTitanGraph.prepareCommit(StandardTitanGraph.java:554) ~[titan-core-1.0.0.jar:na]
at com.thinkaurelius.titan.graphdb.database.StandardTitanGraph.commit(StandardTitanGraph.java:683) ~[titan-core-1.0.0.jar:na]
at com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx.commit(StandardTitanTx.java:1352) [titan-core-1.0.0.jar:na]
at com.thinkaurelius.titan.graphdb.tinkerpop.TitanBlueprintsGraph$GraphTransaction.doCommit(TitanBlueprintsGraph.java:263) [titan-core-1.0.0.jar:na]
at org.apache.tinkerpop.gremlin.structure.util.AbstractTransaction.commit(AbstractTransaction.java:94) [gremlin-core-3.0.2-incubating.jar:3.0.2-incubating]
at io.mindmaps.core.accessmanager.GraphAccessManagerImpl.commit(GraphAccessManagerImpl.java:811) [mindmaps-core-0.0.5-SNAPSHOT.jar:na]
at io.mindmaps.graphmanager.listener.TransactionController.commitGraph(TransactionController.java:98) [classes/:na]
at io.mindmaps.graphmanager.listener.TransactionController.validateAndCommit(TransactionController.java:84) [classes/:na]
at io.mindmaps.graphmanager.listener.TransactionController.loadData(TransactionController.java:66) [classes/:na]
at io.mindmaps.graphmanager.listener.TransactionController.lambda$postTransaction$0(TransactionController.java:43) [classes/:na]
at io.mindmaps.graphmanager.loader.QueueManager.handleJob(QueueManager.java:76) ~[classes/:na]
at io.mindmaps.graphmanager.loader.QueueManager.lambda$addJob$3(QueueManager.java:24) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_66]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_66]

How can this occur when only one commit is allowed at a time ?


Answer:

While the answer I accepted is 100% correct. I want to highlight more clearly what I did to escape from lock contentions (Much of this is based/thanks to the accepted answer):

Step 1: As recommend rather than wrap an instance of the graph I wrapped a new transaction in each GraphManager. i.e. I made the factory as follows:

public class GraphManagerFactory(){
    TitanGraph instance;
    public static GraphManager getGraphManager(){
        if(instance = null){
            instance = TitanFactory.open("conf/titan-cassandra.properties");
        }
        return new GraphManager(instance.newTransaction());
    }
}

This step lead to a lot of improvement. I still had lock contentions but they were resolved more quickly.

Step 2: When building the graph for the first time I also provided the schema in advance. Specifically rather than let titan build vertex properties and edges implicitly I built them explicitly before even adding the first vertex. This is a simple matter of using management.makeEdgeLabel(label).make(); for edge labels and management.makePropertyKey(label).dataType(String.class).make(); for vertex properties. An added benefit of this is that I can perform batch loading more easily. This means expanding the Factory again to be:

public class GraphManagerFactory(){
    TitanGraph instance;
    public static GraphManager getGraphManager(){
        if(instance = null){
            instance = TitanFactory.open("conf/titan-cassandra.properties");
            TitanManagement management = instance.openManagement();
            //Check if the labels exist before creating explicitly. 
            //If they don't exist do the following:
            management.makeEdgeLabel("EdgeLabel").make();
            management.makePropertyKey("property").dataType(String.class).make();
            management.commit();
        }
        return new GraphManager(instance.newTransaction());
    }
}

Step 3: The final step which removed contentions almost entirely was to increase the id block size to graph.configuration().setProperty("ids.block-size", 100000); . This final step may only be applicable to me though as I am performing large loading operations simultaneously.

Question:

I have two timer, that schedule two tasks:

public ReWrite() throws InterruptedException {
    timer = new Timer();
    TimerTask CSVTask = new TimerTask() {
        @Override
        public void run () {
            readFromCSV();
        }
    };
    TimerTask mailTask = new TimerTask() {
        @Override
        public void run () {
            checkStatus();
        }
    };
    timer.schedule (CSVTask, 0l, 1000*60*60);
    timer.schedule (mailTask, 60000l, 1000*60*10);
}

readFromvCSV()

  1. Finding all CSV files in folder except the last one
  2. Reading all data from them and write to CassandraDB (or if can't - to the last file in folder)
  3. Delete all CSV files (except the last one)

checkStatus()

  1. Finding all CSV files in folder and take file size of the last one
  2. If file size > 1 mb, sending email

It working at first, while data was about few rows, but when it comes to thousands - i got this error message:

Exception in thread "Timer-0" java.lang.ArrayIndexOutOfBoundsException: 1
at handler.emergency.ReWrite.WriteToCassandra(ReWrite.java:197)
at handler.emergency.ReWrite.lambda$ReadFromCSV$1(ReWrite.java:137)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at handler.emergency.ReWrite.ReadFromCSV(ReWrite.java:129)
at handler.emergency.ReWrite.access$000(ReWrite.java:21)
at handler.emergency.ReWrite$1.run(ReWrite.java:56)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

Maybe I got this error by accessing file with two task at the same time?

Possibly I can just write two different classes with they own timers and tasks: One with read\write functions and Second for read\Email, but I'm interesting in solving problem in that way with two timers/tasks.

What I need for this? Another Async Thread?

Sorry for all mistakes, my 1st post

UPDATED: Post for inattention, sorry. Error was from wrong CSV data.


Answer:

Seems that your WriteToCassandra (method?) is causing the problem. Without the full code we cannot help you. Also locking a file while reading/writing to it should be considered but to be honest, I didn't do a lot of file I/O so others might give you better advice