Hot questions for Using Cassandra in list

Question:

I am using Scala to connect with Cassandra and applying my queries in it, I have created a simple table in Cassandra which has two columns row_id and row_values. row_id has datatype as "varchar" and row_values stores the List of elements. I inserted some random values in the Table and want to retrieve these. For creating table:

CREATE TABLE Matrix1(row_id VARCHAR PRIMARY KEY, row_values LIST<VARCHAR>);

For Inserting Into the table:

INSERT INTO Matrix1(row_id, row_values) VALUES ('abcd3', ['dsf23', 'fsf1','dsdf1']);

Now I want to retrieve values and print them using Scala, I am using a code to save values from query

val results: ResultSet = session.execute("SELECT * FROM Matrix1 where row_id = 'abcd3'") 

Now I want to print the "row_id" and "row_values"

var rowid: String = null
var rowval: List[String] = null
for (row <- results) {
      rowid = row.getString("row_id")
      rowval = row.getList("row_values")
      }   

with this code I am getting the values in "rowid" because it is a String, but there is an error for "rowval". How can I retrieve the column (List type) from the ResultSet?


Answer:

Try to add class to getList method:

row.getList("row_values", classOf[String])

Question:

I am trying to insert data into Cassandra(2.1.9) My Java object has a map of a list of UDT. On running the code I am getting error regarding @Frozen annotation. I am using DataStax(2.1.9) Library. http://docs.datastax.com/en/drivers/java/2.1/index.html?com/datastax/driver/mapping/annotations/FrozenValue.html

create table user{
        name text,
        addresses map<text, frozen<list<frozen<address>>>>,
}

My Java Class

public class User{
    private String name;
    @FrozenValue
    private Map<String, List<AddressUDT>> addresses;
}

But I am getting following error

java.lang.IllegalArgumentException: Error while checking frozen types on field addresses of entity com.dante.data.model.User: expected AddressUDT to be frozen but was not frozen
    at com.datastax.driver.mapping.AnnotationChecks.validateAnnotations(AnnotationChecks.java:73) ~[cassandra-driver-mapping-2.1.7.jar:na]
    at com.datastax.driver.mapping.AnnotationParser.parseEntity(AnnotationParser.java:81) ~[cassandra-driver-mapping-2.1.7.jar:na]
    at com.datastax.driver.mapping.MappingManager.getMapper(MappingManager.java:148) ~[cassandra-driver-mapping-2.1.7.jar:na]
    at com.datastax.driver.mapping.MappingManager.mapper(MappingManager.java:105) ~[cassandra-driver-mapping-2.1.7.jar:na]
    at com.dante.data.migration.dao.UserMigrationDao.insertUsersToCassandra(UserMigrationDao.java:389) ~[UserMigrationDao.class:na]
    at com.dante.data.migration.service.impl.UserMigration.insertUsersToCassandra(UserMigration.java:32) ~[UserMigration.class:na]
    at com.dante.data.migration.controller.DataMigrationController.migrateUserDetails(DataMigrationController.java:93) ~[DataMigrationController.class:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) ~[spring-web-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:137) ~[spring-web-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110) ~[spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandleMethod(RequestMappingHandlerAdapter.java:776) ~[spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:705) ~[spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) ~[spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:959) ~[spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:893) ~[spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:966) [spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:857) [spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:617) [servlet-api.jar:na]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:842) [spring-webmvc-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:723) [servlet-api.jar:na]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290) [catalina.jar:6.0.44]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) [catalina.jar:6.0.44]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233) [catalina.jar:6.0.44]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191) [catalina.jar:6.0.44]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127) [catalina.jar:6.0.44]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103) [catalina.jar:6.0.44]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109) [catalina.jar:6.0.44]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:293) [catalina.jar:6.0.44]
    at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:861) [tomcat-coyote.jar:6.0.44]
    at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:620) [tomcat-coyote.jar:6.0.44]
    at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:489) [tomcat-coyote.jar:6.0.44]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]

Please suggest the resolution


Answer:

@Frozen("map<text, frozen<list<frozen<AddressUDT>>>>")
private Map<String, List<AddressUDT>> addresses;

Question:

I like to store a object like:

@Table(value = "my_table")
public class MyTableDto {
  @PrimaryKeyColumn(name = "uid", type = PrimaryKeyType.PARTITIONED)
  @CassandraType(type = DataType.Name.UUID)
  private UUID uid;

  @Column(value = "child_ids")
  private List<ChildIdDto> childIds;
}

Then I get the exception:

Caused by: org.springframework.dao.InvalidDataAccessApiUsageException: Only primitive types are allowed inside Collections for property [childIds] of type ['interface java.util.List'] in entity [de.myapplication.repository.dto.MyTableDto]

I do understand the exception, but is there another way to persist custom objects?

EDIT:

  • When I comment out this attribute, everything works

Answer:

! Never say never, I got the solution.

To give a good example, I will list all according classes.

ParentClass.java

@Table(value = "my_table") //OPT
public class MyTableDto {
  @PrimaryKeyColumn(name = "uid", type = PrimaryKeyType.PARTITIONED) 
  @CassandraType(type = DataType.Name.UUID)
  private UUID uid;

