Hot questions for Using RxJava 2 in functional programming

Top Java Programmings / RxJava 2 / functional programming

Question:

Look at these 2 small tests:

@Test
public void test1() {
    Observable.range(1, 10)
        .groupBy(v -> v % 2 == 0)
        .flatMap(group -> {
            if (group.getKey()) {
                return group;
            }
            return group;
        })
        .subscribe(System.out::println);
}

@Test
public void test2() {
    Observable.range(1, 10)
        .groupBy(v -> v % 2 == 0)
        .toMap(g -> g.getKey())
        .flatMapObservable(m -> Observable.merge(
            m.get(true),
            m.get(false)))
        .subscribe(System.out::println);
}

I was expecting both to return a list of numbers in the same order, so:

1 2 3 4 5 6 7 8 9 10

but the second example returns

2 4 6 8 10 1 3 5 7 9

instead.

It seems that on the second example the merge is doing a concat instead, in fact if I change it to a concat, the result is the same.

What am I missing?

Thank you.


Answer:

Basically flatMap and merge do not guarantee the order of the emitted items.

From flatMap doc:

Note that FlatMap merges the emissions of these Observables, so that they may interleave.

From merge doc:

Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).

Quote from this SO Answer:

In your case, with single-element, static streams, it is not making any real difference (but in theory, merge could output words in random order and still be valid according to spec)

If you need a guaranteed order use concat* instead.

First example

It works like this:

  • when 1 is emitted the groupBy operator will create a GroupedObservable with key false
    • flatMap will output the items from this observable - which is currently only 1
  • when 2 is emitted the groupBy operator will create a GroupedObservable with key true
    • flatMap will now also output the items from this 2nd GroupedObservable - which is currently 2
  • when 3 is emitted the groupBy operator will add it to the existing GroupedObservable with key false and flatMap will output this item right away
  • when 4 is emitted the groupBy operator will add it to the existing GroupedObservable with key true and flatMap will output this item right away
  • etc.

It may help you to add some more logging:

    Observable.range(1, 10)
            .groupBy(v -> v % 2 == 0)
            .doOnNext(group -> System.out.println("key: " + group.getKey()))
            .flatMap(group -> {
                if (group.getKey()) {
                    return group;
                }
                return group;
            })
            .subscribe(System.out::println);

Then the output is:

key: false
1
key: true
2
3
...
The second example

This is quite different, because toMap will block until the upstream completes:

  • when 1 is emitted the groupBy operator will create a GroupedObservable with key false
    • toMap will add this GroupedObservable to the internal map and uses the key false (the same key as the GroupedObservable has)
  • when 2 is emitted the groupBy operator will create a GroupedObservable with key true
    • toMap will add this GroupedObservable to the internal map and uses the key true (the same key as the GroupedObservable has) - so now the map has 2 GroupedObservables
  • the following numbers are added to the corresponding GroupedObservables and when the source completes, thetoMap operator is done and will pass the map to the next operator
  • in flatMapObservable you use the map to create a new observable where you first add the even elements (key = true) and then the odd elements (key = false)

Also here you could add some more logging:

    Observable.range(1, 10)
            .groupBy(v -> v % 2 == 0)
            .doOnNext(group -> System.out.println("key: " + group.getKey()))
            .toMap(g -> g.getKey())
            .doOnSuccess(map -> System.out.println("map: " + map.size()))
            .flatMapObservable(m -> Observable.merge(
                    m.get(true),
                    m.get(false)
            ))
            .subscribe(System.out::println);

Then the output is:

key: false
key: true
map: 2
2
4
6
8
10
1
3
5
7
9

Question:

Let's say I have an Observable emitting Integers. I would like to add 1 to all odd numbers and subtract 1 to all even numbers.

What would be a clean way of achieving this with RxJava?

I am struggling to understand how to achieve this with groupBy as I can't see a clean way to get odd and even Observable back after the groupBy, to then map a different function to each of them (and maybe merge them after).

Thanks for your help!


Answer:

How about simply mapping them conditionally?

Observable.range(1, 10)
    .map(v -> {
       if (v % 2 == 0) {
           return v - 1;
       }
       return v + 1;
    })
    .subscribe(System.out::println)

Using groupBy is more involved:

Observable.range(1, 10)
    .groupBy(v -> v % 2 == 0)
    .flatMap(group -> {
        if (group.getKey()) {
           return group.map(v -> v + 1);
        }
        return group.map(v -> v - 1);
    })
    .subscribe(System.out::println);