Hot questions for Using Cassandra in logging

Question:

I'm always getting the following error.Can somebody help me please?

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.datastax.spark.connector.japi.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:24)
    at com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions(CassandraStreamingJavaUtil.java:55)
    at SparkStream.main(SparkStream.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 20 more

When I compile the following code. I've searched the web but didn't find a solution. I've got the error when I added the saveToCassandra.

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

/**
 * Created by jonas on 10/10/16.
 */
public class SparkStream implements Serializable{
        public static void main(String[] args) throws Exception{
        SparkConf conf = new SparkConf(true)
                .setAppName("TwitterToCassandra")
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host", "127.0.0.1")
                .set("spark.cassandra.connection.port", "9042");
;
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(5000));

        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        Set<String> topics = Collections.singleton("Test");

        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(
                ssc,
                String.class,
                String.class,
                kafka.serializer.StringDecoder.class,
                kafka.serializer.StringDecoder.class,
                kafkaParams,
                topics
        );

        JavaDStream<Tweet> createTweet = directKafkaStream.map(s -> createTweet(s._2));


        CassandraStreamingJavaUtil.javaFunctions(createTweet)
                .writerBuilder("mykeyspace", "rawtweet", mapToRow(Tweet.class))
                .saveToCassandra();

        ssc.start();
        ssc.awaitTermination();

    }


    public static Tweet createTweet(String rawKafka){
        String[] splitted = rawKafka.split("\\|");
        Tweet t = new Tweet(splitted[0], splitted[1], splitted[2], splitted[3]);
        return t;
    }
}

My pom is the following.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.company</groupId>
    <artifactId>Sentiment</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>twitter4j.org</id>
            <name>twitter4j.org Repository</name>
            <url>http://twitter4j.org/maven2</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.0.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>



        <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10 -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.10</artifactId>
            <version>1.6.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.9.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-core</artifactId>
            <version>[4.0,)</version>
        </dependency>

        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>4.0.4</version>
        </dependency>

        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-async</artifactId>
            <version>4.0.4</version>
        </dependency>
    </dependencies>


</project>

Answer:

org.apache.spark.Logging is available in Spark version 1.5.2 or lower version. It is not in the 2.0.0. Pls change versions as follows

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>1.5.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>1.6.2</version>
    </dependency>

Question:

is there any easy way to turn on query logging on cassandra through xml configuration? I'm using namespace:

xmlns:cassandra="http://www.springframework.org/schema/data/cassandra"

but I can't find any suitable solution. I was trying to turn on trace through cqlsh, but it dosen't work for my app.

I was trying also to add line:

<logger name="com.datastax.driver.core.QueryLogger.NORMAL" level="TRACE" />

But also doesn't work.

My versions: spring-data-cassandra-1.4.0 cassandra: 2.1.5


Answer:

Add a QueryLogger @Bean and get the Cluster @Autowired in:

@Bean
public QueryLogger queryLogger(Cluster cluster) {
    QueryLogger queryLogger = QueryLogger.builder()
            .build();
    cluster.register(queryLogger);
    return queryLogger;
}

(+ obviously configure QueryLogger.Builder as required).

Don't forget to set log levels to DEBUG/TRACE in your application.yml:

logging.level.com.datastax.driver.core.QueryLogger.NORMAL: DEBUG
logging.level.com.datastax.driver.core.QueryLogger.SLOW: TRACE

VoilĂ !

Question:

I'd like to change the log level of Datastax driver's logger but after many tries I can't figure it out...

Here's the class I use :

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Metadata;

import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.MappingManager;

public class CassandraSession {

/**
 * CassandraSession singleton
 */
private static CassandraSession INSTANCE = null;
/**
 * The Cassandra Cluster
 */
private static Cluster cluster;
/**
 * The Cassandra Session
 */
private static Session session;
/**
 * MappingManager is used to create Cassandra mappers
 */
private static MappingManager manager;
/**
 * LOGGER
 */
private static final Logger LOGGER = Logger.getLogger(CassandraSession.class);
/**
 * Keyspace Name
 */
private static final String KEYSPACE = "MY_KEYSPACE";

/**
 * CassandraSession
 */
private CassandraSession() {
    initialize();
}

/**
 * This method initializes the connection with the Cassandra Database
 */
private void initialize() {
    cluster = Cluster.builder().withClusterName("TestCluster").addContactPoints("127.0.0.1").withPort(9042).build();
    final Metadata metadata = cluster.getMetadata();
    LOGGER.info("Connected to cluster: " + metadata.getClusterName());
}

/**
 * Get the instance of the singleton CassandraSession
 *
 * @return
 */
public static synchronized CassandraSession getInstance() {
    if (INSTANCE == null) {
        INSTANCE = new CassandraSession();
    }
    return INSTANCE;
}

/**
 * Get the Cassandra Session
 *
 * @return
 */
public Session getSession() {
    if (session == null) {
        session = cluster.connect(KEYSPACE);
    }
    return session;
}

/**
 * Get the Cassandra MappingManager
 *
 * @return
 */
public MappingManager getManager() {
    if (manager == null) {
        manager = new MappingManager(session);
    }
    return manager;
}

}

I tried to put a log4j.properties file in src/main/resources, change the log level programmatically, nothing is changing. I'am still getting the following traces :

