Hot questions for Using Cassandra in scala

Question:

Can someone explain the differences between --packages and --jars in a spark-submit script?

nohup ./bin/spark-submit   --jars ./xxx/extrajars/stanford-corenlp-3.8.0.jar,./xxx/extrajars/stanford-parser-3.8.0.jar \
--packages datastax:spark-cassandra-connector_2.11:2.0.7 \
--class xxx.mlserver.Application \
--conf spark.cassandra.connection.host=192.168.0.33 \
--conf spark.cores.max=4 \
--master spark://192.168.0.141:7077  ./xxx/xxxanalysis-mlserver-0.1.0.jar   1000  > ./logs/nohup.out &

Also, do I require the--packages configuration if the dependency is in my applications pom.xml? (I ask because I just blew up my applicationon by changing the version in --packages while forgetting to change it in the pom.xml)

I am using the --jars currently because the jars are massive (over 100GB) and thus slow down the shaded jar compilation. I admit I am not sure why I am using --packages other than because I am following datastax documentation


Answer:

if you do spark-submit --help it will show:

--jars JARS                 Comma-separated list of jars to include on the driver
                              and executor classpaths.

--packages                  Comma-separated list of maven coordinates of jars to include
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.

if it is --jars

then spark doesn't hit maven but it will search specified jar in the local file system it also supports following URL scheme hdfs/http/https/ftp.

so if it is --packages

then spark will search specific package in local maven repo then central maven repo or any repo provided by --repositories and then download it.

Now Coming back to your questions:

Also, do I require the--packages configuration if the dependency is in my applications pom.xml?

Ans: No, If you are not importing/using classes in jar directly but need to load classes by some class loader or service loader (e.g. JDBC Drivers). Yes otherwise.

BTW, If you are using specific version of specific jar in your pom.xml then why dont you make uber/fat jar of your application or provide dependency jar in --jars argument ? instead of using --packages

links to refer:

spark advanced-dependency-management

add-jars-to-a-spark-job-spark-submit

Question:

Am getting the following error code when trying to insert into Cassandra through Phantom from a Scala application.

Cassandra version is that bundled with: dsc-cassandra-3.0.1

[error] (run-main-0) com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces), localhost/0:0:0:0:0:0:0:1:9042 (com.datastax.driver.core.TransportException: [localhost/0:0:0:0:0:0:0:1:9042] Cannot connect)) com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces), localhost/0:0:0:0:0:0:0:1:9042 (com.datastax.driver.core.TransportException: [localhost/0:0:0:0:0:0:0:1:9042] Cannot connect))

I have read through other such questions in StackOverflow but have not found a resolution to my issue.

Additionally I have not noticed the presence of the following in any of the other error logs:

Am I correct in reading localhost/127.0.0.1:9042

Doesn't this boil down to 127.0.0.1/127.0.0.1:9402 - which would explain why it can't find the correct port opening.

Going down this track now, trying to figure out if that's a thing.

Have ensured Cassandra is running.

I also ran sudo lsof -i -P | grep -i "listen" with the following output (just pulling out the java ones):

java 4053 dan_mi_sun 85u IPv4 0xdbcce7039c377b9d 0t0 TCP localhost:7199 (LISTEN)
java 4053 dan_mi_sun 86u IPv4 0xdbcce703986952cd 0t0 TCP localhost:53680 (LISTEN)
java 4053 dan_mi_sun 92u IPv4 0xdbcce7039869b46d 0t0 TCP localhost:7002 (LISTEN)
java 4053 dan_mi_sun 145u IPv4 0xdbcce7039c37846d 0t0 TCP localhost:9042 (LISTEN)

Any thoughts on what the issue could be?

Have found this, but not sure if it is relevant:

https://datastax-oss.atlassian.net/browse/JAVA-897

In case it is of use here is the build.sbt

name := "SuperChain"

organization := "org.dyne.danielsan"

version := "0.1.0-SNAPSHOT"

scalaVersion := "2.11.7"

crossScalaVersions := Seq("2.10.4", "2.11.2")

