Hot questions for Using Cassandra in connector

Question:

Here are my specs:

  • Casssandra version: 3.0.0
  • Operating System: Mac OSX Yosemite 10.10.5
  • Spark Version: 1.4.1

Context:

I have created a keyspace "movies" and a table "movieinfo in Cassandra. I have installed and assembled a jar file following the guidance from this post. I have written a small script(below) to test my connection:

scala> sc.stop

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf

scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> val conf = new SparkConf()
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2ae92511

scala> conf.set("cassandra.connection.host", "127.0.0.1")
res1: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2ae92511

scala> val sc = new SparkContext("local[*]", "Cassandra Test", conf)
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@59b5251d

scala> val table = sc.cassandraTable("movies", "movieinfo")
table: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> table.count

However, I receive the proceeding tracelog.

15/11/24 09:21:30 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
java.io.IOException: Failed to open native connection to Cassandra at {10.223.134.106}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
    at $iwC$$iwC$$iwC.<init>(<console>:55)
    at $iwC$$iwC.<init>(<console>:57)
    at $iwC.<init>(<console>:59)
    at <init>(<console>:61)
    at .<init>(<console>:65)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.223.134.106:9042 (com.datastax.driver.core.TransportException: [/10.223.134.106:9042] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:220)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1393)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:402)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
    ... 70 more

I believe that I may need to modify the settings and dependencies in the pom.xml file (citing this post). However, as I am a novice in both Spark and Java I would appreciate any guidance or feedback on how to best proceed. Thank you for your support.


Answer:

This warning comes from the Java driver. What it tells you is that it has found in your classpath Netty's native transport, but this feature is only available under Linux, while you are running on Mac OS X.

If you are using Maven, check your dependencies to see if you are manually (or transitively) including this dependency:

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-transport-native-epoll</artifactId>
      <version>...</version>
    </dependency>

If so, simply remove it. Otherwise, it is safe to disregard this warning.

Question:

I am trying to use Apache Spark to process my large (~230k entries) cassandra dataset, but I am constantly running into different kinds of errors. However I can successfully run applications when running on a dataset ~200 entries. I have a spark setup of 3 nodes with 1 master and 2 workers, and the 2 workers also have a cassandra cluster installed with data indexed with a replication factor of 2. My 2 spark workers show 2.4 and 2.8 GB memory on the web interface and I set spark.executor.memory to 2409 when running an application, to get a combined memory of 4.7 GB. Here is my WebUI Homepage

The environment page of one of the tasks

At this stage, I am simply trying to process data stored in cassandra using spark. Here is the basic code I am using to do this in Java

SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);

CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);

System.out.println(rdd.count());

For a successful run, on a small dataset (200 entries), the events interface looks something like this

But when I run the same thing on a large dataset (i.e. I change only the CASSANDRA_COLUMN_FAMILY), the job never terminates inside the terminal, the log looks like this

and after ~2 minutes, the stderr for the executors looks like this

and after ~7 minutes, I get

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

in my terminal, and I have to manually kill the SparkSubmit process. However, the large dataset was indexed from a binary file that occupied only 22 MB, and doing nodetool status, I can see that only ~115 MB data is stored in both of my cassandra nodes. I have also tried to use Spark SQL on my dataset, but have got similar results with that too. Where am I going wrong with my setup, and what should I do to successfully process my dataset, for both a Transformation-Action program and a program that uses Spark SQL.

I have already tried the following methods

  • Using -Xms1G -Xmx1G to increase memory, but the program fails with an exception saying that I should instead set spark.executor.memory, which I have.

  • Using spark.cassandra.input.split.size, which fails saying it isn't a valid option, and a similar option is spark.cassandra.input.split.size_in_mb, which I set to 1, with no effect.

EDIT

based on this answer, I have also tried the following methods:

  • set spark.storage.memoryFraction to 0

  • not set spark.storage.memoryFraction to zero and use persist with MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK and MEMORY_AND_DISK_SER.

Versions:

  • Spark: 1.4.0

  • Cassandra: 2.1.6

  • spark-cassandra-connector: 1.4.0-M1


Answer:

I think there is a issue in the latest spark-cassandra-connector. The parameter spark.cassandra.input.split.size_in_mb is supposed to have a default value of 64 MB which is being interpreted as 64 bytes in the code. This causes too many partitions to be created, which can't be scheduled by spark. Try setting the conf value to

spark.cassandra.input.split.size_in_mb=67108864

Question:

I'm using a tutorial here in this Github to run spark on cassandra using a java maven project: https://github.com/datastax/spark-cassandra-connector.

I've figured how to use direct CQL statements, as I have previously asked a question about that here: Querying Data in Cassandra via Spark in a Java Maven Project

However, now I'm trying to use the datastax java API in fear that my original code in my original question will not work for Datastax version of Spark and Cassandra. For some weird reason, it won't let me use .where even though it is outlined in the documentation that I can use that exact statement. Here is my code:

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
       }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

here is the error:

14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy8.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
    ... 27 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849)
    ... 3 more
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

I know that my error is specifically at this section:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

When I remove the .where(), it works. But it says specifically on github that you should be able to execute .where and .map functions respectively. Does anyone have any type of reasoning for this? or solution? Thanks.

edit i get the error to go away when i use this statement instead:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("id=?", "1").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

I have no idea why this option works but not the rest of my variations. Here are the statements i ran in my cql so that you know what my keyspace looks like:

    session.execute("DROP KEYSPACE IF EXISTS tester");
    session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
    session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
    session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))");
    session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0001," +
                  "'Angel'," +
                  "'Pay'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
    session.execute(
          "INSERT INTO tester.empByRole (id, fname, lname, role) " +
          "VALUES (" +
              "0001," +
              "'Angel'," +
              "'Pay'," +
              "'IT Engineer'" +
              ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
        session.execute(
              "INSERT INTO tester.dept (id, dname) " +
              "VALUES (" +
                  "1553," +
                  "'Commerce'" +
                  ");");

Answer:

The where method adds ALLOW FILTERING to your query under the covers. This is not a magic bullet, as it still doesn't support arbitrary fields as query predicates. In general, the field must either be indexed or a clustering column. If this isn't practical for your data model, you can simply use the filter method on the RDD. The downside is that the filter takes place in Spark and not in Cassandra.

So the id field works because it's supported in a CQL WHERE clause, whereas I'm assuming role is just a regular field. Please note that I am NOT suggesting that you index your field or change it to a clustering column, as I don't know your data model.

Question:

I am using the Spark Cassandra connector. It take 5-6 minutes for fetch data from Cassandra table. In Spark I have seen many tasks and Executor in log. The reason might be that Spark divided the process in many tasks!

Below is my code example :

public static void main(String[] args) {

    SparkConf conf = new SparkConf(true).setMaster("local[4]")
            .setAppName("App_Name")
            .set("spark.cassandra.connection.host", "127.0.0.1");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Demo_Bean> empRDD = javaFunctions(sc).cassandraTable("dev",
            "demo");
    System.out.println("Row Count"+empRDD.count());
}

Answer:

After searching on Google i fond the issue in the latest spark-cassandra-connector. The parameter spark.cassandra.input.split.size_in_mb Default value is 64 MB which is being interpreted as 64 bytes in the code. So try with spark.cassandra.input.split.size_in_mb = 64 * 1024 * 1024 = 67108864

Hear is an example :

public static void main(String[] args) {

    SparkConf conf = new SparkConf(true).setMaster("local[4]")
            .setAppName("App_Name")
            .set("spark.cassandra.connection.host", "127.0.0.1")
            .set("spark.cassandra.input.split.size_in_mb","67108864");


    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Demo_Bean> empRDD = javaFunctions(sc).cassandraTable("dev",
            "demo");
    System.out.println("Row Count"+empRDD.count());
}

Question:

I am facing some issues with spark cassandra connector filtering for java. Cassandra allows the filtering by last column of the partition key with IN clause. e.g

create table cf_text
(a varchar,b varchar,c varchar, primary key((a,b),c))

Query : select * from cf_text where a ='asdf' and b in ('af','sd');

sc.cassandraTable("test", "cf_text").where("a = ?", "af").toArray.foreach(println)

How count I specify the IN clause which is used in the CQL query in spark? How range queries can be specified as well?


Answer:

Just wondering, but does your Spark code above work? I thought that Spark won't allow a WHERE on partition keys (a and b in your case), since it uses them under the hood (see last answer to this question): Spark Datastax Java API Select statements

In any case, with the Cassandra Spark connector, you are allowed to stack your WHERE clauses, and an IN can be specified with a List<String>.

List<String> valuesList = new ArrayList<String>();
valuesList.Add("value2");
valuesList.Add("value3");

sc.cassandraTable("test", "cf")
    .where("column1 = ?", "value1")
    .where("column2 IN ?", valuesList)
    .keyBy(new Function<MyCFClass, String>() {
                public String call(MyCFClass _myCF) throws Exception {
                    return _myCF.getId();
                }
            });

Note that the normal rules of using IN with Cassandra/CQL still apply here.

Range queries function in a similar manner:

sc.cassandraTable("test", "person")
    .where("age > ?", "15")
    .where("age < ?", "20")
    .keyBy(new Function<Person, String>() {
                public String call(Person _person) throws Exception {
                    return _person.getPersonid();
                }
            });

Question:

so I previously had some questions on how to query cassandra using spark in a java maven project here: Querying Data in Cassandra via Spark in a Java Maven Project

Well my question was answered and it worked, however I've run into an issue (possibly an issue). I'm trying to now use the datastax java API. Here is my code:

package com.angel.testspark.test2;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
               }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

