Hot questions for Using Cassandra in astyanax

Question:

This is my setup:

  • 4 nodes Cassandra 1.2.19
  • Astyanax 1.56.49

I am setting configuration like

AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder()
    .forCluster(service.getClusterName())
    .forKeyspace(service.getKeySpaceName())
    .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
         .setDiscoveryType(NodeDiscoveryType.NONE)
         .setCqlVersion("3.0.0")
         .setDefaultReadConsistencyLevel(consistencyLevel.getAstyanaxValue())
         .setDefaultWriteConsistencyLevel(consistencyLevel.getAstyanaxValue())
    )
    .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("b2bConnectionPool")
         .setPort(service.getPort())
         .setMaxConnsPerHost(5)
         .setSeeds(StringUtils.join(hosts, ","))
         // increase default timeout for heavy operations (milliseconds)
         .setSocketTimeout(15000)
         .setSSLConnectionContext(sslContext)
         .setAuthenticationCredentials(credentials)
    )
    .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
    .buildKeyspace(ThriftFamilyFactory.getInstance());

Now there is a reproducible query that takes a long time and finally throws a OperationTimeoutException:

com.netflix.astyanax.connectionpool.exceptions.OperationTimeoutException: OperationTimeoutException: [host=myhost(myip):13260, latency=10001(40007), attempts=4]TimedOutException()
    at com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException(ThriftConverter.java:171) ~[astyanax-thrift-1.56.49.jar:na]
    at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:65) ~[astyanax-thrift-1.56.49.jar:na]
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1$2.execute(ThriftColumnFamilyQueryImpl.java:190) ~[astyanax-thrift-1.56.49.jar:na]
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1$2.execute(ThriftColumnFamilyQueryImpl.java:182) ~[astyanax-thrift-1.56.49.jar:na]
    at com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftConnection.execute(ThriftSyncConnectionFactoryImpl.java:151) ~[astyanax-thrift-1.56.49.jar:na]
    at com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverImpl.tryOperation(AbstractExecuteWithFailoverImpl.java:119) ~[astyanax-core-1.56.49.jar:na]
    at com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnectionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:338) ~[astyanax-core-1.56.49.jar:na]
    at com.netflix.astyanax.thrift.ThriftColumnFamilyQueryImpl$1.execute(ThriftColumnFamilyQueryImpl.java:180) ~[astyanax-thrift-1.56.49.jar:na]

The exception message says "latency=10001" and I thought this should be the socket timeout configured to 15000 ms but it's obviously not. How can I increase the timeout for a query operation in astyanax?


Answer:

I was getting the similar timeout exception when trying to query nodes with bigger data. I modified these 4 values in cassandra.yaml and it resolved all the timeout errors.

How long the coordinator should wait for read operations to complete read_request_timeout_in_ms: 15000

How long the coordinator should wait for seq or index scans to complete range_request_timeout_in_ms: 30000

How long the coordinator should wait for writes to complete write_request_timeout_in_ms: 30000

How long the coordinator should wait for counter writes to complete counter_write_request_timeout_in_ms: 15000

Note: You need to do this in all the nodes in the cluster and also need to restart Cassandra in all of them.

Question:

[SOLVED] -

I am not sure why, but removing the

m.withRow(CF_test, "acct1234")
   .incrementCounterColumn("loginCount", 1); 

part of my code made it work. Alternatively, if someone could explain why it does so, that would be appreciated. Thank you!


I've been having trouble with Astyanax for Cassandra-cli. I couldn't really figure out much from the Netflix Github site, so I was looking to ask people with more firsthand experience.

How would I go about inserting rows into a Column Family that I have already created in Cassandra? Currently, my code looks like this:

// Inserting data
MutationBatch m = keyspace.prepareMutationBatch();


//Initialize Column family
ColumnFamily<String, String> CF_test =
new ColumnFamily<String, String>(
"users",              // Column Family Name
StringSerializer.get(),   // Key Serializer
StringSerializer.get());  // Column Serializer


m.withRow(CF_test, "default")
  .putColumn("full_name", "john", null)
  .putColumn("email", "smith", null)
  .putColumn("state", "555 Elm St", null)
  .putColumn("gender", "Male", null)
  .putColumn("birth_year", 30, null);

m.withRow(CF_test, "acct1234")
  .incrementCounterColumn("loginCount", 1);


  OperationResult<Void> result = m.execute();

However, it gives me the following error:

Exception in thread "main" com.netflix.astyanax.connectionpool.exceptions.BadRequestException: BadRequestException: [host=127.0.0.1(127.0.0.1):9160, latency=55(55), attempts=1]InvalidRequestException(why:invalid operation for non commutative columnfamily users)
at com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException(ThriftConverter.java:159)
at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:65)
at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:28)
at com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftConnection.execute(ThriftSyncConnectionFactoryImpl.java:151)
at com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverImpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69)
at com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnectionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:256)
at com.netflix.astyanax.thrift.ThriftKeyspaceImpl.executeOperation(ThriftKeyspaceImpl.java:485)
at com.netflix.astyanax.thrift.ThriftKeyspaceImpl.access$000(ThriftKeyspaceImpl.java:79)
at com.netflix.astyanax.thrift.ThriftKeyspaceImpl$1.execute(ThriftKeyspaceImpl.java:123)
at Test1.main(Test1.java:76)