resolvers ++= Seq(
  "Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/",
  "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/",
  "Sonatype repo"                    at "https://oss.sonatype.org/content/groups/scala-tools/",
  "Sonatype releases"                at "https://oss.sonatype.org/content/org.dyne.danielsan.superchain.data.cassandra.init.repositories/releases",
  "Sonatype snapshots"               at "https://oss.sonatype.org/content/org.dyne.danielsan.superchain.data.cassandra.init.repositories/snapshots",
  "Sonatype staging"                 at "http://oss.sonatype.org/content/org.dyne.danielsan.superchain.data.cassandra.init.repositories/staging",
  "Java.net Maven2 Repository"       at "http://download.java.net/maven/2/",
  "Twitter Repository"               at "http://maven.twttr.com",
  "Wedsudos Bintray Repo"            at "https://dl.bintray.com/websudos/oss-releases/"
)

libraryDependencies ++= Seq(
  "com.websudos" %% "phantom-dsl" % "1.12.2",
  "org.scalatest" %% "scalatest" % "2.2.1" % "test",
  "org.scalacheck" %% "scalacheck" % "1.11.5" % "test"
)

initialCommands := "import org.dyne.danielsan.superchain._"

Answer:

This error: com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces leads me to believe that the version of phantom you are using is not using datastax java-driver 3.0+. Since you are connecting to a 3.0 cluster, you need a 3.0 driver that understands the schema tables (system_schema.* instead of system.schema*). If you upgrade to phantom-dsl 1.21.0, that should fix the issue.

Question:

I use Datastax Enterprise 4.5. I hope I did the config right, I did it like on datastax website explained. I can write into the Cassandra DB with an Windowsservice, this works but i can't query with Spark using the where function.

I start the Cassandra node (there is only one for test purpose) with "./dse cassandra -k -t" (in the /bin folder) so hadoop and spark are running both. I can write into Cassandra without a problem.

So you cannot use a 'where' clause in a Cassandra query when the 'where' isn't the RowKey. So I need to use Spark/Shark. I can start and use all queries I need with shark (./dse shark) but I need to write a Standalone program in Scala or Java.

So I tried this link: https://github.com/datastax/spark-cassandra-connector

And I can query a simple statement like:

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "MY_IP")
  .setMaster("spark://MY_IP:7077")
  .setAppName("SparkTest")

// Connect to the Spark cluster:
lazy val sc = new SparkContext(conf)

val rdd = sc.cassandraTable("keyspace", "tablename")
println(rdd.first)

and this works well but if I ask for more line or count:

println(rdd.count)
rdd.toArray.foreach(println)

then I get this exception:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

When I try this in Java I have the same problem. Does anyone know this problem? I dont know if the DB config is correct or if the scala/Javaprogram works correct. Maybe some Ports a blocked but 7077 and 4040 are open.

Sidenote: If I start spark on the Cassandra DB, I can do queries like:

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

But if I use a "where" clause like:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println)

I get this exception:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING

Do you have an Idea why? I thought I can use where clauses in spark?

Thank you!


Answer:

All masters are unresponsive!

Implies that the IP you are attempting to connect to is not actually bound by spark. So this is basically a networking configuration error. Scan to see which interfaces are listening on 7077 and make sure you are connecting to the correct interface.

As for the second question, the where operator implies you are going to do a predicate pushdown on that clause. Currently you cannot do this with primary keys. If you want to where on a single primary key you can do a filter to accomplish that but you will not see great performance as this will do a whole table scan.

Question:

I am using Cassandra prepared statements to execute my queries in a Scala application using the Datastax Cassandra Java driver.

We have conditional logic to check and see if a bound statement should execute based on some internal state of an object. Something to this effect:

def updateDatabase(x: String, y: String, z: String) = {
   val bound = statement.bind(x,y,x)     

   if(sequence_nr < current){
      session.execute(bound)
   }
}

Does this introduce a memory leak in either our Scala application or in Cassandra? My inclination is to say no but didn't want to make that assumption as I don't fully understand what the driver is doing behind the scenes with the bind call.

I appreciate the help.


Answer:

The bind() method generates a new instance of BoundStatement at each invocation, and session.execute() does not keep any reference to it, so it will be GC'ed. There is no risk of memory leaks here.

Question:

I want to connect to Cassandra which is running as a service in Mesosphere DC/OS. Is there any programmatic way(any api exposed) to get the Cassandra port and ip details ?

