Hot questions for Using Cassandra in persistence

Question:


Answer:

I can't comment on MongoDB but I can talk to Cassandra. Cassandra does indeed have a TTL feature in which you can expire data after a certain time. You have to plan for it though because TTL's do add some overhead during a process Cassandra runs called 'compaction' - see: http://docs.datastax.com/en/cassandra/2.1/cassandra/dml/dml_write_path_c.html

and: http://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html

As long as you size for that kind of workload, you should be OK. That being said, Cassandra really excels when you have event driven data - things like time series, product catalogs, click stream data, ETC.

If you aren't familiar with Patrick McFadin, meet your new best friend: https://www.youtube.com/watch?v=tg6eIht-00M

And of course, the plenty of free tutorials and training here: https://academy.datastax.com/

EDIT to add one more idea of expiring data 'safely' and with the least overhead. This is one done by a sharp guy by the name of Ryan Svihla https://lostechies.com/ryansvihla/2014/10/20/domain-modeling-around-deletes-or-using-cassandra-as-a-queue-even-when-you-know-better/

Question:

Using Java I am trying to test the akka cassandra persistentence using java. From the URL (http://doc.akka.io/docs/akka/2.4.0-RC3/java/persistence.html) I am trying to make the PersistentActorExample work with cassandra and I am running into the following problems.

I am using below mentioned application.conf. Do you guys have any java implementation sample that I could use to get started? The same code works fine leveldb. Currently we are on datastax 4.8. I am hoping it is an application.conf issue.


akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2550
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2556",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }

  persistence {
    journal {
      plugin = "cassandra-journal"
      # Comma-separated list of contact points in the cluster
      cassandra-journal.contact-points = ["dse-9042.service.consul"]
    }

    snapshot-store {
      plugin = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore"
      # Comma-separated list of contact points in the cluster
      cassandra-journal.contact-points = ["dse-9042.service.consul"]
    }
  }

  akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  akka.actor.default-mailbox.stash-capacity=10000

}

In my project I am using the following maven dependency;

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-persistence_2.11</artifactId>
            <version>2.4.0-RC3</version>
        </dependency>
        <dependency>
            <groupId>com.github.krasserm</groupId>
            <artifactId>akka-persistence-cassandra_2.11</artifactId>
            <version>0.3.9</version>
        </dependency>

This is the Error I am experiencing:

[INFO] [10/04/2015 16:52:40.906] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/04/2015 16:52:41.112] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2550]
[INFO] [10/04/2015 16:52:41.124] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Starting up...
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Started up successfully
[INFO] [10/04/2015 16:52:41.193] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/04/2015 16:52:41.196] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics collection has started successfully
[INFO] [10/04/2015 16:52:41.380] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2552]
Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[ClusterSystem]
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
[ERROR] [10/04/2015 16:52:41.950] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.actor.ActorSystemImpl(ClusterSystem)] Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
    at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
    at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
    at java.lang.Class.newInstance(Class.java:438)
    at akka.util.Reflect$.instantiate(Reflect.scala:44)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
    at akka.actor.Props.newActor(Props.scala:259)
    at akka.actor.ActorCell.newActor(ActorCell.scala:561)
    at akka.actor.ActorCell.create(ActorCell.scala:587)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
    at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
    at java.lang.Class.newInstance(Class.java:438)
    at akka.util.Reflect$.instantiate(Reflect.scala:44)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
    at akka.actor.Props.newActor(Props.scala:259)
    at akka.actor.ActorCell.newActor(ActorCell.scala:561)
    at akka.actor.ActorCell.create(ActorCell.scala:587)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.`enter code here`java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Answer:

Please use the stable version of Akka 2.4.0 (you're using a Release Candidate), and also bump the dependency for the cassandra plugin to 0.4 which was released last week and supports Akka 2.4.x.

The reason you're getting the error is that you have pulled in conflicting versions of Akka (it's Journal Plugin API), and Journal implementation. The Journal Plugin API was experimental in Akka 2.3 and was subject to change while making it stable in 2.4.x. Since Akka 2.4.0 the Journal API is stable and will not change in breaking ways.

Question:

I have a table in cassandra which has a column of type MAP (i.e) source_id_map map

