Hot questions for Using Cassandra in apache spark dataset

Top Java Programmings / Cassandra / apache spark dataset

Question:

I have a column of type set and I use collect_set() of spark Dataset API which returns a wrapped array of wrapped array. I want a single array from all values of the nested wrapped arrays. How can I do that?

Eg. Cassandra table:

Col1  
{1,2,3}
{1,5}

I'm using Spark Dataset API. row.get(0) returns a wrapped array of wrapped array.


Answer:

Consider you have Dataset<Row> ds which has value column.

+-----------------------+
|value                  |
+-----------------------+
|[WrappedArray(1, 2, 3)]|
+-----------------------+

And it has below schema

root
 |-- value: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = false)

Using UDF

Define UDF1 like below.

static UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>> getValue = new UDF1<WrappedArray<WrappedArray<Integer>>, List<Integer>>() {
      public List<Integer> call(WrappedArray<WrappedArray<Integer>> data) throws Exception {
        List<Integer> intList = new ArrayList<Integer>();
        for(int i=0; i<data.size(); i++){
            intList.addAll(JavaConversions.seqAsJavaList(data.apply(i)));
        }
        return intList;
    }
};

Register and call UDF1 like below

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.callUDF;
import scala.collection.JavaConversions;

//register UDF
spark.udf().register("getValue", getValue, DataTypes.createArrayType(DataTypes.IntegerType));

//Call UDF
Dataset<Row> ds1  = ds.select(col("*"), callUDF("getValue", col("value")).as("udf-value"));
ds1.show();

Using explode function

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;

Dataset<Row> ds2 = ds.select(explode(col("value")).as("explode-value"));
ds2.show(false);

Question:

The table structure in cassandra:

identifier, date, set(integer)

What I want to achieve using Spark is grouping rows by the identifier and the date, and then aggregating all the sets value. I will be more clear by an example:

Raw data: (Consider letters representing integers)

id1, 05-05-2017, {a,b,c}
id1, 05-05-2017, {c,d}
id1, 26-05-2017, {a,b,c}
id1, 26-05-2017, {b,c}
id2, 26-05-2017, {a,b,c}
id2, 26-05-2017, {b,c,d}

Output:

id1, 05-05-2017, {a,b,c,d}
id1, 26-05-2017, {a,b,c}
id2, 26-05-2017, {a,b,c,d}

Since this is a set, I want unique values in aggregated results. I am using java and dataset.


Answer:

If your dataframe has the columns you mentions you can do it like this:

df.withColumn("set", explode(col("set"))).groupBy("identifier", "date").agg(collect_set("set"))