Hot questions for Using Cassandra in spark streaming

Top Java Programmings / Cassandra / spark streaming

Question:

i am getting an error when i am trying to run a spark application with cassandra.

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). 

I am using spark version 1.2.0 and its clear that i am only using one spark context in my application. But whenever i try to add following code for streaming purpose am getting this error.

JavaStreamingContext activitySummaryScheduler = new JavaStreamingContext(
            sparkConf, new Duration(1000));

Answer:

You can only have one SparkContext at a time and since a StreamingContext has a SparkContext in it you can't have a separate Streaming and Spark Context in the same code. What you can do is build a StreamingContext off of your SparkContext so you can have access to both if you really need that.

Use this constructor JavaStreamingContext(sparkContext: JavaSparkContext, batchDuration: Duration)

Question:

Now in my current architecture I have a module which is responsible for writing/reading data to and from Cassandra, and module responsible for downloading data. Recently I started using Datastax and Spark. I want to do some transformations on new acquired data. What's the right take on this problem? Do I use my module for storing data and do Spark calculations separately, or send downloaded data directly to Spark using Spark Streaming and in a job save both the orginal data and transformed data to Cassandra? I'm operating on stock quotes so it's a lot of data downloaded continuously and a lot of transformations.


Answer:

In my opinion, its better to keep it separated.

first store the raw data then process it. its easier to scale and maintain each component later.

for example: if you want to change something in your downloading module like adding a new download sources or fix a bug, it wont affect the data processing done in spark, and changing something in the code running on spark wont have any effect(or introduce a bug) on the raw data you downloaded.

Question:

I am using Spark to get data from kafka and insert it into Cassandra. My program is

public static void fetchAndValidateData() {
    SparkConf sparkConf = new SparkConf().setAppName("name")
            .set("spark.cassandra.connection.host", "127.0.0.1")
            .set("spark.cleaner.ttl", "3600");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    Map<String,String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "127.0.0.1");
    kafkaParams.put("group.id", App.GROUP);
    JavaPairReceiverInputDStream<String, EventLog> messages =
            KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
                    kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());
    JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
        @Override
        public EventLog call(Tuple2<String, EventLog> tuple2) {
            return tuple2._2();
        }
    });
    lines.foreachRDD(rdd -> { javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra(); });
    jssc.start();
    try {
        jssc.awaitTermination();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    jssc.stop();
    jssc.close();
}

My spark-submit command is C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT.jar

My POM file is

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.jtv</groupId>
  <artifactId>spark.atnt</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spark.atnt</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <build>
    <plugins>  
       <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>single</goal>
                  </goals>
              </execution>
          </executions>
          <configuration>
              <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
          </configuration>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.11</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.11</artifactId>
      <version>1.6.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>javax.json</groupId>
        <artifactId>javax.json-api</artifactId>
        <version>1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>    
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

I am getting java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil error.

How do I solve it?


Edit:

I figured out what is causing the problem. It is org.apache.kafka:kafka_2.10:0.8.0. When I add provided to it, I get the Failed to execute goal org.apache.maven.plugins:maven-assembly-plugin:2.2-beta-5:single (default) on project spark.atnt: Failed to create assembly: Failed to resolve dependencies for project: com.jtv:spark.atnt:jar:0.0.1-SNAPSHOT: Could not transfer artifact com.sun.jdmk:jmxtools:jar:1.2.1 from/to java.net (https://maven-repository.dev.java.net/nonav/repository): Cannot access https://maven-repository.dev.java.net/nonav/repository with type legacy using the available connector factories: BasicRepositoryConnectorFactory error on my mvn package command and when I remove it, I get java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil error on my spark-submit command.


Answer:

The easiest way to solve this problem is to package the Cassandra library within your jar file.

In order to do this you can use the maven-assembly-plugin in your pom.xml:

       <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>

This plugin will package all of your dependencies with your jar file. If you want to prevent some dependencies from being packaged (e.g. spark) you need to add the tag <scope>provided</scope>. For example:

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
        <scope>provided</scope>
    </dependency>

Please note that if you use the assembly plugin as described above you will obtain two jar files in your target folder. If you want to use the full jar you will need to run: C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.5.2 --class "com.jtv.spark.atnt.App" --master local[4] target\spark.atnt-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Question:

I'm trying to create a keyspace and table using cassandra but I'm getting an error. In fact I'm trying to connect spark and cassandra.

I have the following code:

public static void main(String[] args){

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = KafkaProperties.TOPIC.split(",");
    for (String topic: topics) {
        topicMap.put(topic, KafkaProperties.NUM_THREADS);
    }
    /* connection to cassandra */
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");

    /* Receive kafka inputs */
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");

    System.out.println(" -----  trying to create tables ------ ");

    try (Session session = connector.openSession()) {
        session.execute("DROP KEYSPACE IF EXISTS test");
        session.execute("CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        session.execute("CREATE TABLE test.users (id TEXT PRIMARY KEY, name TEXT)");
    }

    System.out.println("---- tables created ----");

But I'm getting the following error:

Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Connection has been closed)))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:80)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1145)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:313)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:182)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70)
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

At first I thought it was the host, but then I changed the connection host to "local" getting the next error and I don't know what should I set here for not getting this:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot build a cluster without contact points
at com.datastax.driver.core.Cluster.checkNotEmpty(Cluster.java:108)
at com.datastax.driver.core.Cluster.<init>(Cluster.java:100)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:169)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1031)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:179)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70)
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

Add that before anything I started cassandra by invoking 'bin/cassandra -f' from the command line.

Thanks!


Answer:

The problem was that I had wrong dependencies.

If you want more information you can see the solution here.

Question:

I'm writing a simple data pipeline in Spark Streaming, using Java, to pull JSON data from Kafka, parse the JSON into a custom class (Transaction), then insert that data into a Cassandra table but I am unable to get the mapToRow() function to work.

I've seen tons of examples that say all you have to do is something along the lines of this:

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(
        streamingContext,
        String.class, 
        String.class, 
        StringDecoder.class, 
        StringDecoder.class,
        kafkaParams,
        topicsSet
);

JavaDStream<String> lines = stream.map(
    new Function<Tuple2<String,String>, String>(){
        @Override
        public String call(Tuple2<String,String> tuple2) {
            return tuple2._2();
        }
    }
);

javaFunctions(lines).writerBuilder("myKeyspace", "myTableName", mapToRow(Transaction.class)).saveToCassandra();

However, when I do this I get the error:

The method mapToRow(Class<Transaction>) is undefined for the type SaveTransactions

I think all I am lacking is some sort of decoration on my class but I have not been successful in figuring out which one. I've tried going bare bones, essentially making the class a property bag:

public class Transaction implements java.io.Serializable{

    public int TransactionId;
    ...

    public Transaction(){}
}

I've tried all of the DataStax mapping annotations:

@Table(keyspace = "myKeyspace", name = "myTableName",
       readConsistency = "QUORUM",
       writeConsistency = "QUORUM",
       caseSensitiveKeyspace = false,
       caseSensitiveTable = false)
public class Transaction implements java.io.Serializable{

    @PartitionKey(0)
    @Column(name="transaction_id")
    public int TransactionId;
    ...

    public Transaction(){}
}

I also tried establishing public get/set methods for each property and setting the properties to private:

public class Transaction implements java.io.Serializable{

    private int transactionId;
    ...

    public Transaction(){}

    public int getTransactionId() {
        return transactionId;
    }

    public void setTransactionId(int transactionId) {
        this.transactionId = transactionId;
    }
}

I have been able to parse the DStream into an RDD of Transactions using the class below:

public class Transaction implements java.io.Serializable{

    ...