When I try to read a row from this table, I see that there is some issue while reading column of map type as below

Caused by: javax.persistence.PersistenceException: org.apache.cassandra.serializers.MarshalException: Unexpected extraneous bytes after map value at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setCollectionValue(CassandraDataHandlerBase.java:2526) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setFieldValueViaCQL(CassandraDataHandlerBase.java:1504) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateViaThrift(CassandraDataHandlerBase.java:1163) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.onColumn(CassandraDataHandlerBase.java:1054) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateEntity(CassandraDataHandlerBase.java:653)

Another exception that I notice is as below.

Error while retrieving fieldUTF8Type value via CQL, Caused by: . java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:267) at org.apache.cassandra.utils.ByteBufferUtil.readBytes(ByteBufferUtil.java:543) at org.apache.cassandra.utils.ByteBufferUtil.readBytesWithShortLength(ByteBufferUtil.java:552) at org.apache.cassandra.serializers.CollectionSerializer.readValue(CollectionSerializer.java:128) at org.apache.cassandra.serializers.MapSerializer.deserializeForNativeProtocol(MapSerializer.java:104) at com.impetus.client.cassandra.schemamanager.CassandraDataTranslator$MapTypeBuilder.decompose(CassandraDataTranslator.java:1177) at com.impetus.client.cassandra.schemamanager.CassandraDataTranslator$MapTypeBuilder.access$4800(CassandraDataTranslator.java:1100) at com.impetus.client.cassandra.schemamanager.CassandraDataTranslator.decompose(CassandraDataTranslator.java:507) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setCollectionValue(CassandraDataHandlerBase.java:2518) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.setFieldValueViaCQL(CassandraDataHandlerBase.java:1504) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateViaThrift(CassandraDataHandlerBase.java:1163) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.onColumn(CassandraDataHandlerBase.java:1054) at com.impetus.client.cassandra.datahandler.CassandraDataHandlerBase.populateEntity(CassandraDataHandlerBase.java:653) at com.impetus.client.cassandra.CassandraClientBase$CQLClient.executeQuery(CassandraClientBase.java:2272) at com.impetus.client.cassandra.CassandraClientBase.executeSelectQuery(CassandraClientBase.java:926) at com.impetus.client.cassandra.thrift.ThriftClient.executeQuery(ThriftClient.java:1062) at com.impetus.client.cassandra.query.CassQuery.populateEntities(CassQuery.java:153) at com.impetus.kundera.query.QueryImpl.fetch(QueryImpl.java:1377) at com.impetus.kundera.query.QueryImpl.getResultList(QueryImpl.java:200)


Answer:

Please enable CQL3 both while inserting as well as reading data

Map propertyMap = new HashMap();
propertyMap.put(CassandraConstants.CQL_VERSION, CassandraConstants.CQL_VERSION_3_0);
EntityManagerFactory emf = Persistence.createEntityManagerFactory("cassandra-pu",propertyMap);

Question:

I am using Apache Ignite 2.3 with cassandra 2.1.9 as my persistence layer. I am using cacheStoreFactory class which saves and gets data from the db. I am auto-wiring some dependencies in this class but it is coming as null.