Caused by: InvalidRequestException(why:invalid operation for non commutative columnfamily users)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:20833)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:964)
at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:950)
at com.netflix.astyanax.thrift.ThriftKeyspaceImpl$1$1.internalExecute(ThriftKeyspaceImpl.java:129)
at com.netflix.astyanax.thrift.ThriftKeyspaceImpl$1$1.internalExecute(ThriftKeyspaceImpl.java:126)
at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:60)
... 8 more

Creating keyspaces worked, now I'm just unable to get column families changed.


Answer:

To use

m.withRow(CF_test, "acct1234").incrementCounterColumn("loginCount", 1);

(counters),

you have to define a column family (or super column family) whose columns will act as counters.

Default validation class must be set to CounterColumnType.

CounterColumnType may only be set in the default_validation_class. A column family either contains only counters, or no counters at all.

Take a look at this page that refers to this page.

Question:

I'm looking at https://github.com/Netflix/astyanax/wiki/Cassandra-compatibility and it doesn't seem to be updated to current versions. I'm currently using Cassandra 2.0.x and I am wondering if it's compatible with Astyanax 3.10.x, since it's causing a few tests to fail.


Answer:

From my understanding Cassandra 2.0 was the last one added, it seems like astyanax is retired. Its possible that some releases on 2.0.X would work but the errors you are seeing are probably due to the new changes/functionality added to Cassandra which is not supported by astyanax.

Here is their last update where they suggest switching to datastax

If you really need to use astyanax i would suggest going down to Cassandra 2.0 otherwise it might be best to switch to datastax

Question:

I have a batch job that reads through approximately 33 million rows in Cassandra, using the AllRowsReader as described in the Astyanax wiki:

new AllRowsReader.Builder<>(getKeyspace(), columnFamily)
            .withPageSize(100)
            .withIncludeEmptyRows(false)
            .withConcurrencyLevel(1)
            .forEachRow(
                row -> {
                    try {
                        return processRow(row);
                    } catch (Exception e) {
                        LOG.error("Error while processing row!", e);
                        return false;
                    }
                }
            )
            .build()
            .call();

If some sort of error stops the batch job, I would like to be able to pick up and continue reading from the row where it stopped, so that I don't have to start reading from the first row again. Is there any fast and simple way to do this?

Or isn't the AllRowsReader the right fit for this kind of task?


Answer:

Since nobody has answered let me try this one. Cassandra uses partitioners to determine in which node it should place the row. There are mainly two type of partitioners: 1) Ordered 2) Unordered

https://docs.datastax.com/en/cassandra/2.2/cassandra/architecture/archPartitionerAbout.html

In case of Ordered Partitioner, rows are placed according to the lexicographic order.But in case of Unordered Partitioner you dont have any way to know about the order.

Ordered Partitioner are regarded as anti-pattern in cassandra because it makes cluster distribution pretty difficult. https://docs.datastax.com/en/cassandra/2.2/cassandra/planning/planPlanningAntiPatterns.html

I am assuming you should be using unordered partitioner in your code. So currently there is no way to tell cassandra which is using unordered partitioner that start from this particular row.

I hope this answers your question

Question:

Maybe I misunderstood how the automatic node discovery in the Astyanax Cassandra API works, but here is my problem:

I have the following setup:

2 Datacenters with 2 nodes each and a replication factor of 2.

DC1: N1 and N2 and DC2: N3 and N4

The seed nodes are N1 and N3 (also provided to the applicaton). The automatic discovery of the other nodes (N2 and N4) seems to work. Even though, they are not shown in the hosts pool.

If N3 fails, the data is correctly written to N4 and it is also correctly synchronized to N3 when the node comes up again. The same goes for N1 and N2.

The problem happens when both seed nodes (N1 and N3) fail. Then the data is no longer written to N2 and N4 (as expected), but an Exception causes the application to fail (Astyanax writes an info of an exception to the log, when one seed node is down, but this normally doesn't cause the application to fail).

It is clear that the seed nodes have to be online when the application starts, but I thought that the automatic node discovery in astyanax would allow the seed nodes to fail, so that the replication nodes can take over (using a consistency level of CL_ONE).

Is there a way to avoid this failure, or do I just misunderstand the automatic node discovery, or am I just doing something terribly wrong?

Some additional information: The nodes mainly use the default settings in cassandra.yaml and the tokens were generated with the python script, proposed in the documentation.

private AstyanaxContext<Cluster> connect(final String hosts) {
    AstyanaxConfigurationImpl asConfig = new AstyanaxConfigurationImpl();
    asConfig.setDefaultWriteConsistencyLevel(ConsistencyLevel.CL_ONE);
    asConfig.setDefaultReadConsistencyLevel(ConsistencyLevel.CL_ONE);
    AstyanaxContext<Cluster> context = new AstyanaxContext.Builder()
            .forCluster("TestSuitCluster")
            .withAstyanaxConfiguration(
                    asConfig.setDiscoveryType(NodeDiscoveryType.TOKEN_AWARE)
                    .setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE))
            .withConnectionPoolConfiguration(
                    new ConnectionPoolConfigurationImpl(
                            "CassandraConnectionPool").setSeeds(hosts)
                            .setMaxConnsPerHost(8).setMaxConns(8))
            .withConnectionPoolMonitor(new ConnectionPoolMonitor())
            .buildCluster(ThriftFamilyFactory.getInstance());
    context.start();
    return context;
}

