Hot questions for Using Cassandra in apache storm

Top Java Programmings / Cassandra / apache storm

Question:

I am trying to insert data from storm to cassandra. It is of string type initially.

My java class has following code:

String insertQuery1
            = "insert into fault.as_fo_ag_uc ("
            + "host,"
            + "trigger,"
            + "eventtime,uuiddefault) "
            + "values(?,?,?,?)";    

BoundStatement boundStatement = new BoundStatement(statement);
boundStatement.setString(1, dto.getHost());
boundStatement.setString(2, dto.getTrigger());
Timestamp ts = Timestamp.valueOf(dto.getEventTime());
boundStatement.setDate(3, ts);
boundStatement.setString(4, dto.getUIDDefault());

Here eI get error that eventtime is of type timestamp. As I have converted to timestamp but it is notworking. I also have tried using Simple Date Formater to format it into date type

DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,z");
boundStatement.setDate(3, new java.sql.Date(df.parse(dto.getEventTime)).getTime()));

The error is "EventTime is of type timestamp". My cassandra driver version is 2.1.7.

com.datastax.driver.core.exceptions.InvalidTypeException: Value eventime is of type timestamp at com.datastax.driver.core.AbstractGettableByIndexData.checkType(AbstractGettableByIndexData.java:89) ~[cassandra-driver-core-2.1.7.1.jar:?] at com.datastax.driver.core.AbstractData.setString(AbstractData.java:157) ~[cassandra-driver-core-2.1.7.1.jar:?] at com.datastax.driver.core.BoundStatement.setString(BoundStatement.java:499) ~[cassandra-driver-core-2.1.7.1.jar:?] at storm.starter.bolt.CassandraWriterBolt.execute(CassandraWriterBolt.java:219) [classes/:?] at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819) [storm-core-0.10.0.jar:0.10.0] at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]


Answer:

The setXyz methods in BoundStatement that expect an int argument interpret that value as an index starting at 0.

So your

boundStatement.setString(2, dto.getTrigger());

is actually trying to set the placeholder for the eventtime column in the CQL query.

Change your statement to start at 0 and go up to 3.

BoundStatement boundStatement = new BoundStatement(statement);
boundStatement.setString(0, dto.getHost());
boundStatement.setString(1, dto.getTrigger());
Timestamp ts = Timestamp.valueOf(dto.getEventTime());
boundStatement.setDate(2, ts);
boundStatement.setString(3, dto.getUIDDefault());

Question:

I am working on a requirement where I need to read sensor data from csv/tsv and insert into Cassandra db.

CSV Format:

sensor1 timestamp1 value sensor1 timestamp2 value sensor2 timestamp1 value sensor2 timestamp3 value

Details:

User can upload a file to our web application. Once the file is uploaded, I need to display unique values from a column to User in the next page. For example ->

  1. sensor1 node1
  2. sensor2 node2
  3. sensorn create

User can either map a sensor1 with existing primary key called node1, in this case timestamps and values for sensor1 will be added to a table where primary key is equal to node1 or create primary key, in this case timestamps and values will be added with the new primary key.

I was able to implement this using Java8 streaming and collection. This is working with small csv file.

Question:
  1. How can I upload huge csv/ tsv file (200 gb) to my web application? Shall I upload the file in HDFS and specify the path in UI? I have even split the huge file into small chunks (50 MB each).

  2. How can I get unique values from first column? Can I use Kafka/ spark here? I need to insert timestamp/ value to Cassandra db. Again Can I use Kafka/ Spark here?

Any help is highly appreciated.


Answer:

How can I upload huge csv/ tsv file (200 gb) to my web application? Shall I upload the file in HDFS and specify the path in UI? I have even split the huge file into small chunks (50 MB each).

Depends on how your web app is going to be used. Uploading a file of such a huge size during the context of a HTTP request from a client to the server is always going to be tricky. You have to do it asynchronously. Whether you put that in HDFS or S3 or even a simple SFTP server is a matter of design choice and that choice will affect what kinds of tools you want to build around the file. I would suggest start with something simple like FTP/NAS and as you have needs to scale, you could use something like S3. (Using HDFS as a shared file storage is something I haven't seen many people do, but that shouldn't prohibit you from trying)

