Hot questions for Using Azure in azure cosmosdb sqlapi

Top Java Programmings / Azure / azure cosmosdb sqlapi

Question:

We have implemented an Azure CosmosDB (MongoDB with SQL API) database in the cloud. Through java, we'd like to generate reports based on the data hiding in the MongoDB. I'm not yet too happy with the performance of my read queries, and I was wondering what can be improved to my current setup.

Like said, I use Java to query the database. I use the Microsoft Azure DocumentDB library to query the database:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-documentdb</artifactId>
    <version>1.16.2</version>
</dependency>

Currently, the best performance I have been able to get, was to query around 38.000 documents in memory in around 20 seconds, with 50,000 RU/s configured (local cosmos emulator). I would really like this improved, because we might query millions of documents soon.

I have the feeling that the way we store the data, might not be optimal. Each document now look as follows:

{
    "deviceid": "xxx",
    "devicedata": {
        "datetime": "2018-08-28T00:00:02.104Z",
        "sensors": [
            {
                "p_A2": "93095",
                "p_A3": "303883",
                "p_batterycurrent": "4294967.10000",
                "p_batterygauge": "38.27700",
                "p_batteryvoltage": "13.59400",
                ** ... around 200 more key - value pairs ... **
            }
        ]
    },
    "id": "aa5d3cf5-10fa-48dd-a0d2-a536284eddac",
    "_rid": "PtEIANkbMQABAAAAAAAAAA==",
    "_self": "dbs/PtEIAA==/colls/PtEIANkbMQA=/docs/PtEIANkbMQABAAAAAAAAAA==/",
    "_etag": "\"00000000-0000-0000-4040-006a7f2501d4\"",
    "_attachments": "attachments/",
    "_ts": 1535619672
}

A query that we would use a lot, would look as follows:

SELECT c.deviceid, 
    c.devicedata.datetime, 
    c.devicedata.sensors[0].p_A2, 
    c.devicedata.sensors[0].p_A3,
    c.devicedata.sensors[0].p_batterycurrent,
    c.devicedata.sensors[0].s_humidity 
FROM c 
WHERE c.deviceid = 'xxx'
    AND c.devicedata.datetime >= '2018-08-28T00:00:00.000Z' 
    AND c.devicedata.datetime < '2018-08-30T00:00:00.000Z' 
order by c.devicedata.datetime desc

I cut these queries per deviceId. So per device, I run a thread with this query. This seems to go a lot faster than a single thread with a single query.

Such a query as above would take us around 20 seconds.

I have noticed however, if I only query on the deviceid and devicedata.datetime, the query is done within 2 seconds. It seems that getting the sensor data out of the sensor list is a really tough cookie. If I do a select * (so no filtering on the sensor data), it is also faster than when I let the SQL API filter out the sensors: around 15 seconds.

My question is, what can I do to improve upon this? Is my document list too long? is there any way I can set this up differently? The sensor key value pairs are not fixed, and can differ per device.

Some more technical details: I have an unlimited collection, partitioned on /deviceid. I have used the standard index policy of Azure (which is index everything), as well as excluding the sensors from it.

I have tried all the tips as described here: https://docs.microsoft.com/en-us/azure/cosmos-db/performance-tips-java

This is my current Java setup, although I have tried lots of different things:

//This piece of code is currently in a seperate thread. There is one thread per deviceId to query
documentClient = new DocumentClient(HOST, MASTER_KEY,
                 ConnectionPolicy.GetDefault(), ConsistencyLevel.Session);

FeedOptions options = new FeedOptions();
options.setEnableCrossPartitionQuery(true);

documentList = documentClient
    .queryDocuments(getAlldataCollection().getSelfLink(), query, options)
    .getQueryIterable().toList();

I'm fairly sure MongoDB can query hundreds of thousands of documents within seconds, so I'm pretty sure I'm doing something wrong with my current setup.

Any suggestions?


Answer:

I cannot provide a definite solution to your problem, but hopefully give you ideas to get to a solution with desired performance level.

NoSql a good fit?

First, to get this off the table, are you sure your scenario is a good fit for noSQL? CosmosDB shines when the primary scenario is working with pinpoint data (create, select-by-id, update-by-id, delete-by-id). Yes, it definitely can do limited mass operations and aggregations, but querying millions is pushing it. SQL on the other had is designed to work with large sets of data and is really good in doing aggregations.

