Hot questions for Using RxJava 2 in flatmap

Question:

I just watched the conference by Jake Wharton The State of Managing State with RxJava.

He proposes to transform the events from view to action in this way:

Observable<Event> events = RxView.clicks(view).map(__ -> new Event());
ObservableTransformer<Event, Action> action = events -> events.flatMap(/* ... */);
events.compose(action).subscribe();

I would like to know the difference with this implementation:

Observable<Event> events = RxView.clicks(view).map(__ -> new Event());    
Observable<Action> action = events.flatMap(/* ... */);
action.subscribe();

What is the difference between using a compose() with an ObservableTransformer and a simple flatMap() with two Observable?


Answer:

There is a good explanation, from Daniel Lew, about the differences. In short:

The difference is that compose() is a higher level abstraction: it operates on the entire stream, not individually emitted items.

For more details look at the complete explanation in this article (in the section named What About flatMap()?)

Question:

Let's say I have a function that takes a String and a long and returns a Single<String>.

Single<String> stringAddition(String someString, long value) {
  return Single.just(someString + Long.toString(value));
}

Now I have this Observable...

Observable.interval(1, SECONDS)
  .scan("", (cumulativeString, item) -> {
    // Need to return the result of stringAddition(cummulativeString, item)
  });

I'm at a loss on how to do this. Scan needs me to return a String, but II would like to use the method that returns a Single<String>. To me it seems like I need something that can combine the behaviour of both scan and flatMap. Is there any RxJava2 wizardry that can help me?


Answer:

You can achieve it as follows. This can be somewhat shortened if stringAddition would have returned Observable

Observable<String> scanned = Observable.interval(1, TimeUnit.SECONDS)
            .scan(
                    Observable.just(""),
                    (cumulativeString, item) ->
                        cumulativeString
                          .flatMap(str -> stringAddition(str, item).toObservable())
            )
            .flatMap(it -> it);

Question:

In rxjava 1 there Observable had this flatmap method

public final Observable flatMap(Func1 collectionSelector, Func2 resultSelector)

That allowed you to pass/combine the initial result to the flatmap subscriber.

How can I achieve the same result with RxJava2?

I have a Single that emits A, I need to get B based on A and then use both A and B to perform an action.


Answer:

You have the same method on RxJava2, both on Observable and Flowable , but, in both RxJava1 and 2, there is no such operator for Single, you can transform Single to Observable and then apply this operators.

Question:

I'm working with an API of my own and a i'm hoping to chain a few paginated results using RxJava. I use cursor based pagination. (imagine there are 50 users in this first request):

{
    "data":{
        "status":"ok",
        "total":988, //users total
        "has_next_page":true,
        "end_cursor":"AQAxd8QPGHum7LSDz8DnwIh7yHJDM22nEjd",
        "users":[{"id":"91273813",
                "username":"codergirl",
                "full_name":"Code Girl",
                "picture_url":"https://cdn.com/21603182_7904715668509949952_n.jpg",
                },
                ...
                ]
        }
}

Right now, I'm getting the first 50 results like this, using retrofit:

public class DataResponse {
    @SerializedName("end_cursor")
    private String end_cursor;

    @SerializedName("users")
    private JsonArray users;

    @SerializedName("has_next_page")
    private Boolean has_next_page;

    public boolean hasNextCursor(){
        return has_next_page;
    }
    public String endCursor(){
        if (hasNextCursor()){
            return end_cursor;
        }
        return "";
    }
    public JsonArray getUsers(){
        return users;
    }
}

then:

public interface MyService  {
    @GET( "/users")
    Observable<DataResponse> getUsers(
            @Query("cursor") String cursor,
    );
}

and

MyService service = RetrofitClient.getInstance();
service.getUsers()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe( val->  showUsers(val.getUsers())); // getting the first 50 users

The next call should be to "/users?cursor=AQAxd8QPGHum7LSDz8DnwIh7yHJDM22nEjd"

I'd like to return all (in this case 988) users


Answer:

My solution