How can I get unique values from first column? Can I use Kafka/ spark here? I need to insert timestamp/ value to Cassandra db. Again Can I use Kafka/ Spark here?

Spark batch or even a normal M/R job would do the trick for you. This is just a simple groupBy operation, though you should really look at how far you are willing to sacrifice on the latency, as groupBy operations are generally costly (it involves shuffles). Generally, from my limited experience, using streaming for use-cases is slightly overkill, unless you get a continuous stream of source data. But the way you have described your use-case looks more a batch candidate for me.

Some things I would focus on: how do I transfer my file from the client app, what are my end-to-end SLAs for availability of data in Cassandra, what happens when there are failures (do we retry, etc.), how often my jobs will be run (will it be triggered every time user uploads the file or it can be a cron job), etc.

Question:

when I tried to save a table to cassandra using persist() method and kundera framework, i receive the error:

28462 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  d.d.pieceDAOImpl - start to insert data
28513 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  c.i.c.c.CassandraClientBase - Returning cql query  INSERT INTO "pieces"("width","height","depth","IdPiece") VALUES(10.0,11.0,12.0,'1') .
28543 [Thread-15-localhostAMQPbolt0-executor[2 2]] ERROR c.i.c.c.CassandraClientBase - Error while executing query  INSERT INTO "pieces"("width","height","depth","IdPiece") VALUES(10.0,11.0,12.0,'1')
28544 [Thread-15-localhostAMQPbolt0-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: com.impetus.kundera.KunderaException: com.impetus.kundera.KunderaException: InvalidRequestException(why:Unknown identifier IdPiece)
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:448) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:414) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$fn__8226$fn__8239$fn__8292.invoke(executor.clj:851) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_99]
Caused by: com.impetus.kundera.KunderaException: com.impetus.kundera.KunderaException: InvalidRequestException(why:Unknown identifier IdPiece)
        at com.impetus.kundera.persistence.EntityManagerImpl.persist(EntityManagerImpl.java:180) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.dao.pieceDAOImpl.insert(pieceDAOImpl.java:54) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.controller.DatabaseController.saveSensorEntitie(DatabaseController.java:47) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at connector.bolt.PrinterBolt.execute(PrinterBolt.java:66) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at org.apache.storm.daemon.executor$fn__8226$tuple_action_fn__8228.invoke(executor.clj:731) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$mk_task_receiver$fn__8147.invoke(executor.clj:463) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$clojure_handler$reify__7663.onEvent(disruptor.clj:40) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:435) ~[storm-core-1.0.0.jar:1.0.0]
        ... 6 more

And im sur that my idpiece is the primary key of my table.

my table:

CREATE TABLE mykeyspace.pieces (
    idpiece text PRIMARY KEY,
    depth double,
    height double,
    width double
) WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

my class entity

@Entity
@Table(name = "pieces", schema = "mykeyspace@cassandra_pu")
public class PieceEntitie implements Serializable{

    @Id
    private String IdPiece;
    @Column
    private double width;
    @Column
    private double height;
    @Column
    private double depth;

how can i resolve this problem ? thank you in advance


Answer:

Kundera uses quoted identifiers in the generated query.

INSERT INTO "pieces"("width","height","depth","IdPiece") VALUES(10.0,11.0,12.0,'1')

In cassandra, quoted identifiers are case-sensitive. So there is no such column "IdPiece" in your table.

Solution is to rename your field to idpiece in entity class

Question:

when I tried to save a table to cassandra using persist() method and kundera framework, i receive the error:

    18976 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  c.i.c.c.CassandraClientBase - Returning cql query  INSERT INTO "pieces"("width","depth","height","idpiece") VALUES(10.0,12.0,11.0,'1') .
18998 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  d.c.DatabaseController - insert piece to database: SUCCESS
18998 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  d.d.SensorDAOImpl - start to insert data
19011 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  c.i.c.c.CassandraClientBase - Returning cql query  INSERT INTO "sensors"("event_time","temperature","pressure","IdSensor","date","this$0") VALUES(1462959800344,10.0,10.0,'1',150055,sensor.entitie.predefinedModel.SensorEntitie@1c4a9b7b) .
19015 [Thread-15-localhostAMQPbolt0-executor[2 2]] ERROR c.i.c.c.CassandraClientBase - Error while executing query  INSERT INTO "sensors"("event_time","temperature","pressure","IdSensor","date","this$0") VALUES(1462959800344,10.0,10.0,'1',150055,sensor.entitie.predefinedModel.SensorEntitie@1c4a9b7b)
19015 [Thread-15-localhostAMQPbolt0-executor[2 2]] INFO  c.i.c.c.CassandraClientBase - Returning delete query DELETE FROM "pieces" WHERE "idpiece" = '1'.
19018 [Thread-15-localhostAMQPbolt0-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.RuntimeException: com.impetus.kundera.KunderaException: com.impetus.kundera.KunderaException: InvalidRequestException(why:line 1:184 mismatched character ')' expecting '-')
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:448) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:414) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$fn__8226$fn__8239$fn__8292.invoke(executor.clj:851) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.0.jar:1.0.0]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.7.0_99]
Caused by: com.impetus.kundera.KunderaException: com.impetus.kundera.KunderaException: InvalidRequestException(why:line 1:184 mismatched character ')' expecting '-')
        at com.impetus.kundera.persistence.EntityManagerImpl.persist(EntityManagerImpl.java:180) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.dao.SensorDAOImpl.insert(SensorDAOImpl.java:54) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at database.controller.DatabaseController.saveSensorEntitie(DatabaseController.java:49) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at connector.bolt.PrinterBolt.execute(PrinterBolt.java:66) ~[project-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
        at org.apache.storm.daemon.executor$fn__8226$tuple_action_fn__8228.invoke(executor.clj:731) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.daemon.executor$mk_task_receiver$fn__8147.invoke(executor.clj:463) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.disruptor$clojure_handler$reify__7663.onEvent(disruptor.clj:40) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:435) ~[storm-core-1.0.0.jar:1.0.0]
        ... 6 more

as you see I want to use onetomany my class piece entity

@Entity
@Table(name = "pieces", schema = "mykeyspace@cassandra_pu")
public class PieceEntitie implements Serializable{

    @Id
    private String IdPiece;
    @Column
    private double width;
    @Column
    private double height;
    @Column
    private double depth;

my class sensor entity

@EmbeddedId
    private CompoundKey key;
    @Column
    private float temperature;
    @Column
    private float pressure;

    @OneToMany(cascade = { CascadeType.ALL }, fetch = FetchType.EAGER)
    @JoinColumn(name="idsensor")
    private List<PieceEntitie> pieces;



    @Embeddable
    public class CompoundKey
    {
            @Column 
            private String IdSensor;           
            @Column 
            private long date;           
            @Column(name = "event_time")
            private long eventTime;



    }

my tables:

    CREATE TABLE mykeyspace.sensors (
    idsensor text,
    date bigint,
    event_time timestamp,
    pressure float,
    temperature float,
    PRIMARY KEY ((idsensor, date), event_time)
) WITH CLUSTERING ORDER BY (event_time ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

cqlsh:sensor> DESCRIBE table pieces ;

CREATE TABLE mykeyspace.pieces (
    idpiece text PRIMARY KEY,
    depth double,
    height double,
    idsensor text,
    width double
) WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

tutorial followed; https://github.com/impetus-opensource/Kundera/wiki/Polyglot-Persistence

how can i resolve this problem ?


Answer:

I resolved the problem by separating CompoundKey class and sensor class. before I put the CompoundKey class in the sensor class, so Kundera was trying to insert CompoundKey as an attribute