Hot questions for Using RxJava 2 in backpressure

Top Java Programmings / RxJava 2 / backpressure

Question:

recently I realized that I don't understand how RxJava2 backpressure works.

I made small test and I expect that it should fail with MissingBackpressureException exception:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

System out shows next:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

Why it doesn't produce MissingBackpressureException.

I expect that e.onNext(i); will put item into buffer of ObservableObserveOn and after it's size is greater than static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

It should throw MissingBackpressureException which doesn't happen. Does the buffer automatically grow? If not where are items stored?


Answer:

That's because backpressure moved out to Flowableonly with RxJava2, see here. If you will switch to Flowable with BackpressureStrategy.MISSING you will get the exception. That also means that in your case you indeed have buffer that automatically grows, from observerOn docs:

Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer...

Question:

I have a producer which emits items periodically and a consumer which is sometimes quite slow. It is important that the consumer only works with recent items. I thought onBackpressureLatest() is the perfect solution for this problem. So I wrote the following test code:

PublishProcessor<Integer> source = PublishProcessor.create();
source
        .onBackpressureLatest()
        .observeOn(Schedulers.from(Executors.newCachedThreadPool()))
        .subscribe(i -> {
            System.out.println("Consume: " + i);
            Thread.sleep(100);
        });

for (int i = 0; i < 10; i++) {
    System.out.println("Produce: " + i);
    source.onNext(i);
}

I expected it to log something like:

Produce: 0
...
Produce: 9
Consume: 0
Consume: 9

Instead, I get

Produce: 0
...
Produce: 9
Consume: 0
Consume: 1
...
Consume: 9

onBackpressureLatest() and onBackpressureDrop() do both not have any effect. Only onBackpressureBuffer(i) causes an exception.

I use rxjava 2.1.9. Any ideas what the problem/my misunderstanding could be?


Answer:

observeOn has an internal buffer (default 128 elements) that will pick up all source items easily immediately, thus the onBackpressureLatest is always fully consumed.

Edit:

The smallest buffer you can create is 1 which should provide the required pattern:

source.onBackpressureLatest()
      .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
      .subscribe(v -> { /* ... */ });

(the earlier delay + rebatchRequest combination is practically equivalent to this).

Question:

I am using RxJava2 Flowables by subscribing to a stream of events from a PublishSubject.It's being used in enterprise level application and we don't have the choice of dropping any events. I am using version RxJava 2.2.8

I am using BackpressureStrategy.BUFFER as I don't want to lose any of my events.

Also, I buffer again for 50000 or 3 minutes whichever is earlier. This I do as I want to consolidate events and then process them.

But I get the following errors in a few minutes of my run

io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)
at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)
at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)

I tried increasing the buffer size by setting up, but there is no change in the behavior.

System.setProperty("rx2.buffer-size", "524288");

Also If I buffer for a longer time instead of 3 minutes, I get the exception after much longer time probably because my downstream performs better when the events are consolidated more. However, I don't have that choice because these are live events and needs processing immediately(in 3-5 minutes).

I also tried thread.sleep() before invoking the "subscription.next" in case of error but still getting the same results.

keySubject.hide()
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() {

@Override
  public void onSubscribe(Subscription var1) {
   innerSubscription = var1;
innerSubscription.request(1L);
 }

@Override
public void onNext(List<String> logs) {
    Subscription.request(1L);

///   Do some logic here

}

I want to know How do I handle the backpressure to avoid this exception? Is this exception because of ".buffer" method Is there a way for me to check the status of these buffers. Also why even if I increase the rx2.buffer-size, I still get the exception in the same amount of time. Ideally, the system should run longer with a higher buffer size if the exception is because if buffer getting full.

Any help on the reason for this message "Could not emit buffer due to lack of requests at " will be great.


Answer:

The thing is, why do you use a subject that isn't backpressure-aware? Are you using that as a poor man's event bus? Also, assuming e.getContents() is a simple getter I believe you can replace this whole block

.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() { ... });

with

.flatMapIterable(e -> e.getContents())
.buffer(3,TimeUnit.MINUTES,50000)
.rebatchRequests(1)
.observeOn(Schedulers.computation())
.doOnNext(s -> /* Do some logic here */)
.subscribe();

Question:

I am observing the lines produced by a NetworkResource, wrapping it in an Observable.create. Here is the code, missing try/catch and cancellation for simplicity:

fun linesOf(resource: NetworkResource): Observable<String> =
        Observable.create { emitter ->
            while (!emitter.isDisposed) {
                val line = resource.readLine()
                Log.i(TAG, "Emitting: $line")
                emitter.onNext(line)
            }
        }

The problem is that later I want to turn it into a Flowable using observable.toFlowable(LATEST) to add backpressure in case my consumer can't keep up, but depending on how I do it, the consumer stops receiving items after item 128.

A) this way everything works:

val resource = ...
linesOf(resource)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribe { Log.i(TAG, "Consuming: $it") }

B) here the consumer gets stuck after 128 items (but the emitting continues):