Let's assume this design decision was carefully weighted and noSQL is the best fit for unmentioned reasons.

Debug for hard data

Don't do performance tests against local cosmosDB emulator. Don't. That's obviously not the real thing (consider network, storage bandwidth/seek times, system impact), but only emulates it. You could get very misleading results. Spin up a real test instance.

First step to debugging your query performance problems would be to enable query-execution-metrics and see where those 20 seconds are actually spent.

Also, loading 38000 documents most likely will never arrive in single batch, check how many continuation queries are actually made to the cosmosDB server.

Also, run a profiler and make sure the bottleneck is really in the CosmosDB. If you are making many continuation calls AND concurrently querying over many devices then that may be a lot happening in client as well, and queries flying on the network. Make sure you are not throttled in client (GC, Http stack, internal locking, connection/thread pools, etc).

Data/Query design
Reduce queried data

If you already know deviceid, then don't query for it 38000+ times - that's just ballast.

Reduce model object size

/* around 200 more key - value pairs */

That's a huge object. I would test if splitting it up to smaller objects would help cosmosDB to spend less time internally loading and processing documents. Ex:

{
    "p_A2": "93095",
    "p_A3": "303883",
    "battery" : {
        "current": "4294967.10000",
        "gauge": "38.27700",
        "voltage": "13.59400"
    }
   ...
}

Not sure how docDB is internally storing the documents (full graph vs subdocuments) but you could test if it makes an impact. The difference of 2s vs 20s is so huge that it hints that it may be relevant.

Sensors array?

The query only queries for the first first measurement set. Is the array necessary? You could test if omitting this level has any performance impact.

Data types in model

battery_current etc are storing sensor measurement numerical values as longish strings. If they are always numbers, then you could store them as numbers instead and reduce document size in server & client. Client performance would probably impacted more (string = heap allocation). Ex: "4294967.10000" is 13 chars = 26B in client (UTF-16).

App design

Do you really need all those 38000 or millions of documents every time? Consider if you could get by with a subset..

If this is for data movement then consider other options (Data factory, change feed processing) to incrementally transfer measurements. If this is on-request app need then consider loading smaller timeframes (= less documents) and use caching for past timeframes. If you can, pre-aggregate results before caching. Sensor data of past is most likely not going to change.

As always, consider your business case for ROI. Optimization is always possible, but sometimes its more beneficial to adjust a business requirement instead of technical solution.

Question:

I am working on Azure Cosmos DB with SQL Api. I am using Azure SDK from:

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-documentdb</artifactId>
    <version>2.4.7</version>
</dependency>

in order to insert items into collection.

  DocumentClient documentClient = setupDocumentClient();
  ResourceResponse<Document> document = documentClient.createDocument("/dbs/" + databaseName + "/colls/" + colName, entity, new RequestOptions(), true);

I am getting an exception:

com.microsoft.azure.documentdb.DocumentClientException: Message: {"Errors":["Date header doesn't conform to the required format. Please ensure that the time is provided in GMT and conforms to RFC 1123 date-time specifications.","Date header doesn't conform to the required format. Please ensure that the time is provided in GMT and conforms to RFC 1123 date-time specifications."]}

What is weird, is that I was using the same code in another project and it was working without this exception and the data was inserted correctly to the Azure Cosmos DB. Maybe someone has encounter this problem and found solution for it?


Answer:

I have fixed this issue in my code by adding:

<dependency>
    <groupId>joda-time</groupId>
    <artifactId>joda-time</artifactId>
    <version>LATEST</version>
</dependency>

after this I run mvn clean install and the Exception disappeared.

Apparently one of the other dependencies of my project was referring to an old dependency of joda-time.

Question:

I am trying to run 2 instances of ChangeFeedProcessor , both pointing towards the same Collection and using the same lease collection in a Cosmos account. I have specified unique hostName in both the instances

My intention is that the Feed load gets distributed amongst the instances according to the logical partitions (as per Microsoft documentation)

When i try to get the 2nd instance up, i get the following exception in console.

Is there any different way to achieve this?

