Hot questions for Using Cassandra in hadoop

Question:

The class org.apache.cassandra.hadoop.pig.CqlStorage is a pig Cassandra driver.

This class is present in following Cassandra versions,

  • 2.0.5 ( apache-cassandra-2.0.5.jar )
  • 2.1.12 ( apache-cassandra-2.1.12.jar )

But it is missing in following latest releases

  • 2.2.4 ( apache-cassandra-2.2.4.jar )
  • 3.0.0 ( apache-cassandra-3.0.0-alpha1.jar )

Not sure why it is discontinued and what is the alternative for the same.


Answer:

Check out the JIRA about it here -- https://issues.apache.org/jira/browse/CASSANDRA-10542

Nobody's currently responsible for Pig code. As a result, there is nobody to fix the issues, or even fix the failing tests (of which we unfortunately have plenty). Those tests take time to run, constantly hang, and fail with cryptic errors that we don't know how to fix and don't have enough resources to investigate. Thus I propose we deprecate Pig support in 2.2 and remove it in 3.0.

Question:

I'm fairly new to Cassandra. I'm using hadoop to bulk load data into a cassandra cluster using CqlOutputFormat. I'm unable to find sufficient examples in internet to tailor it to my usecase.

I'm specifically using it to insert data into the cluster using the statement ,

insert into pinseries (pin, timeseries) values(?, ?)

I'm not sure how the context.write() should look like to make this work. There seems to be enough examples to see how it should work for an update statement (The wordcount from examples will do). But can someone tell me how to use that in insert mode?


Answer:

The CqlRecordWriter used by the CqlOutputFormat doesn't support insert statements only update statements so you will need to use update to insert your data. Along the lines of:

update pinseries set timeseries = ? where pin = ?

I'm assuming that pin is your primary key.

Question:

I want to load cassandra table to a datafram in spark, I have followed the sample programes below (found in this answer), but I am getting an execption mentioned below, I have tried to load the table to RDD first then convert it to Datafrme, loading the RDD is successful, but when I try to convert it to a dataframe I am getting the same execption faced in the first methdology, any suggestions ? I am using Spark 2.0.0, Cassandra 3.7, and Java 8.

 public class SparkCassandraDatasetApplication {
 public static void main(String[] args) {
 SparkSession spark = SparkSession
          .builder()
          .appName("SparkCassandraDatasetApplication")
          .config("spark.sql.warehouse.dir", "/file:C:/temp")
          .config("spark.cassandra.connection.host", "127.0.0.1")
          .config("spark.cassandra.connection.port", "9042")
          .master("local[2]")
          .getOrCreate();

 //Read data to dataframe
 // this is throwing an exception
 Dataset<Row> dataset = spark.read().format("org.apache.spark.sql.cassandra")
        .options(new HashMap<String, String>() {
            {
                put("keyspace", "mykeyspace");
                put("table", "mytable");
            }
        }).load();

   //Print data
   dataset.show();       
   spark.stop();
   }        
}

When submitted I am getting this exception:

Exception in thread "main" java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider org.apache.hadoop.fs.s3.S3FileSystem not found
at java.util.ServiceLoader.fail(ServiceLoader.java:239)
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2623)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2634)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2651)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeQualifiedPath(SessionCatalog.scala:115)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:145)

Using the RDD method to read from cassandra is successful ( i have tested it with count() call), but converting the RDD to DF is throwing the same exception faced in the first method.

public class SparkCassandraRDDApplication {
public static void main(String[] args) {
             SparkSession spark = SparkSession
              .builder()
              .appName("App")
              .config("spark.sql.warehouse.dir", "/file:/opt/spark/temp")
              .config("spark.cassandra.connection.host", "127.0.0.1")
              .config("spark.cassandra.connection.port", "9042")
              .master("local[2]")
              .getOrCreate();

    SparkContext sc = spark.sparkContext();

    //Read
    JavaRDD<UserData> resultsRDD = javaFunctions(sc).cassandraTable("mykeyspace", "mytable",CassandraJavaUtil.mapRowTo(UserData.class));

    //This is again throwing an exception
    Dataset<Row> usersDF = spark.createDataFrame(resultsRDD, UserData.class);

    //Print
    resultsRDD.foreach(data -> {
        System.out.println(data.id);
        System.out.println(data.username);
    });

    sc.stop();
  }
}

Answer:

Please check if "hadoop-common-2.2.0.jar" is available in classpath. You can test your application by creating a jar including all the dependencies. Use below pom.xml in which maven-shade-plugin is used to include all the dependencies to create uber jar.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.abaghel.examples.spark</groupId>
<artifactId>spark-cassandra</artifactId>
<version>1.0.0-SNAPSHOT</version>
<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.11</artifactId>
        <version>2.0.0-M3</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>reference.conf</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.abaghel.examples.spark.cassandra.SparkCassandraDatasetApplication</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
  </build>
