Hot questions for Using Cassandra in database

Question:

Given an example of the following select in CQL:

SELECT * FROM tickets WHERE ID IN (1,2,3,4)

Given ID is a partition key, is using IN relation better than doing multiple queries or is there no difference?


Answer:

I remembered seeing someone answer this question in the Cassandra user mailing list a short while back, but I cannot find the exact message right now. Ironically, Cassandra Evangelist Rebecca Mills just posted an article that addresses this issue (Things you should be doing when using Cassandra drivers...points #13 and #22). But the answer is "yes" that in some cases, multiple, parallel queries would be faster than using an IN. The underlying reason can be found in the DataStax SELECT documentation.

When not to use IN

...Using IN can degrade performance because usually many nodes must be queried. For example, in a single, local data center cluster with 30 nodes, a replication factor of 3, and a consistency level of LOCAL_QUORUM, a single key query goes out to two nodes, but if the query uses the IN condition, the number of nodes being queried are most likely even higher, up to 20 nodes depending on where the keys fall in the token range.

So based on that, it would seem that this becomes more of a problem as your cluster gets larger.

Therefore, the best way to solve this problem (and not have to use IN at all) would be to rethink your data model for this query. Without knowing too much about your schema, perhaps there are attributes (column values) that are shared by ticket IDs 1, 2, 3, and 4. Maybe using something like level or group (if tickets are for a particular venue) or maybe even an event (id), instead.

Basically, while using a unique, high-cardinality identifier to partition your data sounds like a good idea, it actually makes it harder to query your data (in Cassandra) later on. If you could come up with a different column to partition your data on, that would certainly help you in this case. Regardless, creating a new, specific column family (table) to handle queries for those rows is going to be a better approach than using IN or multiple queries.

Question:

I'm trying to store objects in cassandra with a user defined type as the partition key. I am using the datastax java driver for object mapping, and while I am able to insert into the database, I cannot retrieve the object. If I change the partition key to use a non-udt (such as text) I am able to save and retrieve (even if there are other udt of the same type on the object). From reading the documentation it appears that UDTs are allowed as keys. I've also not been able to find any indication that the java driver doesn't support udts as keys either. It looks like it is failing during the mapping of the object, but only when the UDT is the partition key.

Is this an unsupported feature? Do I need to only use the default types as keys when using the object mapper? Or am I just doing something wrong?

Here are the cql commands I used to set up the database:

create keyspace example_keyspace;
use example_keyspace;
create type my_data_type (value text);
create table my_classes (key frozen<my_data_type>, value frozen<my_data_type>, primary key (key));

Here is the java code I used to try inserting and retrieving:

package me.example;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.datastax.driver.mapping.annotations.Frozen;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import com.datastax.driver.mapping.annotations.UDT;

public class Main {

    public static void main(String[] args) {
        try (Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1")
                .build()) {
            Mapper<MyClass> mapper = new MappingManager(cluster.newSession())
                    .mapper(MyClass.class);

            MyDataType value = new MyDataType();
            value.setValue("theValue");

            MyDataType key = new MyDataType();
            key.setValue("theKey");

            MyClass myClass = new MyClass();
            myClass.setKey(key);
            myClass.setValue(value);

            mapper.save(myClass);

            MyClass toret = mapper.get(key);
            System.out.println(toret.getKey());
            System.out.println(toret.getValue().getValue());
        }
    }

    @Table(keyspace = "example_keyspace", name = "my_classes")
    public static class MyClass {
        @PartitionKey
        @Frozen
        private MyDataType key;

        @Frozen
        private MyDataType value;

        public MyDataType getKey() {
            return key;
        }

        public void setKey(MyDataType key) {
            this.key = key;
        }

        public MyDataType getValue() {
            return value;
        }

        public void setValue(MyDataType value) {
            this.value = value;
        }

    }

    @UDT(keyspace = "example_keyspace", name = "my_data_type")
    public static class MyDataType {

        private String value;

        public String getValue() {
            return value;
        }

        public void setValue(String value) {
            this.value = value;
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((value == null) ? 0 : value.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof MyDataType)) {
                return false;
            }
            MyDataType other = (MyDataType) obj;
            if (value == null) {
                if (other.value != null) {
                    return false;
                }
            } else if (!value.equals(other.value)) {
                return false;
            }
            return true;
        }
    }

}

A select shows the object was inserted successfully:

select * from my_classes;

yields:

     key               | value
-------------------+---------------------
 {value: 'theKey'} | {value: 'theValue'}

