Hot questions for Using RxJava 2 in android room

Top Java Programmings / RxJava 2 / android room

Question:

I am working on the application, where on the application startup I am downloading categories and posts from Rest service for storing them in the SQLite database. I have a few questions:

  1. How can I determine which object is Category and which Post? Or how can I access them at all? objects variable is pretty strange.
  2. Where I should put the code for inserting items in the database using Room library?
  3. I need to download images for each post, where I should do so?

Code:

ItemsApi client = this.getClient(); // Retrofit2

List<Observable<?>> requests = new ArrayList<>();
requests.add(client.getCategories());
requests.add(client.getPosts());

Observable<Object> combined = Observable.zip(
        requests,
        new Function<Object[], Object>() {
            @Override
            public Object apply(Object[] objects) throws Exception {
                Timber.d("Length %s", objects.length); // Length 2
                Timber.d("objects.getClass() %s", objects.getClass()); // objects.getClass() class [Ljava.lang.Object;

                return new Object();
            }
        });

Disposable disposable = combined.subscribe(
        new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
               Timber.d("Object %s", o.toString());
            }
        },

        new Consumer<Throwable>() {
            @Override
            public void accept(Throwable e) throws Exception {
                Timber.d("error: %s", e.toString());
            }
        }
);


private ItemsApi getClient() {
    Retrofit.Builder builder = new Retrofit
            .Builder()
            .client(this.getOkHttpClient())
            .addConverterFactory(GsonConverterFactory.create(this.getGson()))
            .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
            .baseUrl(Config.WEBSERVICE_URL_PREFIX);

    return builder.build().create(ItemsApi.class);
}

ItemsApi.class:

public interface ItemsApi {
    @GET("categories")
    Observable<List<CategoryEntity>> getCategories();

    @GET("posts")
    Observable<List<ArticleEntity>> getPosts();
}

Answer:

Here are answers:

1) For parallel requests, you should use Observable.zip, like this

Observable<Boolean> obs = Observable.zip(
    client.getCategories(),
    client.getPosts(), 
    (categoriesList, postsList) -> {
         // you have here both categories and lists
         // write any code you like, for example inserting to db
         return true;
});

Here you have parameters (categoriesList, postsList) each of its types, List and List.

2) You should put your code where I specified in comments. Make sure you have it in correct Thread

3) Downloading images could also be done there. You can have another zip-s in the function, combining parallel downloading images, insertions to db etc. All of them should be observables, combined with zip.

In zip you can combine as many observables, as you wish, their results will be available as combining function's parameters.

Question:

When we use a Flowable to get an update notification after inserting a new data-base row, works fine. But when the insertion is done inside another explicit transaction, the Flowable does not get a notification.

To illustrate the issue, I've forked the BasicRxJavaSample from android-architecture-components and added 2 test-methods to UserDaoTest.java

@Test
public void testFlowable() {
    // When subscribing to the emissions of the user
    final TestSubscriber<User> userTestSubscriber = mDatabase.userDao().getUser().test();

    userTestSubscriber.assertValueCount(0);

    // When inserting a new user in the data source
    mDatabase.userDao().insertUser(USER);

    userTestSubscriber.assertValueCount(1);
}

This works fine. But when I do the same inside of an explicit transaction it does not work:

@Test
public void testFlowableInTransaction() {
    // When subscribing to the emissions of the user
    final TestSubscriber<User> userTestSubscriber = mDatabase.userDao().getUser().test();

    userTestSubscriber.assertValueCount(0);

    // When inserting a new user in the data source
    mDatabase.beginTransaction();
    try {
        mDatabase.userDao().insertUser(USER);
        mDatabase.setTransactionSuccessful();

    } finally {
        mDatabase.endTransaction();
    }

    userTestSubscriber.assertValueCount(1);
    // this fails - the userTestSubscriber is still empty!
}

Note: this example is of course simplified, just to illustrate the issue.

Here's the generated DaoImpl.insertUser():

  public void insertUser(User user) {
    __db.beginTransaction();
    try {
      __insertionAdapterOfUser.insert(user);
      __db.setTransactionSuccessful();
    } finally {
      __db.endTransaction();
    }
  }

We can see that this code uses a transaction and also my test-code uses another transaction. According to the SupportSQLiteDatabase-beginTransaction() docs, nesting transactions should be okay.

Is this maybe a room-bug?

The room version of this project is 1.0.0-alpha3, but I can also see this problem with version 1.0.0-alpha8 (in another project)


Answer:

This was a bug in room: #65471397

I verified that the bug has been fixed with version 1.0.0-rc1

Question:

I have a user table. I need to check if a user with a specific name exists. I am doing this with this code

UserDao

  @Query("SELECT * FROM User WHERE userName = :userName")
  fun getUserByUserName(userName :String) : Observable<User>

Using

  repository.getUserByUserName(input)
                .observeOn(mainThread())
                .subscribe(user -> {

                });

After I subscribe to this Observable, but this only works for the case when such a user exists, but if the user is not in the database, subscribe does not work. How can I handle the situation when the user is in the database and when he is not?


Answer:

You can use Single(or Maybe as Beloo suggested) instead of Observable.

You can try something like this :