11:39:48.762 [http-bio-8080-exec-6] DEBUG c.d.driver.core.SystemProperties - com.datastax.driver.NEW_NODE_DELAY_SECONDS is undefined, using default value 1
11:39:48.768 [http-bio-8080-exec-6] DEBUG c.d.driver.core.SystemProperties - com.datastax.driver.NON_BLOCKING_EXECUTOR_SIZE is undefined, using default value 8
11:39:48.770 [http-bio-8080-exec-6] DEBUG c.d.driver.core.SystemProperties - com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS is undefined, using default value 60
11:39:48.812 [http-bio-8080-exec-6] DEBUG com.datastax.driver.core.Cluster - Starting new cluster with contact points [/127.0.0.1:9042]
11:39:48.827 [http-bio-8080-exec-6] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
11:39:48.924 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
11:39:48.924 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
11:39:48.924 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
11:39:48.925 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true
11:39:48.926 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent - Platform: Windows
11:39:48.926 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent - Java version: 8
11:39:48.926 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false
11:39:48.926 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available
11:39:48.927 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false
11:39:48.929 [http-bio-8080-exec-6] DEBUG i.n.util.internal.PlatformDependent - Javassist: unavailable

What I can see from those traces is that sl4j is used as default logging framwork. Then how can I tell Datastax's driver to user my logger (whose properties are defined by my server).

Server used : Apache TomEE Plume 1.7.2

Datastax driver version : 2.1.9

Cassandra version : 2.2.1

Thanks.


Answer:

Thank you for your answer Olivier but this dependency already was declared in my pom.xml (I saw the page you linked before asking the question).

But it's OK I found the solution. Actually the problem was about a conflict between slf4j bindings, I saw it in TomEE's logs.

I had this binding in my pom.xml :

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.12</version>
</dependency>

And another binding in this library :

<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-all</artifactId>
    <version>2.2.1</version>
</dependency>

concerning "logback-classic.jar"

Then the solution is to exclude it :

<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-all</artifactId>
    <version>2.2.1</version>
        <exclusions>
            <exclusion>
                <artifactId>logback-classic</artifactId>
                <groupId>ch.qos.logback</groupId>
            </exclusion>
        </exclusions>
</dependency>

Thanks.

Question:

Please help for my problem. I use Ubuntu 14.04, Java 8, Cassandra 2.1.3. Sorry I'm beginner. How I can extract and deserialize commitLog from dir: /var/lib/cassandra/commitlog? As far as I understand we may use lib for Maven cassandra-all, where is method CommitLogReplayer.recover? If I start recover commitLog, for example:

CommitLogReplayer.recover(new File("/var/lib/cassandra/commitlog/CommitLog-4-1453270991708.log")); 

I get exception:

Expecting URI in variable: [cassandra.config].  Please prefix the file with file:/// for local files or file://<server>/ for remote files. Aborting. If you are executing this from an external tool, it needs to set Config.setClientMode(true) to avoid loading configuration.
Fatal configuration error; unable to start. See log for stacktrace.

Config.setClientMode(true); - don't helped me.

What I am wrong?


Answer:

With setClientMode(true), there will be no context to deserialize the data unless you do something to create the cfmetadata yourself. Even then this is a pretty non trivial project since the applying of mutations is tightly coupled with C*. That said this isn't impossible, but you wont be able to use the CommitLogReplayer since it tries to apply the mutations. Its a good place to start looking to write your own though (up to CommitLogReplayer.replayMutation you can reuse a lot).

Question:

I am using Spring Boot Data Cassandra in my spring boot project, I am trying to log CQL queries but setting spring.jpa.show-sql to true is not working. here is my application.properties

spring.data.cassandra.keyspace-name=sample
spring.data.cassandra.contact-points=127.0.0.1
spring.data.cassandra.port=9042
spring.data.cassandra.schema-action=CREATE_IF_NOT_EXISTS
spring.jpa.show-sql=true

Answer:

It's possible using a LatencyTracker

@SpringBootApplication
public class Application implements CommandLineRunner {

    @Autowired
    public Application(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public void run(String... strings) throws Exception {

    cluster.register(new LatencyTracker() {
                @Override
                public void update(Host host, Statement statement, Exception exception, long newLatencyNanos) {
                    System.out.println(statement);
                }

                @Override
                public void onRegister(Cluster cluster) {

                }

                @Override
                public void onUnregister(Cluster cluster) {

                }
            });
    }

}

Question:

I'm using DataStax Cassandra on Windows 10 and accessing in Java. The log messages are appended to Eclipse console. I need to out them to the logger. I tried to update the logbakc and logback-tools xml files but got no effect. I changed the level of STDOUT appender in logback to ERROR to print only the errors, the restarted the service but got no effect, all the messages are kept displayed in the console

appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
      <level>ERROR</level>
    </filter>
    <encoder>
      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
    </encoder>
  </appender>

Any advise?


Answer:

Instead of using consoleAppender(which appends on the console) use a file appender like this

< configuration >

<appender name="FILE" class="ch.qos.logback.core.FileAppender">

<file>testFile.log< /file>

<append>true< /append>

</appender>   
<root level="ERROR">
  <appender-ref ref="FILE" />
 </root>
</configuration>`

You can read more about different appenders here : http://logback.qos.ch/manual/appenders.html