But my eclipse console is outputting an error:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidTypeException: Invalid value for CQL type frozen<example_keyspace.my_data_type>, expecting class com.datastax.driver.core.UDTValue but class me.example.Main$MyDataType provided
    at com.datastax.driver.core.DataType.serialize(DataType.java:619)
    at com.datastax.driver.mapping.Mapper.getQuery(Mapper.java:320)
    at com.datastax.driver.mapping.Mapper.get(Mapper.java:342)
    at me.example.Main.main(Main.java:31)

I've also tried using a UDTValue object as the key to retrieve the object, but I still get the same error.

I am running cassandra 2.1.7 on OS X 10.10.4. Java version is 1.8.0_45. Datastax java driver core and mapper are version 2.1.6.

Thanks!


Answer:

This is a bug in the driver. I've created JAVA-831 in our issue tracker.

In the meantime, you can use the following workaround:

MappingManager manager = new MappingManager(cluster.newSession());
UDTMapper<MyDataType> myDataTypeMapper = manager.udtMapper(MyDataType.class);

UDTValue keyAsUDTValue = myDataTypeMapper.toUDT(key);
MyClass toret = mapper.get(keyAsUDTValue);

Question:

Just started working with spark and cassandra in Java and I am already stuck with saving data to my cassandra database. here is the java bean class that i have

    public class User implements Serializable {
    public User(){}

    public User(String username, String password){
        this.username = username;
        setPassword(password);
    }

    public User(String username, String password, boolean admin){
        this.username = username;
        this.admin = admin;
        setPassword(password);
    }

    private String username;
    public String getUserName(){ return username; }
    public void setUsername(String username){ this.username = username; }

    private String password;
    public String getPassword(){ return password; }
    public void setPassword(String password){ this.password = password; }

    private Boolean admin = false;
    public boolean isAdmin(){ return admin; }
    public void setAdmin(boolean admin){ this.admin = admin; }

    private Calendar dateRegistered = Calendar.getInstance();
    public Calendar getDateRegistered(){ return dateRegistered; }
}

I have a connection with my cassandra database and try to save data the followint way

JavaRDD<User> usersRDD = sparkContext.parallelize(users);
    javaFunctions(usersRDD).writerBuilder("database", "users", mapToRow(User.class)).saveToCassandra();

where users is a list of initiated users. when i excecute this i get the following error.

java.lang.IllegalArgumentException: requirement failed: Columns not found in class com.app.models.User: [username]
at scala.Predef$.require(Predef.scala:233)
at com.datastax.spark.connector.mapper.ReflectionColumnMapper.columnMapForWriting(ReflectionColumnMapper.scala:91)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:27)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:18)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:269)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:59)
at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:443)
at com.autobot.context.SparkContext.createUsers(SparkContext.java:56)
at com.autobot.context.SparkContext.createUser(SparkContext.java:51)
at com.autobot.user.UserTest.saveUser(UserTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Answer:

Maybe this exception comes from inconsistent case of username and it's getters/setters? I believe it should be:

private String userName;
public String getUserName(){ return username; }
public void setUserName(String username){ this.username = username; }

Question:

I changed the authenticator value of my Cassandra database to 'PasswordAuthenticator' in cassandra.yaml file. Previously I used following code to connect to the database using java.

public void connect(String node) {
      cluster = Cluster.builder()
            .addContactPoint(node).build();
      Metadata metadata = cluster.getMetadata();
      System.out.printf("Connected to cluster: %s\n", 
            metadata.getClusterName());
      for ( Host host : metadata.getAllHosts() ) {
         System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n",
               host.getDatacenter(), host.getAddress(), host.getRack());
      }
      session = cluster.connect();
   }

Now this code gives me error saying

Exception in thread "main" com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host /127.0.0.1: Host /127.0.0.1 requires authentication, but no authenticator found in Cluster configuration

I understand that I need to connect to the database with my super user username and password. How can I give those details when connecting to database using java?


Answer:

You can do that by adding the .withCredentials method to your cluster builder, like this:

  cluster = Cluster.builder()
        .addContactPoint(node)
        .withCredentials("yourusername", "yourpassword")
        .build();

Question:

I am trying to insert in batches (Objects are stored in an arraylist and as soon as count is divisible by 10000, I insert all these objects into my table. But it takes more than 4 minutes to do so. Is there any approach which is faster?

arr.add(new Car(name, count, type));
if(count%10000==0){
repository.saveAll(arr);
arr.clear();
}

Answer:

So here is what is happening. I am most curious to see the table definition inside Cassandra. But given your Car constructor,

new Car(name, count, type)

Given those column names, I'm guessing that name is the partition key.

The reason that is significant, is because the hash of the partition key column is what Cassandra uses to figure out which node (token range) the data should be written to.

When you saveAll on 10000 Cars at once, there is no way you can guarantee that all 10000 of those are going to the same node. To deal with this, Spring Data Cassandra must be using a BATCH (or something like it) behind the scenes. If it is a BATCH, that essentially puts one Cassandra node (designated as a "coordinator") to route writes to the required nodes. Due to Cassandra's distributed nature, that is never going to be fast.

If you really need to store 10000 of them, the best way would be send one write at a time asynchronously. Of course, you won't want 10000 threads all writing concurrently, so you'll want to throttle-down (limit) the number of active threads in your code. DataStax's Ryan Svihla has written a couple of articles detailing how to do this. I recommend this one- Cassandra: Batch Loading Without the Batch - The Nuanced Edition.

tl;dr;

Spring Data Cassandra's saveAll really shouldn't be used to persist several thousand writes. If I were using Spring Data Cassandra, I wouldn't even go beyond double-digits with saveAll, TBH.

Edit

Check out this answer for details on how to use Spring Boot/Data with Cassandra asyncrhonously: AsyncCassandraOperations examples

Question:

I'm running Cassandra 2.1.0 as my client due to 2.0.9 not supporting concurrent writers on the same table, 2.0.9 on the cluster.

I can use concurrent CQLSStableWriter objects for a single CF in one JVM instance. However, when I try to use two CQLSStableWriter objects, one for each CF, for two CF's in one JVM instance, I receive the error:

Exception in thread "Thread-2" java.lang.IllegalArgumentException: unconfigured columnfamily <the second column family>
at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.getStatement(CQLSSTableWriter.java:460)
at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.using(CQLSSTableWriter.java:391)
at CsvLoader.generateSSTables(CsvLoader.java:60)
at MultiThreadedCsvLoader$LoaderThread.run(MultiThreadedCsvLoader.java:93)
Caused by: org.apache.cassandra.exceptions.InvalidRequestException: unconfigured columnfamily avping_v2_file_sha2_id_idx
at org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily(ThriftValidation.java:115)
at org.apache.cassandra.cql3.statements.ModificationStatement$Parsed.prepare(ModificationStatement.java:730)
at org.apache.cassandra.cql3.statements.ModificationStatement$Parsed.prepare(ModificationStatement.java:724)
at org.apache.cassandra.cql3.QueryProcessor.getStatement(QueryProcessor.java:437)
at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.getStatement(CQLSSTableWriter.java:449)
... 3 more

The code I am running is in short:

CQLSSTableWriter writer1 = CQLSSTableWriter.builder().inDirectory("keyspace/cf_1").forTable(<cf_1 create statement>).using(<cf_1 insert statement>).build();
CQLSSTableWriter writer2 = CQLSSTableWriter.builder().inDirectory("keyspace/cf_2").forTable(<cf_2 create statement>).using(<cf_2 insert statement>).build();

The error occurs during the second call of using(). The program has multiple threads, but I restricted to one thread for debugging.

Is multiple CQLSSTableWriters for multiple CF's in one JVM instance supported currently? Am I using the API correctly?

The reason I am writing to multiple CF's is that I need to build the main table and also one or more indices. sstableloader seems to be the recommended method for bulk loading. Are there any other decent methods to approach this problem if CQLSSTableWriter doesn't support my use case, such as loading the main table first and then using a CQL client to iterate over the rows in the main CF and inserting into the index? Or just switch to completely using CQL BATCH?

The first test data set is 10's of TB. The data is either in gzip'd text files or a Postgres database.


Answer:

between writer1 and writer2 put you can insert this

import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.Schema;
...
CQLSSTableWriter writer1 = CQLSSTableWriter.builder().inDirectory("keyspace/cf_1").forTable(<cf_1 create statement>).using(<cf_1 insert statement>).build();
... do your stuff with writer1....
// remove keyspace definition
KSMetaData ksm = Schema.instance.getKSMetaData("keyspace");
Schema.instance.clearKeyspaceDefinition(ksm);
CQLSSTableWriter writer2 = CQLSSTableWriter.builder().inDirectory("keyspace/cf_2").forTable(<cf_2 create statement>).using(<cf_2 insert statement>).build();
... do your stuff with writer2....

it worked for me hope it helps

Question:

I would like to generate unique id for entity and store the entity in Cassandra database (only if entity with generated id does not exist already).

After id generation I check in db if there is any entity with the same id. If not, then the entity is saved. Sample code from MyService class:

synchronized (MyService.class) {
    do {
        id = generateId();
    } while (myDao.find(id) != null);
    sampleObject.setId(id);
    myDao.create(sampleObject);
}

In MyDao to save entity I'm using:

cassandraOperations.insert(sampleObject);

What is the best practice to ensure that already generated id does not exist in database? I feel that this synchronize block is not the most efficient solution. Or maybe there is any other way in which I can ensure that entity is inserted only if there is no entity with the same id in database?


Answer:

Type 1 uuids (timeuuid) guarantees no collisions provided you create less than 10k uuids per millisecond (per host). So this is easiest solution with no impact on throughput or latency. If you use a type 4 random uuid (uuid type) the chance of a collision is less than a super volcano erupting from under your datacenter but it doesn't provide the guarantee timeuuid does.

If you want you can also use lightweight transactions with the IF NOT EXISTS clause on your query.

INSERT INTO keyspace_name.table_name
  ( identifier, column_name...)
  VALUES ( value, value ... ) IF NOT EXISTS

This will only apply the mutation if the row does not already exist. The query will return an applied field that tells you if it succeeded or not. If another inserted same thing only one would work.

https://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html#reference_ds_gp2_1jp_xj__if-not-exists

This will be slower since it uses paxos, which takes multiple hops around your cluster to complete.

Question:

Could someone please respond to my below questions,

1) I have the 4 nodes 172.30.56.60, 172.30.56.61, 172.30.56.62, 172.30.56.63 and also I have configured the seeds as '172.30.56.60, 172.30.56.61' in cassandra.yaml in all the four nodes. NOTE : I didn't give any information about the '172.30.56.62, 172.30.56.63' in the cassandra.yaml file. But when I start the Cassandra in all the four nodes, How does Cassandra has the ability to identify the 62 and 63 ?