The stacktrace, that is shown, when the last seed node falls away:

com.netflix.astyanax.connectionpool.exceptions.PoolTimeoutException: PoolTimeoutException: [host=127.0.0.1(127.0.0.1):9160, latency=2000(2000), attempts=1]Timed out waiting for connection
    at com.netflix.astyanax.connectionpool.impl.SimpleHostConnectionPool.waitForConnection(SimpleHostConnectionPool.java:218)
    at com.netflix.astyanax.connectionpool.impl.SimpleHostConnectionPool.borrowConnection(SimpleHostConnectionPool.java:185)
    at com.netflix.astyanax.connectionpool.impl.RoundRobinExecuteWithFailover.borrowConnection(RoundRobinExecuteWithFailover.java:66)
    at com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverImpl.tryOperation(AbstractExecuteWithFailoverImpl.java:67)
    at com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnectionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:256)
    at com.netflix.astyanax.thrift.ThriftClusterImpl.describeKeyspaces(ThriftClusterImpl.java:165)
    at com.netflix.astyanax.thrift.ThriftClusterImpl.describeKeyspace(ThriftClusterImpl.java:184)
    at at.dbeg.cassandra.CasandraTestSuit.deleteKeyspace(CasandraTestSuit.java:134)
    at at.dbeg.cassandra.CasandraTestSuit.runTests(CasandraTestSuit.java:189)
    at at.dbeg.cassandra.CasandraTestSuit.main(CasandraTestSuit.java:50)    
com.netflix.astyanax.connectionpool.exceptions.ConnectionAbortedException: ConnectionAbortedException: [host=127.0.0.1(127.0.0.1):9160, latency=0(0), attempts=1]org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset by peer: socket write error
    at com.netflix.astyanax.thrift.ThriftConverter.ToConnectionPoolException(ThriftConverter.java:193)
    at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:65)
    at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:28)
    at com.netflix.astyanax.thrift.ThriftSyncConnectionFactoryImpl$ThriftConnection.execute(ThriftSyncConnectionFactoryImpl.java:151)
    at com.netflix.astyanax.connectionpool.impl.AbstractExecuteWithFailoverImpl.tryOperation(AbstractExecuteWithFailoverImpl.java:69)
    at com.netflix.astyanax.connectionpool.impl.AbstractHostPartitionConnectionPool.executeWithFailover(AbstractHostPartitionConnectionPool.java:256)
    at com.netflix.astyanax.thrift.ThriftKeyspaceImpl.executeOperation(ThriftKeyspaceImpl.java:485)
    at com.netflix.astyanax.thrift.ThriftKeyspaceImpl.access$000(ThriftKeyspaceImpl.java:79)
    at com.netflix.astyanax.thrift.ThriftKeyspaceImpl$6$3.execute(ThriftKeyspaceImpl.java:355)
    at at.dbeg.cassandra.CasandraTestSuit.testWrite(CasandraTestSuit.java:269)
    at at.dbeg.cassandra.CasandraTestSuit.runTests(CasandraTestSuit.java:168)
    at at.dbeg.cassandra.CasandraTestSuit.main(CasandraTestSuit.java:50)
Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset by peer: socket write error
    at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
    at org.apache.thrift.transport.TFramedTransport.flush(TFramedTransport.java:156)
    at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:65)
    at org.apache.cassandra.thrift.Cassandra$Client.send_insert(Cassandra.java:833)
    at org.apache.cassandra.thrift.Cassandra$Client.insert(Cassandra.java:822)
    at com.netflix.astyanax.thrift.ThriftKeyspaceImpl$6$3$1.internalExecute(ThriftKeyspaceImpl.java:367)
    at com.netflix.astyanax.thrift.ThriftKeyspaceImpl$6$3$1.internalExecute(ThriftKeyspaceImpl.java:358)
    at com.netflix.astyanax.thrift.AbstractOperationImpl.execute(AbstractOperationImpl.java:60)
    ... 10 more
Caused by: java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
    at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
    ... 17 more 

Answer:

I think I finally found the answers. This is not possible in a Cluster Context without an own HostSupplier. The easiest way to solve this problem, is to iterate over all keyspaces in the cluster and use the logic of the RingDescribeHostSupplier to find all hosts.

If this HostSupplier is used and set in the AstyanaxContext, then the expected behavior is shown.