here is my sample ignite confifuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       ">
    <description>Main Spring file for ignite configuration.</description>

    <bean id="cacheIgniteBean" class="org.apache.ignite.IgniteSpringBean">
        <property name="configuration">
            <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
                <property name="dataStorageConfiguration">
                    <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                        <property name="dataRegionConfigurations">
                            <list>
                                <!--
                                    Defining a data region that will consume up to 2 GB of RAM.
                                -->
                                <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                                    <!-- Custom region name. -->
                                    <property name="name" value="2GB_Region"/>

                                    <!-- 500 MB initial size (RAM). -->
                                    <property name="initialSize" value="#{500L * 1024 * 1024}"/>

                                    <!-- 2 GB maximum size (RAM). -->
                                    <property name="maxSize" value="#{2L * 1024 * 1024 * 1024}"/>

                                    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                    <property name="pageEvictionMode" value="RANDOM_LRU"/>
                                </bean>
                            </list>
                        </property>
                    </bean>
                </property>
                <property name="cacheConfiguration">
                    <list>
                        <bean class="org.apache.ignite.configuration.CacheConfiguration">

                            <property name="name" value="item"/>
                            <property name="cacheMode" value="PARTITIONED"/>
                            <property name="atomicityMode" value="ATOMIC"/>
                            <property name="backups" value="0"/>
                            <property name="cacheStoreFactory">
                                <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                    <constructor-arg value="com.tgt.gom.cacheserver.store.ItemCacheStore"/>
                                </bean>

                            </property>

                            <property name="readThrough" value="${ignite.config.cache.item.readThrough}"/>
                            <property name="writeThrough" value="${ignite.config.cache.item.writeThrough}"/>
                            <property name="writeBehindEnabled" value="${ignite.config.cache.item.writeBehindEnabled}"/>
                            <property name="writeBehindFlushSize"
                                      value="${ignite.config.cache.item.writeBehindFlushSize}"/>
                            <property name="writeBehindFlushFrequency"
                                      value="${ignite.config.cache.item.writeBehindFlushFrequency}"/>
                            <property name="writeBehindFlushThreadCount"
                                      value="${ignite.config.cache.item.writeBehindFlushThreadCount}"/>
                            <property name="writeBehindBatchSize"
                                      value="${ignite.config.cache.item.writeBehindBatchSize}"/>
                        </bean>

                        <bean class="org.apache.ignite.configuration.CacheConfiguration">

                            <property name="name" value="location"/>
                            <property name="cacheMode" value="PARTITIONED"/>
                            <property name="atomicityMode" value="ATOMIC"/>
                            <property name="backups" value="0"/>
                            <property name="cacheStoreFactory">
                                <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                    <constructor-arg value="com.tgt.gom.cacheserver.store.LocationCacheStore"/>
                                </bean>
                            </property>
                            <property name="readThrough" value="${ignite.config.cache.item.readThrough}"/>
                            <property name="writeThrough" value="${ignite.config.cache.item.writeThrough}"/>
                            <property name="writeBehindEnabled" value="${ignite.config.cache.item.writeBehindEnabled}"/>
                            <property name="writeBehindFlushSize"
                                      value="${ignite.config.cache.item.writeBehindFlushSize}"/>
                            <property name="writeBehindFlushFrequency"
                                      value="${ignite.config.cache.item.writeBehindFlushFrequency}"/>
                            <property name="writeBehindFlushThreadCount"
                                      value="${ignite.config.cache.item.writeBehindFlushThreadCount}"/>
                            <property name="writeBehindBatchSize"
                                      value="${ignite.config.cache.item.writeBehindBatchSize}"/>
                        </bean>

                    </list>
                </property>

                <!--<property name="includeEventTypes">
                    <util:constant static-field="org.apache.ignite.events.EventType.EVTS_TASK_EXECUTION"/>
                </property>-->
                <property name="failureDetectionTimeout" value="5000"/>
                <property name="discoverySpi">
                    <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                        <property name="ipFinder">
                            <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                                <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                                <property name="addresses">
                                    <list>
                                        <value>127.0.0.1:47500..47509</value>
                                    </list>
                                </property>
                            </bean>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Here is my ItemCacheStore class code:

@Slf4j
@Service
public class ItemCacheStore extends CacheStoreAdapter<String, ItemV1DTO> implements Serializable {
    private static final long serialVersionUID = 1L;


    @Autowired
    private ItemRepository itemRepository;

    @Autowired
    private ItemCacheStoreAsync itemCacheStoreAsync;

    private static final String LOG_OP_INFO = "Item_Cache_Store";


    @Override
    public ItemV1DTO load(String item_id) throws CacheLoaderException {
        ItemV1DTO itemV1DTO = null;
        System.out.println("in item cache store ");

        try {
            ItemEntity itemEntity = itemRepository.findOne(item_id);
            if (itemEntity != null) {
                itemV1DTO = mapToItemDTO(itemEntity);
            }
        } catch (Exception e) {
            throw new CacheLoaderException("failed to load item data from cassandra" + e.getMessage());
        }
        return itemV1DTO;
    }
}

In ItemCacheStore class when the load method is called, the itemRepository field is null. However, when I autowire the same ItemRepository bean in another controller class, it works fine.

