Hot questions for Using RxJava 2 in project reactor

Top Java Programmings / RxJava 2 / project reactor

Question:

I am attempting to write a Reactive Stream based on the following information:

We have a stream of Entity Events where each Event contains the ID of its Entity and a Type of either INTENT or COMMIT. It is assumed that a COMMIT with a given ID will always be preceded by one-or-more INTENTs with the same ID. When an INTENT is received, it should be grouped by its ID and a "buffer" for that group should be opened. The buffer should be "closed" when a COMMIT for the same group is received or a configured timeout has lapsed. The resulting buffers should be emitted.

Note that it is possible to receive multiple INTENTs before receiving a closing COMMIT. (Edit:) The bufferDuration should guarantee that any "opened" buffer is emitted after bufferDuration time has lapsed since the INTENT that opened the buffer was received, with or without a COMMIT.

My latest attempt at this is the following:

public EntityEventBufferFactory {
    private final Duration bufferDuration;

    public EntityEventBufferFactory(Duration bufferDuration) {
        this.bufferDuration = bufferDuration;
    }

    public Flux<List<EntityEvent>> createGroupBufferFlux(Flux<EntityEvent> eventFlux) {
        return eventFlux.groupBy(EntityEvent::getId)
            .map(groupedFlux -> createGroupBuffer(groupedFlux))
            .flatMap(Function.identity());
    }

    protected Flux<List<EntityEvent>> createGroupBuffer(Flux<EntityEvent> groupFlux) {
        return groupFlux.publish().buffer(groupFlux.filter(this::shouldOpenBufferOnEvent), createGroupBufferCloseSelector(groupFlux));
    }

    protected Function<EntityEvent, Publisher<EntityEvent>> createGroupBufferCloseSelector(Flux<EntityEvent> groupFlux) {
        return event -> Flux.firstEmitting(Flux.just(event).delay(bufferDuration), groupFlux.filter(this::shouldCloseBufferOnEvent).publish());
    }

    protected boolean shouldOpenBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.INTENT;
    }

    protected boolean shouldCloseBufferOnEvent(EntityEvent entityEvent) {
        return entityEvent.getEventType() == EventType.COMMIT;
    }
}

And here is the test I am attempting to get passing:

@Test
public void entityEventsCanBeBuffered() throws Exception {
    FluxProcessor<EntityEvent, EntityEvent> eventQueue = UnicastProcessor.create();

    Duration bufferDuration = Duration.ofMillis(250);

    Flux<List<EntityEvent>> bufferFlux = new EntityEventBufferFactory(bufferDuration).createGroupBufferFlux(eventQueue);
    bufferFactory.setBufferDuration(bufferDuration);

    List<List<EntityEvent>> buffers = new ArrayList<>();
    bufferFlux.subscribe(buffers::add);

    EntityEvent intent = new EntityEvent();
    intent.setId("SOME_ID");
    intent.setEventType(EventType.INTENT);

    EntityEvent commit = new EntityEvent();
    commit.setId(intent.getId());
    commit.setEventType(EventType.COMMIT);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    eventQueue.onNext(intent);
    eventQueue.onNext(commit);

    Thread.sleep(500);

    assertEquals(2, buffers.size());
    assertFalse(buffers.get(0).isEmpty());
    assertFalse(buffers.get(1).isEmpty());
}

With this test, I get two emitted buffers, but they are both empty. You'll note that after digging around, I had to add .publish() at certain points to not get an Exception from Reactor saying This processor allows only a single Subscriber. The answer to this question, RxJava: "java.lang.IllegalStateException: Only one subscriber allowed!", is what led me to that approach.

I'm currently using Reactor, but I think this translates 1-to-1 with RxJava using Observable and methods of the same names.

Any thoughts?


Answer:

I think that is the definitive use case of Rx groupBy. From the documentation:

Groups the items emitted by a Publisher according to a specified criterion, and emits these grouped items as GroupedFlowables. The emitted GroupedPublisher allows only a single Subscriber during its lifetime and if this Subscriber cancels before the source terminates, the next emission by the source having the same key will trigger a new GroupedPublisher emission.

In your case, this criterion is the ID, and on each GroupedPublisher emitted you takeUntil the type is COMMIT:

source
.groupBy(EntityEvent::getId)
.flatMap(group -> 
    group
    .takeUntil(Flowable.timer(10,TimeUnit.SECONDS))
    .takeUntil(this::shouldCloseBufferOnEvent)
    .toList())

Edit: added time condition.

Question:

This crashes with OOM:

Flowable.range(1, 5000)
        .map(__ -> new byte[1024 * 1024])
        .replay(
          fb ->
            fb.take(1)
              .concatMap(__ -> fb)
          ,1
        )
        .count()
        .toFlowable()
        .blockingSubscribe(c -> System.out.println("args = [" + c + "]"));

This is, I think because replay is holding to the emissions from upstream, even though I would have thought that the 1 buffer sizse hint would make it not to.... what am I missing?