here is my error:

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Now I KNOW exactly where my error is. It is System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray())); because I need to convert rdd to an Array. However, the API documentation SAID i should be able to do this... this is code copied and pasted from the documentation. Why can I not serialize the RDD to an array?

I've already inserted dummy data into my cassandra using the insertions in my post that I included in the link above.

Also, a previous error that I solved was when i changed all of my getters and setters to lowercase. When I used capitals in them, it produced an error. Why can't I use capitals in my getters and setters here?

Thanks, Angel


Answer:

Changing public class App to public class App implements Serializable should fix the error. Because a java inner class will keep a reference to the outer class, your Function object will have a reference to App. As Spark needs to serialize your Function object, it requires App is also serializable.

Question:

I am using username and password to connect to cassandra, not the super user. Using the same user with spark give authorization exception. spark tries to connect to system.size_estimates. Which are the other system tables does spark cassandra connector use. I need the information to to get the access from DBA. Also I would like to know whether READ permission is enough or WRITE is also required.


Answer:

The Cassandra tables that the spark-cassandra-connector 1.6 is using are

  • system.size_estimates
  • system.peers
  • system.local

read access should be enough.

Question:

I need to write data of my filtered stream to cassandra using Java and Datastax Spark Cassandra Connector.

I followed the datastax java documentation.

The documentation explains how to write a RDD to cassandra, but not how to write a Dstream.

I need to be able to save a PairDStream and I don't know how to do it, because all examples are written in scala.

I need to turn the following code written in scala in the java's code:

val wc = stream.flatMap(_.split("\\s+"))
    .map(x => (x, 1))
    .reduceByKey(_ + _)
    .saveToCassandra("streaming_test", "words", SomeColumns("word", "count")) 

Basically I have to save a JavaPairDStream<String, Integer>


Answer:

Solution, in case anyone looking for the answer

To write a DStream or JavaDStream to Cassandra, need to import:

import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;

and use javaFunctions(DStream<T> arg0) or javaFunctions(JavaDStream<T> arg0)

Question:

I have used Spark SQL to retrieve data from a Cassandra database:

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer " +
                "WHERE CAST(store_id as string) = '" + storeId + "'");

After that I did some filtration and I want to save this data into another Cassandra table that looks like this:

store_id uuid,
report_name text,
report_time timestamp,
sharder int,
customer_email text,
count int static,
firts_name text,
last_name text,
PRIMARY KEY ((store_id, report_name, report_time, sharder), customer_email)

How can I add these additional properties when I save the DataFrame into the new table? Also what is the best practice to shard the Cassandra long row using this example? I expect to have 4k-6k records in the DataFrame, so sharding the long row is a must, but I am not sure if counting the records and then changing the sharder for a certain number of items is the best practice in Spark or Cassandra.


Answer:

after you have the DataFrame, you can define a case class, which has the structure of the new schema with the added properties.

You can create the case class like this: case class DataFrameRecord(property1: String, property2: Long, property3: String, property4: Double)

Then you can use map to convert into the new structure using the case class: df.rdd.map(p => DataFrameRecord(prop1, prop2, prop3, prop4)).toDF()

Question:

I'm new in Apache Spark and I want to connect the spark to Cassandra Database.

  • Spark version:2.2.0
  • Cassandra version:2.1.14

The error is happening at below line

(long count = javaFunctions(sc).cassandraTable("test", "table1").count();)

My java code is below:

public SparkTestPanel(String id, User user) {
    super(id);
    form = new Form("form");
    form.setOutputMarkupId(true);
    this.add(form);
    SparkConf conf = new SparkConf(true);
    conf.setAppName("Spark Test");
    conf.setMaster("local[*]");
    conf.set("spark.cassandra.connection.host", "127.0.0.1");
    conf.set("spark.cassandra.auth.username", "username");
    conf.set("spark.cassandra.auth.password", "password");
    JavaSparkContext sc=null;
    try {
        sc = new JavaSparkContext(conf);
        long count = javaFunctions(sc).cassandraTable("test", "table1").count();

    } finally {
        sc.stop();
    }
}

And my pom.xml is this:

 <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector-java_2.10 -->
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.11</artifactId>
        <version>1.5.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.3</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.2.0</version>
        <type>jar</type>
    </dependency>

And I get this error:

[Stage 0:=======================>                                  (4 + 4) / 10]
[Stage 0:>                                                         (0 + 4) / 10]
[Stage 0:=====>                                                    (1 + 4) / 10]
[Stage 0:=================>                                        (3 + 4) / 10]
2017-08-16 12:22:48,857 ERROR Executor:91 - Exception in task 6.0 in stage 0.0 (TID 6)
java.lang.AbstractMethodError: com.datastax.spark.connector.japi.GenericJavaRowReaderFactory$JavaRowReader.read(Lcom/datastax/driver/core/Row;Lcom/datastax/spark/connector/CassandraRowMetadata;)Ljava/lang/Object;
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$15.apply(CassandraTableScanRDD.scala:345)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$15.apply(CassandraTableScanRDD.scala:345)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:397)
at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1801)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
2017-08-16 12:22:48,861 ERROR TaskSetManager:70 - Task 6 in stage 0.0 failed 1 times; aborting job

Can anyone help?


Answer:

Remove spark-cassandra-connector-java_2.11 and cassandra-driver-core dependencies. Your pom.xml should be like below.

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.11</artifactId>
    <version>2.0.3</version>
</dependency>

Question:

I have problem for retrieving data from Cassandra table using sparkCassandraConnector. I created a name space called "ks" and table "student" in Cassandra. the table is like follow:

id | name

----+-----------

10 | Catherine

I started Spark locally with running start-all.sh

Then I created this the class "SparkCassandraConnector" which has a command for connecting spark and Cassandra.What I am trying to do is to retrieve data from student table and print it on screen.

The error I get is "java.lang.ClassNotFoundException: SparkCassandraConnector$Student java.net.URLClassLoader$1.run(URLClassLoader.java:372) java.net.URLClassLoader$1.run(URLClassLoader.java:361) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:360) java.lang.ClassLoader.loadClass(ClassLoader.java:424) java.lang.ClassLoader.loadClass(ClassLoader.java:357) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:340)

This is my Program:

import org.apache.commons.lang.StringUtils;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;

public class SparkCassandraConnector  implements Serializable {
public static void main(String[] args) {

    SparkConf conf = new SparkConf().setAppName("Simple Application");

    conf.setMaster("spark://127.0.0.1:7077");
    conf.set("spark.cassandra.connection.host", "127.0.0.1");
    String[] jars = new String[10];
    jars[0] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector-java_2.10/1.1.0-alpha4/spark-cassandra-connector-java_2.10-1.1.0-alpha4.jar";
    jars[1] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[3] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.1.0-alpha4/spark-cassandra-connector_2.10-1.1.0-alpha4.jar";
    jars[4] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
    jars[5] = "~/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.0/cassandra-thrift-2.1.0.jar";
    jars[6] = "~/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.0/cassandra-clientutil-2.1.0.jar";
    conf = conf.setJars(jars);
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "student", Student.class)
            .map(new org.apache.spark.api.java.function.Function<Student, String>() {
                @Override
                public String call(Student person) throws Exception {
                    return person.toString();
                }
            });
    System.out.println("Data as Person beans: \n" + StringUtils.join(rdd.collect(), "\n"));
}
public static class Student implements  Serializable{

    private Integer id;
    private String name;

    public Student(){

    }
    public Student(Integer id, String name) {
        this.id = id;
        this.name = name;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

}

this is my POM file:

<dependencies>