import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public void getAllUsers(){

    AtomicReference<String> cache = new AtomicReference<>();
    AtomicBoolean hasMore = new AtomicBoolean(true);

    io.reactivex.Observable.just(0)
        // getting the first 50 users
        .flatMap(users1-> service.getUsers( cache.get() ))

        // scheduler
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())

        // re-call variable
        .repeatUntil(() -> !hasMore.get())

        .subscribe(new Observer<DataResponse>() {
            @Override
            public void onSubscribe(Disposable d) { // on subscribe }

            @Override
            public void onNext(DataResponse response) {

                // saving boolean (If there are more users)
                hasMore.set(response.hasNextCursor());

                // saving next cursor
                cache.set(response.endCursor());

                // adding the new 50 users
                addToList(response.getUsers());

            }

            @Override
            public void onError(Throwable e) {// error}

            @Override
            public void onComplete() {// complete}
        });

}

Question:

I'm a newbie in RxJava and I was doing some RESTful programming using RxJava2. I had few API calls to be made which were all independent with respect to each other. What I observed from my use case is that, since the API calls were being async wrt each other and were all returning Observables<String>, and from all the API responses I was doing some computations, So at the time of computation, I didn't had responses from few APIs yet and as such it failed. For all those APIs whose response was not yet received, I was making use of subscribe like the below code :

Observable<String> res = someApiCall(data1,data2); res.subscribe(data -> { //Call Another Method.})

And for the ones for which response was received the code was :

return someApiCall.flatMap(data -> { // Call Another Method})

My Question is : Does using FlatMap makes it Blocking ? How does the 2 flows that i described above differ ? Is Subscribe always in async ?


Answer:

Subscriber in Rx it´s sync by default. The only way to make it run async your pipeline is using subscribeOn or observerOn operators.

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Also the use of flatMap in your case it´s correct, you must use the flatMap in order to compose functions, that call new API with the previous API response information.

And in case some of those API calls are not responding, you can always use timeout in your flatMap operator to do a compensation.

http://reactivex.io/documentation/operators/timeout.html

Question:

I'm relatively new on RxJava2 and I'm getting some weird behaviors, so it's likely that I'm using the tool on the wrong way.

It's a fairly big project, but I've separated the snippet below as a minimum reproducible code:

Observable
  .interval(333, TimeUnit.MILLISECONDS)
  .flatMap(new Function<Long, ObservableSource<Integer>>() {
    private Subject<Integer> s = PublishSubject.create();
    private int val = 0;

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
      val++;
      s.onNext(val);
      return s;
      }
    })
  .subscribe(new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
      Log.w("value: %s", integer);
     }
  });

This code simulates events from my rx-stream using an .interval and a flatMap receive those events "do some processing" and uses a Subject to push results down the stream.

The stream is an ongoing process which will have several several events.

This minimum code is silly because I'm pushing only on the apply callback, but in the real case there're several possible moments that a push can happen and the number of events being received during apply is not the same amount that will be sent via the Subject.

What I expected to see with this code is:

value: 2  // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc

what I actually got is:

value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
 ... etc

I've also tried to have an Observable<Integer> o = s.share(); and returning it, or return directly s.share(); with the same results.

I kind of understand why this is happening. The ObservableSource gets subscribed again n again n again so there're more events on every loop.

The question:

How can I achieve my expected behavior?

(in case my expected behavior was not clear, please ask more on the comments)


Answer:

Your PublishSubject is subscribed to multiple times, once per item from interval().

Edit: You will need to pass in a new PublishSubject each time (switch to BehaviorSubject if you'd like to retain the first/last emission); pass that to the long-running process, and ensure that its onComplete is called properly when the long-running process finishes.

Question:

I would expect this small example to print all numbers which are divisible by 3.

@Test
public void test() {
    Observable.range(1, 100)
            .groupBy(n -> n % 3)
            .toMap(g -> g.getKey())
            .flatMap(m ->  m.get(0))
            .subscribe(System.out::println);
}

The println is not printing anything instead, and I don't get why.

I reduced this example from a more complex one, I understand this can be done in a different way, but I need it this way as there are more groups involved which needs to be manipulated in the flatMap at the same time.

Thanks for your help!


Answer:

Use the method filter(Predicate<? super T> predicate) instead of groupBy(..) to emit the elements that satisfy a specified predicate.

Observable.range(1, 100)
    .filter(n -> n % 3 == 0)
    .toMap(g -> g.getKey())
    .flatMap(m ->  m.get(0))
    .subscribe(System.out::println);

Java Stream-API works on the same principle:

IntStream.range(1, 100).filter(n -> n%3 == 0).forEach(System.out::println);
// prints 3, 6, 9, 12... on the each line

Question:

I have this code:

private Single<Invoice> getInvoiceWithItems() {
    return getInvoice().flatMap(invoice -> getItems(invoice)); // <--- Here, I need invoice and items
}

private Single<Invoice> getInvoice() { ... }

private Single<List<Item>> getItems(Invoice invoice) { ... }

I want to do something like invoice.setItems(items). I tried passing an extra function parameter to flatMap but it doesn't accept it.

How can I do it?

I found this solution, but I'm not sure if it is the best one:

private Single<Invoice> getInvoiceWithItems() {
    return Single.zip(getInvoice(), getInvoice().flatMap(invoice -> getInvoiceItems(invoice)), (invoice, items) -> {
        invoice.setItems(items);
        return invoice;
    });
}

Answer:

private Single<Invoice> getInvoiceWithItems() {
    return getInvoice().flatMap(invoice -> getItems(invoice).map(items -> {
        invoice.setItems(items);
        return invoice;
    }));
}

Question:

I'm facing a confusion, giving by example 4 Single:

val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())

data class MyObj ( field1: String, field2: Int, field3: Int, field4: String... .... field10: Int )

and I have a service10.execute(s1 : String s2 : Int s3 : Int s4 : String)

If I do:

s1.flatMap { str -> 
    s2.flatMap { int1 ->
        s3.flatMap { int2 ->
            s4.flatMap { str2 ->
                ...
                s10.flatmap { int10
                  service10.execute(myObj(str, int1, int2, str2..., int10))
                }
            }
        }
    }
}

Is the same as doing:

Single.zip(
            listOf(
                s1,
                s2,
                s3,
                s4
              ...,
              ...,
              s10
            )
        ) { array ->
            val str = array[0] as String
            val int1 = array[1] as Int
            val int2 = array[2] as Int
            val str2 = array[3] as String
            ...
            val str10 = array[9] as Int
        }

1) Is the flatMap executing there in parallel o sequentially there? 2) If the nested flatMap are sequential, is there a way to make them parallel like the zip?


Answer:

No, the nested flatMaps do not make the Singles run in parallel, as proved by the following test:

    // so we can be sure service1 and service2 are active
    val bothSubscribed = CountDownLatch(2)
    // so we can simulate a blocking, long running operation on both services
    val subscribeThreadsStillRunning = CountDownLatch(1)

    val service5 = { str: String, str2: String ->
        Observable.just("service5: $str, $str2").singleOrError()
    }

    val scheduler = Schedulers.io()

    val createSingle = { value: String ->
        Observable
            .create<String> { emitter ->
                println("subscribe $value on ${Thread.currentThread().name}")
                bothSubscribed.countDown()
                subscribeThreadsStillRunning.await(10, SECONDS)
                emitter.onNext(value)
            }
            .singleOrError()
            .subscribeOn(scheduler)
    }

    val s1 = createSingle("outer")
    val s4 = createSingle("inner")

    s1.flatMap { outer ->
        s4.flatMap { inner ->
            service5(outer, inner)
        }
    }.subscribe()

    assert(bothSubscribed.await(5, SECONDS))
    subscribeThreadsStillRunning.countDown()

The reason can be understood by remembering that code within lambda's is not run until the lambda is executed (seems obvious saying like that, but it took me a bit of thinking to get it). s4.flatMap is what triggers the subscribe to s4, but this code doesn't execute until outer is available, i.e. until s1 has already emitted and is therefore complete.

Zip seems like the perfect solution for this, and I'm not sure why you want to use flat map. I can't think of a way to do it. It also has a type safe API so you don't have to use the array based API in your example.

Singles
        .zip(s1, s4) { outer, inner -> service5(outer, inner) }
        .flatMap { it }
        .subscribe()

Note that I have used Singles from "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1" as the lambdas work better with Kotlin.

Question:

I have an API that returns a Single. This Single contains a list of values, let's say String values. When I am calling this object, I get that Single and have to filter some values from it and return back another Single. I'm trying to achieve something like in this simplified test:

@Test
public void filterTest() {

    List<String> sourceList = Arrays.asList("email", "phone", "smoke", "email", "phone", "fax", "email");

    Single.just(sourceList)
            .toObservable()
            .flatMap(source -> {
                return Observable.from(source);
            })
            .filter(source -> !source.equals("email"))
            .groupBy(/* criteria? */)
            //how to extract single list from groupBy or 
            //is there another opposite function for flatMap?
            .toSingle()
            .subscribe(s -> System.out.println(s));
}

Answer:

Try this:

Single.just(sourceList)
        .flattenAsObservable(source -> source)
        .filter(source -> !source.equals("email"))
        .toList()
        .subscribe(s -> System.out.println(s));

or

 Observable.fromIterable(sourceList)
        .filter(source -> !source.equals("email"))
        .toList()
        .subscribe(s -> System.out.println(s));

Question:

As explained in the docs RxJava 2.x no longer accepts null values. So it is not surprising that both of the following two lines terminate with onError called:

Observable.fromCallable(() -> null);
Observable.just(1).flatMap(i -> Observable.error(new RuntimeException()));

what is unclear is why

Observable.just(1).flatMap(i -> Observable.fromCallable(() -> null))

terminates with success and no items emitted. It seams reasonable to expect for it behave in the same fashion as Observable.error

I can see in source code of rx-java 2.1.2

 public final <R> Observable<R> flatMap(...) {
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        ...
 }

which explains why it is happening in terms of code, but I still have two questions:

1) Is this is an intended behavior or a bug?

