Hot questions for Using Cassandra in kundera

Question:

I want to use the transaction management from Kundera (V3.2) for my Cassandra database. Referring to https://github.com/impetus-opensource/Kundera/wiki/Transaction-Management it should be possible to use this functionality. I wrote following code:

public void update(Account entity){
    EntityManager manager = this.entityManagerFactory.createEntityManager(getProperties());
    manager.setFlushMode(FlushModeType.COMMIT);
    manager.getTransaction().begin();
    try{


        String queryStringNative = "UPDATE account SET value = 20 WHERE id = 'xxx' IF value = 10";
        Query query = manager.createNativeQuery(queryStringNative);
        query.executeUpdate();

        String queryStringNative1 = "UPDATE account SET value = 30 WHERE id = 'yyy' IF value = 40";
        Query query1 = manager.createNativeQuery(queryStringNative1);
        query1.executeUpdate();

        //commit
        manager.getTransaction().commit();

    } catch(Exception e){
        manager.getTransaction().rollback();
    }
    manager.clear();
    manager.close();
}

But when i simulate an error in the second query the rollback doesn't work and the account with id 'xxx' is updated.

So my question, is it generally possible to use the transaction implementation from Kundera for kundera-cassandra in my particular way?

EDIT:

I found out that Kundera uses a EventLogQueue to perform a rollback or commit. It reads the Events which are in the queue and rollbacks these events. The problem is, it seams that the EventQueue receives only events which are sent by method call of EntityManager.persist(), EntityManager.remove() or EntityManager.merge(). So there is no entry when executing a native query.


Answer:

Transactions on native queries are not supported by Kundera.

Reason:

Kundera tracks the state of the entity objects to ensure transactions. In native queries the query could be anything.. update, delete, create, metadata queries, aggregation queries, etc where transactions are not usually used (atleast in NoSql world).

Workaround:

You can have client side transactions by explicitly checking for errors and have another query for undoing the previous query.

Question:

I use Kundera-Cassandra 3.2 and want to use the transaction management from Kundera.

My handling looks like this:

EntityManager manager = repo.getEntityManagerFactory().createEntityManager(CassandraRepository.getProperties());

try{
    manager.getTransaction().begin();

    this.repo.update(account1, manager); //calls the merge method of the Entitymanager
    this.repo.save(account2, manager); //calls the persist method of the Entitymanager

    manager.getTransaction().commit();

} catch(Exception e){
    if(manager.getTransaction().isActive()){
        manager.getTransaction().rollback();
    }
} finally {
    manager.clear();
    manager.close();
}   

When an error in the this.repo.save(account2, manager); occurs, the manager rollbacks the transaction, but does not do a update statement, he makes a delete statement for the merge method. The reason for this is, when calling the merge methode, kundera creates an insert statement and not an update. But how to say Kundera to make an update to rollback the transaction also with an update.

Logs:

12:42:41.185 [http-bio-8080-exec-3] INFO com.impetus.client.cassandra.CassandraClientBase - Returning delete query DELETE FROM "account" WHERE "id" = 'MCSP-000000000004'.
12:42:41.211 [http-bio-8080-exec-3] INFO com.impetus.client.cassandra.CassandraClientBase - Returning delete query DELETE FROM "account" WHERE "id" = 'MCSP-000000000005'.

EDIT (my repository):

public class CassandraRepository<T> {

    @PersistenceUnit
    private EntityManagerFactory entityManagerFactory;

    public static Map<String, String> getProperties() {
        final Map<String, String> properties = new HashMap<String, String>();
        properties.put(CassandraConstants.CQL_VERSION, CassandraConstants.CQL_VERSION_3_0);
        return properties;
    }



