Hot questions for Using RxJava 2 in java 8

Question:

I am getting information of a person from different service calls. Some that can be executed in parallel. Others are dependent on info returned by the parallel calls. Once all the info is computed, I save the person info to a repository

Here is how I have setup my flow :

Parallel calls :

private Observable<Person> getDataForEval(Person person){
 Person person = new Person();
 Observable<Salary> observeSalary = getSalary(person).subscribeOn(Schedulers.io);
 Observable<HomeAddress> observeHome = getHomeAddress(person).subscribeOn(Schedulers.io);
 return  Observable.zip(observeSalary , observeHome , (salary, home) -> buildPersonAfterGettingData(salary,home));
}

Dependent Calculation :

private Observable<Person> getDataAfterCalc(Person person) {
  Observable<LoanEligiblity> loanEligible= getLoanEligibility(person);
  Observable<Tax> observeTax= getTax(person);
  return Observable.zip(loanEligible, observeTax, (loan, tax) ->
                buildFinalPersonInfo(loan, tax));
}

Chaining Both, in main thread :

Observable<Person> finalPersonInfo = getDataForEval.flatMap(person -> getDataAfterCalc(person));
finalPersonInfo.subscribe( finalPerson -> save(finalPerson));

Question - When is this flow triggered? My understanding is that when the main thread calls the subscribe method, the Observable.zip() - used to make the parallel calls - is triggered and the subsequent subscribers get the responses.

Is it correct? If I need to know time taken to process one person, can i calculate it as follows :

finalPersonInfo.onSubscribe(()->start).onTerminate(()->finish);

Answer:

Your understand is almost right.

zip operator subscribe to its sources when downstream subscribe on it. See ObservableZip#72 and ObservableZip#110.

But zip operator is nothing with parallel, parallel call is because subscribeOn operator.

You approach of measure time used to process one person is right.

By the way, you should use Single rather than Observable in this scene.

The Single class implements the Reactive Pattern for a single value response. Single behaves the same as Observable except that it can only emit either a single successful value, or an error (there is no "onComplete" notification as there is for Observable)

My RxJava source version is 2.1.13

Question:

I've got a data access object that passes each item in a data source to a consumer:

public interface Dao<T> {
    void forEachItem(Consumer<T> item);
}

This always produces items in a single threaded way - I can't currently change this interface.

I wanted to create a Flowable from this interface:

private static Flowable<String> flowable(final Dao dao) {
    return Flowable.create(emitter -> {
        dao.forEachItem(item ->
                emitter.onNext(item));
        emitter.onComplete();
    }, ERROR);
}

If I use this Flowable in a situation where the processing takes longer than the rate at which items are emitted then I understandably get a missing back pressure exception as I am using ERROR mode:

    Dao<String> exampleDao =
            itemConsumer ->
                    IntStream.range(0, 1_000).forEach(i ->
                            itemConsumer.accept(String.valueOf(i)));

    flowable(exampleDao)
            .map(v -> {
                Thread.sleep(100);
                return "id:" + v;
            })
            .blockingSubscribe(System.out::println);

I don't wish to buffer items - seems like this could lead to exhausting memory on very large data sets - if the operation is significantly slower than the producer.

I was hoping there would be a backpressure mode that would allow the emitter to block when passed next/completion events when it detects back pressure but that does not seem to be the case?

In my case as I know that the dao produces items in a single threaded way I thought I would be able to do something like:

  dao.forEachItem(item -> {
    while (emitter.requested() == 0) {
      waitABit();
    }         
    emitter.onNext(item)
  });

but this seems to hang forever.

How wrong is my approach? :-) Is there a way of producing items in a way that respects downstream back pressure given my (relatively restrictive) set of circumstances?

I know I could do this with a separate process writing to a queue and then write a Flowable based on consuming from that queue- would that be the preferred approach instead?


Answer:

Check the part of the Flowable, especially the part with Supscription.request(long). I hope that gets you on the right way.


The TestProducerfrom this example produces Integerobjects in a given range and pushes them to its Subscriber. It extends the Flowable<Integer> class. For a new subscriber, it creates a Subscription object whose request(long) method is used to create and publish the Integer values.

It is important for the Subscription that is passed to the subscriber that the request() method which calls onNext()on the subscriber can be recursively called from within this onNext() call. To prevent a stack overflow, the shown implementation uses the outStandingRequests counter and the isProducing flag.

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

The Consumer in this example extends DefaultSubscriber<Integer> and on start and after consuming an Integer requests the next one. On consuming the Integer values, there is a little delay, so the backpressure will be built up for the producer.

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

in the following main method of a test class the producer and consumer are created and wired up:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

When running the example, the logfile shows that the consumer runs continuously, while the producer only gets active when the internal Flowable buffer of rxjava2 needs to be refilled.

Question:

I'm trying to learn RxJava 2 but since yesterday afternoon I'm having a bad time trying to translate from v1 to v2 of the RxJava library... I came across this function that I could use for something I'm doing to help me understand the whole Reactive-Programming paradigm.

List<Integer> emitList = ...;
Observable<Integer> observable = Observable.from(emitList);

observable
        .subscribeOn(Schedulers.newThread())
        .parallel((a) -> {
            return a
            .filter((i) -> {
                return i % 2 == 0;
            })
            .doOnNext((xx) -> {
                System.out.println("parallel thread in: " + ThreadUtils.currentThreadName());
                System.out.println("parallel: " + xx);
                ThreadUtils.sleep(10);
                System.out.println("parallel thread out: " + ThreadUtils.currentThreadName());
            });
        },
        Schedulers.io()
        )
        .subscribe(
                (i) -> {
                    System.out.println("onNext thread entr: " + ThreadUtils.currentThreadName());
                    System.out.println(i);
                    System.out.println("onNext thread exit: " + ThreadUtils.currentThreadName());
                },
                (t) -> {
                    t.printStackTrace();
                },
                () -> {
                    System.out.println("onCompleted()");
                }
        );