2) If intended, is there a reason for this?


Answer:

This is a bug with the Observable.fromCallable and will be fixed with PR 5517.

If, for some reason you can't avoid a null return in this setup, you can apply hide() to workaround this bug:

Observable.just(1).flatMap(i -> Observable.fromCallable(() -> null).hide())

or help RxJava throw:

Observable.just(1)
    .flatMap(i -> Observable.fromCallable(() -> 
         java.util.Objects.requireNonNull(apiReturningNull()))
    )

Question:

Please look at this code:

 Disposable disposable = mcityService.authLogin(request,Utils.prepareHeaders())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(resp ->
                {
                    mCompositeDisposable.add(mcityService.getUserDetails(selectedCity.id,Utils.prepareHeaders(resp.tokenType,resp.accessToken))
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(userDetails ->
                            {
                                /*process resp and user details*/

                            }));


                }, throwable ->
                {
                    process errors
                });

        mCompositeDisposable.add(disposable);
    }

So basically I need to call authLogin, if it succedes, call getUserDetails (some fields from authLogin call results are required), if getUserDetails succeded, chain is finished and I need some additional processing result from both calls. If authLogin fails or getUserDetails fails, error processing should be performed (for example, get http error code or message from throwable).

As my approach works, I know it's not goot approach, how to optimize it? Can I use flatMap operator instead nested observables?