    <!--Spark-->

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.1.0-alpha4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_2.10</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

Answer:

In the provided jars, the jar containing the job, and hence the Student.class is missing. A quick fix it to add the jar that's in your project's ./target folder.

An alternative is to package your job and all dependencies in an 'uber jar' and use that uber jar as only declared jar. Look into the maven shade plugin.

Jars can also be provided from the command line using spark-submit --jars option.

Question:

I'm trying to do a simple code where I create a schema, insert some tables, and then pull some information and print it out. However, I'm getting an error. I'm using the Datastax cassandra spark connector. I have been using these two examples to help me try to accomplish this:

https://gist.github.com/jacek-lewandowski/278bfc936ca990bee35a

http://www.datastax.com/documentation/developer/java-driver/1.0/java-driver/quick_start/qsSimpleClientAddSession_t.html

However, the second example doesn't use a cassandra spark connector, or spark in general.

Here is my code:

package com.angel.testspark.test;


import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;

import scala.Tuple2;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }
    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        // Prepare the schema
        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS tester");
            session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
            session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
            session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0001," +
                          "'Angel'," +
                          "'Pay'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0002," +
                          "'John'," +
                          "'Doe'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0003," +
                          "'Jane'," +
                          "'Doe'," +
                          "'IT Analyst'" +
                          ");");
                session.execute(
                      "INSERT INTO tester.dept (id, dname) " +
                      "VALUES (" +
                          "1553," +
                          "'Commerce'" +
                          ");");

                ResultSet results = session.execute("SELECT * FROM tester.emp " +
                        "WHERE role = 'IT Engineer';");
            for (Row row : results) {
                System.out.print(row.getString("fname"));
                System.out.print(" ");
                System.out.print(row.getString("lname"));
                System.out.println(); 
            }
                System.out.println();
            }

        }

    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

here is my error:

14/09/18 11:22:18 WARN util.Utils: Your hostname, APAY-M-R03K resolves to a loopback address: 127.0.0.1; using 10.150.79.164 instead (on interface en0)
14/09/18 11:22:18 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/09/18 11:22:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/18 11:22:18 INFO Remoting: Starting remoting
14/09/18 11:22:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@10.150.79.164:50506]
14/09/18 11:22:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.150.79.164:50506]
14/09/18 11:22:18 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/18 11:22:18 INFO storage.DiskBlockManager: Created local directory at /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-local-20140918112218-2c8d
14/09/18 11:22:18 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
14/09/18 11:22:18 INFO network.ConnectionManager: Bound socket to port 50507 with id = ConnectionManagerId(10.150.79.164,50507)
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/09/18 11:22:18 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 10.150.79.164:50507 with 2.1 GB RAM
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/18 11:22:18 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:18 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:18 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:50508
14/09/18 11:22:18 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.150.79.164:50508
14/09/18 11:22:19 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/18 11:22:19 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-a0dc4491-1901-4a7a-86f4-4adc181fe45c
14/09/18 11:22:19 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:50509
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/09/18 11:22:19 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/09/18 11:22:19 INFO ui.SparkUI: Started Spark Web UI at http://10.150.79.164:4040
14/09/18 11:22:19 WARN core.FrameCompressor: Cannot find LZ4 class, you should make sure the LZ4 library is in the classpath if you intend to use it. LZ4 compression will not be available for the protocol.
14/09/18 11:22:19 INFO core.Cluster: New Cassandra host /127.0.0.1:9042 added
14/09/18 11:22:19 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
    at com.sun.proxy.$Proxy6.execute(Unknown Source)
    at com.angel.testspark.test.App.createSchema(App.java:85)
    at com.angel.testspark.test.App.run(App.java:38)
    at com.angel.testspark.test.App.main(App.java:109)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:584)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/18 11:22:20 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

I believe it may just be a syntax error, i'm just unsure of where and what it is.

Any help would be great, thanks. I've scoured the internet and haven't found a simple example of just inserting data and pulling data in java with cassandra and spark.

******edit: @BryceAtNetwork23 and @mikea were correct about my syntax error, so i've edited the question and fixed it. I am getting a new error though so I've pasted in the new error and updated the code


Answer:

Try running your CQL via cqlsh and you should get the same/similar error:

aploetz@cqlsh:stackoverflow> CREATE TABLE dept (id INT PRIMARY KEY, dname TEXT);
aploetz@cqlsh:stackoverflow> INSERT INTO dept (id, dname) VALUES (1553,Commerce);
<ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:50 no viable alternative at
input ')' (... dname) VALUES (1553,Commerce[)]...)">

Put single quotes around "Commerce" and it should work:

session.execute(
                  "INSERT INTO tester.dept (id, dname) " +
                  "VALUES (" +
                      "1553," +
                      "'Commerce'" +
                      ");");

I'm getting a new error though now...

Also try running that from cqlsh.

aploetz@cqlsh:stackoverflow> SELECT * FROM emp WHERE role = 'IT Engineer';
code=2200 [Invalid query] message="No indexed columns present in by-columns clause with Equal operator"

This is happening because role is not defined as your primary key. Cassandra doesn't allow you to query by arbitrary column values. The best way to solve this one, is to create an additional query table called empByRole, with role as the partition key. Like this:

CREATE TABLE empByRole 
    (id INT, fname TEXT, lname TEXT, role TEXT,
    PRIMARY KEY (role,id)
);

aploetz@cqlsh:stackoverflow> INSERT INTO empByRole (id, fname, lname, role) VALUES (0001,'Angel','Pay','IT Engineer');
aploetz@cqlsh:stackoverflow> SELECT * FROM empByRole WHERE role = 'IT Engineer';

 role        | id | fname | lname
-------------+----+-------+-------
 IT Engineer |  1 | Angel |   Pay

(1 rows)

Question:

I saw the github repo of spark-cassandra-connector and i did not found a ReaderBuilder implemented their but a WriterBuilder was implemented and can anyone help me with that as i want to read data from cassandra DB using a CassandraConnector reference.

I wanted to connect two cassandra clusters in the same SparkContext and i want to read data from both of them and so i needed a ReaderBuilder for reading data from my second cassandra cluster also I am working with java language here.

Github repo Link: https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java

CassandraConnector eventsConnector = CassandraConnector.apply(sc.getConf().set("spark.cassandra.connection.host", "192.168.36.234"));

Answer:

My first suggestion would be to not use RDDs in Java. RDD's in Java is much more difficult than in Scala and it's also the old api. I would suggest using DataFrames instead. These provide a much cleaner interface between different datasources as well as automatic optimizations and other benefits.

Now if you cannot use DataFrames, you would instead just make the CassandraJavaRDD and then use "withConnector" or "withReadConf" to change the read configuration.

https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L123-L129

Something like

val cluster2 = CassandraConnector eventsConnector = 
  CassandraConnector.apply(
    sc.getConf()
      .set("spark.cassandra.connection.host", "192.168.36.234"));

  javaFunctions(sc).cassandraTable(ks, "test_table").withConnector(cluster2).collect()
}

There is no need for a builder because the RDD itself has a fluent API. Since writing happens immediately on the conclusion of the call it needed a builder.

Question:

i'm new in spark and when i use filter of spark in java api, i get this error(if collect() all of table it's correctly worked and i can see all of data get from cassandra.) i checked master and workers version are same and when application start in web ui of spark i can see it but:

[Stage 0:>                                                          (0 + 0) / 6]
[Stage 0:>                                                          (0 + 2) / 6]
[Stage 0:>                                                          (0 + 4) / 6]

2017-08-28 16:37:16,239 ERROR TaskSetManager:70 - Task 1 in stage 0.0 failed 4 times; aborting job 2017-08-28 16:37:21,351 ERROR DefaultExceptionMapper:170 - Unexpected error occurred org.apache.wicket.WicketRuntimeException: Method onRequest of interface org.apache.wicket.behavior.IBehaviorListener targeted at org.apache.wicket.extensions.ajax.markup.html.AjaxLazyLoadPanel$1@e7e7465 on component [AjaxLazyLoadPanel [Component id = panel]] threw an exception at org.apache.wicket.RequestListenerInterface.internalInvoke(RequestListenerInterface.java:268) at org.apache.wicket.RequestListenerInterface.invoke(RequestListenerInterface.java:241) at org.apache.wicket.core.request.handler.ListenerInterfaceRequestHandler.invokeListener(ListenerInterfaceRequestHandler.java:248) at org.apache.wicket.core.request.handler.ListenerInterfaceRequestHandler.respond(ListenerInterfaceRequestHandler.java:234) at org.apache.wicket.request.cycle.RequestCycle$HandlerExecutor.respond(RequestCycle.java:895) at org.apache.wicket.request.RequestHandlerStack.execute(RequestHandlerStack.java:64) at org.apache.wicket.request.cycle.RequestCycle.execute(RequestCycle.java:265) at org.apache.wicket.request.cycle.RequestCycle.processRequest(RequestCycle.java:222) at org.apache.wicket.request.cycle.RequestCycle.processRequestAndDetach(RequestCycle.java:293) at org.apache.wicket.protocol.http.WicketFilter.processRequestCycle(WicketFilter.java:261) at org.apache.wicket.protocol.http.WicketFilter.processRequest(WicketFilter.java:203) at org.apache.wicket.protocol.http.WicketFilter.doFilter(WicketFilter.java:284) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:217) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:142) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:518) at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1091) at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:673) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.wicket.RequestListenerInterface.internalInvoke(RequestListenerInterface.java:258) ... 29 more