And the farthest I've got is this:

Observable<Integer> observable = ....
observable.subscribeOn(Schedulers.newThread())
        .filter(i -> i % 2 == 0)
        .doOnNext(i -> {
          System.out.println("parallel thread in: " + threadName());
          System.out.println("parallel: " + i);
          Thread.sleep(10);
        })
        .subscribe(
                number -> System.out.println(threadName() + ": " + number),
                throwable -> System.err.println(threadName() + ": " + throwable.toString()),
                () -> System.out.println(threadName() + ": Completed!")

        );

I know there's a lot wrong with what I'm doing.. for starters the filtering and doOnNext its inside of the parallel clause, whereas in my "approach" its outside, and who knows what else. I trying going through the tests on the RxJava repo, but I couldn't identify any that would be similar to this. I looked at the Flowable and ParallelFlowable but they are waaaaay different to the point I can't find how to achieve the parallelism on my version... which doesn't print a thing btw.


Answer:

Parallel processing in RxJava 2 is tied to Flowable and uses the same fluent API design as the Observable:

Flowable<Integer> f = ....
f.subscribeOn(Schedulers.newThread())
    .parallel()                          // <---------------------------------
    .runOn(Schedulers.computation())     // <---------------------------------
    .filter(i -> i % 2 == 0)
    .doOnNext(i -> {
      System.out.println("parallel thread in: " + threadName());
      System.out.println("parallel: " + i);
      Thread.sleep(10);
    })
    .sequential()                        // <---------------------------------
    .subscribe(
            number -> System.out.println(threadName() + ": " + number),
            throwable -> System.err.println(threadName() + ": " + throwable.toString()),
            () -> System.out.println(threadName() + ": Completed!")
    );

    Thread.sleep(10000);

Question:

I have some inherently asynchronous code in an Android app. I'm using RxJava2. The class Thing is not under my control (but ThingFactory is). In one method (createThing()), a new Thing is instantiated. Thing's constructor does some work and, when it is complete, notifies us via a callback onThingInitialized(). At the point that callback is called, we should be guaranteed that thing exists. In the callback, I schedule work to happen on a separate thread (in this case, using RxJava2, but I don't think it should matter). There is nowhere in this code that I call anything like thing = null. So, once it's set, it's set forever.

I threw a volatile onto it because the instance does get updated, but never nulled. If I'm mis-using it, please feel free to berate me.

public class UsesAThing implements ThingCallbacks {

    private volatile Thing thing; // I feel like I don't understand 'volatile'

    // I call this method
    public void createThing() {
        thing = thingFactory.newThing(param1, param2);
    }

    // Thing's constructor does some work and notifies us when it's done
    @Override
    public void onThingInitialized() {
        // Called on main thread, but I want to do some IO work, so:
        Schedulers.io().scheduleDirect(() -> {
            thing.doStuff(); // NPE!
        });
    }
}

How is an NPE possible there?

EDIT:

Thing's constructor does its work asynchronously. As I said, this is in an Android environment, so the work it's actually doing is binding to a Service. When the Service is bound, its ServiceConnection::onServiceConnected() callback is hit, which itself actually fires up an AsyncTask which, in its onPostExecute() callback, calls the onThingInitialized() callback.

EDIT 2:

I should also note that this NPE doesn't happen all the time. I've run through this code hundreds of time, and I've only seen it occur once.

EDIT 3: Sample calling code

I didn't provide sample code because it's about as simple as one might imagine, but here's what it looks like:

Flowable.just(1)
    .subscribeOn(Schedulers.io())
    .subscribe(i -> createThing());

Answer:

If I understand your comment, createThing() is called in a worker thread. It's unlikely but possible that the thread scheduler will halt this worker thread after the Thing constructor initiates the sequence of events that leads to the callback, but before newThing() returns and thing is assigned. If the whole callback sequence runs before the thread calling createThing() runs again, you will see this NPE.

To test this theory, first create a test that runs repeatedly to reproduce the issue. Then change it so createThing() is called in the main thread and see if the problem goes away. That would be a workaround, not a fix. But a real fix would involve not doing work in Thing's constructor, which you stated is out of your control.

Question:

I would like to know how can I identify which element in the array is currently being processed. for example I would like to print the following:

e
1
Element1 consumed
e
2
Element2 consumed
e
3
Element2 consumed

how can i modify the below code to achieve the given results please.

code:

public static void main(String[] args) {
    Observable<String> stringObservable = Observable.just("e1", "e2", "e3");
    stringObservable
    .map(x->x.split(""))//return non-observable
    .flatMap(y->Observable.fromArray(y)//return observable
            .map(n->n))
    .observeOn(Schedulers.io())
    .blockingSubscribe(getObserver());
}

Answer:

Use doOnNext and doOnComplete on the inner sequence that splits your source item:

public static void main(String[] args) {
    Observable<String> stringObservable = Observable.just("e1", "e2", "e3");

    stringObservable
    .flatMap(x ->
        Observable.fromArray(x.split(""))
        .doOnNext(n -> 
            System.out.println(n)) // <-------------------- prints each split item
        .map(n->n)
        .doOnComplete(() -> 
            System.out.println(
               "Element " + x + " consumed")) // <--------- end of processing of x
    )
    .observeOn(Schedulers.io())
    .blockingSubscribe(getObserver());
}