    public void update(T entity, EntityManager manager) throws Exception{
        try {
            manager.merge(entity);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    public void save(T entity, EntityManager manager) throws Exception{
        try {
            manager.persist(entity);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }
}

Answer:

According to JPA, to update an entity you have to first bring it into managed state (by fetching it)

Example:-

        PersonCassandra p = entityManager.find(PersonCassandra.class, "2");

        entityManager.getTransaction().begin();
        p.setMonth(Month.JAN);
        entityManager.merge(p);
        entityManager.persist(p3);
        entityManager.getTransaction().commit();

Issue is not with INSERT and UPDATE statements since both are similar for Cassandra, under the hood.

Question:

I am using kundera-cassandra (V3.2) and want to limit a select query. This is working with following code:

TypedQuery<T> query = manager.createQuery(criteriaQuery);
query.setMaxResults(limit);

My problem is to set a start parameter. I found the method

query.setFirstResult(start);

but it does not work. Is there any solution available to tell the select query from where to start? I cannot do that on application level because the query is used by a REST service method.


Answer:

You are looking for limit, offset queries and they are not supported by Kundera-Cassandra as there is no support on Cassandra natively.

Question:

I want to use the a sequence generator in an kundera-cassandra (V3.2) entity. Referring to this https://github.com/impetus-opensource/Kundera/issues/777 I have to set the CQL Version to Version 3 when creating the EntityManagerFactory, and not when creating the EntityManager. My Problem is that I use Spring and I do not know how to set the property when autowiring the EntityManagerFactory.

@Id
@TableGenerator(name = "id_gen", allocationSize = 30, initialValue = 100)
@GeneratedValue(generator = "id_gen", strategy = GenerationType.TABLE)
private String id;

In my Repository i define the EntityManagerFactory like this:

@PersistenceUnit
private EntityManagerFactory entityManagerFactory;

And in my application-context.xml I define the Bean like this:

<bean id="entityManagerFactory"
    class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="persistenceUnitName" value="cassandra_pu" />
</bean> 

<bean id="pum" class="org.springframework.orm.jpa.persistenceunit.DefaultPersistenceUnitManager">
    <property name="persistenceXmlLocations">
        <list>
            <value>classpath:persistence.xml</value>
        </list>
    </property>
</bean>

So can anybody tell my where to set the version-property?


Answer:

You can do the following:

  • Create an xml file say cass-props.xml and add the following:

<?xml version="1.0" encoding="UTF-8"?> <clientProperties> <datastores> <dataStore> <name>cassandra</name> <connection> <properties> <property name="cql.version" value="3.0.0" /> </properties> <servers> <server> <host> localhost </host> <port> 9160 </port> </server> </servers> </connection> </dataStore> </datastores> </clientProperties>

  • Then add the following property in your persistence.xml

    <property name="kundera.client.property" value="cass-props.xml" />

Question:

I have a table in cassandra which has a column of type List, When I try to read a row from this table, I see that there is some issue while reading column of list type as below:

27296 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  c.i.c.c.CassandraClientBase - Returning cql query  INSERT INTO "sensors"("pressure","pieces","temperature","idsensor","date","event_time") VALUES(10.0,[{"idpiece":'1',"width":10.0,"height":11.0,"depth":12.0},{"idpiece":'2',"width":10.0,"height":11.0,"depth":12.0},{"idpiece":'3',"width":10.0,"height":11.0,"depth":12.0}],10.0,'1',33544,0) .
27319 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  d.d.SensorDAOImpl - select p from sensors p
27498 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  c.i.c.c.CassandraClientBase - Executing cql query SELECT * FROM "sensors" LIMIT 100.
27498 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  c.i.c.c.CassandraClientBase - Executing query SELECT * FROM "sensors" LIMIT 100.
27745 [Thread-15-localhostAMQPbolt0-executor[2 2]] ERROR c.i.c.c.d.CassandraDataHandlerBase - Eror while retrieving data, Caused by: .
org.apache.cassandra.serializers.MarshalException: Unexpected extraneous bytes after list value
        at org.apache.cassandra.serializers.ListSerializer.deserializeForNativeProtocol(ListSerializer.java:104) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setElementCollectionList(CassandraDataHandlerBase.java:1727) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setElementCollection(CassandraDataHandlerBase.java:1568) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateViaThrift(CassandraDataHandlerBase.java:1154) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.onColumn(CassandraDataHandlerBase.java:1054) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateEntity(CassandraDataHandlerBase.java:653) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.CassandraClientBase$CQLClient.executeQuery(CassandraClientBase.java:2301) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.CassandraClientBase.executeSelectQuery(CassandraClientBase.java:926) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.thrift.ThriftClient.executeQuery(ThriftClient.java:1062) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.query.CassQuery.populateEntities(CassQuery.java:146) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.kundera.query.QueryImpl.fetch(QueryImpl.java:1377) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.kundera.query.QueryImpl.getResultList(QueryImpl.java:200) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.dao.SensorDAOImpl.findByQuery(SensorDAOImpl.java:84) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.controller.DatabaseController.saveSensorEntitie(DatabaseController.java:47) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at connector.bolt.PrinterBolt.execute(PrinterBolt.java:66) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at org.apache.storm.daemon.executor$fn__8226$tuple_action_fn__8228.invoke(executor.clj:731) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$mk_task_receiver$fn__8147.invoke(executor.clj:463) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$clojure_handler$reify__7663.onEvent(disruptor.clj:40) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:435) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:414) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$fn__8226$fn__8239$fn__8292.invoke(executor.clj:851) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_99]