  @Column(value = "child_ids") //OPT
  private List<ChildDto> childIds;
}

ChildDto.java

@UserDefinedType // THE SOLUTION
public class ChildDto {
  @Column(value = "child") //OPT
  @CassandraType(type = DataType.Name.TEXT) //OPT
  private String groupId;

  @Column(value = "description") //OPT
  @CassandraType(type = Name.TEXT) //OPT
  private String description;
}

The @UserDefinedType is the solution. For more information see here.

NOTE: Each annotation with "OPT" is NOT required

Question:

How can I define a table schema for a custom column type, as described below?

I did look at the Datastax documentation for UDTs on frozen types. But, I am not able to apply that to my java code. What changes are required for Cassandra TYPE node, so that I can serialize/deserialize easily?. It should store List of Double arrays in column node.

static class Testf {
        String id;
        String name;
        List<Double[]> nodes;
    }

Table schema:

CREATE TABLE IF NOT EXISTS myks.testf(
id text,
name text,
nodes list<FROZEN<node>>,
PRIMARY KEY (id) );

CREATE TYPE myks.node (
     node map<double>
);

Answer:

The easiest way will be to use ObjectMapper from Java driver. You can add necessary annotations for your class, and then map class to Cassandra table & back. But you'll need to create a separate class to match your node UDT.

Question:

I have query cassandra table and selected curent_time as shown below:

 Dataset getTime = spark.sql("select current_time from trafficdata where current_time between "+ time1 +" and "+ time2 );
 getTime.show();
 List<Rows> list = getTime.collectAsList();

I want to convert this List<Rows> to List<Long>. Does anyone knows how to do that?


Answer:

You can't cast List<Row> to List<Long>, but you can convert it.

One thing: Dataset.collectAsList returns List<T>, for Dataset<Row> it would be List<Row>, not List<Rows> - probably typo.

You can do:

List<Long> longs = new ArrayList<>();
for(Row row : list) {
    longs.add(row.getLong(0));
} 

Java 8 Stream option:

List<Long> longs = list.stream().map(row -> row.getLong(0)).collect(Collectors.toList());

Question:

I am fetching a lot of rows from Cassandra using the Datastax Driver and I need to process them as quickly as possible.

I have looked into using List::parallelStream().forEach() which seems great at first since ResultSet acts a lot like a List, but sadly I am unable to use parallelStream() directly on ResultSet. To get this to work I first have to use ResultSet::all() which really is slow - I assume it iterates over each element.

ResultSet rs = this.getResultSet(); // Takes <1 second

// Convert the ResultSet to a list so as I can use parallelStream().
List<Row> rsList = rs.all(); // Takes 21 seconds

rsList.parallelStream().forEach(this::processRow); // Takes 3 seconds

Is there any faster way I can process each row of the result set?


Answer:

To get this to work I first have to use ResultSet::all() which really is slow

ResultSet.all() will fetch all rows using server-side paging. You can control the page size with statement.setFetchSize()

Is there any faster way I can process each row of the result set?

It depends on your query, what is it ? If you're doing a Full partition scan, there is only a couple of machines doing the job but if you're fetching data from multiple partitions, you can try to parallelize them with multiple queries, one for each partition

Question:

I'm trying to append an frozen list with UDT but i receive this exception :

org.springframework.data.cassandra.CassandraTypeMismatchException: Query; CQL [UPDATE docs SET items=items+? WHERE ip=?]; Value 0 of type class java.util.ArrayList does not correspond to any CQL3 type; nested exception is com.datastax.driver.core.exceptions.InvalidTypeException: Value 0 of type class java.util.ArrayList does not correspond to any CQL3 type

I created the UDT like that:

CREATE TYPE item (
    type text,
    uuid text,
    timestamp bigint,
    size bigint,
    content text
);

And the table like that:

CREATE TABLE docs (
    ip text PRIMARY KEY,
    items list<frozen<item>>,
    keys list<text>
);

Here the POJO of docs:

@Table("docs")
public class Document implements Serializable {

    private static final long serialVersionUID = 1L;

    @PrimaryKey
    private String ip;
    private List<String> keys;
    private List<Item> items;

    public void setIp(String ip) {
        this.ip = ip;
    }
    public void addNewKey(String key) {
        this.keys.add(key);
    }
    public void addNewItem(Item item) {
        this.items.add(item);
    }

    public Document(String ip) {
        this.ip = ip;
        this.keys = new ArrayList<String>();
        this.items = new ArrayList<Item>();
    }

    public List<Item> getAllItems() {
        return items;
    }
    public int getAllItemsLength() {
        try {
            return items.size();
        } catch (Exception e) {
            return 0;
        }
    }
    public List<String> getAllKeys() {
        return keys;
    }
    public int getAllKeysLength() {
        try {
            return keys.size();
        } catch (Exception e) {
            return 0;
        }
    }

    public String getAsJsonString() {
        return new Gson().toJson(this);
    }
}

Here the POJO of UDT:

@UserDefinedType(value="item")
public class Item implements Serializable {

    private static final long serialVersionUID = 1L;

    @NotNull
    private String type;
    @NotNull
    private String uuid;
    @NotNull
    private long timestamp;
    @NotNull
    private long size;
    @NotNull
    private String content;

