Hot questions for Using Cassandra in apache spark sql

Top Java Programmings / Cassandra / apache spark sql

Question:

I have Cassandra database from which i analyzed the data using SparkSQL through Apache Spark. Now i want to insert those analyzed data into PostgreSQL . Is there any ways to achieve this directly apart from using the PostgreSQL driver (I achieved it using postREST and Driver i want to know whether there is any methods like saveToCassandra())?


Answer:

At the moment there is no native implementation of writing the RDD to any DBMS. Here are the links to the related discussions in the Spark user list: one, two

In general, the most performant approach would be the following:

  1. Validate the number of partitions in RDD, it should not be too low and too high. 20-50 partitions should be fine, if the number is lower - call repartition with 20 partitions, if higher - call coalesce to 50 partitions
  2. Call the mapPartition transformation, inside of it call the function to insert the records to your DBMS using JDBC. In this function you open the connection to your database and use the COPY command with this API, it would allow you to eliminate the need for a separate command for each record - this way the insert would be processed much faster

This way you would insert the data into Postgres in a parallel fashion utilizing up to 50 parallel connection (depends on your Spark cluster size and its configuration). The whole approach might be implemented as a Java/Scala function accepting the RDD and the connection string

Question:

Can someone help me with this error

Here my table in cassandra

CREATE TABLE zz("timestamp" timestamp, "sessionId" text, "userId" text, PRIMARY KEY ("userId", "sessionId", "timestamp"));

with some data

INSERT INTO rr ("userId", "sessionId", "timestamp") VALUES ('1', '1', 1);

I use Spark with Java to query my data with SQL

SparkConf conf = new SparkConf(true).setMaster("local").setAppName("DatastaxTests").set("spark.cassandra.connection.host", "localhost");
SparkContext ctx = new SparkContext(conf);
SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ctx);

CassandraSQLContext cassandraContext = new CassandraSQLContext(ctx);
cassandraContext.setKeyspace("crm360retail");

DataFrame userIdRDD = cassandraContext.sql("SELECT * FROM rr");
System.out.println("Data fetched: \n" + StringUtils.join(userIdRDD.collect(), "\n"));

But when I execute the code I have this error

java.util.NoSuchElementException: key not found: userId
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.sql.cassandra.CassandraTableScan$$anonfun$1$$anonfun$apply$1.apply(CassandraTableScan.scala:29)
at org.apache.spark.sql.cassandra.CassandraTableScan$$anonfun$1$$anonfun$apply$1.apply(CassandraTableScan.scala:29)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.cassandra.CassandraTableScan$$anonfun$1.apply(CassandraTableScan.scala:29)
at org.apache.spark.sql.cassandra.CassandraTableScan$$anonfun$1.apply(CassandraTableScan.scala:28)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at org.apache.spark.sql.cassandra.CassandraTableScan.inputRdd(CassandraTableScan.scala:48)
at org.apache.spark.sql.cassandra.CassandraTableScan.execute(CassandraTableScan.scala:99)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
at com.worldline.ms.pcop.crm360.retail.tracking.service.dao.jpa.TrackingDaoImpl.getLastVisitedProduct(TrackingDaoImpl.java:37)
at com.worldline.ms.pcop.crm360.retail.tracking.service.functional.DivolteTrackingFunctionnalServiceImpl.getLastVisitedProduct(DivolteTrackingFunctionnalServiceImpl.java:35)
at com.worldline.ms.pcop.crm360.retail.tracking.service.business.DivolteTrackingBusinessServiceImpl.getLastVisitedProduct(DivolteTrackingBusinessServiceImpl.java:29)
at com.worldline.ms.pcop.crm360.retail.tracking.service.test.DivolteTrackingBusinessServiceTest.testGetLastVisitedProduct(DivolteTrackingBusinessServiceTest.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:72)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:81)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:216)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:82)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:60)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:67)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:162)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

I know the problem is that my column name is case sensitive ut if i use this query cassandraContext.sql("SELECT 'userId', timestamp FROM rr") the result is

userIdRDD.collectAsList().get(0) = [userId,1970-01-01 01:00:00.001]

How can I get my data from a table column case sensitive ?


Answer:

CQL3 treats columns names as case insensitive. Names are always converted to lower case. In order to avoid this you have to double quote the names in Create query. If you executed it in Java, your query should look like this:

CREATE TABLE zz("timestamp" timestamp, \"sessionId\" text, \"userId\" text, PRIMARY KEY (\"userId\", \"sessionId\", "timestamp"));

Note that quotes are escaped. In this case userId and sessionId will not be saved at lower-case.

Hope this helps,

Zoran

Question:

I'm trying to pull certain data out of a cassandra table, and then write it back to a different table in cassandra.

This is what I have:

JavaRDD<MeasuredValue> mvRDD = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class))
  .where ("\"Time_Key\" IN (1601823,1601824)")
  .select("Time_Key","Start_Frequency","Bandwidth", "Power");  