edit: Methods declarations:

public static Map<String, String> prepareHeaders(String tokenType, String accessToken);
Observable<UserDetails> getUserDetails(@Path(value = "cityId", encoded = true) String cityId, @HeaderMap Map<String, String> headers);

Final attempt:

mcityService.authLogin(request, Utils.prepareHeaders())
                .concatMap(response ->
                {
                    final Map<String, String> headers = Utils.prepareHeaders(response.tokenType,response.accessToken);
                    return mcityService.getUserDetails(selectedCity.id, headers)
                            .map(userDetails -> new Object()
                            {
                                public AuthResponse ar = response;
                                public UserDetails ud = userDetails;
                            });
                })
                .doOnNext(responseDetails ->
                {
                   AuthResponse ar = responseDetails.ar;  
                   UserDetails ud = responseDetails.ud;   


                })
                .doOnError(throwable ->
                {

                    final String message = throwable.getMessage();

                });

Results: .doOnNext never called, mcityService.getUserDetails seems to be never called, .doOnError also never called (so there was no error). First mcityService.authLogin call returns Observable<AuthResponse> don't I really need subscribe?


Answer:

Yes, you can, and should use flatMap / concatMap / switchMap. Also, sorry if it is not coded well, I primarily use RxJS, which has pipable operators (much better!).