    public Item(String type, String uuid, long timestamp, long size, String content) {
        this.type = type;
        this.uuid = uuid;
        this.timestamp = timestamp;
        this.size = size;
        this.content = content;
    }
}

Here the repository:

@Query(value="UPDATE docs SET items=items+?0 WHERE ip=?1")
@AllowFiltering
public void updateItemByIp(List<Item> item, String ip);

Here the service where i call updateItemByIp():

Item thatItem = new Item(
  "text/plain",
  "hrT4qLrWt1m3vwLU0smlIkwJJS7Y+/KhTudPwVCWf3w=",
  1583782576724,
  26,
  "content"
);
List<Item> itemList = new ArrayList<Item>();
itemList.add(thatItem);

DBRepo.updateItemByIp(itemList, ip); // <-- The exception is triggered here with IDE breakpoints

When i do the same query in cqlsh, it works perfectly.

This is my first question here, I hope I've been understood, thank you all for the help :)


Answer:

The solution was to use repository implemented methods: findById() and save() instead of my own method updateItemByIp(), like:

// inject the repository in the class
@Autowired
private DocumentsRepository DBRepo;

// ...

final Optional<Document> query = DBRepo.findById(923216);
if (query.isPresent()) {

    // create new item
    Item thatItem = new Item(
        "text/plain",
        "hrT4qLrWt1m3vwLU0smlIkwJJS7Y+/KhTudPwVCWf3w=",
        1583782576724,
        26,
        "content"
    );

    // add item to database
    query.map(dbDocument -> {
        dbDocument.addNewItem(thatItem);
        return DBRepo.save(dbDocument);
    });
};

And it works perfectly!

Question:

Im attempting to make async writes to a Cassandra cluster using ListenableFuture as follows:

private static Cluster cluster = null;
private ListeningExecutorService executorService;
private PreparedStatement preparedStatement;
private Session session = null;

... 

executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE));
...

