Hot questions for Using RxJava 2 in asynchronous

Top Java Programmings / RxJava 2 / asynchronous

Question:

I have a question. Is possible run two tasks asynchronously in single threaded environment in RxJava? I know that Java should contain library for this functionality, but i think that RxJava does not contain it.


Answer:

Of course it contains single threaded asynchronous processing at it contains any thread count processing.

Example

Flowable.fromCallable(() ->{
            // do something
        })
                .subscribeOn(Schedulers.single());

Alternatives to Schedulers.single() is Schedulers.from(Executors.newFixedThreadPool(1)) where you can specify thread pool

Question:

Let's say I have this synchronous method:

public FruitBowl getFruitBowl() {
    Apple apple = getApple(); // IO intensive
    Banana banana = getBanana(); // CPU intensive
    return new FruitBowl(apple, banana);
}

I can use the Java concurrency API to turn it into an async method, which would turn out somewhat like this:

public Future<FruitBowl> getFruitBowl() {
    Future<Apple> appleFuture = getAppleAsync(); // IO intensive
    Future<Banana> bananaFuture = getBananaAsync(); // CPU intensive
    return createFruitBowlAsync(appleFuture, bananaFuture); // Awaits appleFuture and bananaFuture and then returns a new FruitBowl
}

What is the idiomatic Rx way of doing this while taking advantage of it's schedulers (io and computation) and return a Single?


Answer:

You can use the zip operator. And for each of the async operation define a different thread. If you don't do so, the methods will be executed one after the other, on the same thread.

I would create an Observable version of both methods, in order to return respectively Observable<Apple> and Observable<Banana> and use them in this way:

Observalbe.zip(getAppleObservable().subscribeOn(Schedulers.newThread()), 
               getBananaObservable().subscribeOn(Schedulers.newThread()),
     (apple, banana) -> new FruitBowl(apple, banana)))
     .subscribe(/* do your work here with FruitBowl object */);

Here more details about how to parallelize operations with zip operator

Question:

I have next code wrapped in AsyncTask class

        AsyncTask<Void, Void, Void> task = new AsyncTask<Void, Void, Void>() {
        @Override
        protected Void doInBackground(Void... params) {
            try {
                if (NetworkUtils.isNetworkConnected(mContext))
                    mIndicatorTable.sync().blockingGet();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } finally {
                try {
                    final MobileServiceList<IndicatorModel> results = mIndicatorTable.table().read(null).blockingGet();
                    if (isViewAttached()) {
                        getMvpView().refreshIndicatorList(results);
                        getMvpView().hideLoading();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return null;
        };

mIndicatorTable.sync() and mIndicatorTable.table().read(null) both returns Single<Boolean>. mIndicatorTable.sync() - syncing local storage with remote one, if network is unavailable then we just read from local storage using mIndicatorTable.table().read(null) when network is ready - we execute syncing and then read from local storage (we don't care if syncing canceled or interrupted). And after all we should call View to refresh RecycleView. How it can be implemented with RxJava2?

#UPDATE

Working version

getDataManager().syncAll()
     .onErrorResumeNext(Single.just(false))
     .flatMap(list -> toSingle(mIndicatorTable.getTable().read(null)))
     .subscribeOn(Schedulers.newThread())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(results -> {
        if (isViewAttached()) {
            getMvpView().refreshIndicatorList(results);
            getMvpView().hideLoading();
        }
     })

Answer:

rx-java make chaining async call easy by just calling .flatMap().

mIndicatorTable.sync()
        .flatMap(new Function<Boolean, SingleSource<MobileServiceList<IndicatorModel>>>() {
            @Override
            public SingleSource<MobileServiceList<IndicatorModel>> apply(Boolean aBoolean) throws Exception {
                return mIndicatorTable.table().read(null);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(...)

But you want to proceed the sync() call in another thread and RecycleView could only be accessed from main thread. That's where rx-android come in handy. .subscribeOn(Schedulers.io()) instantiate the stream in another thread and .observeOn(AndroidSchedulers.mainThread()) ensure that statements below this line (here, .subscribe(...)) are executed in the main thread.

Question:

I need to write subsequent requests in my Android project. So first an API request is made and when the asynchronous response comes back, the response are used in the second request, and so on.

I've been studying the RxJava2 library but I haven't comprehended fully yet. Furthermore, the RxJava code will be in an Interactor class, that will call functions that reside in a repository, so I don't want to write out code directly inside the RxJava2 code, but call functions from another class. A GitHub repo that cover these areas would be very useful for me.


Answer:

The flatMap operator is a canonical way for specifying continuations that depend on the result(s) of a previous source(s):

retrofitAPI.getData(params)
.flatMap(data -> 
     retrofitAPI.getMoreData(data)
     .flatMap(moreData -> retrofitAPI.getEvenMoreData(data, moreData))
)

Question:

I am wrapping some async code using Observable.create(...). The async code needs to be "active" for five seconds, and then it should be stopped. This is how I am currently stopping it:

Observable.<MyObject>create(emitter -> {

    // Some async code

    emitter.setDisposable(Disposables.fromRunnable(() -> {
        // Stop the above async code
    }));

    // Wait for 5 seconds until the async code above has had enough time to finish its task.
    Thread.sleep(5000);

    if (!emitter.isDisposed()) {
        // Stop the above async code
        emitter.setDisposable(null);
    }
})...

I'm not sure if this is the best way because Thread.sleep(5000) is used. Is there a better way to do this with some RxJava code?


Answer:

There exist a timed overload of take that allows you to have items from a source until the specified time elapses:

Observable.<MyObject>create(emitter -> {

    // Some async code

    emitter.setDisposable(Disposables.fromRunnable(() -> {
        // Stop the above async code
    }));
})
.take(5, TimeUnit.SECONDS);

You still have to setup the Disposable in create.

Question:

I implemented a service that asynchronously fetches data from an eyetracker and provides a PublishSubject that anyone who is interested can subscribe to to get a stream of the latest eyetracking events.

What i have no clue about is how to get these events into my GUI thread (since eyetracking is pretty useless without the information of what is being displayed) and how to throttle the received events enough so the GUI does not lag because of blocking the thread.

Can someone give me a hint on how to do that using RxJava?


Answer:

Reactive extensions offer something called Schedulers. They instruct a stream to change threads on which they either produce or observe items. You can set them through operators subscribeOn, which modifies the stream so that your producer emits its items ( in other words, calls onNext method on its subscribers ), or through operator called observeOn which tells the stream to schedule its observing work on specified scheduler.

getSomeObservable() // Observable does its work on computation scheduler
    .observeOn(Schedulers.io()) // instructs to switch every afterwards operator to observe items on io scheduler
    .map(...) // this callback is called on a thread provided by io scheduler
    .filter(...) // and this as well
    .observeOn(Schedulers.newThread()) // switch to new thread scheduler
    .doOnNext() // new thread
    .subscribeOn(Schedulers.computation()) // instructs to produce items on computational scheduler
    .subscribe(...) // still scheduling this task to new thread scheduler

Usually frameworks with some level of support for rx have some Scheduler implementation that performs work on UI thread ... Android for example has a library which provides AndroidSchedulers.mainThread() which is exactly what you are looking for, only in your framework

Question:

I am looking to make a series of function calls with intervals between each call. Can assume each function call to be a print statement printing a unique String. Assume we want to wait for 500 ms between each function call. How can I go about with this in RxJava?


Answer:

Found a solution to this by using a Completable that executes an Action.

A sample is below:

    println("33")
    Completable.fromAction { println("75") }
            .delay(5, TimeUnit.SECONDS)
            .andThen(Completable.fromAction { println("82") })
            .delay(5, TimeUnit.SECONDS)
            .andThen(Completable.fromAction { println("93") })
            .delay(5, TimeUnit.SECONDS)
            .andThen(Completable.fromAction { println("101") })
            .subscribe()

The above code prints 33 and 75 immediately, and then waits 5 seconds, prints 82, waits 5 seconds, ...and so on.