Hot questions for Using RxJava 2 in concurrency

Question:

I'm trying to query an API that provides me with a list of files to download (as follows). I then proceed to download these files while also re-querying the API for anything that may have been missed in the initial call.

Completable#mergeDelayError(Iterable<? extends CompletableSource> sources) is used to ensure I can perform multiple tasks in parallel and get notified when everything has been completed.

fun fetchAndDownload(details: List<String>): Completable = 
    exampleApi.fetchPackages(details) // This is a Single
        .flatMapCompletable { (results, retry) -> 
            val completables = mutableListOf<Completable>()
            results.mapTo(completables) { value ->
                exampleApi.download(value).subscribeOn(Schedulers.io())
            }

            if (retry.isNotEmpty()) { 
                completables += fetchAndDownload(retry)
                    .delay(3L, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())
            }
            Completable.mergeDelayError(completables)
        }

However, this implementation has the possibility of overwhelming the network and/or thread count by executing too many things at once. I am therefore wondering what the best approach would be to limit the number of completables that are running at once.

I am aware of Completable#mergeDelayError(Publisher<? extends CompletableSource> sources, int maxConcurrency) but am not sure how to convert my List<Completable> to the required Publisher. An alternative solution is to provide a custom Scheduler that has a maximum thread count, but I am also not sure how to provide such a Schduler (that I can clean and discard when it is no longer needed).


Answer:

The simplest approach is to use Floable.fromIterable to convert the List of Completable to a Publisher.

This will allow for the use of Completable#mergeDelayError(Publisher<? extends CompletableSource> sources, int maxConcurrency)

Question:

I'm trying to do the following:

                           A
                           |
                           |
                           V
                     Observable<B>
                           /\
                          /  \
                         /    \
                        V      V
            Observable<C>       Observable<D>
                        \      /
                         \    /
                          V  V
                      Observable<E>
  1. Given an input [A], an async call returns [B].
  2. Two tasks that each need [B] need to run in parallel and return [C] and [D] respectively.
  3. The two results are combined into [E], which is then shown in the UI.

I'm new to RxJava and have come across zip, merge, etc., but don't really understand what operators are required for this kind of problem. Any help will be highly appreciated.

PS. 1) While [C] and [D] are both required, [E] can still be created with only one of them. So, it would be nice to have a timeout at this point in case one (or both) of them fail(s). 2) Is it possible to have them run in specific threads - one in computation() and the other in io()?

Here's a conceptual code that I have at the moment. I do it linearly so: A -> B -> C -> D -> E

    return a2b(a)
            .subscribeOn(Schedulers.io())
            .flatMap(this::b2c)
            .subscribeOn(Schedulers.computation())
            .map(this::c2d)
            .map(this::d2e)
            .cast(E.class)
            .startWith(e -> new E.loadingState());

Ideally, I should use the following function somewhere:

Observable<E> cd2e(C c, D d) {
    return Observable.just(new E());
}

Thanks.


Answer:

The publish() operator binds a single observable in a way that allows multiple subscriptions.

return a2b(a)
        .subscribeOn(Schedulers.io())
        .publish( bObservable -> 
               Observable.zip( bObservable.map( b -> this::b2c ),
                               bObservable.map( b -> this::b2d ),
                               (c, d) -> combine( c, d ) )
        .subscribe( ... );

The operator binds the observer chain such that multiple subscriptions can be made; in this case the subscriptions are zipped together, combining the C and D types into the combined E type.

You are then free to add observeOn() operators to have the computation done on the threads you desire.

Question:

I have 3 Observables, the output of the first observable is required by the second observable. And the output the both first and second observable is required by the third observable.

    Observable<String> observableOne = Observable
        .just("{1}")
        .map(v -> {
            System.out.println("Executing Observable 1");
            return v;
        });

    Observable<String> observableTwo = observableOne
        .map(observableOneValue -> {
            System.out.println("Executing Observable 2");
            return "{2"+observableOneValue+"}";
        });

    Observable.zip(
        observableOne,
        observableTwo,
        (observableOneValue, observableTwoValue) ->
        {
            System.out.println("Executing Observable 3");
            return "{3"+observableOneValue+observableTwoValue+"}";
        }
    ).blockingSubscribe(System.out::println);

This repeats the execution of the first observable, I can certainly make the first observable cacheable. But I was wondering if there is a better option than that, particularly I am looking for some kind of message passing construct from the first to the other two observables


Answer:

I am not sure exactly what you are looking for by a "message passing construct". cache will work for you example above, but you mention you don't want to use that.

Another option that might fit your use case is using ConnectableObservable. It only starts emitting items when you call connect on it, not when it is subscribed to. Convert your observableOne to a ConnectableObservable by calling publish. Then set up all your subscribers. Then call connect on observableOne.

ConnectableObservable<String> observableOne = Observable
        .just("{1}")
        .map(v -> {
          System.out.println("Executing Observable 1");
          return v;
        }).publish();

    Observable<String> observableTwo = observableOne
        .map(observableOneValue -> {
          System.out.println("Executing Observable 2");
          return "{2"+observableOneValue+"}";
        });

    Observable.zip(
        observableOne,
        observableTwo,
        (observableOneValue, observableTwoValue) ->
        {
          System.out.println("Executing Observable 3");
          return "{3"+observableOneValue+observableTwoValue+"}";
        }
    ).subscribe(System.out::println);

    // Call when all the subscribers are ready --
    observableOne.connect();

Notes

  • observableOne is now a ConnectableObservable
  • need to use subscribe instead of blockingSubscribe so the code will execute the connect call.