Caused by: java.lang.RuntimeException: Panel me.SparkTestPanel could not be constructed. at ...

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 10, 21.1.0.41, executor 1): java.lang.ClassNotFoundException: me.SparkTestPanel$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD.count(RDD.scala:1158) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45) at me.SparkTestPanel.(SparkTestPanel.java:77) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ... 39 more

Caused by: java.lang.ClassNotFoundException: me.SparkTestPanel$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

And my code is :

import com.datastax.spark.connector.japi.CassandraJavaUtil;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapColumnTo;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;

import java.util.List;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.wicket.markup.html.form.Form;

/**
 *
 * @author mohamadreza
 */
public class SparkTestPanel extends Panel {

    private Form form;

    public SparkTestPanel(String id) {
        super(id);
        form = new Form("form");
        form.setOutputMarkupId(true);
        this.add(form);             
        SparkConf conf = new SparkConf(true);
        conf.setAppName("Spark Test");
        conf.setMaster("spark://192.16.11.18:7049");
        conf.set("spark.closure.serializer","org.apache.spark.serializer.JavaSerializer");
        conf.set("spark.serializer","org.apache.spark.serializer.JavaSerializer");

        conf.set("spark.cassandra.connection.host", "192.16.11.18");
        conf.set("spark.cassandra.connection.port", "7005");
        conf.set("spark.cassandra.auth.username", "user");
        conf.set("spark.cassandra.auth.password", "password");
        JavaSparkContext sc = null;
        try {
            sc = new JavaSparkContext(conf);
            JavaRDD<CassandraRow> cache = javaFunctions(sc).cassandraTable("keyspace", "test").cache();
            Long count = cache.filter(new Function<CassandraRow, Boolean>() {
                @Override
                public Boolean call(CassandraRow t1) throws Exception {
                    return t1.getString("value").contains("test");
                }
            }).count();
            String a = count.toString();
        } finally {
            sc.stop();
        }
    }
}

And spark version 2.1.1 ,scala version 2.11,JAVA 8 and my pom.xml:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.5</version>
    </dependency>

I use docker for cassandra and spark nodes.(cassandra version 3.0) Can Any one help me?


Answer:

Problem solved :)

When you want use JAVA Api of Apache Spark you must Copy .jar (located in target directory in root of your project) of your project to $SPARK_PATH/jars/ in each Spark node(master and workers).if your .jar is very large you can split ui and spark code and only copy .jar of spark code project and use this spark code in your ui project.

Question:

I get some entries from the stream in linux terminal, assign them as lines, break them into words. But instead of printing them out I want to save them to Cassandra. I have a Keyspace named ks, with a table inside it named record. I know that some code like CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra(); has to do the job but I guess I am doing something wrong. Can someone help ?

Here is my Cassandra ks.record schema (I added these data through CQLSH)

id | birth_date                       | name
----+---------------------------------+-----------
10 | 1987-12-01 23:00:00.000000+0000  | Catherine
11 | 2004-09-07 22:00:00.000000+0000  |   Isadora
1  | 2016-05-10 13:00:04.452000+0000  |      John
2  | 2016-05-10 13:00:04.452000+0000  |      Troy
12 | 1970-10-01 23:00:00.000000+0000  |      Anna
3  | 2016-05-10 13:00:04.452000+0000  |    Andrew

Here is my Java code :

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;


public class CassandraStreaming2 {
    public static void main(String[] args) {

        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CassandraStreaming");
        JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(1));

        // Create a DStream that will connect to hostname:port, like localhost:9999
        JavaReceiverInputDStream<String> lines = sc.socketTextStream("localhost", 9999);

        // Split each line into words
        JavaDStream<String> words = lines.flatMap(
                (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" "))
        );

        words.print();
        //CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();

        sc.start();              // Start the computation
        sc.awaitTermination();   // Wait for the computation to terminate

    }
}

Answer:

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md#saving-data-to-cassandra

As per the docs, you need to also pass a RowWriter factory. The most common way to do this is to use the mapToRow(Class) api, this is the missing parameter described.

But you have an additional problem, your code doesn't yet specify the data in a way that can be written to C*. You have a JavaDStream of only Strings. And a single String cannot be made into a Cassandra Row for your given schema.

Basically you are telling the connector

Write "hello" to CassandraTable (id, birthday, value)

Without telling it where the hello goes (what should the id be? what should the birthday be?)

Question:

I have a problem with running using spark-cassandra-connector with spark-shell.

In general, I follow this tutorial on 'Installing the Cassandra/Spark OSS Stack' by Amy Tobey for the "using spark-cassandra-connector" part. I see that:

  • I manage to connect to Cassandra cluster

    INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
    
  • but I am not able to run count method on the table object of CassandraTableScanRDD class

I have no idea how to interpret the console error output (googling it has not bring any effect) and I kindy ask for any hint of what am I doing wrong.

CONSOLE OUTPUT:

1. running Spark with spark-cassandra-connector jar

$ /usr/local/src/spark/spark-1.1.0/bin/spark-shell --jars /usr/local/src/spark/spark-1.1.0/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar
    Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/30 01:17:40 INFO SecurityManager: Changing view acls to: martakarass,
15/03/30 01:17:40 INFO SecurityManager: Changing modify acls to: martakarass,
15/03/30 01:17:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(martakarass, ); users with modify permissions: Set(martakarass, )
15/03/30 01:17:40 INFO HttpServer: Starting HTTP Server
15/03/30 01:17:40 INFO Utils: Successfully started service 'HTTP class server' on port 38860.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31)
Type in expressions to have them evaluated.
Type :help for more information.
15/03/30 01:17:42 INFO SecurityManager: Changing view acls to: martakarass,
15/03/30 01:17:42 INFO SecurityManager: Changing modify acls to: martakarass,
15/03/30 01:17:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(martakarass, ); users with modify permissions: Set(martakarass, )
15/03/30 01:17:43 INFO Slf4jLogger: Slf4jLogger started
15/03/30 01:17:43 INFO Remoting: Starting remoting
15/03/30 01:17:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@marta-komputer.home:48238]
15/03/30 01:17:43 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@marta-komputer.home:48238]
15/03/30 01:17:43 INFO Utils: Successfully started service 'sparkDriver' on port 48238.
15/03/30 01:17:43 INFO SparkEnv: Registering MapOutputTracker
15/03/30 01:17:43 INFO SparkEnv: Registering BlockManagerMaster
15/03/30 01:17:43 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150330011743-7904
15/03/30 01:17:43 INFO Utils: Successfully started service 'Connection manager for block manager' on port 55197.
15/03/30 01:17:43 INFO ConnectionManager: Bound socket to port 55197 with id = ConnectionManagerId(marta-komputer.home,55197)
15/03/30 01:17:43 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/30 01:17:43 INFO BlockManagerMaster: Trying to register BlockManager
15/03/30 01:17:43 INFO BlockManagerMasterActor: Registering block manager marta-komputer.home:55197 with 265.1 MB RAM
15/03/30 01:17:43 INFO BlockManagerMaster: Registered BlockManager
15/03/30 01:17:43 INFO HttpFileServer: HTTP File server directory is /tmp/spark-f69a93d0-da4f-4c85-9b46-8ad33169763a
15/03/30 01:17:43 INFO HttpServer: Starting HTTP Server
15/03/30 01:17:43 INFO Utils: Successfully started service 'HTTP file server' on port 38225.
15/03/30 01:17:43 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/30 01:17:43 INFO SparkUI: Started SparkUI at http://marta-komputer.home:4040
15/03/30 01:17:43 INFO SparkContext: Added JAR file:/usr/local/src/spark/spark-1.1.0/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar at http://192.168.1.10:38225/jars/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar with timestamp 1427671063959
15/03/30 01:17:44 INFO Executor: Using REPL class URI: http://192.168.1.10:38860
15/03/30 01:17:44 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@marta-komputer.home:48238/user/HeartbeatReceiver
15/03/30 01:17:44 INFO SparkILoop: Created spark context..
Spark context available as sc.