2) How exactly gossip protocol work / How exactly Cassandra bootstrap works?

Thanks, Harry


Answer:

(Disclaimer: I'm a Scylla employee)

When you start Cassandra / Scylla on your nodes, they contact the seed/s nodes (which you defined in the yaml file for all 4 nodes) to get the information about the ring, token ranges and the other members in the ring (other nodes).

Bootstrap controls the ability for the data in cluster to be automatically redistributed when a new node is inserted. The new node joining the cluster is defined as an empty node without system tables or data.

  • Contact the seed nodes to learn about gossip state.
  • Transition to Up and Joining state (to indicate it is joining the cluster; represented by UJ in the nodetool status).
  • Contact the seed nodes to ensure schema agreement.
  • Calculate the tokens that it will become responsible for.
  • Stream replica data associated with the tokens it is responsible for from the former owners.
  • Transition to Up and Normal state once streaming is complete (to indicate it is now part of the cluster; represented by UN in the nodetool status).

You can read more about bootstrapping here: http://thelastpickle.com/blog/2017/05/23/auto-bootstrapping-part1.html

The gossip protocol makes sure every node in the system eventually knows important information about every other node's state, including those that are unreachable or not yet in the cluster when any given state change occurs. Gossip timer task runs every second. During each of these runs the node initiates gossip exchange according to following rules:

  1. Gossip to random live endpoint (if any)
  2. Gossip to random unreachable endpoint with certain probability depending on number of unreachable and live nodes
  3. If the node gossiped to at (1) was not seed, or the number of live nodes is less than number of seeds, gossip to random seed with certain probability depending on number of unreachable, seed and live nodes.

These rules ensure that if the network is up, all nodes will eventually know about all other nodes. (Clearly, if each node only contacts one seed and then gossips only to random nodes it knows about, you can have partitions when there are multiple seeds -- each seed will only know about a subset of the nodes in the cluster. Step 3 avoids this and more subtle problems.)

This way a node initiates gossip exchange with one to three nodes every round (or zero if it is alone in the cluster)

You can read more about gossip high-level architecture here: https://wiki.apache.org/cassandra/ArchitectureGossip

You can read more about Scylla gossip implementation here: https://github.com/scylladb/scylla/wiki/Gossip

Question:

I have here a bunch of XML-files which I like to store in a Cassandra database. Is there any possiblity out there to manage that or do I have to parse and reform the XML-files?


Answer:

You can certainly store them as a blob or text but you will not be able to query the individual fields within the XML files. One other thing you'd want to be cautious of is payload size and partition size. Cassandra in general isn't really designed as an object store but depending on payload size and desired query functionality, you may either have to parse/chunk them out or look for an alternative solution.

Question:

I have a 'user' table like below,

 user_id | user_name | user_phone
---------+-----------+-------------
      23 |     user, | 12345678910
       5 |     user^ | 12345678910
      10 |     user- | 12345678910
      16 |     user{ | 12345678910
      13 |     user= | 12345678910
      11 |     user_ | 12345678910
       1 |     user@ | 12345678910
      19 |     user" | 12345678910
       8 |     user( | 12345678910
       0 |     user! | 12345678910
       2 |     user# | 12345678910
       4 |     user% | 12345678910
      18 |     user[ | 12345678910
      15 |     user} | 12345678910
      22 |     user< | 12345678910
      27 |     user/ | 12345678910
      20 |     user: | 12345678910
       7 |     user* | 12345678910
       6 |     user& | 12345678910
       9 |     user) | 12345678910
      14 |     user| | 12345678910
      26 |     user? | 12345678910
      21 |     user; | 12345678910
      17 |     user] | 12345678910
      24 |     user> | 12345678910
      25 |     user. | 12345678910
      12 |     user+ | 12345678910
       3 |     user$ | 12345678910

I did an indexing on the 'user_name' field,

CREATE CUSTOM INDEX user_name_idx ON user ("user_name") USING 'org.apache.cassandra.index.sasi.SASIIndex' WITH OPTIONS = {'mode': 'CONTAINS', 'analyzer_class': 'org.apache.cassandra.index.sasi.analyzer.StandardAnalyzer', 'case_sensitive': 'false'};

But when I search like below,

select * from user where "user_name" LIKE '%u%' -> This works

select * from user where "user_name" LIKE '%,%' -> This is not working None of the special characters are working in Cassandra. What Am I doing wrong here? How to support special character search in Cassandra?

Anything to do with indexing?


Answer:

You are using StandardAnalyzer, which remove special character

If you want to keep special character use NonTokenizingAnalyzer

Example :

CREATE CUSTOM INDEX user_name_idx 
   ON user ("user_name") USING 'org.apache.cassandra.index.sasi.SASIIndex' 
   WITH OPTIONS = {'mode': 'CONTAINS', 'analyzer_class': 'org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer', 'case_sensitive': 'false'};

Sample Output :

cqlsh:test> SELECT * FROM user WHERE user_name LIKE '%,%';

 user_id | user_name | user_phone
---------+-----------+------------
      23 |     user, | 12345678910

Question:

I have two Cassandra databases in two different hosts. Both have tables with the exact same definition. How do I migrate data from one to the other?

SparkConf sparkConf = new SparkConf().setAppName(App.APP_NAME)
    .set("spark.cassandra.connection.host", App.CASSANDRA_HOST)
    .set("spark.cassandra.auth.username", "user")            
    .set("spark.cassandra.auth.password", "pass")
    .set("spark.cleaner.ttl", "3600");
SparkContext sparkContext = new SparkContext(sparkConf);

JavaRDD<EventLog> logs = javaFunctions(sparkContext)
    .cassandraTable("xyz", "event_log", mapRowTo(EventLog.class));
javaFunctions(logs).writerBuilder("xyz", "event_log", mapToRow(EventLog.class)).saveToCassandra();
sparkContext.stop();

In my Java code, I have read from the original DB and want to use the writerBuilder to write it into another DB. Where do I put the second DB's config and how do I write it there?


Answer:

From the docs, there is a method

def withConnector(connector: CassandraConnector): WriterBuilder

Which you can call to pass in a different CassandraConnector object with the destination cluster information. This returns a new WriterBuilder which you would call saveToCassandra on.

Question:

If I have a Cassandra database on server A and I have a database on server B, which I would like to update with data from my Cassandra database once a day, what are the techniques I should use to accomplish this?

Any discussions I have overheard revolve around picking either HTTP streaming or pagination.

Why do people suggest HTTP streaming or pagination as approaches to do this? Are there any drawbacks or benefits to picking either?


Answer:

This is my professional opinion, but to recap the comments

cassandra database on server A and a database on server B

Whatever you write must be able to actually communicate with Server B. Therefore , you're limited to what tools/methods you can use .

which I would like to update with data from my cassandra database once a day

Long Running batch jobs are not ideal because they can fail at any time and need restarted. Therefore, I recommend a one-off operation on a scheduled service like cron, Oozie, Airflow, etc.

http streaming or pagination

Regarding pagination, that's a reasonable solution, but just make sure your client supports it and you can setup multithreading if it's slow.

I personally have no experience with Cassandra and streaming out of it, only into it. And it was using Spark Streaming out of Kafka. I am fairly sure that's binary protocol, but not http. I can't find anything on REST streaming on Cassandra, but it doesn't sound like a good idea to sit your own REST endpoint on top of Cassandra unless you are really skilled at optimizing concurrent operations

Back to the tools/methods, I've mentioned Spark, and I know Cassandra has a nice Spark Connector, so that's my recommendation. If your database B uses JDBC, then you can load data straight from Spark in probably less than 10 lines