Exception in thread "pool-23-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl$1.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-19-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115) at reactor.core.publisher.MonoRunnable.block(MonoRunnable.java:66) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionSupervisorImpl$1.run(PartitionSupervisorImpl.java:89) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Exception in thread "pool-25-thread-3" java.lang.NullPointerException at com.azure.data.cosmos.internal.changefeed.implementation.ExceptionClassifier.classifyClientException(ExceptionClassifier.java:56) at com.azure.data.cosmos.internal.changefeed.implementation.PartitionProcessorImpl.lambda$run$0(PartitionProcessorImpl.java:115)...etc

I have used the below maven dependency

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-cosmos</artifactId>
    <version>3.0.0</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-api</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>guava</artifactId>
            <groupId>com.google.guava</groupId>
        </exclusion>
    </exclusions>
</dependency>

CODE SNIPPET

  1. creating a list of ChangeFeedProcessors (for all the containers found in a database)
        //FEED DATABASE
        CosmosDatabase feedDatabase = cosmosClient.getDatabase(cosmosDbName);

        //LEASE DATABASE
        CosmosDatabase leaseDatabase = cosmosClient.getDatabase(cosmosDbName + LEASES);

        //List of Containers in Feed Database
        List<CosmosContainerProperties> containerPropertiesList = null;
        try {
            Flux<FeedResponse<CosmosContainerProperties>> containers = feedDatabase.readAllContainers();
            List<FeedResponse<CosmosContainerProperties>> list = containers.toStream().collect(Collectors.toList());//Abhishek Optimize
            containerPropertiesList = list.get(0).results();
        }
        catch (Exception e) {
            System.out.println("Fail to query Containers");
            throw new ServiceException("Fail to query Containers");
        }

containerPropertiesList.parallelStream().forEach(cosmosContainerProperties -> {
                //FEED CONTAINER
                String containerName = cosmosContainerProperties.getString("id");
                CosmosContainer feedContainer = feedDatabase.getContainer(containerName);

                //LEASE CONTAINER
                String leaseContainerName = containerName + "-leases";
                CosmosContainer leaseContainer = leaseDatabase.getContainer(leaseContainerName);

                //Building ChangeFeedProcessor for current Container
                ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
                changeFeedProcessorOptions.startTime(OffsetDateTime.now());

                ChangeFeedProcessor changeFeedProcessor = null;
                try {
                    ChangeFeedProcessor.BuilderDefinition builderDefinition = ChangeFeedProcessor.Builder()                           
                            .hostName("Host1")//used Host2 in the other Host
                            .feedContainer(feedContainer)
                            .leaseContainer(leaseContainer)
                            .options(changeFeedProcessorOptions)
                            .handleChanges(docs -> {
                                documentChangeHandler.processChanges(containerName, docs);
                            });
                    changeFeedProcessor = builderDefinition.build();
                }
                catch (Exception e) {
                    System.out.println("Fail to initialize ChangeFeedProcessor for " + containerName);
                }
                resultList.add(changeFeedProcessor);

                System.out.println("processed:  " + leaseContainerName);
            });
  1. The resultList is then returned and ChangeFeedProcessors are started in the below method
public void startChangeFeed() {
        if (null != changeFeedProcessors && !changeFeedProcessors.isEmpty()) {
            changeFeedProcessors.parallelStream().forEach(processor->processor.start().block());
        }
        else {
            System.out.println("changeFeedProcessors list is empty.. probably changeFeedProcessor has not been setup yet");
        }
    }

Answer:

From the comments, the issue is related to VPN/Proxy or something blocking the required port ranges.

Direct Mode,needs a certain port range to be open and configured in the VPN/Proxy/Firewall:

If configuration is not possible, you can switch to Gateway / HTTP mode.

The Change Feed Processor uses a second Leases collection to store state (mainly explained here https://docs.microsoft.com/azure/cosmos-db/change-feed-processor#components-of-the-change-feed-processor along with .NET samples, but the concepts are the same). The current model creates 1 lease per physical partition (I say current model because this implementation can improve in the future for a better distribution), and each lease, can be owned by 1 instance only. So if you have 2 leases, and 2 instances, 1 lease will be owned by each.

Each instance will process the changes in the partition/s based on the lease/s it owns.

A load distribution of 90/10 means that the changes happening in your collection seem to be skewed to happen mainly in one partition (hot partition) and not equally distributed.