2. Performing imports

scala> 

scala> sc.stop
15/03/30 01:17:51 INFO SparkUI: Stopped Spark web UI at http://marta-komputer.home:4040
15/03/30 01:17:51 INFO DAGScheduler: Stopping DAGScheduler
15/03/30 01:17:52 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/03/30 01:17:52 INFO ConnectionManager: Selector thread was interrupted!
15/03/30 01:17:52 INFO ConnectionManager: ConnectionManager stopped
15/03/30 01:17:52 INFO MemoryStore: MemoryStore cleared
15/03/30 01:17:52 INFO BlockManager: BlockManager stopped
15/03/30 01:17:52 INFO BlockManagerMaster: BlockManagerMaster stopped
15/03/30 01:17:52 INFO SparkContext: Successfully stopped SparkContext

scala> im15/03/30 01:17:52 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
port com.15/03/30 01:17:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
datastax.spark.connector._
15/03/30 01:17:52 INFO Remoting: Remoting shut down
15/03/30 01:17:52 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
import com.datastax.spark.connector._

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf

3. Defining spark.cassandra.connection.host, defining SparkContext

scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@e6e5da4

scala> val sc = new SparkContext("local[*]", "test", conf)
15/03/30 01:17:54 INFO SecurityManager: Changing view acls to: martakarass,
15/03/30 01:17:54 INFO SecurityManager: Changing modify acls to: martakarass,
15/03/30 01:17:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(martakarass, ); users with modify permissions: Set(martakarass, )
15/03/30 01:17:54 INFO Slf4jLogger: Slf4jLogger started
15/03/30 01:17:54 INFO Remoting: Starting remoting
15/03/30 01:17:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:35080]
15/03/30 01:17:54 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@localhost:35080]
15/03/30 01:17:54 INFO Utils: Successfully started service 'sparkDriver' on port 35080.
15/03/30 01:17:54 INFO SparkEnv: Registering MapOutputTracker
15/03/30 01:17:54 INFO SparkEnv: Registering BlockManagerMaster
15/03/30 01:17:54 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150330011754-63ea
15/03/30 01:17:54 INFO Utils: Successfully started service 'Connection manager for block manager' on port 32973.
15/03/30 01:17:54 INFO ConnectionManager: Bound socket to port 32973 with id = ConnectionManagerId(localhost,32973)
15/03/30 01:17:54 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/30 01:17:54 INFO BlockManagerMaster: Trying to register BlockManager
15/03/30 01:17:54 INFO BlockManagerMasterActor: Registering block manager localhost:32973 with 265.1 MB RAM
15/03/30 01:17:54 INFO BlockManagerMaster: Registered BlockManager
15/03/30 01:17:54 INFO HttpFileServer: HTTP File server directory is /tmp/spark-630cc34e-cc29-4815-b51f-8345250cb030
15/03/30 01:17:54 INFO HttpServer: Starting HTTP Server
15/03/30 01:17:54 INFO Utils: Successfully started service 'HTTP file server' on port 43669.
15/03/30 01:17:54 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/30 01:17:54 INFO SparkUI: Started SparkUI at http://localhost:4040
15/03/30 01:17:54 INFO SparkContext: Added JAR file:/usr/local/src/spark/spark-1.1.0/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar at http://192.168.1.10:43669/jars/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar with timestamp 1427671074181
15/03/30 01:17:54 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@localhost:35080/user/HeartbeatReceiver
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@118a4d5

4. Using cassandraTable function to build object of CassandraTableScanRDD class

scala> val table = sc.cassandraTable("twissandra", "invoices")
table: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

5. Calling count function on the object of CassandraTableScanRDD class

scala> table.count
15/03/30 01:39:43 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
15/03/30 01:39:43 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
15/03/30 01:39:43 INFO SparkContext: Starting job: reduce at CassandraTableScanRDD.scala:243
15/03/30 01:39:43 INFO DAGScheduler: Got job 0 (reduce at CassandraTableScanRDD.scala:243) with 1 output partitions (allowLocal=false)
15/03/30 01:39:43 INFO DAGScheduler: Final stage: Stage 0(reduce at CassandraTableScanRDD.scala:243)
15/03/30 01:39:43 INFO DAGScheduler: Parents of final stage: List()
15/03/30 01:39:43 INFO DAGScheduler: Missing parents: List()
15/03/30 01:39:43 INFO DAGScheduler: Submitting Stage 0 (CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15), which has no missing parents
15/03/30 01:39:43 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
15/03/30 01:39:43 INFO MemoryStore: ensureFreeSpace(5320) called with curMem=0, maxMem=278019440
15/03/30 01:39:43 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.2 KB, free 265.1 MB)
15/03/30 01:39:43 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (CassandraTableScanRDD[1] at RDD at CassandraRDD.scala:15)
15/03/30 01:39:43 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/30 01:39:43 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, NODE_LOCAL, 26342 bytes)
15/03/30 01:39:43 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/03/30 01:39:43 INFO Executor: Fetching http://192.168.1.10:41700/jars/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar with timestamp 1427672382104
15/03/30 01:39:43 INFO Utils: Fetching http://192.168.1.10:41700/jars/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar to /tmp/fetchFileTemp97270090697167118.tmp
15/03/30 01:39:44 INFO Executor: Adding file:/tmp/spark-0a658f91-717f-4c30-8fe2-979c8c1399a7/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar to class loader
15/03/30 01:39:44 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
15/03/30 01:39:44 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
15/03/30 01:39:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NoSuchMethodError: org.apache.spark.SparkEnv.isStopped()Z
    at org.apache.spark.metrics.CassandraConnectorSource$.instance(CassandraConnectorSource.scala:53)
    at com.datastax.spark.connector.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:53)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:194)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/30 01:39:44 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.NoSuchMethodError: org.apache.spark.SparkEnv.isStopped()Z
    at org.apache.spark.metrics.CassandraConnectorSource$.instance(CassandraConnectorSource.scala:53)
    at com.datastax.spark.connector.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:53)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:194)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/30 01:39:44 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NoSuchMethodError: org.apache.spark.SparkEnv.isStopped()Z
        org.apache.spark.metrics.CassandraConnectorSource$.instance(CassandraConnectorSource.scala:53)
        com.datastax.spark.connector.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:53)
        com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:194)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
15/03/30 01:39:44 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
15/03/30 01:39:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/03/30 01:39:44 INFO TaskSchedulerImpl: Cancelling stage 0

Answer:

java.lang.NoSuchMethodError is a common indication of a version mismatch: One of your dependencies was compiled against a more recent version of another dependency and at runtime is provided with an earlier version that does not have that new method.

In this case, you are trying to run Spark-Cassandra Connector 1.3.0-SNAPSHOT against Spark 1.1.0. Try to align those versions. Either use a 1.3.0 release of Spark of a 1.1.0-compatible version of the spark-cassandra connector.

Question:

Cassandra datastax driver unable to connect to one of the nodes in the data center but still I am able to read and write to database. Datacenter contains two nodes in one rack. I specified one of them as seed node.

Connected to cluster: Test Cluster
Datacenter: datacenter1; Host: /PVT IP1; Rack: rack1
Datacenter: datacenter1; Host: /PUBLIC IP2; Rack: rack1
2017-11-04 02:19:50 WARN  com.datastax.driver.core.HostConnectionPool:184 - Error creating connection to /PVT IP1:9042
com.datastax.driver.core.exceptions.TransportException: [/PVT IP1:9042] Cannot connect
    at com.datastax.driver.core.Connection$1.operationComplete(Connection.java:165)
    at com.datastax.driver.core.Connection$1.operationComplete(Connection.java:148)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:500)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:479)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:220)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out: /PVT IP1:9042
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:218)
    ... 7 more
