Hot questions for Using RxJava 2 in mongodb

Question:

I have a User model which embed an Caracteristic model

public class User {
    private List<Caracteristic> cracteristics;
}

Using Flowable and rxJava i want to get a list of all Caracteristics of all users i.e. to transform a Single<List<User>> to Single<List<Caracteristics>>

@Controller("cars")
 public class CaracteristicController() {

     public Single<List<User> list() {
          return Flowable.fromPublisher(
                   getCollection().find(exists("caracteristics")))
                   .toList();
     }

     private MongoCollection<User> getCollection() {
         return dbClient
            .getDatabase(dbConfiguration.getDatabaseName())
            .getCollection(dbConfiguration.getCollectionName(), User.class);
     }

 }

User model is a document in a MongoDB database

{
  "name" : "name",
  "caracteristics" : [
    {
    "desc" : "carac 1",
    "ref" : "ROO12"
  },
  {
    "desc" : "carac 2",
    "ref" : "ROO14"
  }]
}

I don't know how to manipulate the Flowable or the Single to return a list of Caracteristic objects instead of users.


Answer:

If you want to get a list of all Characteristics of all users you can achieve it like this:

Single<List<User>> users = ...;
Single<List<Characteristics>> characteristics = users
    .flatMapObservable(Observable::fromIterable)
    .flatMapIterable(user -> user.characteristics)
    .toList();

Question:

I'm studying reactive programming with RxJava2 and I have a question about its usage with an async database driver like MongoDB.

If I use blocking MongoDB driver to get a collection the approach would be this:

public class MyDao{
   ...
   public Document getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      return collection.find().first();
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return Observable.just(myDao.getFirstDocument(collectionName)); 
   }
}

Instead, working with the async Driver of MongoDB, my return type for a read operation is a void (and not a Document, or a Future)with a callback method inside, for example:

collection.find().first(
        (document, throwable) -> {
            myService.myCallback(document);
        }
);

So, how can I pass my Observable Documents to MyService?

public class MyDao{
   ...
   public void getFirstDocument(String collectionName){
      MongoCollection<Document> collection = database.getCollection(collectionName);
      collection.find().first(
        (document, throwable) -> {
            //SOME SORT OF CALLBACK
        }
     );
   }
}



public class MyService {
   ...
   public Observable<Document> getFirstOf(String collectionName){
       return ??????? 
   }
}

Answer:

When you are using Observable.just() in

public Observable<Document> getFirstOf(String collectionName){
    return Observable.just(myDao.getFirstDocument(collectionName)); 
}

it equals to next code

public Observable<Document> getFirstOf(String collectionName){
    Document doc = myDao.getFirstDocument(collectionName);
    return Observable.just(doc); 
}

You can notice that it's not async code and request to DB is performed on calling thread. To make that code async you need to rewrite it like that

public Observable<Document> getFirstOf(String collectionName){
    return Observable.fromCallable(() -> myDao.getFirstDocument(collectionName)); 
}

If you are using async MongoDB driver and would like to wrap it in Observable, you can write in that way

public Observable<Document> getFirstDocument(String collectionName) {
    return Observable.create(emitter -> {
        MongoCollection<Document> collection = database.getCollection(collectionName);
        collection.find().first((document, throwable) -> {
            if(document != null) {
                emitter.onNext(document);
                emitter.onComplete();
            } else if(throwable != null) {
                emitter.onError(throwable);
            }
        });
    });
}