mcityService.authLogin(request, Utils.prepareHeaders())
            .concatMap(response -> { 
               final Map<String, String> headers = Utils.prepareHeaders(resp.tokenType,resp.accessToken);
               return mcityService.getUserDetails(selectedCity.id, headers)
                                  .map(userDetails -> ResponseUserDetails.of(response, userDetails));
            })
            .doOnNext(responseDetails -> {
               // Hanlde ResponseUserDetails object
            })
            .doOnError(throwable -> {
               // Handle exception
               final String message = throwable.getMessage();
               ...
            })
            .subscribe(
               responseDetails -> { ... },
               throwable -> { ... }
            );

If you don't want to use an additional class, you can create an Object on the fly

return mcityService.getUserDetails(selectedCity.id, headers)
                   .map(userDetails -> new Object() { 
                           public Response r = response;
                           public UserDetails ud = userDetails;
                   });

And access its fields via

.doOnNext(responseDetails -> {
     final Response r = responseDetails.r;
     final UserDetails ud = responseDetails.ud;
     ...
})

static class ResponseUserDetails {
   final Response response;
   final UserDetails userDetails;

   ResponseUserDetails(
            final Response response,
            final UserDetails userDetails) {
      this.response = response;
      this.userDetails = userDetails;
   }

   static ResponseUserDetails of(
            final Response response,
            final UserDetails userDetails) {
      return new ResponseUserDetails(response, userDetails);
   }
}

Question:

I'm creating an app that takes a list of apps installed on the device and checks for version updates from the google play store.

This is my method to get the app information based on package name:

    public Observable<DetailsResponse> getUpdates(@NonNull List<ApplicationInfo> apps) {
        return Observable.fromIterable(apps)
            .flatMap(appInfo -> googlePlayApiService.getDetails(appInfo.packageName));
    }

It works fine if the package is actually on the google play store, but it returns retrofit2.adapter.rxjava2.HttpException: HTTP 404 if the package name is not found (ie: sideloaded app)

This is my method to handle the observables:

 updatesViewController.getUpdates(apps)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .as(AutoDispose.autoDisposable(ViewScopeProvider.from(this)))
            .subscribe(responseItem -> responseList.add(responseItem),
                    throwable -> responseList.add(null), //404's here, doesn't do the onComplete at all.
                    () -> { // onComplete
                        for (int i = 0; i < apps.size(); ++i) {
                          if (responseList.get(i) != null && apps.get(i).isLowerVersion(responseList.get(i)) {
                              doSomething();
                          }
                     });

If all the apps are on the playstore, this works as intended. I want to make it so that if one or more of the apps are not found in the playstore, it can still doSomething() on the apps that are found, while ignoring the apps that aren't. Any help is appreciated!


Answer:

You add null (responseList.add(null)) to the list of responses when you hit one of those 404s (apps not registered on playstore).

Then as logically you are checking and doing something if the version of the app is lower so you can doSomething(). But the check also checks nulls (if (responseList.get(i) != null[...]), therefore you have nulls in the list and those will not doSomething.

Is doSomething dependant on some data in the item? If not, you could do something like:

if(responseList.get(i) == null || (responseList.get(i) != null && apps.get(i).isLowerVersion(responseList.get(i)))

This will call doSomething for all the apps that is lowerVersion OR the missing ones (e.g: ones resulted in 404s) Remember the assumption above, that the doSomething() doesn't need actual data from this retrieval - otherwise subsequent actions in the doSomething() will fail.