2017-11-04 02:19:50 WARN  com.datastax.driver.core.Session:378 - Error creating pool to /PVT IP1:9042
com.datastax.driver.core.exceptions.ConnectionException: [/PVT IP1:9042] Pool was closed during initialization
    at com.datastax.driver.core.HostConnectionPool$2.onSuccess(HostConnectionPool.java:148)
    at com.datastax.driver.core.HostConnectionPool$2.onSuccess(HostConnectionPool.java:134)
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1773)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
    at com.google.common.util.concurrent.CollectionFuture$CollectionFutureRunningState.handleAllCompleted(CollectionFuture.java:76)
    at com.google.common.util.concurrent.AggregateFuture$RunningState.processCompleted(AggregateFuture.java:255)
    at com.google.common.util.concurrent.AggregateFuture$RunningState.decrementCountAndMaybeComplete(AggregateFuture.java:242)
    at com.google.common.util.concurrent.AggregateFuture$RunningState.access$300(AggregateFuture.java:91)
    at com.google.common.util.concurrent.AggregateFuture$RunningState$1.run(AggregateFuture.java:146)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.completeWithFuture(AbstractFuture.java:730)
    at com.google.common.util.concurrent.AbstractFuture.setFuture(AbstractFuture.java:666)
    at com.google.common.util.concurrent.Futures$AsyncCatchingFuture.doFallback(Futures.java:826)
    at com.google.common.util.concurrent.Futures$AsyncCatchingFuture.doFallback(Futures.java:813)
    at com.google.common.util.concurrent.Futures$AbstractCatchingFuture.run(Futures.java:789)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.completeWithFuture(AbstractFuture.java:730)
    at com.google.common.util.concurrent.AbstractFuture.setFuture(AbstractFuture.java:666)
    at com.google.common.util.concurrent.Futures$AsyncCatchingFuture.doFallback(Futures.java:826)
    at com.google.common.util.concurrent.Futures$AsyncCatchingFuture.doFallback(Futures.java:813)
    at com.google.common.util.concurrent.Futures$AbstractCatchingFuture.run(Futures.java:789)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:634)
    at com.google.common.util.concurrent.Futures$AbstractChainingFuture.run(Futures.java:1405)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
    at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:634)
    at com.google.common.util.concurrent.SettableFuture.setException(SettableFuture.java:53)
    at com.datastax.driver.core.Connection$1.operationComplete(Connection.java:165)
    at com.datastax.driver.core.Connection$1.operationComplete(Connection.java:148)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:500)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:479)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:220)
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:745)

docker-compose.yaml file for each node looks like this:

Yaml file Node 1:
version: "3"
services:

  cassandra:
    image: cassandra:3.11
    container_name: cassandra_node
    ports:
      - 7000:7000
      - 9042:9042
    volumes:
      - /home/******/docker/cassandra/cassandra-data:/var/lib/cassandra
    environment:
      CASSANDRA_BROADCAST_ADDRESS: PVT IP1
      CASSANDRA_SEEDS: PVT IP1

For node2:

Yaml File Node 2
version: "3"
services:

  cassandra:
    image: cassandra:3.11
    container_name: cassandra_node
    ports:
      - 7000:7000
      - 9042:9042
    volumes:
      - /home/******/docker/cassandra/cassandra-data:/var/lib/cassandra
    environment:
      CASSANDRA_BROADCAST_ADDRESS: PVT IP2
      CASSANDRA_SEEDS: PVT IP1

Could someone help me in figuring out the issue here. Thanks!!


Answer:

I think the issue here is with values passed for CASSANDRA_BROADCAST_ADDRESS and CASSANDRA_SEEDS. There is space character in values, hence cassandra driver might be unable to parse.

I have been using below docker-compose.yml for creating cassandra cluster. Please give it a try.

version: '3'

networks:
    cassandra-cluster:
        driver: bridge

volumes:
  data-volume-dc1-n1:
  data-volume-dc1-n2:
  data-volume-dc2-n1:
  data-volume-dc2-n2:

services:
    ###############################################################################################
    # DC1 node 1
    ###############################################################################################
    DC1_N1:
        container_name: DC1_N1
        image: cassandra:latest
        command: bash -c 'if [ -z "$$(ls -A /var/lib/cassandra/)" ] ; then sleep 0; fi && /docker-entrypoint.sh cassandra -f'

        volumes:
            - data-volume-dc1-n1:/var/lib/cassandra

        environment:
            - CASSANDRA_CLUSTER_NAME=cassandra_cluster
            - CASSANDRA_SEEDS=DC1_N1,DC1_N2,DC2_N1,DC2_N2
            - CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
            - CASSANDRA_DC=DC1
            - CASSANDRA_RACK=rack1

        # Expose ports for cassandra cluster
        expose:
            - 7000
            - 7001
            - 7199
            - 9042
            - 9160

        ports:
            - "9042:9042"

        # Cassandra ulimt recommended settings
        ulimits:
            memlock: -1
            nproc: 32768
            nofile: 100000

        networks:
            - cassandra-cluster

        restart: unless-stopped

    ###############################################################################################
    # DC1 node 2
    ###############################################################################################
    DC1_N2:
        container_name: DC1_N2
        image: cassandra:latest
        command: bash -c 'if [ -z "$$(ls -A /var/lib/cassandra/)" ] ; then sleep 0; fi && /docker-entrypoint.sh cassandra -f'

        volumes:
            - data-volume-dc2-n2:/var/lib/cassandra

        environment:
            - CASSANDRA_CLUSTER_NAME=cassandra_cluster
            - CASSANDRA_SEEDS=DC1_N1,DC1_N2,DC2_N1,DC2_N2
            - CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
            - CASSANDRA_DC=DC1
            - CASSANDRA_RACK=rack2

        # Expose ports for cassandra cluster
        expose:
            - 7000
            - 7001
            - 7199
            - 9042
            - 9160

        # Cassandra ulimt recommended settings
        ulimits:
            memlock: -1
            nproc: 32768
            nofile: 100000

        networks:
            - cassandra-cluster

        restart: unless-stopped

    ###############################################################################################
    # DC2 node 1
    ###############################################################################################
    DC2_N1:
        container_name: DC2_N1
        image: cassandra:latest
        command: bash -c 'if [ -z "$$(ls -A /var/lib/cassandra/)" ] ; then sleep 0; fi && /docker-entrypoint.sh cassandra -f'

        volumes:
            - data-volume-dc2-n1:/var/lib/cassandra

        environment:
            - CASSANDRA_CLUSTER_NAME=cassandra_cluster
            - CASSANDRA_SEEDS=DC1_N1,DC1_N2,DC2_N1,DC2_N2
            - CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
            - CASSANDRA_DC=DC2
            - CASSANDRA_RACK=rack1

        # Expose ports for cassandra cluster
        expose:
            - 7000
            - 7001
            - 7199
            - 9042
            - 9160

        # Cassandra ulimt recommended settings
        ulimits:
            memlock: -1
            nproc: 32768
            nofile: 100000

        networks:
            - cassandra-cluster

        restart: unless-stopped

Question:

Write path for cassandra:

Step 1 - Data is written in commitlog immediately. Step 2 - After threshold is met, CommitLog is Flushed into Memtable Step 3 - Once threshhold for size is met in Memtable, Data is flushed into Disk as SSTable.

In above process, if Data is updated at the Step 1 itself, then does it require any special handling.

For example we have employee column family. Data starts to be written by a program into Column Family, after inserting 10 rows, An update is issued for 3rd row and Still data is not flushed into Memtable.

Will Cassandra handles this scenario as crash recovery ?

Please share views on this.


Answer:

Here is the write path for Cassandra

  1. Data is appended to Commitlog, if durable_writes is set to true while creating the keyspace.
  2. Data is also written to Memtable immediately.
  3. Once thresholds are met for Memtable, data is flushed into disk as SSTable.

During any time if an update occurs before flushing to disk, memtable captures that updated record as well as commitlog is appended with that update. Now if a crash were to happen; during restart of Cassandra all these records are replayed from commitlog and hence there is durability of writes/updates guaranteed.

For additional reference

Question:

I am using the datastax spark cassandra connector with Spark 1.6.3.

Is it possible to use the Spark cassandra connector Java API with Spark 2.0+?

I see that the latest version of spark-cassandra-connector-java_2.11 is 1.6.0-M1.

Does someone know about the future of the connector's Java API?

Thanks,

Shai


Answer:

If you're running Spark 2.0-2.2, you can use Cassandra connector from DataStax https://spark-packages.org/package/datastax/spark-cassandra-connector (which is written in Scala, but you can just consume jar).

Check out the compatibility table in the official repo: https://github.com/datastax/spark-cassandra-connector#version-compatibility

Question:

I'm trying to run simple count on data set from apache spark shell that was previously fetched to my cassandra cluster. To do this I've created simple maven project that creates fat jar, there are my dependencies:

        <!-- https://mvnrepository.com/artifact/com.cloudera.sparkts/sparkts -->
    <dependency>
        <groupId>com.cloudera.sparkts</groupId>
        <artifactId>sparkts</artifactId>
        <version>0.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10 -->
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>2.0.0-M3</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.0.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core -->
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>

