Hot questions for Using Neo4j in apache spark

Top Java Programmings / Neo4j / apache spark

Question:

I have a program which works with Spark JavaStreamingContext. I have learnt that there are just a few output operations admitted when using DStreams, as print(). This is a piece of code

private static void analyzeHashtags() throws InterruptedException {
    JavaPairDStream<String, String> messages =  KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
    lines.print();
    jssc.start();
    jssc.awaitTermination();

}

Now I'd like to add queries operation to this code, like below:

private static void analyzeHashtags() throws InterruptedException, SQLException {
    JavaPairDStream<String, String> messages =  KafkaUtils.createStream(jssc, zookeeper_server, kafka_consumer_group, topics);
    JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
    lines.print();
    String hashtag = "#dummy"; int frequencies = 59;
    String cql = " CREATE (n:Hashtag {name:'"+hashtag+"', freq:"+frequencies+"})";
    st.executeUpdate(cql);
    jssc.start();
    jssc.awaitTermination();
}

But this code just executes the query once. I'd like it to execute it every time it loops. How is it possible to make this? Thanks in advance.


Answer:

To execute arbitrary operations on a DStream, we use foreachRDD. It provides access to the data at each batch interval, represented by the underlying rdd.

Java/Scala pseudo(mix)code:

JavaPairDStream<String, Integer> lines = messages.mapToPair((x)->(new 
Tuple2<String, Integer>(x._2, 1))).reduceByKey(sumFunc);
lines.foreachRDD{ rdd => 
    .. do something with the RDD here...
}

Normally, the do something operates on the data on the RDD. We can either operate on that data in a distributed way, by using RDD functions such as foreachPartition.

But, considering that you're using a local neo4j connection here, and if the data at each streaming interval is not very large, we can collect the data to the driver and do the operation locally. It seems that would be a fit in this case, as the data has passed already a distributed reduce phase (reduceBykey)

So, the foreachRDD part would become:

lines.foreachRDD{ rdd => 
    val localDataCollection = rdd.collect
    localDataCollection.foreach{ keywordFreqPair => 
      val cql = "CREATE (n:Hashtag {name:'"+keywordFreqPair._1+"', freq:"+keywordFreqPair._2+"})"
      st.executeUpdate(cql)
}

Question:

I am fetching neo4j data into spark dataframe using neo4j-spark connector. I am able to fetch it successfully as I am able to show the dataframe. Then I register the dataframe with createOrReplaceTempView() method. Then I try running spark sql on it, but it gives exception saying

org.apache.spark.sql.AnalysisException: Table or view not found: neo4jtable;

This is how my whole code looks like:

import java.text.ParseException;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.neo4j.spark.Neo4JavaSparkContext;
import org.neo4j.spark.Neo4j;

import scala.collection.immutable.HashMap;

public class Neo4jDF {

    private static Neo4JavaSparkContext neo4jjsc;
    private static SparkConf sConf;
    private static JavaSparkContext jsc;
    private static SparkContext sc; 
    private static SparkSession ss;

    private static Dataset<Row> neo4jdf;
    static String neo4jip = "ll.mm.nn.oo";

    public static void main(String[] args) throws AnalysisException, ParseException 
    {
        setSparkConf();
        setJavaSparkContext();
        setNeo4jJavaSparkContext();
        setSparkContext();
        setSparkSession();

        neo4jdf = loadNeo4jDataframe();
        neo4jdf.createOrReplaceTempView("neo4jtable");

        neo4jdf.show(false); //this prints correctly

        Dataset<Row> neo4jdfsqled = ss.sql("SELECT * from neo4jtable");

        neo4jdfsqled.show(false); //this throws exception
    }

    private static Dataset<Row> loadNeo4jDataframe(String pAutosysBoxCaption)
    {
        Neo4j neo4j = new Neo4j(jsc.sc());
        HashMap<String, Object> a = new HashMap<String, Object>();
        Dataset<Row> rdd = neo4j.cypher("cypher query deleted for irrelevance", a).loadDataFrame();
        return rdd;
    }

    private static void setSparkConf()
    {
        sConf = new SparkConf().setAppName("GetNeo4jToRddDemo");
        sConf.set("spark.neo4j.bolt.url", "bolt://" + neo4jip + ":7687");
        sConf.set("spark.neo4j.bolt.user", "neo4j");
        sConf.set("spark.neo4j.bolt.password", "admin");
        sConf.setMaster("local");
        sConf.set("spark.testing.memory", "471859200");
        sConf.set("spark.sql.warehouse.dir", "file:///D:/Mahesh/workspaces/spark-warehouse");
    }

    private static void setJavaSparkContext()
    {
        jsc = new JavaSparkContext(sConf);
    }

    private static void setSparkContext()
    {
        sc = JavaSparkContext.toSparkContext(jsc);
    }

    private static void setSparkSession()
    {
        ss = new SparkSession(sc);
    }

    private static void setNeo4jJavaSparkContext()
    {
        neo4jjsc = Neo4JavaSparkContext.neo4jContext(jsc);
    }
}

I feel the issue might be with how all spark environment variables are created. I first created SparkConf sConf. From sConf, I created JavaSparkContext jsc. From jsc, I created SparkContext sc. From sc, I created SparkSession ss. From ss, I created Neo4jJavaSparkContext neo4jjjsc.

So visually:

sConf -> jsc -> sc       -> ss 
             -> neo4jjsc 

Also note that

  • Inside loadNeo4jDataframe(), I use sc to instantiate instance Neo4j neo4j, which is then used for fetching neo4j data.
  • Data is fetched using Neo4j instance.
  • neo4jjjsc is never used, but I kept it as a possible hint for issue.

Given all these points and observations, please tell me why I get table not found exception? I must be missing something stupid. :\

Update

Tried setting ss (after data is fetched using SparkContext of neo4j) as follows:

private static void setSparkSession(SparkContext sc)
{
    ss = new SparkSession(sc);
}

private static Dataset<Row> loadNeo4jDataframe(String pAutosysBoxCaption)
{
    Neo4j neo4j = new Neo4j(sc);

    Dataset<Row> rdd = neo4j.cypher("deleted cypher for irrelevance", a).loadDataFrame();

    //initalizing ss after data is fetched using SparkContext of neo4j
    setSparkSession(neo4j.sc());  
    return rdd;
}

Update 2

From comments, just realised that neo4j creates a its own spark session using spark context sc instance provided to it. I am not having access to that spark session. So, how I am supposed to add / register arbitrary dataframe (here, neo4jdf) which is created in some other spark session (here spark session created by neo4j.cypher) to my spark session ss?


Answer:

Based on the symptoms we can infer that both pieces of code use different SparkSession / SQLContext. Assuming there is nothing unusual going on in the Neo4j connector, you should be able to fix this by changing:

private static void setSparkSession()
{
    ss = SparkSession().builder.getOrCreate();
}

or by initializing SparkSession before calling setNeo4jJavaSparkContext.

If these won't work, you can switch to using createGlobalTempView.

Important:

In general I would recommend initializing single SparkSession using builder pattern, and deriving other contexts (SparkContexts) from it, when necessary.

Question:

Summary

My question is about how Apache Spark Streaming can handle an output operation that takes a long time by either improving parallelization or by combining many writes into a single, larger write. In this case, the write is a cypher request to Neo4J, but it could apply to other data storage.


Environment

I have an Apache Spark Streaming application in Java that writes to 2 datastores: Elasticsearch and Neo4j. Here are the versions:

  • Java 8
  • Apache Spark 2.11
  • Neo4J 3.1.1
  • Neo4J Java Bolt Driver 1.1.2

The Elasticsearch output was easy enough as I used the Elasticsearch-Hadoop for Apache Spark library.


Our Stream

Our input is a stream from Kafka received on a particular topic, and I deserialize the elements of the stream through a map function to create a JavaDStream<[OurMessage]> dataStream. I then do transforms on this message to create a cypher query String cypherRequest (using an OurMessage to String transformation) that is sent to a singleton that manages the Bolt Driver connection to Neo4j (I know I should use a connection pool, but maybe that's another question). The cypher query produces a number of nodes and/or edges based on the contents of OurMessage.

The code looks something like the following.

dataStream.foreachRDD( rdd -> {
    rdd.foreach( cypherQuery -> {
        BoltDriverSingleton.getInstance().update(cypherQuery);
    });
});


Possibilities for Optimization

I have two thoughts about how to improve throughput:

  1. I am not sure if Spark Streaming parallelization goes down to the RDD element level. Meaning, the output of RDDs can be parallelized (within `stream.foreachRDD()`, but can each element of the RDD be parallelized (within `rdd.foreach()`). If the latter were the case, would a `reduce` transformation on our `dataStream` increase the ability for Spark to output this data in parallel (each JavaRDD would contain exactly one cypher query)?
  2. Even with improved parallelization, our performance would further increase if I could implement some sort of Builder that takes each element of the RDD to create a single cypher query that adds the nodes/edges from all elements, instead of one cypher query for each RDD. But, how would I be able to do this without using another kafka instance, which may be overkill?

Am I over thinking this? I've tried to research so much that I might be in too deep.


Aside: I apologize in advance if any of this is completely wrong. You don't know what you don't know, and I've just started working with Apache Spark and Java 8 w/ lambdas. As Spark users must know by now, either Spark has a steep learning curve due to it's very different paradigm, or I'm an idiot :).

Thanks to anyone who might be able to help; this is my first StackOverflow question in a long time, so please leave feedback and I will be responsive and correct this question as needed.


Answer:

I think all we need is a simple Map/Reduce. The following should allow us to parse each message in the RDD and then write it to the Graph DB all at once.

dataStream.map( message -> {
    return (ParseResult) Neo4JMessageParser.parse(message);
}).foreachRDD( rdd -> {
    List<ParseResult> parseResults = rdd.collect();
    String cypherQuery = Neo4JMessageParser.buildQuery(parseResults);
    Neo4JRepository.update(cypherQuery);
    // commit offsets
});

By doing this, we should be able to reduce the overhead associated with having to do a write for each incoming message.