One more thing I noticed is that if I put one method with @PostConstruct annotation in the ItemCacheStore class then at that time I can see that dependency of ItemRepository got injected but when load method is called then it is again null.


Answer:

The issue is the following configuration:

<property name="cacheStoreFactory">
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
        <constructor-arg value="com.tgt.gom.cacheserver.store.ItemCacheStore"/>
    </bean>
</property>

You're creating a Spring bean of type FactoryBuilder and you pass the classname ItemCacheStore. What happens behind the screens is that the FactoryBuilder will create a new instance of ItemCacheStore. This new instance won't be managed by Spring, so all fields will be null.

So basically you'll end up with two instances of ItemCacheStore:

  • One created by the Spring container thanks to the @Service annotation. All autowired fields will work.
  • Another one create by the FactoryBuilder. It won't be managed by Spring and all autowired fields will be null.

To fix this, there are a few possibilities:

  1. Use a different factory or write your own that will use a Spring managed bean rather than creating a new one. According to the documentation, there is already a CassandraCacheStoreFactory.
  2. Use an AutowireHelper as mentioned in this answer, or use Springs SpringBeanAutowiringSupport to inject beans in a non-Spring managed bean.

Related: Why is my Spring @Autowired field null?

Question:

This question is strictly related to my previous one.

Quick summary: I am struggling to configure Cassandra as persistence layer for my Ignite 2.0 cache. It fails with write-behind operations because of:

java.lang.IllegalArgumentException: object is not an instance of declaring class

I tried to play with the schema on Cassandra, fields configuration in the persistence settings, cache properties etc. but I am not able to overcome that. The following is last configuration I tried:

Persistence settings:

<persistence keyspace="ignite" table="odbc_test" ttl="86400">
    <keyspaceOptions>
        REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}
        AND DURABLE_WRITES = true
    </keyspaceOptions>
    <tableOption>
        comment = 'Cache test'
        AND read_repair_chance = 0.2
    </tableOption>
    <keyPersistence class="java.lang.String" strategy="PRIMITIVE" column="key"/>
    <valuePersistence class="com.riccamini.ignite.ValueClass" strategy="POJO"/>
</persistence>

ValueClass:

public class ValueClass implements Serializable{
    @QuerySqlField
    private int numberOne;
    @QuerySqlField
    private int numberTwo;

    public int getNumberOne(){ return numberOne;}

    public int getNumberTwo(){ return numberTwo;}

    public void setNumberOne(int value){
        numberOne = value;
    }

    public void setNumberTwo(int value){
        numberTwo = value;
    }
}

Cassandra's table:

CREATE TABLE ignite.odbc_test (
    key text PRIMARY KEY,
    numberone int,
    numbertwo int);

Ignite's configuration:

boolean persistence = true;

IgniteConfiguration cfg = new IgniteConfiguration();
CacheConfiguration<String, ValueClass> configuration = new CacheConfiguration<String, ValueClass>();

configuration.setName("test-cache");
configuration.setIndexedTypes(String.class, ValueClass.class);

if(persistence){
    //  Configuring Cassandra's persistence
    DataSource dataSource = new DataSource();
    dataSource.setContactPoints("172.17.0.2");
    RoundRobinPolicy robinPolicy = new RoundRobinPolicy();
    dataSource.setLoadBalancingPolicy(robinPolicy);
    dataSource.setReadConsistency("ONE");
    dataSource.setWriteConsistency("ONE");
    String persistenceSettingsXml = FileUtils.readFileToString(new File(persistenceSettingsConfig), "utf-8");
    KeyValuePersistenceSettings persistenceSettings = new KeyValuePersistenceSettings(persistenceSettingsXml);
    CassandraCacheStoreFactory cacheStoreFactory = new CassandraCacheStoreFactory();
    cacheStoreFactory.setDataSource(dataSource);
    cacheStoreFactory.setPersistenceSettings(persistenceSettings);
    configuration.setCacheStoreFactory(cacheStoreFactory);
    configuration.setWriteThrough(true);
    configuration.setReadThrough(true);
    configuration.setWriteBehindEnabled(true);
    configuration.setStoreKeepBinary(true);
    }


//  Setting cache configuration
cfg.setCacheConfiguration(configuration);