I'm runnig this jar using spark shell using this command:

spark-shell --jars Sensors-1.0-SNAPSHOT-jar-with-dependencies.jar --executor-memory 512M

After loading required dependencies I'm trying to run given operation on my spark instance:

import pl.agh.edu.kis.sensors._
import com.datastax.spark.connector._
val test = new TestConnector(sc)
test.count()

This is error I'm receiving:

    17/01/21 04:42:37 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.io.IOException: Exception during preparation of SELECT "coil_id", "event_time", "car_count", "insert_time" FROM "public"."traffic" WHERE token("coil_id") > ? AND token("coil_id") <= ?   ALLOW FILTERING: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:307)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;
    at com.datastax.spark.connector.types.TypeConverter$.<init>(TypeConverter.scala:116)
    at com.datastax.spark.connector.types.TypeConverter$.<clinit>(TypeConverter.scala)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
    at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:229)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:282)
    ... 17 more
17/01/21 04:42:41 INFO CoarseGrainedExecutorBackend: Got assigned task 2
17/01/21 04:42:41 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
17/01/21 04:42:41 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.io.IOException: Exception during preparation of SELECT "coil_id", "event_time", "car_count", "insert_time" FROM "public"."traffic" WHERE token("coil_id") > ? AND token("coil_id") <= ?   ALLOW FILTERING: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:307)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
    at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:229)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:282)
    ... 17 more
17/01/21 04:42:45 INFO CoarseGrainedExecutorBackend: Got assigned task 3
17/01/21 04:42:45 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
17/01/21 04:42:45 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
java.io.IOException: Exception during preparation of SELECT "coil_id", "event_time", "car_count", "insert_time" FROM "public"."traffic" WHERE token("coil_id") > ? AND token("coil_id") <= ?   ALLOW FILTERING: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:307)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$19.apply(CassandraTableScanRDD.scala:335)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
    at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
    at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:229)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$13.apply(CassandraTableScanRDD.scala:282)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:282)
    ... 17 more

And here's my code:

import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

/**
 * Created by Daniel on 20.01.2017.
 */
public class TestConnector {

    SparkContext sc;

    public TestConnector(SparkContext context){
        sc = context;
        sc.conf().set("spark.cassandra.connection.host","10.156.207.84")
                .set("spark.cores.max","1");
    }

    public TestConnector(){
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host","10.156.207.84")
                .set("spark.cores.max","1");
        sc = new SparkContext(conf);
    }

    public void count(){
        CassandraTableScanJavaRDD rdd  = javaFunctions(sc).cassandraTable("public","traffic");
        System.out.println("Total readings: " + rdd.count());
    }

}

Scala version: 2.11.8, Spark version: 2.0.2, Cassandra version: 3.9, Java version: 1.8.0_111


Answer:

The first problem you are running into looks like a Scala Version mismatch. The Default installation of Spark 2.0 uses Scala 2.11 but you have specified 2.10 for all of your dependencies. Change all the _2.10 to _2.11

Next you will run into a Guava Mismatch problem because you are including the Cassandra Driver when you should not be. So remove the dependency on the Java driver.

See https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md#how-do-i-fix-guava-classpath-errors

Question:

I am trying to save one row only to a cassandra table using spark in java (this comes as a result after long processing in spark), I am using the new method to connect to cassandra using spark session as follows:

     SparkSession spark = SparkSession
          .builder()
          .appName("App")
          .config("spark.cassandra.connection.host", "cassandra1.example.com")
          .config("spark.cassandra.connection.port", "9042")
          .master("spark://cassandra.example.com:7077")
          .getOrCreate();

The connection is successful and works well as I have Spark installed on the same nodes as cassandra, after reading some RDDs from cassandra I want to save to another table in cassandra, so I am following the documentation here, namely, the part to save to cassandra as follows:

List<Person> people = Arrays.asList(
    new Person(1, "John", new Date()),
    new Person(2, "Troy", new Date()),
    new Person(3, "Andrew", new Date())
);
JavaRDD<Person> rdd = spark.sparkContext().parallelize(people);
javaFunctions(rdd).writerBuilder("ks", "people", mapToRow(Person.class)).saveToCassandra();

The problem which I am facing is that parallelize method is not accepted, and only a scala version looks avaiable, the error is:

The method parallelize(Seq<T>, int, ClassTag<T>) in the type 
SparkContext is not applicable for the arguments (List<Person>) 

How can I use that in Java to save to cassandra table?


Answer:

To parallelize java.util.List you can use JavaSparkContext (not SparkContext) , something like this:

import org.apache.spark.api.java.JavaSparkContext;

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.parallelize(people);

Question:

I'm inserting into cassandra using Spark.

CassandraJavaUtil.javaFunctions(newRDD)
            .writerBuilder("dmp", "dmp_user_user_profile_spark1", mapToRow(UserSetGet.class)).saveToCassandra();
            logger.info("DataSaved");

My question is if RDD has 5k rows, and while inserting into Cassandra for some reason the job fails.

Will there be rollback for the rows that were Inserted out of 5k

and if not , how will I know how many rows were actually inserted , Such that I can start my job again from the failed row.


Answer:

Simple answer, No, there will not be automatic rollback.

Whatever data spark was able to save into cassandra, will be persisted into cassandra.

And no, there is no simple way to know till what dataset, spark job was able to save successfully. Infact, only way i can think of is, to read data from cassandra, join and filter out from your resultset, based on key.

To be honest, that seems quite and overhead if data is huge to make humongous join. In most cases, you can simply re-run the job on spark, and let it save to cassandra table again. Since, in cassandra update and inserts work same way. It won't be a problem.

Only place this can be problematic is, if you are dealing with counter tables.

Update : For this specific scenario, You can split your rdd into batches of your size, and then try to save them. That way, if you fail on one rdd, you will know which rdd failed. If not that set, you should be able to pick up from next rdd for sure.

Question:

I have one application with Spark (version 1.4.0) and Spark-Cassandra-connector (version 1.3.0-M1). In which, I am trying to store one dataframe into Cassandra table which has two columns (total, message). And i already created table into Cassandra with these two columns.

Here is my Code,

scoredTweet.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
            @Override
            public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
                SQLContext sqlContext = SparkConnection.getSqlContext();
                DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
                df.registerTempTable("messages");
                DataFrame aggregatedMessages = sqlContext.sql("select count(*) as total,message from messages group by message");
                aggregatedMessages.show();
                aggregatedMessages.printSchema();
                aggregatedMessages.write().mode(SaveMode.Append)
                .option("keyspace", Properties.getString("spark.cassandra.keyspace"))
                .option("c_table", Properties.getString("spark.cassandra.aggrtable"))
                .format("org.apache.spark.sql.cassandra").save();

But I got this exception,

[Stage 20:===========================>                          (103 + 2) / 199]
[Stage 20:====================================>                 (134 + 2) / 199]
[Stage 20:============================================>         (164 + 2) / 199]
[Stage 20:====================================================> (193 + 2) / 199]
                                                                                +-----+--------------------+
|total|             message|
+-----+--------------------+
|    1|there is deep pol...|
|    1|RT @SwarupPhD: Ag...|
|    1|#3Novices : #Desp...|
|    1|RT @Babu_Bhaiyaa:...|
|    1|https://t.co/BMPX...|
+-----+--------------------+

root
 |-- total: long (nullable = false)
 |-- message: string (nullable = true)


15/06/12 21:24:40 INFO Cluster: New Cassandra host /192.168.1.17:9042 added
15/06/12 21:24:40 INFO Cluster: New Cassandra host /192.168.1.19:9042 added
15/06/12 21:24:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host 192.168.1.19 (datacenter1)
15/06/12 21:24:40 INFO Cluster: New Cassandra host /192.168.1.21:9042 added
15/06/12 21:24:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host 192.168.1.21 (datacenter1)
15/06/12 21:24:40 INFO CassandraConnector: Connected to Cassandra cluster: BDI Cassandra
15/06/12 21:24:41 INFO CassandraConnector: Disconnected from Cassandra cluster: BDI Cassandra
15/06/12 21:26:14 ERROR JobScheduler: Error running job streaming job 1434124380000 ms.1
java.util.NoSuchElementException: key not found: frozen<tuple<int, text, text, text, list<text>>>
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:58)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at com.datastax.spark.connector.types.ColumnType$.fromDriverType(ColumnType.scala:73)
    at com.datastax.spark.connector.types.ColumnType$$anonfun$1.apply(ColumnType.scala:67)
    at com.datastax.spark.connector.types.ColumnType$$anonfun$1.apply(ColumnType.scala:67)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at com.datastax.spark.connector.types.ColumnType$.fromDriverType(ColumnType.scala:67)
    at com.datastax.spark.connector.cql.ColumnDef$.apply(Schema.scala:110)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchRegularColumns$1.apply(Schema.scala:210)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchRegularColumns$1.apply(Schema.scala:206)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchRegularColumns(Schema.scala:206)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:235)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:232)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.Set$Set2.foreach(Set.scala:94)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:232)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:241)
    at com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:240)
    at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
    at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
    at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
    at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
    at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
    at com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:240)
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:246)
    at com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:243)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:116)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:115)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:105)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:104)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:156)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:104)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:115)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:243)
    at org.apache.spark.sql.cassandra.CassandraSourceRelation.<init>(CassandraSourceRelation.scala:39)
    at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:168)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:84)
    at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:305)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)

