Hot questions for Using RxJava 2 in java stream

Question:

I have a chain of calls to internet, database and as result I show collected info to user. Now I have very ugly 3-level nested RxJava stream. I really want to make it smooth and easy to read, but I've stuck really hard.

I already read everything about Map, flatMap, zip, etc. Cant' make things work together.

Code: make api call. Received info put in database subscribing to another stream in onSuccess method of first stream, and in onSuccess method of second stream received from DB info finally shows up to user.

Dat Frankenstein:

disposables.add(modelManager.apiCall()
                .subscribeOn(Schedulers.io())
                .observeOn(mainThread)
                .subscribeWith(new DisposableSingleObserver {

                  public void onSuccess(ApiResponse apiResponse) {

                        modelManager.storeInDatabase(apiResponse)
                       //level 1 nested stream:
                        disposables.add(modelManager.loadFromDatabas()
                                  .subscribeOn(Schedulers.io())
                                  .observeOn(mainThread)
                                  .subscribeWith(new DisposableSingleObserver{
                                    public void onSuccess(Data data) {
                                        view.showData(data);
                                    }
                                    public void onError(Throwable e) {
                                    }
                                }));
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                }));
    }

Answer:

I already read everything about Map, flatMap, zip, etc. Cant' make things work together.

Well, you missed something about flatMap, because this is what it's for ;)


disposables.add(
    modelManager.apiCall()
        .subscribeOn(Schedulers.io())
        .doOnSuccess((apiResponse) -> {
            modelManager.storeInDatabase(apiResponse)
        })
        .flatMap((apiResponse) -> {
            modelManager.loadFromDatabase()
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe((data) -> {
            view.showData(data);
        })
);

But if you use a reactive database layer like Room's LiveData<List<T>> support, then you can actually ditch the modelManager.loadFromDatabase() part.

Question:

The simplest example would be a stream of strings like this:

["3", "a", "b", "c", "1", "a", "2", "a", "b"]

The ones that are numbers describe how many more elements should it's group contain.

Very important that the stream is continuous so we cant just wait for the next number to split the stream.

As far as I know there is no built in functionality for this in RXJava2

var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());

flowable/*Something here*/.blockingSubscribe(System.out::println);

And the expected output would be:

[3, a, b, c]
[1, a]
[2, a, b]

Answer:

I've later found akarnokd's RxJava2Extensions package. Using that, I was able to construct this, which does what I want:

var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());
flowable.compose(FlowableTransformers.bufferUntil(new Predicate<>() {
    private int remaining = 0;
    @Override
    public boolean test(String next) {
        if(next.chars().allMatch(Character::isDigit)) {
            remaining = Integer.parseInt(next);
        }
        return --remaining < 0;
    }
})).blockingSubscribe(System.out::println);

Question:

Assuming i have an observable and somewhere downstream I chose to filter using java 8 streams instead of using rxjava operators. Will this cause any issues?

 List<String> stringList = new ArrayList<>();
    Observable.just(stringList)
            .map(new Function<List<String>, List<String>>() {
                @Override
                public List<String> apply(List<String> strings) throws Exception {
                    return strings.stream().filter(it -> it.contains("randomText"))
                            .collect(Collectors.toList());
                }
            }).subscribe(new Consumer<List<String>>() {
        @Override
        public void accept(List<String> strings) throws Exception {
            //do something with list
        }
    });

Answer:

Consumer<T> accepts T, so you can use all the methods provided by T.

Anyway check the documentation, RxJava provides several operators, like map, flatMap, filter, etc. For example, you can replace in your code the first map as follows:

Observable.fromIterable(stringList).filter(x -> x.contains(RANDOM_TEXT))

Question:

I am trying to upgrade RxJava from 1 to 2. In my old code I have a method like below:

private Observable<Integer> reversRange(int from, int to) {
    Stream<Integer> intStream = Stream.iterate(to, p -> p - 1);
    return Observable.from(() -> intStream.iterator())
            .takeWhile(n -> n > from)
            .map(n -> n );
}

But now in RxJava 2 I can not use from. What would be equivalent of this code? I have found in this question that it is fromIterable but I do not know how to use it with Stream.

Or other example, this should not be only for range but for any infinite stream.

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return Observable.from(() -> intStream.iterator());
}

Answer:

Use the generate() function:

this is kotlin code (an extension function), but you just need to change the lambda slightly. And this works with any stream.

fun <T> Stream<T>.toFlowable(): Flowable<T> {
  return Flowable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

you can also use Observable if you prefer, but I don't see why you should.

fun <T> Stream<T>.toObservable(): Observable<T> {
  return Observable.generate(
    Callable { iterator() },
    BiConsumer { ite, emitter ->
      if (ite.hasNext()) {
        emitter.onNext(ite.next())
      } else {
        emitter.onComplete()
      }
    }
  )
}

I think in java will be something like:

public <T> Observable<T> streamToObservable(Stream<T> stream) {
  return Observable.generate(
    () -> stream.iterator(),
    (ite, emitter) -> {
      if (ite.hasNext()) {
        emitter.onNext(ite.next());
      } else {
        emitter.onComplete();
      }
    }
  );
}

and so your code would become:

private Observable<Integer> numbers() {
    Stream<Integer> intStream = Stream.iterate(0, p -> p + 1);
    return streamToObservable(intStream);
}