//  Configuring ODBC
OdbcConfiguration odbcConfig = new OdbcConfiguration();
odbcConfig.setMaxOpenCursors(100);
cfg.setOdbcConfiguration(odbcConfig);

//  Starting Ignite
Ignite ignite = Ignition.start(cfg);

Complete stack trace:

SEVERE: Failed to process 1 of 1 elements, during BULK_WRITE operation with Cassandra
class org.apache.ignite.IgniteException: Failed to execute Cassandra BULK_WRITE operation
    at org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:266)
    at org.apache.ignite.cache.store.cassandra.CassandraCacheStore.writeAll(CassandraCacheStore.java:333)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.updateStore(GridCacheWriteBehindStore.java:804)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.applyBatch(GridCacheWriteBehindStore.java:720)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.access$2400(GridCacheWriteBehindStore.java:75)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.flushCacheCoalescing(GridCacheWriteBehindStore.java:1135)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.body(GridCacheWriteBehindStore.java:1006)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
    at java.lang.Thread.run(Thread.java:748)
Caused by: class org.apache.ignite.IgniteException: Failed to get value of the field 'numberOne' from the instance  of 'class org.apache.ignite.internal.binary.BinaryObjectImpl' class
    at org.apache.ignite.cache.store.cassandra.persistence.PojoField.getValueFromObject(PojoField.java:165)
    at org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindValues(PersistenceController.java:450)
    at org.apache.ignite.cache.store.cassandra.persistence.PersistenceController.bindKeyValue(PersistenceController.java:203)
    at org.apache.ignite.cache.store.cassandra.CassandraCacheStore$4.bindStatement(CassandraCacheStore.java:347)
    at org.apache.ignite.cache.store.cassandra.CassandraCacheStore$4.bindStatement(CassandraCacheStore.java:333)
    at org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl.execute(CassandraSessionImpl.java:230)
    ... 8 more
Caused by: java.lang.IllegalArgumentException: object is not an instance of declaring class
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.ignite.cache.store.cassandra.persistence.PojoField.getValueFromObject(PojoField.java:147)
    ... 13 more

Thank you for all the help provided so far.


Answer:

This happens because you set this:

configuration.setStoreKeepBinary(true);

Set it to false (which is default), deploy POJO classes on server nodes and it will work. Current implementation can't work with binary objects directly, this will be improved in the scope of this ticket: https://issues.apache.org/jira/browse/IGNITE-5270

Question:

I use Achilles library for working with cassandra database. The problem is when I create entity method that effects fields Achilles do not "see" these changes. See example below.

import info.archinnov.achilles.persistence.PersistenceManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AhilesTest {
    private static final UUID ID = UUID.fromString("083099f6-e423-498d-b810-d6c564228724");

    //This is achilles persistence manager
    @Autowired
    private PersistenceManager persistenceManager;

    public void test () {
        //user creation and persistence
        User toInsert = new User();
        toInsert.setId(ID);
        toInsert.setName("name");
        toInsert.setVersion(0l);
        persistenceManager.insert(toInsert);

        //find user
        User  user = persistenceManager.find(User.class, id);
        user.changeName("newName");
        persistenceManager.update(user);

        User updatedUser = persistenceManager.find(User.class, id);
        //here old "name" value is returned
        updatedUser.getName();
    }

    public class User {
        private UUID id; 
        private String name;
        private long version;

        public void changeName (String newName) {
            this.name = newName;
            this.version++;
        }

        //getters and setters are omited
    }
}

user.changeName("newName"); do not affect entity and "old" values are persisted. For my opinion (I have seen debug call stack) this happens because actual User entity is wrapper with Achilles proxy which react to gettter/setter calls. Also when I replace changeName: call to direct getter/setter invocation - user.setName("newName"); user.setVersion(user.getVersion()+1); updating became work.

So why it is happens and is there a way to configure Achilles to react of non getter/setter methods calls?


Answer:

You have to use the setter methods explicitly.

According to the documentation, it intercepts the setter methods only.

"As a consequence of this design, internal calls inside an entity cannot be intercepted 
and will escape dirty check mechanism. It is thus recommended to change state of the 
entities using setters"

It is probably a design choice from achilles, and I suggest you raise it as an issue on the issues page, so it may receive some attention from the author.