val resource = ...
linesOf(resource)
    .toFlowable(BackpressureStrategy.LATEST)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { Log.i(TAG, "Consuming: $it") } // <-- stops after 128

In option A) everything works without any issues, and I can see the Emitting: ... log side by side with the Consuming: ... log.

In option B) I can see the Emitting: ... log message happily emitting new lines, but I stop seeing the Consuming: ... log message after item 128, even though the emitting continues.

Question: Can someone help me understand why this happens?


Answer:

First of all, you are using the wrong type and wrong operator. Using Flowable removes the need for conversion. Using Flowable.generate gets you backpressure:

Flowable.generate(emitter -> {
    String line = resource.readLine();
    if (line == null) {
        emitter.onComplete();
    } else {
        emitter.onNext(line);
    }
});

Second, the reason your version hangs is due to a same pool deadlock caused by subscribeOn. Requests from downstream are scheduled behind your eager emission loop and can not take effect, stopping the emission at the default 128 elements. Use Flowable.subscribeOn(scheduler, false) to avoid this case.

Question:

What I wanted to do is to have a Flowable with a backpressure buffer of one item that keeps the latest one produced from a stream.

I've tried to use Flowable.onBackpressureBuffer(1, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST). However, it doesn't work as I expected

  Flowable.range(0, 10_000)
      .onBackpressureBuffer(1, {}, BackpressureOverflowStrategy.DROP_OLDEST)
      .observeOn(Schedulers.computation())
      .subscribe {
        println(it)
        Thread.sleep(5)
      }

The output I expected is a sequence of integers, not necessarily contiguous, that should includes the last item 9,999. However, it only printed the first a few contiguous numbers like 0, 1, 2, 3, 4..., different each time, but not the last number 9,999.


Answer:

I am using the below code and it always prints 9999 in the end. It first prints consecutive numbers ( till 127) and then 9999. Maybe in your case the main executing thread end much earlier than the threads processing the print number. In order to print all the numbers till 9999, I tried changing the backpressure buffer to 10000 (and main thread sleep to much higher value) and this obviously made sure that all numbers are printed as the buffer is quite large.

public class FlowableTest {

    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub

        Flowable.range(0, 10_000).onBackpressureBuffer(1, () -> {
        }, BackpressureOverflowStrategy.DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(it -> {
            System.out.println(it);
            Thread.sleep(5);
        });

        Thread.sleep(50000); // wait the main program sufficient time to let the other threads end

    }

Question:

I have a publisher that may publish faster than the subscriber can handle data. To handle this, I started working with backpressure. Because I do not want to discard any data, I use reactive pull backpressure. I understood this as the Subscriber being able to tell the Publisher when to publish more data, as described in this and the follwing paragraphs.

The publisher is a Flowable that does its work asnychronous in parallel and is merged into a sequential Flowable afterwards. Data should be buffered up to 10 elements, and when this buffer is full, the Flowable should not publish any more data and wait for the next request.

The subscriber is a DisposableSubscriber that requests 10 items at start. Every consumed item requires some computation, and after that a new item will be requested.

My MWE looks like this:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Flowable.fromIterable(src)
        .parallel(10, 1)
        .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
        .flatMap(i -> Single.fromCallable(() -> {
            System.out.println("publisher: " + i);
            Thread.sleep(200);
            return i;
        }).toFlowable())
        .sequential(1)
        .onBackpressureBuffer(10)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

What I expect this code to do is the following: The subscriber requests the first 10 items. The publisher publishes the first 10 items. The subscriber then does its computation in onNext and requests more items, which the publisher will publish.

What actually happens: At first, the publisher seems to unboundedly publish items. At some point, e.g. after 14 published items, the subscriber handles its first item. While that happens, the publisher continues to publish items. After around 30 published items, a io.reactivex.exceptions.MissingBackpressureException: Buffer is full is thrown and the stream ends.

My question: what am I doing wrong? How can I let the subscriber control if and when the publisher publishes data? Obviously, I am doing something horribly wrong. Otherwise, the expectation would not be such different to the reality.

Example output of the above MWE:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

Answer:

Not an expert in Rx, but let me take a stab at it.. observeOn(...) has its own default buffer size of 128. So, right from the start it's going to request more from upstream than your buffer can hold.

observeOn(...) accepts an optional buffer size override, but even if you supply it, the ParallelFlowable is going to be invoking your flatMap(...) method more frequently than you want. I'm not 100% sure why, maybe it has its own internal buffering it performs when merging the rails back to sequential.

I think you can get closer to your desired behavior by using flatMap(...) instead of parralel(...), supplying a maxConcurrency argument.

One other thing to keep in mind is that you don't want to call subscribeOn(...) - it's meant to affect the upstream Flowable in its entirety. So if you're already calling parallel(...).runOn(...), it has no effect or the effect will be unexpected.

Armed with the above, I think this gets you closer to what you're looking for:

    List<Integer> src = new ArrayList<>();
    for (int i = 0; i < 200; i++) {
        src.add(i);
    }
    Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
    Flowable.fromIterable(src)
            .flatMap(
                    i -> Flowable.just( i )
                            .subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
                            .map( __ -> {
                                System.out.println("publisher: " + i);
                                Thread.sleep(200);
                                return i;
                            } ),
                    10) // max concurrency
            .observeOn(Schedulers.newThread(), false, 10) // override buffer size
            .doOnError(Throwable::printStackTrace)
            .subscribeWith(new DisposableSubscriber<Integer>() {
                @Override
                protected void onStart() {
                    request(10);
                }
                @Override
                public void onNext(Integer integer) {
                    System.out.println("subscriber: " + integer);
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    request(1);
                }
                @Override
                public void onError(Throwable t) {
                }
                @Override
                public void onComplete() {
                }
            });
    try {
        Thread.sleep(1000000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

Question:

I need to create a custom Flowable with backpressure implemented. I'm trying to achieve some sort of paging. That means when downstream requests 5 items I will "ask the data source" for items 0 - 5. Then when downstream needs another 5, I'll get items 5 - 10 and emit back.

The best thing I've found so far is to use Flowable.generate method but I really don't understand why there is no way (as far as I know) how to get the requested number of items the downstream is requesting. I can use the state property of generator to save the index of last items requested so then I need only the number of newly requested items. The emmiter instance I got in the BiFunction apply is GeneratorSubscription which is extending from AtomicLong. So casting emmiter to AtomicLong can get me the requested number. But I know this can't be the "recommended" way.

On the other hand when you use Flowable.create you get the FlowableEmitter which has long requested() method. Using generate is suiting me more for my use-case, but now I'm also curious what is the "correct" way to use Flowable.generate.

Maybe I'm overthinking the whole thing so please point me in the right direction. :) Thank you.

This is what the actual code looks like (in Kotlin):

Flowable.generate(Callable { 0 }, BiFunction { start /*state*/, emitter ->
        val requested = (emitter as AtomicLong).get().toInt()  //this is bull*hit
        val end = start + requested
        //get items [start to end] -> items
        emmiter.onNext(items)
        end /*return the new state*/
    })

Answer:

Ok, I found out that the apply function of the BiFunction is called that many times as is the request amount (n). So there's no reason to have a getter for it. It's not what I have hoped for but it is apparently how generate works. :)

Question:

I used sqlbrite to listen the changes of table a and b. And use combineLatest operator to combine observables produced by sqlbrite. In the BiFunction process the emitted items of observableA and observableB.

private CompositeDisposable mSubscriptions = new CompositeDisposable();
private void initialize(){
    QueryObservable observableA = mDb.createQuery("table_a", "select * from table_a", null);
    QueryObservable observableB = mDb.createQuery("table_b", "select * from table_b", null);
    ResourceSubscriber subscriber = Flowable.combineLatest(
            RxJavaInterop.toV2Observable(observableA
                    .mapToList(mTableAMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
            , 
            RxJavaInterop.toV2Observable(observableB
                    .mapToList(mTableBMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
            , new BiFunction<List<ItemA>, List<ItemB>, List<ResultItem>>() {
                @Override
                public List<ResultItem> apply(@io.reactivex.annotations.NonNull List<ItemA> aItems, @io.reactivex.annotations.NonNull List<ItemB> bItems) throws Exception {
                    List<ResultItem> resultItems = convertToResultItems(aItems, bItems);    // long process here, convert aItems and bItems to resultItems
                    return resultItems;
                }
            }
    )
            .onBackpressureLatest()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new ResourceSubscriber<List<ResultItem>>() {
                @Override
                public void onNext(List<ResultItem> resultItems) {
                    adapter.addData(resultItems);
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            });
    mSubscriptions.add(subscriber);
}

Question: If the BiFunction running too long (e.g. 10 seconds), that longer than the observables trigger interval(e.g. observables trigger every 1 second), that will lead the BiFunction do unnecessary works because of I only need the latest emitted item but the BiFunction is handle the emitted items one by one, so the BiFunction will handle the old emitted item, I don't need to handle it. I want the BiFunction to skip old emitted item and handle latest emitted item every completed apply() in BiFunction, to reduce waste of resources and save time. Is rxjava have approach similar to backpressure for BiFunction or other ways to solve this problem?

The figure show the current and expected BiFunction timeline. figure link

I found two methods to solve this problem, but there are flaw.

Method1: combine "aItems" and "bItems" by Pair then pass the reference to switchMap and process the job.

Flaw: switchMap only emit the latest item to subscriber but still do unnecessary works.

Method2: also combine "aItems" and "bItems" then pass the reference to onNext and process the job.

Flaw: blocked the UI thread.


Answer:

You can just pass the pair of values in the combineLatest's combiner function along and use observeOn to place the computation off the original source threads:

 .combineLatest(srcA, srcB, (a, b) -> Pair.of(a, b))
 .onBackpressureLatest()
 .observeOn(Schedulers.computation(), false, 1)
 .map(pair -> compute(pair))
 .observeOn(AndroidSchedulers.mainThread())
 ...