</project>

You can run the jar like below

spark-submit --class com.abaghel.examples.spark.cassandra.SparkCassandraDatasetApplication spark-cassandra-1.0.0-SNAPSHOT.jar

Question:

I am currently using Hadoop and Cassandra to run Map Reduce tasks.

It works fine, but I am wanting to allow the reduce output to begin inserting data to a Cassandra table with a Map column type.

I have tried just simply changes the value out type to Map in the Reducer but this begins to cause issues internally the cassandra.hadoop implementation as it is always requiring a List.

I have tried to change the OutputValueClass setting, but to no avail.

Has anyone else had luck with outputting something other than List or be able to point me in the direction to understand how it might be achievable.


Answer:

I managed to figure out what I wanted to do, apologies for answering my own question just thought it may help people in a similar situation. Or will be able to tell me that my new way of thinking is also wrong.

I actually misunderstood what the KeyValueOut for a reducer was actually doing. My assumption was that I could change this value to be anything I wanted to write to Cassandra, and that the underlying Driver would just pick it up. For example Map or Text or Blob. However, I now think that it always has to be a List of ByteBuffers to allow anything to go into Cassandra from Hadoop.

I managed to get it working with the correct type by using the following:

     private ByteBuffer ExampleForMapTypes(JSONObject data){
        Map<String, String> mapper = new HashMap<>();            
        String user = data.get("Map_Left").toString();
        String agent = data.get("Map_Right").toString();
        mapper.put(user, agent);

        return MapType
          .getInstance(UTF8Type.instance, UTF8Type.instance)
          .decompose(mapper);
    }

The decompose method will return the ByteBuffer and this in turn allows the Cassandra driver to read what the output is going to be and properly handle it in the query. In this example it will output to a Cassandra Column of type Map

Question:

I'm a seasoned LAMP Developer and have decent experience with php, nginx, haproxy, redis, mongodb, and aws services. Whenever large data requirement comes to the table I go with aws web services and recently started reading about big data expecting to play with the technology on my own instead of using a hosted service for large data handling, stream processing etc.

However it's not the same journey like learning LAMP and because of the nature of the use cases it's hard to find good resources for a newbie. Specially for someone who haven't been with the Java eco system. (To my understanding Java software pretty much cover the popular big data stacks). The below list of softwares popups in pretty much everywhere when talking about big data, but it's very hard to grasp the concept of each and descriptions available at home pages of each project is pretty vague.

For instance "Cassandra", on surface it's the a good database to store time series data, but when reading more about analytics then other stacks come up, hadoop, pig, zookeeper etc.

  • Cassandra
  • Flink
  • Flume
  • Hadoop
  • Hbase
  • Hive
  • Kafka
  • Spark
  • Zookeeper

So in a nutshell, what does these software do? In context to big data, some of these projects share the same aspects, why do they co-exist then? what's the advantage? when to use what?


Answer:

As for hadoop, you have to understand, that Hadoop can mean two things, depending on the context. A bit like the term "Linux", if your familiar with that.

  • only the core: The real "Hadoop" is only a file system for decentralized storage of very large files + request framework to these files via Map/Reduce.
  • the whole ecosystem: This includes the core and all other tools that have been put onto hadoop for data analytics. Flume, Hbase, Hive, Kafka, Spark, Zookeeper are terms belonging to this category. Flink also might be, I am not sure.

Cassandra might also belong to the second category, because "Hadoop integration was added way back in version 0.6 of Cassandra".

To understand the whole ecosystem better, you have to understand how this is all structured:

From bottom to top:

  • bottom layer: Here you have your distributed file system and the Map/Reduce request framework. HDFS is the name of the file system, you will also see this term a lot. On top of HDFS, you can use HBase, which is a column oriented database on top of HDFS ¹.
  • middle layer, execution engines: In the middle we have several different engines, which can query the hadoop file system for information. Actually, some people put Map/Reduce on a second layer, because the Hadoop environment now also includes Tez and Spark. Tez speeds up queries by using graphs for map/reduce execution, I think. And Spark is an in-memory engine.
  • top layer, user abstractions: On top of the execution engines you have the user API/abstractions. This includes apache Hive (SQL like queries) and Pig (in my eyes a mixture of SQL and programming language). But there are also more specialized abstractions like MLlib, which is a library for machine learning on top of a hadoop system using Spark as middle layer.

Somewhere aside, we also have management tools for this whole ecosystem. Managing servers, managing the task execution order (job scheduling) and so on. This is where Kafka and Zookeeper belong to.

¹ I currently do not understand the relationship between HBase vs. ORC files or Parquet.