From Command I can connect like below... same thing I want to do it programmatically may be with REST api... or Java client to connect to these services.

As per this doc https://github.com/mesosphere/dcos-cassandra-service/blob/master/docs/connecting-clients.md

$ dcos cassandra --name=<service-name> connection

{
  "address": [
    "10.0.0.47:9042",
    "10.0.0.50:9042",
    "10.0.0.49:9042"
  ],
  "dns": [
     "node-0.cassandra.mesos:9042",
     "node-1.cassandra.mesos:9042",
     "node-2.cassandra.mesos:9042"
  ]
}

Note : The reason for doing this is every time cassandra IP and port changing. I have to manually adjust in my property file to get the latest details. If its through program its very easy to set the property with out manual interaction.


Answer:

If you use the entries from the DNS section (shown below) they would not change even if the task relocated to another node

 "node-0.cassandra.mesos:9042",
 "node-1.cassandra.mesos:9042",
 "node-2.cassandra.mesos:9042"

Question:

I am trying to run the command line tutorial for Cassandra and am running into an error. This is what I ran.

bin/geomesa-cassandra ingest --contact-point localhost --key-space mykeyspace --catalog mycatalog --converter example-csv --spec example-csv examples/ingest/csv/example.csv

This is what I get in return.

What would cause this?


Answer:

You need to specify the contact point as host:port. See the documentation for the connection parameters. I opened a ticket to improve the error handling for an incorrectly formatted contact point, so that the error will be more obvious.

Thanks,

Question:

I am currently trying out the gigantic key-value store Cassandra in combination with a few other libraries such as Akka. After setting up a Cluster and connecting to a keyspace:

val cluster = Cluster.builder().addContactPoint("127.0.0.1").build()
self ! AddKeySpaceSession(keySpace, cluster.connect(keySpace))

I get the infamous "can't find the StaticLoggerBinder" warning message:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.

Now usually, of course, you would simply place one (and only one) of slf4j-nop.jar, slf4j-simple.jar, slf4j-log4j12.jar, slf4j-jdk14.jar or logback-classic.jar on the class path. This is what I have done as is evident here:

libraryDependencies ++= {
  val akkaVersion = "2.5.8"
  val akkaDeps = Seq(
    "com.typesafe.akka" %% "akka-actor" % akkaVersion,
    "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
    "com.typesafe.akka" %% "akka-testkit" % akkaVersion
  )

  val logbackVersion = "1.2.3"
  val loggingDeps = Seq(
    "ch.qos.logback" % "logback-classic" % logbackVersion % Test
  )

  val cassandraDriverVersion = "3.3.2"
  val cassandraDeps = Seq(
    "com.datastax.cassandra" % "cassandra-driver-core" % cassandraDriverVersion
  )

Though the issue remains and Cassandra is actually the only one that complains about it.


Answer:

The problem might be related to the scope of the logback-classic dependency. Instead of putting it in the Test scope:

"ch.qos.logback" % "logback-classic" % logbackVersion % Test

Try putting it under the default scope:

"ch.qos.logback" % "logback-classic" % logbackVersion

Question:

I'm trying to get the min, max mean of some Cassandra/SPARK data but I need to do it with JAVA.

import org.apache.spark.sql.DataFrame;
import static org.apache.spark.sql.functions.*;

DataFrame df = sqlContext.read()
        .format("org.apache.spark.sql.cassandra")
        .option("table",  "someTable")
        .option("keyspace", "someKeyspace")
        .load();

df.groupBy(col("keyColumn"))
        .agg(min("valueColumn"), max("valueColumn"), avg("valueColumn"))
        .show();

EDITED to show working version: Make sure to put " around the someTable and someKeyspace


Answer:

Just import your data as a DataFrame and apply required aggregations:

import org.apache.spark.sql.DataFrame;
import static org.apache.spark.sql.functions.*;

DataFrame df = sqlContext.read()
        .format("org.apache.spark.sql.cassandra")
        .option("table", someTable)
        .option("keyspace", someKeyspace)
        .load();

df.groupBy(col("keyColumn"))
        .agg(min("valueColumn"), max("valueColumn"), avg("valueColumn"))
        .show();

where someTable and someKeyspace store table name and keyspace respectively.