This doesn't crash:

Flowable.range(1, 5000)
        .map(__ -> new byte[1024 * 1024])
        .publish(
          fb ->
            fb.take(1)
              .concatMap(first -> fb.startWith(first))
          ,1
        )
        .count()
        .toFlowable()
        .blockingSubscribe(c -> System.out.println("args = [" + c + "]"));

But I am not sure if I am guaranteed that I will get ALL the emissions from upstream like that...


Answer:

I've investigated this and found the cause of the issue: a bug in replay in RxJava 2.

What happens is that replay holds references to 2 subscribers, one for take and the other for the concatMap's inner consumer in a local variable, thus there is a GC root from the main thread to the defunct take still referencing the very first item. Since the bounded replay uses a linked list, this very first item then keeps referencing the newer and newer items via its "next" links and ends up exhausting the memory.

publish doesn't keep references to old values thus this is not an issue there.

Question:

I want to branch logic depending on number of emissions from upstream.

To be precise, I want:

  1. Nothing to happen when upstream is empty
  2. One branch to trigger when the upstream emits just one value and then completes
  3. One branch to trigger when the upstream emits more than one value and then completes.

I was scratching my head over how to approach this and I came up with something that works but seems awfully verbose. I am wondering if there's simpler way of doing this.

This solution is based on valve operator from RxJava2Extensions project.

The outline of the solution is as follows:

  1. Use publish(foo) to subscribe multiple times to the upstream
  2. Use merge for the two branches of logic
  3. For 'more than one emission logic' use initially closed valve and open it on second emission, break the valve if there were no or just one emission. By breaking the valve I mean terminate the controlling Publisher
  4. For 'just one emission logic' use initially closed valve. Use ambArray to either break the valve on no emissions or second emission or open the valve when there was exactly one emission.

So this seems to work, though my concerns are:

  1. It looks over engineered for what it's doing. Can this be coded up simpler and clener?
  2. The whole valve breaking business will trigger exception that I am just swallowing, but there could be other exceptions not valve related that I probably should distinguish here and let them propagate down the stream. [EDIT]The valve breaking is important, so that the valve for the single emission logic doesn't accumulate emissions that are meant for the multiple emissions logic and doesn't hog memory that way[/EDIT]

Here's the code:

Flowable.just(1,2,3,4,5) // +1 emissions
    //Flowable.just(1) // 1 emission
    //Flowable.empty() // 0 emissions
            .publish( //publish so that you get connectableFlowable inside
                f ->
                    Flowable.merge( //merge for the logic split
                        f.compose(
                            valve(f.scan(0, (sum, i) -> sum + 1) //scan to emit progressive count
                                   .filter(i -> i > 1) //filter for when count > 1
                                   .take(1) //take just first such count
                                   .concatMap(__ -> Flowable.<Boolean>never().startWith(true))  //and open the valve
                                   .switchIfEmpty(Flowable.empty()), //break the valve if there was just 1 element
                                  false) //start with the valve closed
                        )
                         .onErrorResumeNext(Flowable.empty()) //swallow the broken valve exception???
                         .map(__ -> "more than one elements!"), //here goes logic for +1 emissions
                        f.compose(
                            valve(
                                Flowable.ambArray(
                                    f.scan(0, (sum, i) -> sum + 1) //do progressive counts
                                     .switchIfEmpty(Flowable.never()) //if there was no elements then never end this guy
                                     .filter(i -> i > 1) //filter > 1
                                     .take(1) //take just first one
                                     .concatMap(
                                         __ -> Flowable.<Boolean>empty()) //if there was > 1 element then emit empty and break the valve so we
                                                                          //don't accumulate byte arrays that are meant for multipart upload
                                    ,
                                    f.count() //count the stream
                                     .map(c -> c == 1) //open valve if the count was 1
                                     .toFlowable()
                                     .concatWith(Flowable.never()) //and keep the stream opened forever
                                ),
                                false
                            )
                        )
                         .onErrorResumeNext(Flowable.empty())
                         .map(i -> "just one element") //here goes logic for just one emission
                    )
            )
            .doOnNext(i -> System.out.println("haya! " + i))
            .blockingSubscribe();
}

Answer:

As I suspected I made it too complex. I came with this way cleaner and simpler solution to the problem:

 public static <U, D> FlowableTransformer<U, D> singleMultipleBranching(
    FlowableTransformer<U, D> singleBranchTransformer,
    FlowableTransformer<U, D> manyBranchTransformer
)
{
    return
        fl ->
            fl.replay( //replay so that you get connectableFlowable inside
                       f -> f.buffer(2)
                             .take(1)
                             .switchMap(
                                 buf -> {
                                     switch (buf.size()) {
                                     case 1:
                                         return f.compose(
                                             singleBranchTransformer);
                                     case 2:
                                         return f.compose(
                                             manyBranchTransformer);
                                     default:
                                         return Flowable.empty();
                                     }
                                 }
                             )
            );

}