27753 [Thread-15-localhostAMQPbolt0-executor[2 2]] ERROR c.i.c.c.CassandraClientBase - Error while executing native CQL query Caused by {}.
javax.persistence.PersistenceException: org.apache.cassandra.serializers.MarshalException: Unexpected extraneous bytes after list value
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateEntity(CassandraDataHandlerBase.java:833) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.CassandraClientBase$CQLClient.executeQuery(CassandraClientBase.java:2301) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.CassandraClientBase.executeSelectQuery(CassandraClientBase.java:926) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.thrift.ThriftClient.executeQuery(ThriftClient.java:1062) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.query.CassQuery.populateEntities(CassQuery.java:146) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.kundera.query.QueryImpl.fetch(QueryImpl.java:1377) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.kundera.query.QueryImpl.getResultList(QueryImpl.java:200) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.dao.SensorDAOImpl.findByQuery(SensorDAOImpl.java:84) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.controller.DatabaseController.saveSensorEntitie(DatabaseController.java:47) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at connector.bolt.PrinterBolt.execute(PrinterBolt.java:66) [Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at org.apache.storm.daemon.executor$fn__8226$tuple_action_fn__8228.invoke(executor.clj:731) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$mk_task_receiver$fn__8147.invoke(executor.clj:463) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$clojure_handler$reify__7663.onEvent(disruptor.clj:40) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:435) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:414) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$fn__8226$fn__8239$fn__8292.invoke(executor.clj:851) [storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_99]
Caused by: org.apache.cassandra.serializers.MarshalException: Unexpected extraneous bytes after list value
        at org.apache.cassandra.serializers.ListSerializer.deserializeForNativeProtocol(ListSerializer.java:104) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setElementCollectionList(CassandraDataHandlerBase.java:1727) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setElementCollection(CassandraDataHandlerBase.java:1568) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateViaThrift(CassandraDataHandlerBase.java:1154) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.onColumn(CassandraDataHandlerBase.java:1054) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateEntity(CassandraDataHandlerBase.java:653) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        ... 19 more

but saving data works without any problem.

Here's how the entity manager is created:

Map propertyMap = new HashMap();
 propertyMap.put(CassandraConstants.CQL_VERSION, CassandraConstants.CQL_VERSION_3_0);
 EntityManagerFactory emf = Persistence.createEntityManagerFactory("cassandra_pu", propertyMap);
EntityManager em = emf.createEntityManager();

And here's information about cassandra :

[cqlsh 5.0.1 | Cassandra 2.1.12 | CQL spec 3.2.1 | Native protocol v3]

<dependency>
            <groupId>com.impetus.kundera.client</groupId>
            <artifactId>kundera-cassandra</artifactId>
            <version>3.4</version>
        </dependency>

Answer:

I resolved the problem by installing cassandra 3.4, and using auto schema generation.

I added this line in my resources/META-INF/persistence.xml file

<property name="kundera.ddl.auto.prepare" value="create" />