Then I write back to a new table with:

javaFunctions(mvRDD).writerBuilder("spark_reports","SB1000_47130646", mapToRow(MeasuredValue.class)).withColumnSelector(someColumns("Time_Key", "Start_Frequency", "Bandwidth", "Power")).saveToCassandra();

My MeasuredValue Class looks like this:

public static class MeasuredValue implements Serializable {


public MeasuredValue() { }

public MeasuredValue(Long Time_Key, Double Start_Frequency, Double Bandwidth, Float Power) {
    this.Time_Key = Time_Key;
    this.Start_Frequency = Start_Frequency;
    this.Bandwidth = Bandwidth;
    this.Power = Power;

}
private Long Time_Key;
public Long gettime_key() { return Time_Key; }
public void settime_key(Long Time_Key) { this.Time_Key = Time_Key; }

private Double Start_Frequency;
public Double getstart_frequency() { return Start_Frequency; }
public void setstart_frequency(Double Start_Frequency) { this.Start_Frequency = Start_Frequency; }

private Double Bandwidth;
public Double getbandwidth() { return Bandwidth; }
public void setbandwidth(Double Bandwidth) { this.Bandwidth = Bandwidth; }

private Float Power;    
public Float getpower() { return Power; }
public void setpower(Float Power) { this.Power = Power;
}

The error I get when running is:

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Columns not found in class com.neutronis.spark_reports.Spark_Reports$MeasuredValue: [Time_Key, Start_Frequency, Bandwidth, Power]

Answer:

I discovered that this is due to the fact that the getters/setters follow the JAVA naming scheme as far as capital letters and variables. Since the columns in my table were camel case I had too reconfigure the column names to a proper all lower case naming convention for it to work correctly.

In order to use capital letters I had to use a HashMap:

 HashMap<String,String> colmap = new HashMap<String,String>();
    colmap.put( "start_frequency", "Start_Frequency" );
    colmap.put( "bandwith", "Bandwidth" );
    colmap.put( "power", "Power" );
    RowReaderFactory<MeasuredValue> mapRowTo = mapRowTo(MeasuredValue.class, colmap);

Question:

I'm using Spark 2.1 to read data from Cassandra in Java. I tried the code posted in https://stackoverflow.com/a/39890996/1151472 (with SparkSession) and it worked. However when I replaced spark.read() method with spark.sql() one, the following exception is thrown:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: `wiki`.`treated_article`; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation `wiki`.`treated_article`

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

I'm using same spark configuration for both read and sql methods

read() code: Dataset dataset =

spark.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "wiki");
                        put("table", "treated_article");
                    }
                }).load();

sql() code:

spark.sql("SELECT * FROM WIKI.TREATED_ARTICLE");

Answer:

Spark Sql uses a Catalogue to look up database and table references. When you write in a table identifier that isn't in the catalogue it will throw errors like the one you posted. The read command doesn't require a catalogue since you are required to specify all of the relevant information in the invocation.

You can add entries to the catalogue either by

Registering DataSets as Views

First create your DataSet

spark.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "wiki");
                        put("table", "treated_article");
                    }
                }).load();

Then use one of the catalogue registry functions

void    createGlobalTempView(String viewName)
Creates a global temporary view using the given name.
void    createOrReplaceTempView(String viewName)
Creates a local temporary view using the given name.
void    createTempView(String viewName)
Creates a local temporary view using the given name
OR Using a SQL Create Statement
   CREATE TEMPORARY VIEW words
     USING org.apache.spark.sql.cassandra
     OPTIONS (
       table "words",
       keyspace "test",
       cluster "Test Cluster",
       pushdown "true"
     )

Once added to the catalogue by either of these methods you can reference the table in all sql calls issued by that context.


Example

CREATE TEMPORARY VIEW words
  USING org.apache.spark.sql.cassandra
  OPTIONS (
    table "words",
    keyspace "test"
  );

SELECT * FROM words;
// Hello    1
// World    2

The Datastax (My employer) Enterprise software automatically registers all Cassandra tables by placing entries in the Hive Metastore used by Spark as a Catalogue. This makes all tables accessible without manual registration.

This method allows for select statements to be used without an accompanying CREATE VIEW

Question:

How to execute Prepared and Batch Statement against Cassandra in Java using SparkSession.SQL? I'm using Spark 2.1


Answer:

Short answer: Batching and Prepared Statements happen automatically.

Long answer: SparkSql works through the Spark Cassandra Connector - Cassandra Datasource. The Datasource relation defines how data is read from and written to Cassandra. Under the hood this means any writes via the SparkSql or Dataset api will use all the features that come with the Spark Cassandra Connector.

All writes will be done using prepared statements and Partition key batches. To adjust how the batching is done (or any other write configuration) you can change the parameters listed here

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters

Since SparkSql only communicates through this relation there is no way to use Cassandra specific notions like Batching or Prepared statements.