    public static class ParseJSON implements FlatMapFunction<Iterator<String>, Transaction> {
        public Iterable<Transaction> call(Iterator<String> lines) throws Exception {
            ArrayList<Transaction> transactions = new ArrayList<Transaction>();
                ObjectMapper mapper = new ObjectMapper();
                while (lines.hasNext()) {
                    String line = lines.next();
                    try {
                        transactions.add(mapper.readValue(line, Transaction.class));
                    } catch (Exception e) {
                        System.out.println("Skipped:" + e);
                    }
                }

                return transactions;
        }
    }
}

In conjunction with the following code, acting on the lines object from above:

JavaDStream<Transaction> events = lines.mapPartitions(new Transaction.ParseJSON());

However, once I have it in this from it still doesn't work with the writeBuilder().saveToCassandra() chain.

Any help here is greatly appreciated.


Answer:

Turns out the issue was just an import issue. I had imported com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.* thinking it would give me everything I needed however I also needed to bring in com.datastax.spark.connector.japi.CassandraJavaUtil.* for the .mapToRow() function.

Once I resolved this, I began getting the following error:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
    at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
    at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
    at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
    at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
    at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
    at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
    at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
    at globalTransactions.Process.main(Process.java:77)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 9 more

Which was resolved by pulling in the spark-sql project:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

Hope this helps the next guy/gal.

Question:

I'm working in a project which uses Spark streaming, Apache kafka and Cassandra. I use streaming-kafka integration. In kafka I have a producer which sends data using this configuration:

props.put("metadata.broker.list", KafkaProperties.ZOOKEEPER); props.put("bootstrap.servers", KafkaProperties.SERVER); props.put("client.id", "DemoProducer");

where ZOOKEEPER = localhost:2181, and SERVER = localhost:9092.

Once I send data I can receive it with spark, and I can consume it too. My spark configuration is:

SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", "localhost");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

After that I am trying to store this data into cassandra database. But when I try to open session using this:

CassandraConnector connector = CassandraConnector.apply(jssc.sparkContext().getConf());
Session session = connector.openSession();

I get the following error:

Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:220)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1231)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:334)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:182)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70)
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

Regarding to cassandra, I'm using default configuration:

start_native_transport: true
native_transport_port: 9042
- seeds: "127.0.0.1"
cluster_name: 'Test Cluster'
rpc_address: localhost
rpc_port: 9160
start_rpc: true

I can manage to connect to cassandra from the command line using cqlsh localhost, getting the following message:

Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 3.0.5 | CQL spec 3.4.0 | Native protocol v4] Use HELP for help. cqlsh> 

I used nodetool status too, which shows me this:

http://pastebin.com/ZQ5YyDyB

For running cassandra I invoke bin/cassandra -f

What I am trying to run is this:

try (Session session = connector.openSession()) {
        System.out.println("dentro del try");
        session.execute("DROP KEYSPACE IF EXISTS test");
        System.out.println("dentro del try - 1");
        session.execute("CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        System.out.println("dentro del try - 2");
        session.execute("CREATE TABLE test.users (id TEXT PRIMARY KEY, name TEXT)");
        System.out.println("dentro del try - 3");
    }

My pom.xml file looks like that:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.6.0-M1</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.6.0-M2</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0-alpha2</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.1.0-alpha2</version>
    </dependency>

    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20160212</version>
    </dependency>
</dependencies>

I have no idea why I can't connect to cassandra using spark, is it configuration bad or what i am doing wrong?

Thank you!


Answer:

com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces)

That error indicates an old driver with a new Cassandra version. Looking at the POM file, we find there the spark-cassandra-connector dependency declared twice. One uses version 1.6.0-m2 (GOOD) and the other 1.1.0-alpha2 (old).

Remove the references to the old dependencies 1.1.0-alpha2 from your config:

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.1.0-alpha2</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.1.0-alpha2</version>
</dependency>