Answer:

The connector version 1.3.x does not support Spark 1.4.x we are working on a 1.4.x release at this moment, expect it very soon.

Question:

I have a CassandraRow object that contains values of a row. I read it from a one table. I want to write that same object to another table. But then I get this error:

requirement failed: Columns not found in class com.datastax.spark.connector.japi.CassandraRow: [myColumn1, myColumns2, ...]

I tried to pass my own mapping by creating a Map and passing it in the function. This is my code:

CassandraRow row = fetch();

Map<String, String> mapping = Map.of("myColumn1", "myColumn1", "myColumns2", "myColumns2"....);

JavaSparkContext ctx = new JavaSparkContext(conf);

JavaRDD<CassandraRow> insightRDD = ctx.parallelize(List.of(row));

CassandraJavaUtil.javaFunctions(insightRDD).writerBuilder("mykeyspace", "mytable",
            CassandraJavaUtil.mapToRow(CassandraRow.class, mapping)).saveToCassandra(); //I also tried without mapping

Any help is appreciated. I have tried POJO approach and it is working. But I don't want to be restricted to creating POJOs. I want a generic approach that would work with any table and any row.


Answer:

I could not find a way to generalize my solution using Apache Spark. So I use Datastax Java Driver for Apache Cassandra and wrote SQL queries. That was generic enough for me.

Question:

I have the following table:

CREATE TABLE attribute (
    pid text,
    partner_id int,
    key int,
    value int,
    PRIMARY KEY (pid, partner_id, key)
)

I am trying to key my RDD by 'pid' which is the partition key. According to the documentation I can do that by calling the keyBy method like this:

JavaPairRDD<String, Attribute> attrRdd =
    javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
    .select(column("pid"), column("partner_id"), column("key"), column("value"))
    .keyBy(new Function<Attribute, String>() {
        @Override
        public String call(Attribute attr) throws Exception {
            return attr.getPid();
        }
    });

However, this doesn't create the cassandra partitioner. The only time I get the cassandra partitioner is when I key by all primary key columns like so:

JavaPairRDD<AttributePK, Attribute> attrRdd =
    javaFunctions(context).cassandraTable(ks, cf, mapRowTo(Attribute.class))
    .select(column("pid"), column("partner_id"), column("key"), column("value"))
    .keyBy(JavaApiHelper.getClassTag(AttributePK.class),
        mapRowTo(AttributePK.class), mapToRow(AttributePK.class), column("pid"));

AttributePK has all PK column as members:

public class AttributePK {
    protected String pid;
    protected int partner_id;
    protected int key;
    ...
}

This is not good for me because eventually I want to group all entries by 'pid' without shuffling.

Does someone know why I fail to key by only the partition key column as described in the documentation?

Thanks,

Shai


Answer:

You are using two different api's here. The first example passes a function and the second passes columns. When you pass a function there is no way for the api to know what the columns in the Key will be so it can't build the partitioner. Try using the "KeyBy" function with the column api.

Additionally you probably don't need to do this specifically as the spanBy api should let you do groupByKey/reduceByKey operations for free on the Partition key.

Question:

Below is my code.

directKafkaStream.foreachRDD(rdd -> 
     {
        rdd.foreach(record -> 
             {
                messages1.add(record._2);
             });
                JavaRDD<String> lines = sc.parallelize(messages1);
                JavaPairRDD<Integer, String> data = lines.mapToPair(new PairFunction<String, Integer, String>()
                {
                    @Override
                    public Tuple2<Integer, String> call(String a)
                    {
                        String[] tokens = StringUtil.split(a, '%');
                        return new Tuple2<Integer, String>(Integer.getInteger(tokens[3]),tokens[2]);
                    }
                }); // map to get year and name of the movie
                Function2<String, String, String> reduceSumFunc = (accum, n) -> (accum.concat(n)); // function for reduce
                JavaPairRDD<Integer, String> yearCount = data.reduceByKey(reduceSumFunc); // reduceByKey to count
                javaFunctions(yearCount).writerBuilder("movie_keyspace", "movie_count", mapTupleToRow(Integer.class, String.class)).withColumnSelector(someColumns("year","list_of_movies")).saveToCassandra(); // this is the error line
            });

Here is the error I am getting.

com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column year
    at com.datastax.spark.connector.writer.RoutingKeyGenerator$$anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.fillRoutingKey(RoutingKeyGenerator.scala:47)
    at com.datastax.spark.connector.writer.RoutingKeyGenerator.apply(RoutingKeyGenerator.scala:56)
    at com.datastax.spark.connector.writer.TableWriter.batchRoutingKey(TableWriter.scala:126)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$19.apply(TableWriter.scala:151)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:107)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158)
    at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
    at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)        

Description:

1) Trying to connect Kafka and cassandra using spark

2) Able to store a JavaRDD but not able to store a JavaPairRDD into cassandra

3) DB I have given comment in the line where the error is


Answer:

One of your values for year is null, this is not allowed. Check your data and look for what's generating a null integer.

Question:

I am using the Spark Kafka connector to fetch data from Kafka cluster. From it, I am getting the data as a JavaDStream<String>. How do I get the data as a JavaDStream<EventLog>, where EventLog is a Java bean?

public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) {
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
        }
    });
    jssc.start();
    jssc.awaitTermination();
    return lines;
}

My goal is to save this data into Cassandra where a table with the same specifications as EventLog. The Spark Cassandra connector accepts JavaRDD<EventLog> in the insert statement like this: javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();. I want to get these JavaRDD<EventLog> from Kafka.


Answer:

Use the overloaded createStream method where you can pass the key/value type and decoder classes.

Example:

createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
        kafkaParams, topicsMap, StorageLevel.MEMORY_AND_DISK_SER_2());

Above should give you JavaPairDStream<String, EventLog>

JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
  @Override
  public EventLog call(Tuple2<String, EventLog> tuple2) {
    return tuple2._2();
  }
});

The EventLogDecoder should implement kafka.serializer.Decoder. Below example for json decoder.

public class EventLogDecoder implements Decoder<EventLog> {

 public EventLogDecoder(VerifiableProperties verifiableProperties) {
 }

 @Override
 public EventLog fromBytes(byte[] bytes) {
   ObjectMapper objectMapper = new ObjectMapper();
   try {
     return objectMapper.readValue(bytes, EventLog.class);
   } catch (IOException e) {
     //do something
   }
   return null;
 }
}

Question:

Using Spark 1.5.1 with spark-cassandra-connector-java and the Spark SQL DataFrame apis, what is the best way to filter out String columns that are less than or greater than a given length?

I am trying to do something like this

DataFrame df = context.sql("select key from mytable where key is not null")
DataFrame fdf = df.filter(functions.length(df.col("key").gt(10))))

How does the the functions.length(Column) api work? It takes a Column in and returns a Column, but what happens with the length?


Answer:

1) A column is what you need to apply a predicate to. So change the parenthesis

DataFrame fdf = df
 .filter(
   functions.length(df.col("key"))
   .gt(10)
)

What this does is apply a predicate based on the column Key. First we change the column key to a column of Length(key). Basically applying the function to all values in the column

[ "bird", "cat", "mouse" ] -> [ 4, 3, 5 ]

Then since we now have a numeric column we apply a greater than predicate to that column

[ 4 > 10, 3 > 10, 5 > 10 ] -> [ False, False, False ]

The boolean is used to determine whether the predicate passes or fails.

2) Why not just do the check in sql

sqlContext.sql("SELECT * FROM test.common WHERE LENGTH(key) > 10")

Gets all the values where the length of key is greater than 10