repository.getUserByUserName(input)
                .observeOn(mainThread())
                .subscribe(user -> {

                }, 
                throwable -> {
                // catch your exception here.                   
                }));

Question:

I'm using rxjava & Room where I'm trying to update list of rows in db. Stuck in a loop where events are continuously firing

Dao

@Query("SELECT * FROM movies")
Flowable<List<Movie>> getMovies();

@Update
int updateMovie(Movie movie);

UpdateClass - helper classes is updating the Db like this

// Trying to get all existing movies and update one value in all of them.

@WorkerThread
Flowable<Integer> updateMovies(Helper help) {
    return movieDao.getMovies().flatMapIterable(movies -> movies)
        .flatMap(movie -> {
            LogUtils.debug("movieusecase", "movieid" + movie.getMovieId());
            movie.updateRating(help.getUpdatedVal(movie));
            return Flowable.just(movie);
        }).map(movie -> {
           return movieDao.updateMovie(movie);
        });
}

As soon as I include the updateMovie call I get stuck in an infinite loop where duplicate events keep on coming from room db.

Presenter Class - calls update class to update stuff in db. Gets triggered in activity onCreate

updateClass.updateMovies()
    .observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(movie -> {
        // update stuff
    }, throwable -> {
        LogUtils.error(TAG, throwable.getMessage());
    });

Thanks in advance any help would be really appreciated.


Answer:

Sounds like a job for a Single instead of a Flowable.

The difference is that Single will only fire once with the current list of movies in the DB, It will not keep emitting changes like Flowable does.

Single<List<Movie>> getMovies();

Or if you for some reason want your method to return Flowable, You can also use Flowable.firstOrError() which will transform your Flowable into a Single with a cost.

@WorkerThread
Flowable<Integer> updateMovies(Helper help) {
    return movieDao.getMovies()
        .firstOrError()
        .flatMapIterable(movies -> movies)
        .flatMap(movie -> { ... })
        .map(movie -> { ... });
}

I prefer the first option though for the sake of intention clarity.

Question:

I am trying to init data with RxJava2, Dagger2 and Room and MVP in Android, but Android doesn't execute my room callbacks.

This is my method that creates database, and this is what I see in logs:

public synchronized static AppDatabase getInstance(Context context) {
    if (INSTANCE == null) {
        INSTANCE = buildDatabase(context);
   }
   return INSTANCE;
}

private static AppDatabase buildDatabase(Context context) {
    Log.i("jestem", "builddatabase");

    RoomDatabase.Callback callback = new Callback() {
        @Override
        public void onCreate(@NonNull SupportSQLiteDatabase db) {
            Log.i("jestem", "oncreatebuildda22tabase");
            super.onCreate(db);
          Single.just(getInstance(context).initData());
        }

        @Override
        public void onOpen(@NonNull SupportSQLiteDatabase db) {
            getInstance(context).onOpenDb();
            super.onOpen(db);
        }
    };

    return Room.databaseBuilder(context,
            AppDatabase.class,
            "pijatyka.db")
            .addCallback(callback)
            .build();}

public static void onOpenDb() {
    Log.i("jestem", "onopendb");
    hasFinishedCreating = true;

}

public boolean initData() {
    Log.i("Im here", "oncreatebuilddatabase");
    for (Cards card : DataGenerator.initCards()
            ) {
        myDAO().insertCard(card);
    }
    for (Awards award : DataGenerator.initAwards()
            ) {
        myDAO().insertAwards(award);
    }
    hasFinishedCreating = true;
    return true;
}

But I don't see any of this in logs, and I don't know why...inside the function onCreate and onOpen

This is my activity module:

@Module
public class StartModule {

    private final StartActivity activity;


    public StartModule(StartActivity activity) {
        this.activity = activity;
    }

    @Provides
    @StartScope
    public Activity activityProvides(){
        return  activity;
    }

    @Provides
    @StartScope
    public  StartView view(){
        return new StartView(activity);
    }

    @Provides
    @StartScope
    public StartPresenter presenter(StartView view, StartModel model){
        return new StartPresenter(view, model);
    }

    @Provides
    @StartScope
    public StartModel model(AppDatabase database){
        return new StartModel(activity, database);
    }
}

And my application module

@Module
public class MyAppModule {
    private final Context context;

    public MyAppModule(Application application){
        context = application.getApplicationContext();
    }


    @MyAppScope
    @ApplicationContext
    @Provides
    public Context context(){
        return context;
    }


    @MyAppScope
    @Provides
    public AppDatabase appDatabase(@ApplicationContext Context context){
         return AppDatabase.getInstance(context);
    }

    @Provides
    @MyAppScope
    public MyDAO providesMyDao(AppDatabase database){
        return database.myDAO();
    }

}

Answer:

The code from just won't be executed until you subscribe to the Single object, so change:

Single.just(getInstance(context).initData());

to

Single.just(getInstance(context).initData())
    .subscribeOn(Schedulers.io())   // init Data off main thread
    .subscribe();                   // pass some results handler, if you need

Example of result handlers:

Single.just(getInstance(context).initData())
        .subscribeOn(Schedulers.io())
        .subscribe(result -> {
            if(result)
                System.out.println("Init successful");
            else
                System.out.println("Init unsuccessful");
        }, error -> {
            System.out.println("Error occurred: " + error);
        });