public void writeValue(Tuple tuple) {
    ListenableFuture<String> future = executorService.submit(new Callable<String>() {
       @Override
       public String call() throws Exception {
           if(session == null) {
               session = getCluster().connect("dbname");
               preparedStatement = session.prepare(queryString);
           }

           try {
               BoundStatement boundStatement = preparedStatement.bind(tuple values);
               session.execute(boundStatement);
           } catch(Exception exception) {
                // handle exception
           }

           return null;
       }
    });

If I set POOL_SIZE to 1 everything works. If I set POOL_SIZE to > 1 I get errors as follows:

Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Tried to execute unknown prepared query : 0x75c5b41b9f07afa5384a69790503f963. You may have used a PreparedStatement that was created with another Cluster instance.

So I session and preparedStatement into local vars. Then I get warnings about Re-preparing already prepared query ... plus it's creating a new session every time.

I want to reuse as much as possible. What am I doing wrong and what are my options?

Would it help to make this class static?


Answer:

You have all sorts of race conditions here and execution isn't thread safe.

Each of Cluster, Session, and PreparedStatement are meant to be application scoped singletons, ie. you only need one (one for each query for PreparedStatement).

However, you are recreating a Session and potentially preparing PreparedStatement multiple times.

Don't. Initialize your Session once, in a constructor or some location that only runs once and prepare your statements at the same time. Then use the Session and PreparedStatement where appropriate.


Using a single threaded executor, everything runs as if it was synchronous. When you add more threads, many of them may call

session.prepare(queryString);

at the same time. Or the PreparedStatement you use here

BoundStatement boundStatement = preparedStatement.bind(tuple values);
session.execute(boundStatement);

might be different from the one you initialized

preparedStatement = session.prepare(queryString);

even within the same thread of execution. Or you might be attempting to execute the PreparedStatement with a different Session than the one used to initialize it.


Here are some things you should be doing when using CQL drivers.

  1. Is a prepared statement bound on one session or is it useable on another session?

    A prepared statement is derived from a particular session instance. So when you prepare a statement and it is sent over to the server, it is sent to the cluster with which this session instance is associated with.

The javadoc of Session states

Session instances are thread-safe and usually a single instance is enough per application.

Question:

I have a column info map<text, frozen<list<text>>> in cassandra. Trying to retrieve this map data in java from the below code.

for (Row row : conn.getSession().execute("SELECT info FROM demo.variants where chr = '" +chr + "' and pos = " + pos)) {
    Map<String, List> map = row.getMap("info", String.class, List.class);
    System.out.println(map);
}

When doing this way, I am getting the error

com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [list<varchar> <-> java.util.List]

I could not figure out how to resolve this nested type codec exception. What are the ways to resolve this?


Answer:

I would recommend using the TypeTokens.listOf(TypeToken) utility method to create a TypeToken<List<String>> and use that as the element type to retrieve the map, i.e.:

Map<String, List<String>> info = row.getMap("info", TypeToken.of(String.class), TypeTokens.listOf(String.class));

Question:

I have table emp(id,name,list<frozen<address>>). Here address is cassandra UDT defined as create TYPE address (hno int,street text);. I am trying to read all address's for a given id in emp using below code and I get the following error:

Exception in thread "main" com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [frozen<'address'> <-> com.xyz.cassandra.address]

String query1="select * from iotbilling.emp where id=?";
PreparedStatement preparedStatement2=this.session.prepare(query1);
BoundStatement boundStatement2=preparedStatement2.bind(4);
ResultSet rs2=this.session.execute(boundStatement2);
Row row2=rs2.one();
List<address> addresses=row2.getList("adresses",address.class);
System.out.println("Addresses retrieved");
for(address adr:addresses)
    System.out.println(adr.toString());

`

Here, how to capture the list of frozen address in java code that is returned from cassandra?


Answer:

You can read the value from row and read the metadata row by row:

UDTValue udtData = row.getUDTValue(address);

For example:

udtData.getString("name");

Update with list example

For a list it should probably look like:

List<UDTValue> udtDataList = row.getList("adresses", UDTValue.class)

And then you can easily iterate through the list and access the fields of your data.

Best

Question:

I use Cassandra 2.1.7 and CQL3 for processing it.

I have a column of type list<text> in my table. I use DataStax cassandra-driver-core-2.1.5 to read and write my data from Cassandra. While building of my query, I saw that the query that was building for my column list<text> was wrong.(I checked both the query's in Datastax DevCenter)

My code to build query.

Clause clause = null;
for (Entry<String, Object> val : conditionMap.entrySet()) {
    clause = QueryBuilder.eq("\"".concat(val.getKey()).concat("\""), val.getValue());
    where = where.and(clause);
}

The Query that build while I use the above method.

SELECT * FROM "ashokkeyspace"."table150" WHERE "id"=150 AND "listdata" =['apple'];

What query that I actually required.

SELECT * FROM "ashokkeyspace"."table150" WHERE "id"=150 AND "listdata" contains 'apple';

Answer:

UPDATE: 2.1.7 dropped today

Looks like this is supported as of the driver version 2.1.7.

Which is not yet officially released / documented.

But you can build yourself or hold off until it is GA.

I recommend waiting before pushing to Prod because there are some known performance issues with the current 2.1.7 branch.

You want the new ContainsClause in the query builder.

For updates on release dates etc. see the driver mailing list.

Question:

Model

    @Table(value = "bad_data")
    public class BadData {
        private String objectID;
        private String type;
        private String problems;
        private String owner;
        private String formattedID;
        private String project;

    @PrimaryKey
    private String train;
    private String status;

    /* Default constructor. */
    public BadData () {
        this.objectID = "";
        this.type = "";
        this.problems = "";
        this.owner = "";
        this.formattedID = "";
        this.project = "";
        this.train = "";
        this.status = "";
    }


    /* Getters and setters. */
    public void setObjectID (String objectID) {
        this.objectID = objectID;
    }

    public String getObjectID () {
        return this.objectID;
    }

    public void setType (String type) {
        this.type = type;
    }

    public String getType () {
        return this.type;
    }

    public void setProblems (String problems) {
        this.problems = problems;
    }

    public String getProblems () {
        return this.problems;
    }

    public void setOwner (String owner) {
        this.owner = owner;
    }

    public String getOwner () {
        return this.owner;
    }

    public void setFormattedID (String formattedID) {
        this.formattedID = formattedID;
    }

    public String getFormattedID () {
        return this.formattedID;
    }

    public void setProject (String project) {
        this.project = project;
    }

    public String getProject () {
        return this.project;
    }

    public void setTrain (String train) {
        this.train = train;
    }

    public String getTrain () {
        return this.train;
    }

    public void setStatus (String status) {
        this.status = status;
    }

    public String getStatus () {
        return this.status;
    }

}

Repository

@Autowired
public void save (CassandraOperations db) {
        BadData badData1 = new BadData();
        badData1.setTrain("train");
        badData1.setFormattedID("fid");
        badData1.addProblems("problem1");
        badData1.setObjectID("id");
        badData1.setOwner("lokesh");
        badData1.setType("story");
        badData1.setProject("om");
        badData1.setStatus("open");

        BadData badData2 = new BadData();
        badData2.setTrain("train");
        badData2.setFormattedID("fid");
        badData2.addProblems("problem2");
        badData2.setObjectID("id");
        badData2.setOwner("lokesh");
        badData2.setType("story");
        badData2.setProject("om");
        badData2.setStatus("open");

        BadData badData3 = new BadData();
        badData3.setTrain("train");
        badData3.setFormattedID("fid");
        badData3.addProblems("problem3");
        badData3.setObjectID("id");
        badData3.setOwner("lokesh");
        badData3.setType("story");
        badData3.setProject("om");
        badData3.setStatus("open");

        List<BadData> data = new ArrayList<>();
        data.add(badData1);
        data.add(badData3);
        data.add(badData2);

        db.insert(data);
    }

I am trying to save three objects placing them in list. I got only one object (badData3) saved into the database. I changed the order of those objects in the list. i noticed that which ever object is in the middle of the list is getting saved. Can some one guess what could be the possible error be


Answer:

Yes (krsyk is correct), the value ("train") of your @PrimaryKey field (train) is the same for all entities in your List.

Also, you should have a look at the corresponding test case (insertBatchTest) in test class (CassandraDataOperationsTest) in the SD Cassandra test suite.

For reassurance, I added the following code snippet to the end of the test...

assertThat(template.count(Book.class), is(equalTo(80l)));

And, the test passed as expected.

Note, I was using the latest Spring Data Cassandra 1.4.2.RELEASE.

Also note, because every INSERT or UPDATE in Cassandra is actually an "UPSERT" (see here), then you are effectively overwriting each entry (entity) in your List because of the duplicate primary key value.

Question:

I'm currently writing a very basic CQL access layer using the official Datastax Driver (V2.0) and struggling a bit on passing parameter values to a statement.

Here's an example

Column family (simplified)

USE myKeyspace;

CREATE TABLE MyTable (
    myId timeuuid,
    myTypeId int,
    myVal varchar,
    PRIMARY_KEY( myId, myTypeId )
);

CREATE INDEX MyTable_myTypeID 
    ON MyTable( myTypeId );

The basic idea is storing some event data with multiple values (per "event"), that's why I'm using the combined PK. Every event has its time-based UUID, there might be multiple entries per typeId. Does that even make sense from a modelling perspective?

What I'm trying to do now is fetching only entries for an event with a selection of 'typeIds'.

public void myQueryCode() {

    Cluster.Builder builder = Cluster.builder();
    builder.withPort( 9142 );
    builder.addContactPoints( "127.0.0.1" );
    cluster = builder.build();
    Session session = cluster.connect( "myKeyspace" );

    List<Integer> typeFilter = new ArrayList<> ();
    typeFilter.add( 1 );
    typeFilter.add( 2 );

    Statement stmt = new SimpleStatement( 
        "SELECT * FROM MyTable where myId = ?" +
        " AND myTypeId IN (?,?)" +
        " ALLOW FILTERING",
        UUID.randomUUID(),
        typeFilter );

    ResultSet result = session.execute( stmt );

    // do something with results
}

However, I'm just getting an exception deep down in serialization of the Statement's values.

com.datastax.driver.core.exceptions.InvalidQueryException: Expected 4 or 0 byte int (8)
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)

I'm not sure on passing the list as second parameter, the driver is taking the parameter, but maybe that is only suitable for inserts on collection-typed columns?


Answer:

You are correct that the List you are passing is being forced into the second parameter and this causes the InvalidQueryException.

Statement stmt = new SimpleStatement( 
        "SELECT * FROM MyTable where myId = ?#1" +
        " AND myTypeId IN (?#2,?#3)" +
        " ALLOW FILTERING",
        #1 UUID.randomUUID(),
        #2 typeFilter );

Since typeFilter is a list the driver then attempts to put a List Collection object into parameter 2 and things go wonky. This is because when you run statements like this (without preparing) the driver is unable to check the types, Comment in code. If instead you passed in

Statement stmt = new SimpleStatement( 
            "SELECT * FROM MyTable where myId = ?#1" +
            " AND myTypeId IN (?#2,?#3)" +
            " ALLOW FILTERING",
            #1 UUID.randomUUID(),
            #2 typeFilter.get(0)
            #3 typeFilter.get(1));

You would be fine. Or if you had prepared the statement first you would have been warned with a compile time error.

Question:

I have a User Defined Type in Cassandra which I created using following syntax in CQLSH :

CREATE TYPE order_items (
    qty int,
    name text,
    milk_type text,
    size text,
    price decimal
);

Now in my table, I am storing a list of the type "order_items" so I can store multiple items for one object.

Something like this :

CREATE TABLE order (
     order_id uuid PRIMARY KEY,
     amount decimal,
     location text,
     items list<frozen<order_items>>,
     status text,
     message text
);

If I want to store a record using CQLSH, I can do so with the following syntax,

INSERT INTO order (order_id,amount,location,items,status,message) VALUES 
(e7ae5cf3-d358-4d99-b900-85902fda9bb0, 5, 'San Jose',[{qty:2,name:'mocha',milk_type:'whole',size:'small',price:2.5}], 'PLACED','order is in process');

but when I try to do the same using the DataStax Java driver for cassandra, I am not able to provide the user defined type as a list of objects. I could only come up with a string syntax, but obviously it is throwing the above error.

I have tried referring the datastax documentation, but apparently it is still a work in progress : http://docs.datastax.com/en/developer/java-driver/3.1/manual/udts/

Here is my Java syntax:

 session.execute(insertorderPstmt.bind(UUID.randomUUID(),new BigDecimal("4"),"Santa Clara","[{qty:2,name:'mocha',milk_type:'whole',size:'small',price:2.5}]","PLACED","in process"));

and the error:

Exception in thread "main" com.datastax.driver.core.exceptions.InvalidTypeException: Invalid type for value 3, column is a list but class java.lang.String provided

Is there anyone who has been able to store custom type lists using java driver?


Answer:

Your insert query is not correct.

If you use single quote then use it for everything, and separate all field with coma. If the field is string then enclose it with quote,

Use the below insert query :

INSERT INTO order (
     order_id,
     amount,
     location,
     items,
     status,
     message
 ) VALUES (
    e7ae5cf3-d358-4d99-b900-85902fda9bb0,
    5,
    'San Jose', 
     [
       {qty:2, name:'mocha', milk_type:'whole', size:'small', price:2.5}
     ],
     'PLACED',
     'order is in process'
);

Using Java Driver :

Though you are inserting User define Type(UDT) value, you have create a custom codec or insert value using json or UDTValue

Here is how you can insert value through UDTValue :

//First get your UserType from cluster object
UserType oderItemType = cluster.getMetadata()
        .getKeyspace(session.getLoggedKeyspace())
        .getUserType("order_items");

//Now you can create a UDTValue from your UserType order_items and set value 
UDTValue oderItem = oderItemType.newValue()
        .setInt("qty", 2)
        .setString("name", "mocha")
        .setString("milk_type", "whole")
        .setString("size", "small")
        .setDecimal("price", new BigDecimal(2.5));

// Though you define UDT of List Let's create a list and put the value
List<UDTValue> orders = new ArrayList<>();
orders.add(oderItem);

Now you can insert data like below :

//Let your prepared statment be like
PreparedStatement insertorderPstmt = session.prepare("insert into order(order_id,amount,location,items,status,message) values(?, ?, ?, ?, ?, ?)");

//Now you can bind the value and execute query
session.execute(insertorderPstmt.bind(UUID.randomUUID(), new BigDecimal("4"), "Santa Clara", orders, "PLACED", "in process"));

Question:

I am following the Cassandra Bulk Loader example for Cassandra 3.4.0 from here https://github.com/yukim/cassandra-bulkload-example . The data files are generated under data folder. These are as follows :

quote-historical_prices-ka-1-CompressionInfo.db  quote-historical_prices-ka-1-Index.db
quote-historical_prices-ka-1-Data.db             quote-historical_prices-ka-1-Statistics.db
quote-historical_prices-ka-1-Digest.sha1         quote-historical_prices-ka-1-TOC.txt
quote-historical_prices-ka-1-Filter.db

however when trying to load these using sstableloader -d 127.0.0.1 ~/workspace/cassandra-bulkload-example/data/quote/historical_prices/ i get an error as follows :

Established connection to initial hosts
Opening sstables and calculating sections to stream
Failed to list files in /home/srai/workspace/cassandra-bulkload-example/data/quote/historical_prices
java.lang.NullPointerException
java.lang.RuntimeException: Failed to list files in /home/srai/workspace/cassandra-bulkload-example/data/quote/historical_prices
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.list(LogAwareFileLister.java:53)
        at org.apache.cassandra.db.lifecycle.LifecycleTransaction.getFiles(LifecycleTransaction.java:544)
        at org.apache.cassandra.io.sstable.SSTableLoader.openSSTables(SSTableLoader.java:76)
        at org.apache.cassandra.io.sstable.SSTableLoader.stream(SSTableLoader.java:165)
        at org.apache.cassandra.tools.BulkLoader.load(BulkLoader.java:80)
        at org.apache.cassandra.tools.BulkLoader.main(BulkLoader.java:48)
Caused by: java.lang.NullPointerException
        at org.apache.cassandra.io.sstable.format.SSTableReader.openForBatch(SSTableReader.java:431)
        at org.apache.cassandra.io.sstable.SSTableLoader.lambda$openSSTables$220(SSTableLoader.java:121)
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.lambda$innerList$208(LogAwareFileLister.java:75)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at java.util.TreeMap$EntrySpliterator.forEachRemaining(TreeMap.java:2965)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.innerList(LogAwareFileLister.java:77)
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.list(LogAwareFileLister.java:49)
        ... 5 more
Exception in thread "main" org.apache.cassandra.tools.BulkLoadException: java.lang.RuntimeException: Failed to list files in /home/srai/workspace/cassandra-bulkload-example/data/quote/historical_prices
        at org.apache.cassandra.tools.BulkLoader.load(BulkLoader.java:93)
        at org.apache.cassandra.tools.BulkLoader.main(BulkLoader.java:48)
Caused by: java.lang.RuntimeException: Failed to list files in /home/srai/workspace/cassandra-bulkload-example/data/quote/historical_prices
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.list(LogAwareFileLister.java:53)
        at org.apache.cassandra.db.lifecycle.LifecycleTransaction.getFiles(LifecycleTransaction.java:544)
        at org.apache.cassandra.io.sstable.SSTableLoader.openSSTables(SSTableLoader.java:76)
        at org.apache.cassandra.io.sstable.SSTableLoader.stream(SSTableLoader.java:165)
        at org.apache.cassandra.tools.BulkLoader.load(BulkLoader.java:80)
        ... 1 more
Caused by: java.lang.NullPointerException
        at org.apache.cassandra.io.sstable.format.SSTableReader.openForBatch(SSTableReader.java:431)
        at org.apache.cassandra.io.sstable.SSTableLoader.lambda$openSSTables$220(SSTableLoader.java:121)
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.lambda$innerList$208(LogAwareFileLister.java:75)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
        at java.util.TreeMap$EntrySpliterator.forEachRemaining(TreeMap.java:2965)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.innerList(LogAwareFileLister.java:77)
        at org.apache.cassandra.db.lifecycle.LogAwareFileLister.list(LogAwareFileLister.java:49)
        ... 5 more

Answer:

The bulk loader example by default builds sstables for 2.1.12. Streaming < 3.0.0 tables is not supported, This was brought up for not skip or provide good error before CASSANDRA-10940. However, in 3.5 they will start supporting it again in CASSANDRA-10990.

So either build the sstables in the newer version or wait for 3.5.

Question:

I currently have a linkedlist which contains a list of usernames which I wish to use in a cassandra query. I am trying to get posts from all the users in that linkedlist.

The queries I am currently using are:

select * from userposts and

select * from userposts where user =?


Answer:

Is user the partition key? If so, you should be able to do:

Select * from userposts where user in (val1, val2, val3);

Check http://www.datastax.com/documentation/cql/3.0/cql/cql_reference/select_r.html?scroll=reference_ds_d35_v2q_xj__selectIN for details.

Question:

I have written a java program to perform upsert operation on my cassandra db which consist of a table having multiple list and using my java code i am trying to write to many such lists at the same time suppose a 10 write at a moment, but when i view my cassandra table the values in the lists are not getting stored in a synchronized way a few of the values switch their place from their original position though the timestamp is correct regarding each value but somehow the list is not represented in a correct order. Sharing files and sample code that would be helpfull to identify the issue.

String newInsertQuery1 =  "UPDATE events.generated_event SET attributes = ['100'] + attributes, channels = ['100'] + channels, " + "event_types = ['100'] + event_types, ip = ['100'] + ip, library_info = ['100'] + library_info, property_ids = ['100'] + property_ids," + "texts = ['100'] + texts, user_agent = ['100'] + user_agent WHERE profile_id = '1111' AND project_id = '5bbc83f4bf52016962b695da' AND bucket_id = 1555977600000"; 

String newInsertQuery2 =  "UPDATE events.generated_event SET attributes = ['300'] + attributes, channels = ['300'] + channels, " + "event_types = ['300'] + event_types, ip = ['300'] + ip, library_info = ['300'] + library_info, property_ids = ['300'] + property_ids," + "texts = ['300'] + texts, user_agent = ['300'] + user_agent WHERE profile_id = '1111' AND project_id = '5bbc83f4bf52016962b695da' AND bucket_id = 1555977600000";

String newInsertQuery3 =  "UPDATE events.generated_event SET attributes = ['400'] + attributes, channels = ['400'] + channels, " + "event_types = ['400'] + event_types, ip = ['400'] + ip, library_info = ['400'] + library_info, property_ids = ['400'] + property_ids," + "texts = ['400'] + texts, user_agent = ['400'] + user_agent WHERE profile_id = '1111' AND project_id = '5bbc83f4bf52016962b695da' AND bucket_id = 1555977600000";

similarly query 4, 5, 6 ,7.

I Have made many threads that run at the same time and each thread execute a single query.

expexted result:

 profile_id   | 1111
 project_id   | 5bbc83f4bf52016962b695da
 bucket_id    | 1555977600000

 anonymous_id | 150698a7-5d02-f634-3c8d-4d7bf615f13e

 attributes   | ['300', '700', '400', '600', '500', '800', '00']

 channels     | ['300', '700', '400', '600', '500', '800', '00']

 event_types  | ['300', '700', '400', '600', '500', '800', '00']

 ip           | ['300', '700', '400', '600', '500', '800', '00']

 library_info | ['300', '700', '400', '600', '500', '800', '00']

 property_ids | ['300', '700', '400', '600', '500', '800', '00']

 texts        | ['300', '700', '400', '600', '500', '800', '00']

 timestamps   | null

 user_agent   | ['300', '700', '400', '600', '500', '800', '00']

Actual Result:

 profile_id   | 1111
 project_id   | 5bbc83f4bf52016962b695da
 bucket_id    | 1555977600000
 anonymous_id | 150698a7-5d02-f634-3c8d-4d7bf615f13e

attributes   | ['300', '700', '500', '400', '800', '600', '00']

channels     | ['300', '700', '400', '600', '800', '500', '00']

event_types  | ['300', '700', '400', '600', '500', '800', '00']

ip           | ['300', '700', '400', '600', '500', '800', '00']

library_info | ['300', '700', '400', '600', '500', '800', '00']

property_ids | ['300', '700', '400', '600', '500', '800', '00']

texts        | ['300', '700', '400', '600', '800', '500', '00']

timestamps   | null

 user_agent   | ['300', '700', '400', '600', '500', '800', '00']

The image attached consist of the output of the sstabledump command and as you can see the tstamp are same for values say 800 in each row but when i perform a read on this table the values printed to me are not sorted out they are printed in the same order as written in sstabledump


Answer:

If you're using multiple queries on the same row issued by different threads, you can't guarantee that the updates will be performed in specific order.

Question:

Hi I am very new to spark and scala, Here I am facing some issue with saving data into cassandra below is my scenario

1) I get list of user defined objects (say User Objects which contains firstName, lastName etc..) from my java class to scala class and upto here its fine I am able to access User Object and able to print its contents

2) Now I want to save that usersList into cassandra table using spark context, I have gone through many examples but every where I see creating Seq with our caseClass and hardcoded values and then saving to cassandra, I have tried that and working fine for me as below

import scala.collection.JavaConversions._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import com.datastax.spark.connector._
import java.util.ArrayList

object SparkCassandra extends App {
    val conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("SparkCassandra")
        //set Cassandra host address as your local address
        .set("spark.cassandra.connection.host", "127.0.0.1")
    val sc = new SparkContext(conf)
     val usersList = Test.getUsers
     usersList.foreach(x => print(x.getFirstName))
    val collection = sc.parallelize(Seq(userTable("testName1"), userTable("testName1")))
    collection.saveToCassandra("demo", "user", SomeColumns("name"))
    sc.stop()
}

case class userTable(name: String)

But here my requirement is to use dynamic values from my usersList instead of hardcoaded values, or any other way to achieve this.


Answer:

If you create an RDD of CassandraRow objects, you can directly save the result without having to specify columns or case classes. In addition, CassandraRow has the hugely convenient fromMap function, so you can define your rows as Map objects, convert them, and save the result.

Example:

val myData = sc.parallelize(
  Seq(
    Map("name" -> "spiffman", "address" -> "127.0.0.1"),
    Map("name" -> "Shabarinath", "address" -> "127.0.0.1")
  )
)

val cassandraRowData = myData.map(rowMap => CassandraRow.fromMap(rowMap))

cassandraRowData.saveToCassandra("keyspace", "table")

Question:

I'm iterating over billions of rows in cassandra with the spark/cassandra driver and pulling out data to run statistics on. To accomplish this I am running a FOR loop over each row of data and if it falls within the criteria of a bucket of data i'm calling "channel" then I add it to an ArrayList in the form of of K,V pair of channel,power.

[[Channel,Power]]

The channels should be static based on the iteration increment of the for loop. For example if my channels range is 0 through 10 with an increment of 2 then the channels would be 0,2,4,6,8,10

The FOR loop runs on the current row of data and checks to see if the data falls within the channel and if so adds it to the ArrayList Data in the format of [[Channel,Power]]

Then proceeds to the next row and does the same. Once it goes over all the rows it then increments to the next channel and repeats the process.

The issue is there are billions of rows that qualify for the same channel so I'm not sure if I should be using an ArrayList and flatMap or something else since my results to be slightly different each time I run it and the channels are not static as they should be.

A small sample of data [[Channel,Power]] would be:

[[2,5]]
[[2,10]]
[[2,5]]
[[2,15]]
[[2,5]]

Notice that there my be items that are duplicate that need to remain since I run min,max,average stats on each of these Channels.

Channel 2: Min 5, Max 15, Avg 8

My Code is as follows:

JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SparkTestB", "Measured_Value", mapRowTo )
            .select("Start_Frequency","Bandwidth","Power");
    JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
      @Override
      public Iterable<Value> call(MeasuredValue row) throws Exception {
        long start_frequency = row.getStart_frequency();
        float power = row.getPower();
        long bandwidth = row.getBandwidth();

        // Define Variable
        long channel,channel_end, increment; 

        // Initialize Variables
        channel_end = 10;
        increment = 2;

        List<Value> list = new ArrayList<>();
        // Create Channel Power Buckets
        for(channel = 0; channel <= channel_end; ){
          if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
            list.add(new Value(channel, power));
          } // end if
          channel+=increment;
        } // end for 
        return list; 
      }
    });

     sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
     .agg(min("power"), max("power"), avg("power"))
     .write().mode(SaveMode.Append)     
     .option("table", "results")
     .option("keyspace", "model")
     .format("org.apache.spark.sql.cassandra").save();

My Classes are a follows for the Reflection:

public class Value implements Serializable {
    public Value(Long channel, Float power) {
        this.channel = channel;
        this.power = power;
    }
    Long channel;
    Float power;

    public void setChannel(Long channel) {
        this.channel = channel;
    }
    public void setPower(Float power) {
        this.power = power;
    }
    public Long getChannel() {
        return channel;
    }
    public Float getPower() {
        return power;
    }

    @Override
    public String toString() {
        return "[" +channel +","+power+"]";
    }
}

public static class MeasuredValue implements Serializable {
        public MeasuredValue() { }

        public long start_frequency;
        public long getStart_frequency() { return start_frequency; }
        public void setStart_frequency(long start_frequency) { this.start_frequency = start_frequency; }

        public long bandwidth ;
        public long getBandwidth() { return bandwidth; }
        public void setBandwidth(long bandwidth) { this.bandwidth = bandwidth; }

        public float power;    
        public float getPower() { return power; }
        public void setPower(float power) { this.power = power; }

    }

Answer:

I discovered that the discrepancies were do to my channelization algorithm. I replaced with the following to solve the problem.

        // Create Channel Power Buckets
        for(; channel <= channel_end; channel+=increment ){ 
            //Initial Bucket
            while((start_frequency >= channel) && (start_frequency < (channel + increment))){
                list.add(new Value(channel, power));
                channel+=increment;
            }
            //Buckets to Accomodate for Bandwidth
            while ((channel <= channel_end) && (channel >= start_frequency) && (start_frequency + bandwidth) >= channel){
                list.add(new Value(channel, power));                           
                channel+=increment;
            }                   
        }