Hot questions for Using RxJava 2 in reactive

Question:

I'm developing an Android app using Clean Architecture and I'm migrating it to RxJava 2.x. I have to make some network requests to a soap service, so I defined the api interface in the domain module:

public interface SiginterApi {
    Observable<User> login(String user, String password);
    ...
    Observable<List<Campaign>> getCampaigns(List<Long> campaignIds);
}

I've read that a network request should be made with "Flowable", because of the backpressure management since it's a 'cold observable'. On the other hand, I know the result of the request will be success (with the response) or error, so I don't know if I should use Flowable or Single or even Observable.

Furthermore, I have a database accesses like this:

public interface UserRepository extends Repository {
    Observable<Void> saveUser(String username, String hashedPassword, boolean logged, User user);
    ...
    Observable<User> findUser(String username, String hashedPassword);
}

I don't know if I should use Completable/Flowable/Observable in saveUser method and Single/Flowable/Observable in findUser method.


Answer:

Backpressure is what you get when a source Observable is emitting items faster than a Subscriber can consume them. It's most often a concern with hot observables, not cold ones like your network requests.

I think you should use Completable instead of Observable<Void> in your saveUser method, and use Single for all places where you follow a request/response or input/output pattern. Observable should be used when you actually want a continuous stream of events.

Question:

I have Observable stream, and I want to convert it to Completable, how I could do that?


Answer:

The fluent way is to use Observable.ignoreElements().

Observable.just(1, 2, 3)
.ignoreElements()

Convert it back via toObservable if needed.

Question:

I would like to build a Repository class that returns a Single<Something>.

The class should first look in a Cache which returns Maybe<Something> and if the Maybe completes go off to my Service that returns Single<Something>

interface Cache {
    fun getSomething(): Maybe<Something>   
}

interface Service {
    fun getSomething(): Single<Something>   
}

class Repository (
    private val cache: Cache,
    private val service: Service
) {

    fun getSomething(): Single<Something> {
      return cache.getSomething()
               .????(feed.getSomething()) //onCompleteTransformToSingle() or similar
    }
}    

I have searched through the JavaDoc but it does not seem that a transformer for this scenario exists.

Is there a nice way of handling this?


Answer:

Here are two fairly simple options you could use. The first I find more explicit. The second has the benefit that you will call the network on any error in the cache.

These assume a Maybe is returned from the cache. For example if no value is found in the cache, return Maybe.empty()

1) If there is an empty in the stream, switchIfEmpty() will use the alternate observable which calls the network. If no empty is in the stream, the network will never be called.

override fun getSomething(): Single<Something> {
      return cache.getSomething()
        .switchIfEmpty(
          Maybe.defer {
            feed.getSomething().toMaybe()
          }
        )
        .toSingle()
  }

2) An empty maybe will return an error when cast toSingle(), triggering the network call.

override fun getSomething(): Single<Something> {
      return cache.getSomething()
        .toSingle()
        .onErrorResumeNext {
           feed.getSomething()
        }
  }

Question:

In RxJava is there anything equivalent to the Subject class that works for a Single? Presumably I'd call its onSuccess( item ) or onError( Throwable ) methods and the output would be forwarded to the SingleSubscriber.

I guess I could use an Observable Subject and transform it to a Single but that seems a bit clunky.

Currently using RxJava 1 but interested in the situation with RxJava 2 also.


Answer:

In RxJava 2 there is SingleSubject that you can use as follows:

SingleSubject<Integer> subject1 = SingleSubject.create();

TestObserver<Integer> to1 = subject1.test();

// SingleSubjects are empty by default
to1.assertEmpty();

subject1.onSuccess(1);

// onSuccess is a terminal event with SingleSubjects
// TestObserver converts onSuccess into onNext + onComplete
to1.assertResult(1);

TestObserver<Integer> to2 = subject1.test();

// late Observers receive the terminal signal (onSuccess) too
to2.assertResult(1);

Unfortunately there is no equivalent available in RxJava 1. However, as you have mentioned, you can achieve the desired result by calling subject.toSingle().

Question:

I wanted to use RxJava but can't come up with alternative for method

public final Observable<T> first(Func1<? super T,java.lang.Boolean> predicate)

in RxJava2.

What I want to do is following:

return io.reactivex.Observable
    .concat(source1, source2, source3, source4)
    .first(obj -> obj != null);

Parameters source1 to source4 are io.reactivex.Observable instances that I concatenate and I want resulting Observable to emit only the very first item that isn't null but this of course fails because io.reactivex.Observable doesn't have method first(Func1 predicate) like rx.Observable.

What are my options if I have any in RxJava2 or is it better to stick with RxJava1?


Answer:

Consider using Maybe instead of Observable of nullable type (it won't work with RxJava2). Then use Maybe.concat to get only emitted items as Flowable. And just get the first element with first to return Single (you have to specify a default item) or firstElement to return Maybe:

Maybe.concat(source1, source2, source3, source4)
    .firstElement()

Question:

I have a Observable I want to repeat periodically, but only under a condition:

apiInterface.getData() // returns Observable<Data>
... // processing is happening here
.toList()
.repeatWhen(completed -> {
    if (autoReload){
        // Repeat every 3 seconds
        return completed.delay(3, TimeUnit.SECONDS);
    } else {
        return ??? // What do I have to return that it does not repeat?
    }
})
.subscribe(list -> callbackInterface.success(list));

My question is: What do I have to return in the else statement to not repeat the Observable (just execute the chain once)?


Answer:

You have to react to the completion indicator by something that signals completion in response to an item, for example:

completed.takeWhile(v -> false);

Unfortunately, empty() doesn't work there because it immediately completes the sequence before the source could even run.

Question:

Let's say I have a event-emitting data source that I want to transform into reactive stream. Data source is bound by a resource (for example a socket that periodically sends updated state) so I would want to share single Subscription to that resource. Using single observable with replay (for new subscribers to immediately get current value value) and refCount operators seems to be well suited for that. For example this his how MyDataProvider singleton would look like:

private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
    // Open my resource here and emit data into observable
})
    .doOnDispose(() -> {
        // Close my resource here
    })
    .replay(1)
    .refCount();

public Observable<MyData> getMyDataObservable() {
    return myDataObservable;
}

However, now let's say I have another data source that needs result of the first data source to compute its own value:

private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
    .flatMap(myData -> {
        // Call another data source and return the result here
    })

public Observable<AnotherData> getAnotherDataObservable() {
    return anotherDataObservable;
}

Here my setup starts to fall apart. Multicasting of the first observable only works until refCount operator. After that, everything is unicast again. That would mean that if two separate subscriptions to anotherDataProvider are made, flatMap operator would be called twice. I see two workarounds for this, but I dislike both:

1. Transform first observable before multicast happens

Simplest workaround seems to be for me to save unicast variant of myDataObservable somewhere, before multicast operation is made and then perform that multicast operation in anotherDataObservable However if those two observables are located in diferent modules, this workaround would make the code very inelegant, requiring MyDataProvider to expose two different observables that seemingly return same data.

2. Just use duplicate multicast operators

Second workaround seems to be to just apply those replay and refCount operators again in anotherDataObservable. But this creates inefficiency since first multicast operator in myDataObservable is already applied, but now does nothing, except waste memory and CPU cycles.

Both workarounds also involve coupling of the AnotherDataProvider to the MyDataProvider. If in the future MyDataProvider changes and multicasting is no longer desired, I would also have to update AnotherDataProvider to remove multicasting operators from there.

What would be the more elegant way to resolve this problem? Could I have architectured that any better to avoid the issue altogether?


Answer:

About your first approach, in the current setup, your anotherDataObservable uses myDataObservable and as I understand they are logically coupled because they use the same source. So you would need to have some base shared logic for them. I would extract it to a common module, that will expose unicast version of the observable and then make myDataObservable and anotherDataObservable use it in different modules each adding multicast logic.

Another option would be to have a class that will monitor your resource by subscribing to it like in myDataObservable, doing the processing in onNext and publishing the mapped result with a Subject, i.e. BehavioralSubject if you want to always have access to last published value, and the raw result with another subject. The clients will subscribe to that subjects and will get the mapped or raw values that were calculated only once in the monitoring class.

P.S. remember to add backpressure strategy to your Subject before you subscribe to it.

If those options do not suit you, think about if it is really important to avoid calling flatMap multiple times? Your code is quite straightforward and it is an important metric. If flatMap is not heavy you can just have it run multiple times.

Question:

I am struggling to find any RxJava2 examples of zipping two Flowables into one.

I am trying to modify this test to include something along the lines of

    Integer[] ints = new Integer[count];
    Integer[] moreints = new Integer[count];
    Arrays.fill(ints, 777);
    Arrays.fill(moreints, 777);

    Flowable<Integer> source = Flowable.fromArray(ints);
    Flowable<Integer> anothersource = Flowable.fromArray(moreints);

    Flowable<Integer> zippedsources = Flowable.zip(source, anothersource,
            new BiFunction<Flowable<Integer>, Flowable<Integer>, Flowable<Integer>>() {

                @Override
                public void apply(Flowable<Integer> arg0, Flowable<Integer> arg1) throws Exception {
                    return arg0.blockingFirst() + arg1.blockingLast();
                }

    }).runOn(Schedulers.computation()).map(this).sequential();

Edit: I am trying to take an Integer from source and anothersource and add them up but it seems fundamentally different from the RxJava1 way of doing that ... I have tried a bunch of variations returning Integer, Publisher, Flowable and void but keep getting and error in Eclipse on the zip operator itself.

I am unable to figure out what goes where in .zip(Iterable<? extends Publisher<? extends T>>, Function<? super Object[], ? extends R>).


Answer:

Since you only need to zip two flowables, you could use Flowable.zipWith Operator.

The way it is used is as following:

source.zipWith(anotherSource, new BiFunction<Integer, Integer, Integer>() {
    @Override public Integer apply(Integer a, Integer b) {
        return a + b;
    } 
};

Question:

I'm having issues getting always the last value from combination obtained with a combineLatest operator.

I have 2 hot flowables (a, b) generating events at high frequency (an event every 100ms each):

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);

combined with a combineLatest, which takes almost 300ms to do it's job.

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,        OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
                System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
                System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
                Thread.sleep(300);
            }

After one execution of the combiner, I would like to process the very last combination of event generated, meaning (lastA, lastB).

The default behaviour of the combined flow is to cache all combination of events in its own buffer, so that the combined flow is receveing combinations that are very old and this time gap is exploding.

How should I change my code to disable this buffer and receive always the very last combinaton?


Answer:

You can apply onBackpressureLatest on both flowA and flowB and use the overload of combineLatest which let's you specify the prefetch amount.

Flowable.combineLatest(
    Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
    a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
    1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)

Unfortunately, there is no overload that takes both a BiFunction and a bufferSize thus you have to revert to casting array elements.

Edit

Applying a second onBackpressureLatest and limiting the buffer size on observeOn should get you closer to the desired pattern, although combineLatest is not aimed at this use case. You probably wanted some multi-sample operator.

Question:

Given two Maybe values, how can I combine them into a single Maybe that will either:

  1. call onSuccess() whenever either of the source Maybes calls onSuccess
  2. call onComplete() whenever both of the source Maybes call onComplete()?

(Cf. Option.orElse() in Scala or Vavr.)

E.g., assuming the existence of a combine() method that does what I want:

combine(Maybe.just(a), Maybe.empty())  ≍ Maybe.just(a)
combine(Maybe.empty(), Maybe.just(b))  ≍ Maybe.just(b)
combine(Maybe.empty(), Maybe.empty())  ≍ Maybe.empty()
combine(Maybe.never(), /*anything*/ )  ≍ /*the thing*/ 
combine(/*anything*/,  Maybe.never())  ≍ /*the thing*/ 

At first I thought amb() & family were what I was looking for, but that completes as soon as either source Maybe completes, meaning if the first Maybe completes without a value, you never get the value from the second Maybe.

Right now I'm using

Maybe.mergeArray(m1, m2).firstElement()

which seems to do what I want, but I’m not certain it’s correct and I’m not certain it’s the cleanest way to do it. (For instance, if there’s some delay, will it call onSuccess() immediately when one or the other source does, or will it wait for both onComplete()s?)

Is this correct? Is there a more idiomatic approach?


ETA: I'm happy taking the first value; I don't need to wait for both to complete:

combine(Maybe.just(a), Maybe.just(b))  ≍ Maybe.just(/* a or b, don't care */)

(I can imagine situations in which I might prefer one or the other and want to indicate that by order of the arguments, but in that situation I suspect sequential would be better than parallel.)


Answer:

There's a slightly different approach which might be a little nearer to your definition. This would be using Observable.switchMapMaybe():

Maps the upstream items into MaybeSources and switches (subscribes) to the newer ones while disposing the older ones (and ignoring their signals) and emits the latest success value of the current one if available while failing immediately if this Observable or any of the active inner MaybeSources fail.

Observable.just(m1, m2).switchMapMaybe(m -> m).firstElement()

But the approach using Maybe.mergeArray(m1, m2).firstElement() should be sufficient as well. The firstElement() operator emits the first element emitted by the mergeArray() flowable. This one is unordered and thus there's no information about the completion of any of the maybes.

Question:

I'm new to RxJava and I chose to use it because I thought it would be well suited to my use case.

I have some Integer values I want to observe over an infinite period of time. Whenever one of these values changes (i.e. an event) I want all of its observers to be called on another thread.

Because of the long observation time requirement I thought I needed to use the BehaviorSubject class (although initially I thought Observable was all that I needed .. seeing as I just need to 'observe'), and I could use the subscribeOn() method to set a scheduler and hence achieve calling the subscribers on a background thread:

private BehaviorSubject<Integer> rotationPositionSubject = BehaviorSubject.createDefault(getRotorPosition());
rotationPositionSubject.subscribeOn(scheduler);

And I have a rotate() method I used to update the rotationPositionSubject which will be called from the main thread:

@Override
public synchronized int rotate()
{
    final int newRotorPosition = super.rotate();
    rotationPositionSubject.onNext(newRotorPosition);

    return newRotorPosition;
}

However with the above code I found that the subscribers are called on the 'main' thread. Examining the docs for subscribeOn():

Returns:

the source ObservableSource modified so that its subscriptions happen on the specified Scheduler

So my above code won't work as I am not using the returned ObservableSource, but the return object is an Observable which is of no use for my application?

The question is then, how do I observe long-term any object and call subscribers on a background thread with RxJava, or is RxJava the wrong choice?


Answer:

After some experimentation it looks as though one needs some care when using BehaviorSubject objects and their use isn't quite as obvious as I inferred from the names of the various interfaces.

As a test method to demonstrate what I am currently doing:

@Test
public void test()
{
    System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
    final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
    rotorBehaviour.subscribeOn(Schedulers.single());
    rotorBehaviour.subscribe(new Observer<Integer>()
    {
        @Override
        public void onSubscribe(final Disposable d)
        {
            System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onNext(final Integer integer)
        {
            System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onError(final Throwable e)
        {
            System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onComplete()
        {
            System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
        }
    });


    rotorBehaviour.onNext(1);
    rotorBehaviour.onNext(2);

}

This results in the undesired result:

Executing test on thread ID: 1 onSubscribe() called on thread ID: 1 onNext() called on thread ID: 1 onNext() called on thread ID: 1

Process finished with exit code 0

(Undesired because onNext() is called on the main thread)

Modifying the code to use the Observable returned from the call to subscribeOn gives the same undesired result:

@Test
public void test()
{
    System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
    final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
    Observable<Integer> rotorObservable = rotorBehaviour.subscribeOn(Schedulers.single());

    rotorObservable.subscribe(new Observer<Integer>()
    {
        @Override
        public void onSubscribe(final Disposable d)
        {
            System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onNext(final Integer integer)
        {
            System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onError(final Throwable e)
        {
            System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onComplete()
        {
            System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
        }
    });
    rotorBehaviour.onNext(1);
    rotorBehaviour.onNext(2);
}

Result:

Executing test on thread ID: 1 onSubscribe() called on thread ID: 1 onNext() called on thread ID: 1 onNext() called on thread ID: 1

Process finished with exit code 0

However using the observeOn() method does give the desired result:

@Test
public void test()
{
    System.out.println("Executing test on thread ID: " + Thread.currentThread().getId());
    final BehaviorSubject<Integer> rotorBehaviour = BehaviorSubject.create();
    Observable<Integer>rotorObservable = rotorBehaviour.observeOn(Schedulers.single());

    rotorObservable.subscribe(new Observer<Integer>()
    {
        @Override
        public void onSubscribe(final Disposable d)
        {
            System.out.println("onSubscribe() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onNext(final Integer integer)
        {
            System.out.println("onNext() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onError(final Throwable e)
        {
            System.out.println("onError() called on thread ID: " + Thread.currentThread().getId());
        }

        @Override
        public void onComplete()
        {
            System.out.println("onComplete() called on thread ID: " + Thread.currentThread().getId());
        }
    });
    rotorBehaviour.onNext(1);
    rotorBehaviour.onNext(2);
}

Executing test on thread ID: 1 onSubscribe() called on thread ID: 1 onNext() called on thread ID: 13 onNext() called on thread ID: 13

Process finished with exit code 0

Also in all of the examples I am still using the BehaviorSubject object to initiate an event and I only found out by accident that this will give the desired result.

What concerns me is that I may be using Observable and BehaviorSubject in an incorrect way that just 'happens' to give me the correct result i.e. subscribers are called on the background thread. Unless I've missed it somewhere in the documentation it doesn't appear obvious how to use these objects to get the desired result.

Question:

I am using RxJava and I have an Observable with multiple items inside. What I would like to do is run function A on the first item, function B on all of them and function C when the Observable is completed:

-----1-----2-----3-----|-->
     |     |     |     |
     run A |     |     |
     |     |     |     |
     run B run B run B |
                       |
                       run C

is there a clever way of expressing this with lambda functions? I have the following solution already, but it looks ugly and I suspect that there is a better way to do this:

observable.subscribe(
        new Action1<Item>() {
            boolean first = true;

            @Override
            public void call(Item item) {
                if (first) {
                    runA(item);
                    first = false;
                }
                runB(fax1);
            }
        },
        throwable -> {},
        () -> runC());

Answer:

Use Observable.defer to encapsulate per subscription state (being a boolean that indicates if we are on the first record).

Here's a runnable class that demos use:

import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Observable<Integer> o = 
            Observable.just(1, 2, 3)
                .compose(doOnFirst(System.out::println);
        // will print 1
        o.subscribe();
        // will print 1
        o.subscribe();
    }

    public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
        return o -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return o.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    action.call(t);
                }
            });
        });
    }

}

Even though OP was asking about RxJava1, here's the same solution above but for RxJava2:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Flowable<Integer> f =
                Flowable.just(1, 2, 3)
                        .compose(doOnFirst(System.out::println);
        // will print 1
        f.subscribe();
        // will print 1
        f.subscribe();
    }

    public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return f -> Flowable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return f.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }
}

Question:

I have two Observables, let's call them PeanutButter and Jelly. I'd like to combine them to a Sandwich Observable. I can do that using:

Observable<PeanutButter> peanutButterObservable = ...;
Observable<Jelly> jellyObservable = ...;
Observable<Sandwich> sandwichObservable = Observable.combineLatest(
    peanutButterObservable,
    jellyObservable,
    (pb, j) -> makeSandwich(pb, j))

The problem is that RX waits for the first PeanutButter and the first Jelly to be emitted before emitting the first combined Sandwich but Jelly may never be emitted which means I never get the first Sandwich.

I'd like to combine the two feeds such that a combined item is emitted as soon as the first item from either feed is emitted, regardless of whether the other feed has yet to emit anything, how do I do that in RxJava?


Answer:

one possible approach would be to use the startWith operator to trigger an emission of a known value from each stream upon subscription. this way combineLatest() will trigger if either stream emits a value. you'd just have to be mindful of looking out for the initial/signal values in the onNext consumer.

something like this...:

@Test
public void sandwiches() {
    final Observable<String> peanutButters = Observable.just("chunky", "smooth")
        .startWith("--initial--");

    final Observable<String> jellies = Observable.just("strawberry", "blackberry", "raspberry")
        .startWith("--initial--");

    Observable.combineLatest(peanutButters, jellies, (peanutButter, jelly) -> {
        return new Pair<>(peanutButter, jelly);
    })
    .subscribe(
        next -> {
            final String peanutButter = next.getFirst();
            final String jelly = next.getSecond();

            if(peanutButter.equals("--initial--") && jelly.equals("--initial--")) {
                // initial emissions
            } else if(peanutButter.equals("--initial--")) {
                // jelly emission
            } else if(jelly.equals("--initial--")) {
                // peanut butter emission
            } else {
                // peanut butter + jelly emissions
            }
        },
        error -> {
            System.err.println("## onError(" + error.getMessage() + ")");
        },
        () -> {
            System.out.println("## onComplete()");
        }
    );
}

Question:

I have tried various operators for iterating objects using map, concatMap,all but i am not able to remove element from my list.

Here is a piece of code:

  Observable.fromIterable(selectedJobs)
            .observeOn(AndroidSchedulers.mainThread()) // Added this from one answer in SO. but still no resolution.
            .all(homeJob -> {

                if (!homeJob.isCanCloseJob()) {
                    selectedJobs.remove(homeJob); // <- this is what causing Exception
                    //toast message
                } else {
                    //do something
                }

                return true;
            })
            .subscribe(new SingleObserver<Boolean>() {
                @Override
                public void onSubscribe(Disposable disposable) {

                }

                @Override
                public void onSuccess(Boolean aBoolean) {

                    baseRealm.executeTransaction(realm -> realm.copyToRealmOrUpdate(selectedJobs));

                }

                @Override
                public void onError(Throwable throwable) {
                    AppLogger.e(tag, throwable.getMessage());
                 // throws Caused by: java.util.ConcurrentModificationException
                }
            });

All I want is to check for condition then remove object from list.


Answer:

In functional programming you are working with streams. Instead of removing an item from the initial input you have to filter the stream itself and pass the filtered list to the consumer.

Seems like this is what you are looking for:

Observable.fromIterable(listOfElements)
          .filter(element -> {
              if (element.isValid()) {
                  // do some action with valid `element`
                  // NOTE: this action would be performed with each valid element
                  return true;
              } else {
                  // `element` is not valid, perform appropriate action
                  // NOTE: this action would be performed for each invalid element
                  return false;
              }
          })
          .toList() // collect all the filtered elements into a List
          .subscribe(
                  filteredElements -> /* use `filteredElements` which is List<Element> */, 
                  throwable -> /* perform appropriate action with this `throwable`*/)
          );

Question:

I have an observable that emits a single object which has two fields, something like this:

public class Details {
    private Link link;
    private List<Comment> comments;
}

Observable<Details> detailsObservable = ...;

Now, I was wondering: can this observable be somehow split into two observables? What I'd like to do is to have a subscriber listen for the link and another one to listen for Comments.


Answer:

You can use map method to achieve that. Example code shared below, let me know if it helps.

public static void main(String[] args) {
    Observable<Details> dObs = Observable.just(new Details("a", "c"));
    Observable<String> lObs = dObs.map(d -> d.link);
    Observable<String> cObs = dObs.map(d -> d.comments);
    lObs.subscribe(s -> System.out.println(s));
    cObs.subscribe(s -> System.out.println(s));
}


static class Details {
    String link;
    String comments;

    public Details(String link, String comments) {
        this.link = link;
        this.comments = comments;
    }
}

Question:

I am stuck on how to convert/transform the following observable type to my target type:

I have observable of type:

Observable<Observable<List<FooBar>>>

I want to convert it to:

Observable<List<FooBar>>

So when I subscribe on it, it emits List<FooBar> not Observable<List<FooBar>>

I tried with map, flatMap... I could not find a solution.

However, I found a strange looking operator called blockingFirst which my IDE indicates that it returns Observable<List<FooBar>> when applied to Observable<Observable<List<FooBar>>>

But the 'blocking' part is confusing me.

I am also looking for better solution than blockingFirst one, if any.


Answer:

flatMap is indeed the way to go:

Observable<Observable<List<FooBar>>> streamOfStreams = ...
Observable<List<FooBar>> listStream = 
          streamOfStreams.flatMap(listObservable -> listObservable);

I think you should look at it in a different perspective, it is not simple converting from 1 type to another type. Observable of Observables means stream that emit streams that each of them emit a list of some items. what you want to achieve is to flatten it to single stream that emit all the lists from all the streams. flatMap doing it exactly, you give it an item emission, and return an Observable, flatMap will subscribe to the returned Observable and will merge each item emitted from it to the source stream, in this case, as you simply return each item emission which is Observable<List<FooBar>>, you practically taking each emitted Observable , subscribe to it, and merge all its list emissions back, so you get back stream of all the lists from all the Observables.

blockingFirst is definitely not the way to go, what it does is to wait (block) until first emission and return this item only, as your items ar Observable<List<FooBar>> you'll get the only first Observable. so while it indeed has the same type, its clearly not the same stream you want.

Question:

I want to call an API api1 and pass the output returned by the API to a second API api2. The first API is a get request and second API is a POST request. Therefore, the api1 returns a Single<String> and api2 returns a Completable.

The functions look something like this:

// api1
public Single<String> getToken() {
  ...
}
// api2
public Completable saveTokenToBackend(String token, String userId) {
 ...
}

I want to chain both of these operations together. The subscriber is only interested in knowing if the process of getting token and saving it was successful or not. Therefore, the return type of the final chain of operations should be a Completable. However, when i do this the API calls for api2 stop happening. Only api1 works successfully as per logs.

Single<Completable> r1 = getToken().map(t -> saveTokenToBackend(t, userId));
Completable r2 = Completable.fromSingle(r1);

My broader question here is that how do i chain a response from a Single to a Completable?

Second question is why the above code is not working?

::EDIT::

Based on suggestions in comments, I tried:

public Completable getAndSaveToken() {
    getToken().flatMapCompletable(t -> saveTokenToBackend(t, "dummyuser");
}

In my application code i am doing:

getAndSaveToken()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(() -> {
                Log.v(TAG, "call success");
    }, error -> {
                Log.e(TAG, "Error", error);
    });

Results in: java.io.IOException: Must not be called on the main application thread


Answer:

You have 2 different problems, composition and scheduling. Context: getToken return a single and saveToken(token) returns a completable.

composition: to compose a single and a completable, as you have already noticed, you can use the flatMap operator, this returns a new completable that first gets the token, and then save it (probably, you'll want to modify the token before saving it ;)

getToken().flatMapCompletable(n -> saveToken(n)) // returns a completable

If you want to keep it as a single, so you can map it back to the first instance:

getToken().flatMap(n -> saveToken(n).toSingleDefault(n)) // returns a single

scheduling: in android, you must not start a request in the main thread. To avoid it you can use the subscribeOn operator, also looks like you have already noticed that

getToken().subscribeOn(io()).doOnNext(n -> {/*this should be evaluated in io thread*/})
getToken().subscribeOn(io()).observeOn(mainThread()).doOnNext(n -> {/*this on mainThread*/})

If you still get the error, then the observeOn or subscribeOn has been re-configured somewhere. Need more code to be sure. But anyways, you can assert that both requests are executed in the io thread applying the operator twice:

getToken().subscribeOn(io).flatMapCompletable(token -> saveToken(token).subscribeOn(io()))

Another alternative, if you are using retrofit, is to apply the RxJava2CallAdapterFactory using the createWithScheduler(io()) factory, instead of the default one. This will assert that all requests are created in the io(), so you can combine and prepare the data, and finally apply the observeOn(mainThread()) to update the UI.

Question:

All of the examples I have read, talks about having a separate boundary condition in buffer and window I would want to have a custom boundary condition based on the items being emitted. For example:

Read items from a list, group them till the sum is less than 100. I have a list -

[10, 30, 40, 50, 20, 30, 50, 30, 80]

Keep on grouping the items till their sum is less than 100 and then emit. Output of above list shall be:

[10, 30, 40]
[50, 20, 30]
[50, 30]
[80]

Any help will be highly appreciated


Answer:

As proposed by akarnokd, you can use bufferWhile from RxJava2 Extensions.

To have a condition on sum, you can use a custom Predicate:

Flowable.just(10, 30, 40, 50, 20, 30, 50, 30, 80)
        .compose(FlowableTransformers.bufferWhile(new Predicate<Integer>() {
            private int sum = 0;

            @Override
            public boolean test(Integer next) {
                if (sum + next > 100) {
                    sum = next;
                    return false;
                } else {
                    sum += next;
                    return true;
                }
            }
        }))
        .subscribe(System.out::println);

The output is exactly the expected:

[10, 30, 40]
[50, 20, 30]
[50, 30]
[80]

Question:

The following code emits items from observable1 only after observable2 is completed.

observable1.startWith(observable2)
           .subscribe()

I need to achieve another behavior

observable1 ->       0   1   2   3
observable2 -> 1   2   3   4   5   6

observable1.startWithDefault(observable2)
            -> 1   2 0   1   2   3

The second observable emits items only while first observable is empty and then items from first one are emited.

I could not find correct solution using only basic operators, what is correct RxJava 2 implementation of custom operator startWithDefault should look like?

P.S.

observable1.subscribe()
observable2.takeUntil(observable1).subscribe()

is not correct solution because of race in case of immediate emit from observable1


Answer:

The direction was good, but you need publish(Function) to share observable1's signals and concatEager to not lose elements from it when the switch appens:

observable1.publish(o -> 
    Observable.concatEager(observable2.takeUntil(o), o)
)
.subscribe(...)

Question:

I have a list of elements and I want to process n elements at a time from that list. How can I do it in RX way?

I took a look at take operator but that only takes first n or last n elements. I need to process all elements in a list but n at a time.

Ideally I should get multiple lists of size n from a bigger list.


Answer:

You could use window() (or one of its overloaded variants):

observable.window(batchSize).subscribe(...)

Or, if you don't want to have to wait for the window to 'fill' then perhaps buffer() (or one of its overloaded variants):

observable.buffer(batchSize).subscribe(...)

Question:

I have the following code:

    final Observable<String> a = Observable.just("a1", "a2");   
    final Observable<String> b = Observable.just("b1");

    final Observable<String> c = Observable.combineLatest(a, b, (first, second) -> first + second);

    c.subscribe(res -> System.out.println(res));

What is expected output? I would have expected

a1b1
a2b1

But the actual output is

a2b1

Does that make sense? What is the correct operator the generate the expected sequence?


Answer:

As the name of the operator should imply, it combines the latest value of each source. If the sources are synchronous or really fast, this could mean that one or more sources will run to their completion and the operator will remember only the very last values of each. You have to interleave the source values by some means, such as having asynchronous sources with ample amount of time between items and avoid close overlapping of items of multiple sources.

The expected sequence can be generated a couple of ways, depending on what your original intention was. For example, if you wanted all cross combination, use flatMap:

a.flatMap(aValue -> b, (aValue, bValue) -> first + second)
.subscribe(System.out::println);

If b is something expensive to recreate, cache it:

Observable<String> cachedB = b.cache();
a.flatMap(aValue -> cachedB, (aValue, bValue) -> first + second)
.subscribe(System.out::println);

Question:

I have a method that returns a Single<List<Item>>, and I would like to take each item in this list and pass it downstream to a method that returns Completable. I want to wait until each item has successfully completed and return a Completable result. My initial approach was to process each item separately using flatMapIterable and combine the results using toList, but I cannot call toList on a Completable object. Is there any other way to "aggregate" many Completable tasks into a single Completable in this fashion? Here's what I have so far:

public Single<List<Item>> getListOfItems() {
    ...
}

public Completable doSomething(Item item) {
    ...
}

public Completable processItems() {
    return getListOfItems()     
        .toObservable()
        .flatMapIterable(items -> items)
        .flatMapCompletable(item -> doSomething(item))
        .toList()    // ERROR: No method .toList() for Completable
        .ignoreElements();
}

Answer:

The flatMapCompletable operator do the trick, you don't need to apply additional operators further.

From the docs:

Maps each element of the upstream Observable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete.

flatMapCompletable will return Completable that will complete when all mapped Completables complete their operation:

 public Completable processItems() {
    return getListOfItems()
            .toObservable()
            .flatMapIterable(items -> items)
            .flatMapCompletable(item -> doSomething(item));
}

Question:

Say I have those too classes

class Event {
  int id;
  String name;
  List<Integer> facilityIds;
  List<Facility> facilities; // Empty, the one I need to load
}

class Facility {
  int id;
  String name;
}

Goal: Print out every events with their facilities' name.

Constraints: RxJava2, Facilities can only be loaded one by one (getFacility(facilityId))

From an Observable<Event>, I cannot find my way around loading the facilities and set them back to their respective event.

Basically I was thinking about something like this:

Observable<Event> events;
events
  .map(Event::getFacilityIds)
  .flatMap(Observable::fromIterable)
  .map(facilityId -> service.getFacility(facilityId))
  . // somehow get the event reference and
    // so some event.addFacility() or something similar

Then I go blind and cannot find a way to link them back to the event. I also thought about using zip, that it could be a solution but I did not find a way to keep and event reference to later set the facilities to them.

What is the reactive way to go? Any hint would be greatly appreciated.


Answer:

What's about forEach?

events.forEach(event -> Observable.fromIterable(event.facilityIds)
            //or just .map(service::getFacility)
            .map(facilityId -> service.getFacility(facilityId)
            .forEach(facility -> event.facilities.add(facility)))

Or use doOnNext instead of a first forEach if you wanna to continue the stream. This would be executed synchronously on a thread of the previous operator.

If you getFacility takes valuable amount of time you may parallel retrieving with flatMap:

events.doOnNext(event -> Observable.fromIterable(event.facilityIds)
            .flatMap(facilityId -> 
                    Observable.fromCallable(() -> service.getFacility(facilityId))
                    .subscribeOn(Schedulers.computation()))
            .blockingSubsribe(facility -> event.facilities.add(facility)))

But in this case an order of resulted facilities doesn't guaranteed.

Question:

I study RxJava and try to understand how to implement non standard reactive "debouce" logic. Depend on message new operator must delay some kind of messages or skip it if another type of message arrives from observable.

Debouce only A-messages or forget about it if another message arrived

Please help me compose this logic.


Answer:

This requires a non-trivial combination of operators:

public static <T> ObservableTransformer<T, T> debounceOnly(
        Predicate<? super T> condition, long time, 
        TimeUnit unit, Scheduler scheduler) {
    return o -> o.publish(f ->
        f.concatMapEager(v -> {
            if (condition.test(v)) {
                return Observable.just(v).delay(time, unit, scheduler).takeUntil(f);
            }
            return Observable.just(v);
        })
    );
}


@Test
public void test() {
    PublishSubject<String> subject = PublishSubject.create();

    TestScheduler sch = new TestScheduler();

    subject
    .compose(debounceOnly(v -> v.startsWith("A"), 
         100, TimeUnit.MILLISECONDS, sch))
    .subscribe(System.out::println, Throwable::printStackTrace, 
         () -> System.out.println("Done"));

    subject.onNext("A1");

    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onNext("B1");
    sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    subject.onNext("C1");
    sch.advanceTimeBy(1, TimeUnit.MILLISECONDS);

    subject.onNext("A2");
    sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

    subject.onNext("A3");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onNext("A4");
    sch.advanceTimeBy(50, TimeUnit.MILLISECONDS);

    subject.onNext("B2");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onNext("C2");
    sch.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    subject.onComplete();
}

Question:

Is there any difference between concat and concatArray in RxJava2 except of the number of ObservableSources?

Observable.concat(observable1, observable2)

Observable.concatArray(observable1, observable2, observable3, observable4)

The maximum number of ObservableSources in

concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> sourc4)

is 4, but in

concatArray(ObservableSource<? extends T>... sources)

Is this the only difference between those two?


Answer:

There is only one small difference between the two: using concatArray will likely result in a "heap pollution" warning and requires the use of @SuppressWarnings("unchecekd") because of the varargs. Being only a Java 6 level library, we can't apply @SafeVarargs. The 2-4 argument overloads help avoid the warning.

Question:

I am using RxJava Version 2.0.1. Using Android Studio 3.0 Canary 6.

I have some code like this

private void subscribeToObservable(Observable<List<CalendarDto>> observable) {
    DisposableObserver<List<CalendarDto>> d = getDisposableimportantDaysObserver();
    observable.subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(d);
    disposables.add(d);
}

But the problem is My app crashes whenever I run this code in android device with API 19( Kitkat, have not checked lower than that) , but it runs fine in my android 7.0 emulator. The Error log

java.lang.NoClassDefFoundError: io.reactivex.Flowable at io.reactivex.Observable.bufferSize(Observable.java:126) at io.reactivex.Observable.observeOn(Observable.java:8412)

This is not because of my proguard as I deleted all the code in the proguard config file but the issue was still there.

Does anyone have Idea what might be going wrong ?


Answer:

java.lang.NoClassDefFoundError: io.reactivex.Flowable at io.reactivex.Observable.bufferSize(Observable.java:126) at io.reactivex.Observable.observeOn(Observable.java:8412)

Don't

 .observeOn(AndroidSchedulers.mainThread())

Do

.observeOn(AndroidSchedulers.mainThread(),false,100) // Add SIZE

Question:

I'm tring to use rxjava with retrofit in android studio, but when I add observable in compositedisposable, I get this error:

cannot resolve method 'subscribeon(io.reactivex.scheduler)'

In fact, I am using this sample code

       @POST("/auth/register")
       Observable<Response>register(@Body User user);

Code

 private CompositeDisposable comdisposables= new CompositeDisposable();
 LoginRetrofitInterface requestInterface = new Retrofit.Builder()
    .baseUrl(Constants.BASE_URL)
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
    .addConverterFactory(GsonConverterFactory.create())
    .build().create(LoginRetrofitInterface.class);

     comdisposables.add(requestInterface.register(user)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse,this::handleError));

and dependencies

implementation 'com.google.code.gson:gson:2.6.1'

 implementation 'com.squareup.retrofit2:retrofit:2.5.0'

 implementation 'com.squareup.retrofit2:converter-gson:2.5.0'

//implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:2.1.0'

// com.jakewharton.retrofit:retrofit2-rxjava2-adapter:2.2.0

 implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

 // RxJava

 implementation 'io.reactivex.rxjava2:rxjava:2.2.8'

 implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

Answer:

tnx to @EpicPandaForce for his/her helpful comments , error :cannot resolve method 'subscribeon(io.reactivex.scheduler)' solved.

android.database.Observable was imported instead of io.reactivex.Observable.

Question:

I am working on maven app where the version of Rx Library i'm using here is 2.1.7. I have following code

Observable.just(listData).flatMap(data -> data)

The purpose of this trivial code is to iterate through listData object which is data. The problem is the data type is still List hence i can not access the attribute of data using

flatMap(data -> data.getAttribute)

In simple way, i just wanna this code

Observable.just(listData).flatMap(data -> data)

works just as Java8 Lambda Map like this

listData.map(data -> data.getAttribute)

I'm just exploring this library, so i have limited knowledge to this. Also some code on youtube and blogs just didn't help, i think it's because the version of their's different to mine. Any comments would be helpful, please help.


Answer:

Do you want Observable.fromIterable(list)?

Question:

Asking a very basic RxJava question here because I couldn't find it anywhere else.

I'm having this Observable in RxJava 2 -

Observable<String> database = Observable.just("1", "2", "3");

It works fine. No problems.

But when I try to pass an array like this -

arr = new String[]{"1", "2", "3"};
Observable<String> database = Observable.just(arr);

It throws an error for incompatible types.

  1. Isn't the second declaration same thing as the first declaration? If not, why?

  2. I need a way to emit a predefined Array and on onNext of the Observer, I should be getting the individual items of the Array. How to achieve that?


Answer:

Isn't the second declaration same thing as the first declaration? If not, why?

No. The type system of Java makes a distinction between plain types and an array of those types:

String s = new String("whatever")

String z = new String[0]; // <--------- compile error

A String[] is not a single String type.

The method just is defined as follows:

Observable<T> just(T item);

If we substitute T = String, you'll get a signature of Observable<String> just(String item).

If we substitute T = String[], what do we get? Observable<String[]> just(String[] item).

I need a way to emit a predefined Array and on onNext of the Observer, I should be getting the individual items of the Array. How to achieve that?

Use fromArray as it is defined as Observable<T> fromArray(T[] array):

Observable<String> database = Observable.fromArray(arr);

Question:

I've been trying to learn RxJava2 and I've been struggling with this one..

So, I have a structure that represents an events that goes something like the following:

class Event{
    public Date when;
    public String eventName;
}

And somewhere I query a list of events from the repository that I want to group by date.

So, given a list of events like:

  • Event1 at June
  • Event2 at June
  • Event3 at July
  • Event4 at August
  • Event5 at August

I want to group them so that

  • June
    • Event1
    • Event2
  • July
    • Event3
  • August
    • Event4
    • Event5

What I have so far is, in my opinion, very ugly and I am pretty sure I am over-"engineering" this...

repository.getAllEvents()
            .toObservable()
            .flatMap(new Function<Events, Observable<Event>>() {
                @Override
                public Observable<Event> apply(@NonNull Events events) throws Exception {
                    return Observable.fromIterable(events.getEvents());
                }
            })
            .groupBy(new Function<Event, Date>() {
                @Override
                public Date apply(@NonNull Event event) throws Exception {
                    return event.when;
                }
            })
    .flatMap(new Function<GroupedObservable<Date, Event>, Observable<Object>>() {
        @Override
        public Observable<Object> apply(@NonNull GroupedObservable<Date, Event> dateEventGroupedObservable) throws Exception {
            final Date key = dateEventGroupedObservable.getKey();
            return dateEventGroupedObservable.toList().toObservable().flatMap(new Function<List<Event>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull List<Event> events) throws Exception {
                    return Observable.just(new Pair<Date, List<Event>>(key, events));
                }
            });
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Object o) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

So far, this gives me an observable that delivers a Pair> but as you can see it gets converted to Object and I honestly can't make sense out of the generics hell -.-'

Any tips on how I could approach this?

Thanks


Answer:

You can achieve this simply by using collect operator:

repository.getAllEvents()
    .flatMapIterable(events -> events.getEvents())
    .collect(() -> new HashMap<Date, List<Event>>(), 
             (map, event) -> putEventIntoMap(map, event)
    )
    ...

Without lambdas:

// I assume that getAllEvents returns Events class
repository.getAllEvents()
    .flatMapIterable(new Function<Events, Iterable<? extends Event>>() {
        @Override
        public Iterable<? extends Event> apply(@NonNull Events events) throws Exception {
           return events.getEvents();
        }
    })
    .collect(new Callable<HashMap<Date, List<Event>>>() {
        @Override
        public HashMap<Date, List<Event>> call() throws Exception {
            return new HashMap<Date, List<Event>>();
        }}, new BiConsumer<HashMap<Date, List<Event>>, Event>() {
        @Override
        public void accept(@NonNull HashMap<Date, List<Event>> map, @NonNull Event event) throws Exception {
            putEventIntoMap(map, event);
        }}
    )
    ...

Method to put event into map:

private void putEventIntoMap(HashMap<Date, List<Event>> map, Event event) {
    if (map.containsKey(event.when)) {
        map.get(event.when).add(event);
    } else {
        List<Event> list = new ArrayList<>();
        list.add(event);
        map.put(event.when, list);
    }
}

Question:

It's confusing enough that the .subscribe() method returns void if you pass it an Observer, but Disposable when passed anything else. I realize this has to do with the Reactive-Streams spec, but still…

Observer provides the .onSubscribe( Disposable ) method, but as I read the ReactiveX Observable contract, this method may or may not be called when the Observer subscribes. Is this true for RxJava2? [It seems that it is only required to be called by Flowable, which uses it to notify the Subscriber that it's ready to accept requests.]

I've read that .subscribeWith( Observer ) somehow addresses this issue but I'm having trouble seeing how. Evidently you can pass DisposableObserver, which implements Disposable, but what exactly is the .dispose() method supposed to do?


Answer:

this method may or may not be called when the Observer subscribes. Is this true for RxJava2?

The protocol definitions in each RxJava base class are quite clear:

Flowable via Publisher:

onSubscribe onNext* (onError | onComplete)?

Observable:

onSubscribe onNext* (onError | onComplete)?

Single:

onSubscribe (onSuccess | onError)?

Maybe:

onSubscribe (onSuccess | onError | onComplete)?

Completable:

onSubscribe (onError | onComplete)?

onSubscribe is mandatory, even in never().

I've read that .subscribeWith( Observer ) somehow addresses this issue

The definition is S subscribeWith(S observer) where S extends Observer<? super T>. It simply returns the observer or subclass of an observer provided to it.

but what exactly is the .dispose() method supposed to do?

Disposes the Disposable sent through Observer.onSubscribe in a thread-safe manner. In alignment, the DisposableSubscriber cancels the Subscription received through Subscriber.onSubscribe.

Question:

I have N Single sources and I want to concat these and get the latest successful Single. For ex:

DB Single:

--[A]--|->

API Single:

---X--->

Single.concat(DB, API):

--[A]--|->

Otherwise:

DB Single:

--[A]--|->

API Single:

--[B]--|->

Single.concat(DB, API):

--[B]--|->

Is that possible? I read the documentation but I didn't find anything like "lastSuccessfullOrError()" method. I tried "elementAt","lastOrError" and others but their behavior is not what I'm looking for

Thank you


Answer:

This can be achieved as follows:

public static <T> Single<T> latestSuccess(Single<T>... sources) {
     return Single.defer(() -> {
         AtomicReference<T> last = new AtomicReference<T>();
         return Observable.fromArray(sources)
             .concatMap(source ->
                  source.doOnSuccess(last::lazySet)
                  .toObservable()
                  .onErrorResumeNext(Observable.empty())
             )
             .ignoreElements()
             .andThen(Single.fromCallable(() -> {
                 if (last.get() == null) {
                     throw new NoSuchElementException();
                 }
                 return last.get();
             }));
     });
}

Question:

I have built some basic filter with rxjava2 which works as expected. I was wondering how I could pass values/pass arguments to the filter (return td.getTypeId() == **<value>**;). Also if somebody has ideas/clues/examples on how to build a (semi-)dynamic query (Object.<field> == <value>) with rxjava2 / filter that would be appreciated.

Predicate<TradeDetailed> testfilter;

Flowable<List<TradeDetailed>> td = tr.getTradesDetailedFlowable();

testfilter = new Predicate<TradeDetailed>() {
    @Override
    public boolean test(@NonNull TradeDetailed td) throws Exception {
        return td.getTypeId() == 0;
    }
};

Disposable d = td
        .flatMapIterable(e -> e)
        .filter(e-> testfilter.test(e))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(t -> {
                        System.out.println("filtered "+t.getReference()));

                    },
                    err -> {
                        System.out.println("error");
                    }
        );

Answer:

you can create a separate class for the Predicate:

class YourPredicate implements Predicate<TradeDetailed>(){

    private final int compareAgainst;

    public YourPredicate(int compareAgainst){
        this.compareAgainst = compareAgainst;
    }

    @Override
    public boolean test(@NonNull TradeDetailed td) throws Exception {
        return td.getTypeId() == compareAgainst;
    }

}

Question:

Suppose we have a set of flaky (sometimes failing) parsers that may or may not be able to handle a given file i.e. a given parser either succeeds with some probability p > 0, or fails always (p=0). Is it possible to use RxJava to have this set of parsers subscribe to a stream of incoming files and 'race' to parse the file?

Given that it is possible for the parser to fail initially but still be able to parse the file, it is necessary to have them retry with some backoff policy. Given that it is also possible for no parser to be able to handle a given file, the retry count should be capped.

Implementing exponential backoff is relatively easy to implement using retryWhen with something like this (source):

source.retryWhen(errors ->
                errors.zipWith(Observable.range(1, 3), (n, i) -> i)
                      .flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);

However, setting up a parallel race is something I cannot figure out how to do. It seems like the amb operator is what we want here, but applying it to an arbitrary number of streams seems to require using blockingIterable, which (I think) defeats the purpose of the race as it blocks. I have been unable to find anything useful relating to this use case of amb on the internet.

My attempts thus far resemble something like this:

Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers 
int numParsers = parserSet.size();

Flowable<Parser> parsers = Flowable.fromIterable(parserSet).repeat();

fileSource
    .flatMap(f -> parsers.take(numParsers)
                         .map(p -> p.parse(f))
                         .retryWhen(/* snippet from above */)
                         .onErrorReturn(/* some error value */)
             ).take(1) 

Flowable introduced the .parallel() operator which just recently got the addition of ParallelFailureHandling (see this pr) which has a RETRY method, but I can't seem to get the flowables to stop retrying after one of them has returned.

Is this problem solvable with RxJava?


Answer:

Making the reasonable assumption that your parsers are synchronous, something like

Set<Parser> parserSet = new HashSet<>();
parserSet.add(new Parser(..., ..., ...));
// Add more parsers 
int numParsers = parserSet.size();

ArrayList<Flowable<T>> parserObservableList = new ArrayList<>();

for (Parser p: parserSet) {
    parserObservableList.add(Flowable.fromCallable(() -> p.parse(f))
                                     .retryWhen(/* Add your retry logic */)
                                     .onErrorReturn(/* some error value */));
}

Flowable.amb(parserObservableList).subscribe(/* do what you want with the results */);

should meet your requirements.

Question:

I'm trying to unsubscribe after receiving the first item from an observable. And it seems to not work. What am I doing wrong?

    public class ObservableAndSubscriber {

    public static void main(final String[] args) {
        final Observable<String> strObservable = Observable.create(s -> {
            while (true) {
                s.onNext("Hello World!!");
            }
        });

        final Subscriber<String> strSubscriber = new Subscriber<String>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(final Throwable e) {
                e.printStackTrace();

            }

            @Override
            public void onNext(final String t) {
                System.out.println(t);
                this.unsubscribe();

            }
        };
        strObservable.subscribe(strSubscriber);
    }
}

The result seems to print "Hello World" in an infinite loop.


Answer:

For unsubscribe to work you need to check subscription status in your loop. Otherwise it will run infinitely draining your CPU.

The easiest way to deal with infinite sequencies is to utilize library provided methods Observable.generate() or Observable.fromIterable(). They do proper checking for you.

Also make sure you subscribe and observe on different threads. Otherwise you will only serve single subscriber.

Your example:

strObservable = Observable.generate(g -> g.onNext("Hello!"));
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);

Question:

I have two PublishSubjects that model a queue where I receive and push jobs to.

I want to be able to react to jobs that have been consumed from the first PublishSubject, but not put in the second PublishSubject in a given time window (e.g. 10s):

final Subject<Job> queue = PublishSubject.<Job>create().toSerialized();
final Subject<Job> done = PublishSubject.<Job>create().toSerialized();
// this is probably wrong already since I am consuming items from queue
queue.subscribe(done::onNext);

final Observable<Job> timeOut = queue.timeout(10, SECONDS, Observable.empty()); // ??

Answer:

You haven't described what you actually want to do after 10 seconds pass. Do you want to re-add that job to queue again? Do you want to just get information if processing takes more than 10 secs, or do you want to skip that job if it takes too long?

If you want to skip processing that job and resume with some new observable you can flatMap each element of queue and timeout them separately.

Observable<Job> observableThatWillTriggerOnTimeout = ...
queue.flatMap(job -> dispatchJob(job)
                    .timeout(10, TimeUnit.SECONDS, observableThatWillTriggerOnTimeout)
             )

Tell more about your scenario, I will update my answer if necessary

Question:

I want to implement some code that detect any file change (txt, doc, pdf etc.) to trigger some task with RXJava. Is it possible? Any suggestion? Thanks in advance.


Answer:

Of course it's possible. I'd do that in the following way:

  • Use FilteAlterationObserver from the Apache Commons I/O library
  • Create new observer basing on the documentation linked above (you can filter a concrete file if you don't want to monitor whole directory)
  • In the observer you can use FileAlterationListener, which has methods like onFileChange(File) required for this task
  • Create a method returning RxJava Flowable (backpressure-aware) or Observable type, which wraps listener created above - this will require some RxJava knowledge - you'll need to invoke onNext(...) method on the Emitter inside the Flowable implementation and define the type, you need to return within the Flowable - e.g. Flowable<FileEvent> where FileEvent have to be defined

Question:

I have the following 3 observables:

Observable<List<Action>> actions

Observable<List<Type>> types

Observable<List<Unit>> units

I want to observe all emissions from all observables and put them in a Hash Map:

From first observable

hashMap.put("Actions", actions);

From second observable

hashMap.put("Types", types);

From third observable

hashMap.put("Units", units);

Then I want to emit the hashMap once all observables are completed.

Note:

  • The hash map can be HashMap<String, List<Object>>

  • It is OK to use instanceof

  • I want to use current operators (no custom ones)


Answer:

Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

You are looking for the zip operator. Zip actions, types and units Observables and create your Hash Map in the combine function. The result will be an Observable of Hash Map.

Question:

The code below is simply checking null control and make mView.handleUrl(data) method work if all conditions are provided.

if( intent != null && intent.getExtras() != null ){
        String data = null;
        if( ( data = intent.getExtras().getString( AdvImagePagerFragment.ARG_PANAROMIC_IMAGE_DATA ) ) != null ){
            mView.handleUrl( data );
        }
    }

When I use code with RxJava I write code below.

 Single.just( intent )
                .filter( intent1 -> intent1 != null && intent1.getExtras() != null )
                .map( intent12 -> intent12.getExtras() )
                .filter( bundle -> bundle.getString( AdvImagePagerFragment.ARG_PANAROMIC_IMAGE_DATA  ) != null )
                .map( bundle -> bundle.getString( AdvImagePagerFragment.ARG_PANAROMIC_IMAGE_DATA ) )
                .subscribe( s -> mView.handleUrl( s ) );

Both code I write are working same. However the problem is starting when I try to add new condition to my code. For example, I can want to make some new operatins if the intent is null.

Example code;

if( intent != null && intent.getExtras() != null ){
        String data = null;
        if( ( data = intent.getExtras().getString( AdvImagePagerFragment.ARG_PANAROMIC_IMAGE_DATA ) ) != null ){
            mView.handleUrl( data );
        } else { // START Added a new condition
           mView.showError();
        }
    }

But I cannot write this code by using RxJava filter methods. As soon as filter return false, the operation is being completed. What is the solution in RxJava to add new conditions.


Answer:

As a subscriber to that stream you are interested for 2 cases: if data is null and if data is not null. That means, that you cannot filter out the stream just like you have done previously.


    Single.just(sourceIntent)
            .map(intent -> {
                String data = "";
                Bundle bundle = intent.getExtras();
                if (bundle != null) {
                    data = bundle.getString(KEY, "");
                }
                return data;
            })
            .subscribe(data -> {
                if ("".equals(data)) view.showError();
                else view.handleUrl(data);
            });

Question:

I've got multiple hot observables which may or may not emit items. As a result, I want to combine the observables and then handle the result if any of them emits a result, but if the other observables emits at item, they should be handled together.

eg.

observable1 = PublishSubject<>()  
observable2 = PublishSubject<>()

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)

Should emit:

(1, null) 
(2, null)
(2, "Test")
(3, "Test")

It's also possible that observable2 gets emitted before observable1

CombineLatest is the closest to what I need but that only emits a result when all the observables emit at least one item. Is there a reactive operator for this?


Answer:

You could use startWith with each source to provide an initial value, or use BehaviorSubject with an initial value, then apply combineLatest on these augmented Observables. However, null is not allowed in RxJava 2 so you have to find a neutral value in your observable element type.

PublishSubject<Integer> observable1 = PublishSubject.create()  
PublishSubject<String>  observable2 = PublishSubject.create()

Observable.combineLates(
    observable1.startWith(-100000),
    observable2.startWith(""),
    (a, b) -> a + b
)
.subscribe(System.out::println)
;

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)

or

BehaviorSubject<Integer> observable1 = BehaviorSubject.createDefault(-10000)  
BehaviorSubject<String>  observable2 = BehaviorSubject.createDefault("")

Observable.combineLates(
    observable1,
    observable2,
    (a, b) -> a + b
)
.subscribe(System.out::println)
;

observable1.onNext(1)  
observable1.onNext(2)  
observable2.onNext("Test")  
observable1.onNext(3)

Question:

I have stupid problem with RxJava2.

I need to run two long operations at the same time. I know that I should use Observable.zip() and I use it.

The problem, that my long operations is run one after another and another problem that my long operations starting before I subscribe to them.

Let's imagine that this is my long operation that I should run async.

private String doSomethingLong() {
        Random rand = new Random();
        int value = rand.nextInt(5);
        Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());
        try {
            Thread.sleep(value * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());
        }
        return String.format(Locale.getDefault(),"Job for [%d] seconds", value);
    }

And let there is a method like test() that will try to make it parallel:

public void test() {

        final long started = System.currentTimeMillis();
        Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
        Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());


        Observable.zip(just1, just2, new Func2<String, String, Combined>() {
            @Override
            public Combined call(String s, String s2) {
                return new Combined(s, s2);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Combined combined) {
                long total = System.currentTimeMillis() - started;
                Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
            }
        });

    }

When I'm trying to run this I observe that two observables just1 and just2 runs one after another... And it's confused me...

But there is another staff that confused me more... I commented Observable.zip and noticed that just1 and just2 started method doSomethingLong() before I subscribed to them...

Let me show:

public void test() {

        final long started = System.currentTimeMillis();
        Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
        Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());


//        Observable.zip(just1, just2, new Func2<String, String, Combined>() {
//            @Override
//            public Combined call(String s, String s2) {
//                return new Combined(s, s2);
//            }
//        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
//            @Override
//            public void onCompleted() {
//
//            }
//
//            @Override
//            public void onError(Throwable e) {
//
//            }
//
//            @Override
//            public void onNext(Combined combined) {
//                long total = System.currentTimeMillis() - started;
//                Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
//            }
//        });

    }

This code make almost same - it's run two times doSomethingLong() one after another...

What I'm expect: 1. I need that doSomethingLong() methods run parallel 2. I'm asking to explain why those methods runs before I start subscribe them. 3. How should I write me code well in this situation. I want that doSomethingLong() methods do not called before I subscribe to them.

Thanks a lot. Hope that I explain problem well.


Answer:

Observable.just doesn't run anything when you subscribe. It emits the elements when you subscribe, but your doSomethingLong will run as soon as you pass it as an argument. That's normal and it's how the language works.

What you're looking for is a way to say return this when we subscribe, but also only run it at that time and hopefully on a background thread.

There are a couple of answers for this, here are some:

Using defer

There's an operator called defer which takes a lambda which will be executed once you subscribe:

Observable.defer(() ->  doSomethingLong())

This will only execute doSomethingLong when you subscribe

Using fromCallable

You can create an observable from a lambda. This is known as fromCallable:

Observable.fromCallable(() -> doSomethingLong())

Similarly, this will only run doSomethingLong when you subscribe

Using create

I think this is perhaps the most discouraged way of doing it, since there's a couple of things you have to deal with, but I think for the she of completeness it's ok to mention:

Observable.create( emitter -> {
    if(emitter.isDisposed()) return;

    emitter.onNext(doSomethingLong());
    emitter.onComplete();
});

Again, I'm sure there's more ways of doing this. I just wanted to explain the issue and give some options.

Question:

I'm using a RxJava2 Observable to repeat an operation every second and later post the results to a TextView in my Android layout. The code I have right now (below) is working great, but I would rather have it execute with no initial delay.

Disposable disposable = Observable.interval(1000, java.util.concurrent.TimeUnit.MILLISECONDS)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(aLong -> myMethod());

I've noticed that others using RxJS have used timer() instead of interval() to achieve 0 delay. Is there any way to do this with RxJava2?


Answer:

There is a Observable.timer(long delay, TimeUnit unit, Scheduler scheduler) in RxJava 2. You can try using that.

Update 1: For an interval that starts with an initial delay and then emits on a regular interval, you can use Observable.interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

Question:

I'm new to RxJava, I know flatmaps are for mapping an emitted item to observable. I also know that based on the documentation the emitted observables all get combined (flatten) to a single observable stream.

I was wondering what happens if any of those inner observables get completed?

for example: I have an observable that emits a item data key. I have to make another async http call to get the item data from the server, so I call it by using another observable. I use a flat map to connect these two and create one main observable.

When does the run() method of following "SomeMethodThatWantsItems" get called?

public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine)
{
    Consumer<Item> onNextConsumer = 
    Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word");
    searchObservable
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Consumer<Item>(){
                           @Override
                           public void accept(@NonNull Item item) throws Exception {
                               //Do stuff with the item
                           }
                       }
                , new Consumer<Exception>() { //some implementation of onErrorConsumer
                    }
                 //OnComplete
                , new Action(){

                        @Override
                        public void run() throws Exception {
                            //When does this get called??? after the search complete or when the first http call is successful? 
                        }
                    });

}

private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord)
{
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception {

            //Assume that our search engine call onFind everytime it finds something
            searchEngine.addSearchListener(new searchEngineResultListener(){
                @Override
                public void onFind(String foundItemKey){
                    emitter.onNext(foundItemKey);
                }

                @Override
                public void onFinishedFindingResults(){
                    emitter.onComplete();
                }
            });

        }
    });
}

private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key)
{

    return Observable.create(new ObservableOnSubscribe<Item>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception {

            //Call the server to get the item
            httpCaller.call(key, new onCompleteListener(){
                @Override
                public void onCompletedCall(Item result)
                {
                    emitter.onNext(result);
                    //The result is complete! end the stream
                    emitter.onComplete();
                }
            });
        }
    });
}

public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){
    //Where everything comes together
    Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord);
    retuern searchResultObservable
            .observeOn(Schedulers.newThread())
            .flatMap(new Function<String, Observable<Item>>(){
                @Override
                public Observable<Item> apply(String key){
                    return getItemByKey(httpCaller, key);
                }
            });
}

Answer:

The onComplete() always get call once and then the streams stops. (this is part of the Observable Contract). That means that in your case, your onComplete() at SomeMethodThatWantsItems will be called after all items were retrieved. In case of flatMap(), completion of each inner Observable, simply will signal the source Observable to stop flatting item from the inner Observable to the source Observable, flatMap() merges items from the inner Observable as long as this stream sends items, so it's basically consume the entire inner Observable stream into the source stream, the entire stream is until termination event3 like onComplete(), so in case where inner Observable can emit more than 1 item, that means that it will make more than 1 emission on the source stream.

Question:

Given two methods, both which returns a Single, what is the correct way, using Rx, to chain the two method calls together so that one method is called first, and the second once, and only if, the first completes successfully.

Ideally, the second method will be able to access the value returned by the first.


Answer:

Assuming your methods are like this:

static Single<String> method1() {
  return Single.just("x");
}

static Single<String> method2(String in) {
  return Single.just(in+"y");
}

the chaining will simply be:

Single<String> result = method1().flatMap(v -> method2(v));

Question:

Say I have an expensive calculation that creates an object. I want to give the caller some flexibility as to where that happens, with subscribeOn(). But I also don't want to make that calculation more than once, because of side effects (e.g. the object is backed by some external data store).

I can write

MyObject myObject = MyObject.createExpensively(params);
return Single.just(myObject);

but this does the expensive work on the calling thread.

I can write

Callable<MyObject> callable = () -> MyObject.createExpensively(params);
return Single.fromCallable(callable);

but this will invoke createExpensively() (with side effects) once per subscription, which isn't what I want if there are multiple subscribers.

If I want to ensure that createExpensively() is only called once, and its side effects only occur once, what's the pattern I'm looking for here?


Answer:

You could use Single.cache():

Single.fromCallable(() -> MyObject.createExpensively(params)).cache();

Question:

I have this MyCollectionInteractor that loads all CollectionItemVO from a firebase database:

public interface MyCollectionInteractor extends BaseInteractor{
    Single<List<CollectionItemVO>> load ();
}

CollectionItemVO is:

public class CollectionItemVO {
    String beerId;
    long timestamp;
    int quantity;

    public CollectionItemVO() {
    }


    public CollectionItemVO(String beerId, long timestamp, int quantity) {
        this.beerId = beerId;
        this.timestamp = timestamp;
        this.quantity = quantity;
    }
}

So I have this CollectionItem:

public class CollectionItem {

    private final CollectionItemVO itemVOList;
    private final Beer beer;

    public CollectionItem(Beer beer, CollectionItemVO itemVOList) {
        this.beer = beer;
        this.itemVOList = itemVOList;
    }

}

That has a complete Beer object. To load that object I use this other interactor:

public interface LoadBeerInteractor extends BaseInteractor {
    Flowable<Beer> load(String beerId);
}

I want to transform this CollectionInteractor.load call into an Observable that emits CollectionItem and I want to use LoadBeerInteractor.load(beerId) to delivery CollectionItem with full beer object.

For what I studied, I believe it is possible to do that using flatmap but I have not been able to achieve the desired result yet.


Answer:

I think you need to do something like this:

MyCollectionInteractor collections = ...
LoadBeerInteractor beers = ...

Flowable<CollectionItem> items = collections.load()
    .toFlowable()
    .flatMapIterable(it -> it) // unpack from Flow<List<T>> to Flow<T>
    .flatMap(it ->
        beers
            .load(it.beerId)
            .map(beer -> new CollectionItem(beer, it))
    )

This might also work:

Flowable<CollectionItem> items = collections.load()
    .toFlowable()
    .flatMap(list ->
        Flowable
            .from(list)
            .flatMap(it -> 
                beers
                    .load(it.beerId)
                    .map(beer -> new CollectionItem(beer, it))
            )
    )

Question:

I'm using Maybe class in RxJava2.

I registered the doOnDispose callback to detect the Dispose event, but it is not fired.

Maybe.just("aaa")
    .doOnDispose({ /* do something */ })
    .subscribe( ... )

I looked at the RxJava 2 code, but Maybe seemed not to support doOnDispose.

Maybe is create MaybePeek(not DoOnDisposeObserver) object in doOnDispose,,,

@CheckReturnValue
@SchedulerSupport("none")
public final Maybe<T> doOnDispose(Action onDispose) {
    return RxJavaPlugins.onAssembly(new MaybePeek(this, Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION, (Action)ObjectHelper.requireNonNull(onDispose, "onDispose is null")));
}

protected void subscribeActual(MaybeObserver<? super T> observer) {
    this.source.subscribe(new MaybePeek.MaybePeekObserver(observer, this));
}

But, Single is create DoOnDisposeObserver, and it is worked fine.

@CheckReturnValue
@SchedulerSupport("none")
public final Single<T> doOnDispose(Action onDispose) {
    ObjectHelper.requireNonNull(onDispose, "onDispose is null");
    return RxJavaPlugins.onAssembly(new SingleDoOnDispose(this, onDispose));
}

protected void subscribeActual(SingleObserver<? super T> s) {
    this.source.subscribe(new SingleDoOnDispose.DoOnDisposeObserver(s, this.onDispose));
}

Why Maybe.doOnDispose is not supported?


Answer:

As the documentation says about doOnDispose(Action onDispose)

Calls the dispose Action if the downstream disposes the sequence.

Since your downstream never dispose it, it will never call.

Disposable disposable = Maybe.just("aaa")
    .doOnDispose({ /* do something */ })
    .subscribe( ... )

disposable.dispose();

Now the action in doOnDispose should be called.

Note that if the completion of the stream takes less time than the going to the next operation (disposable.dispose()), then the onDispose action should not be called. So, in order to verify it you can use a delay:

Disposable disposable = Maybe.just("aaa")
    .delay(2000, TimeUnit.MILLISECONDS)
    .doOnDispose({ /* do something */ })
    .subscribe( ... )

disposable.dispose();

Now the action should be fired.

Question:

I´m trying to use subscribeOn and obsereOn with an Executor to allow me back to the main thread once the async task finish. I end up with this code but it does not work

@Test
    public void testBackToMainThread() throws InterruptedException {
        processValue(1);
        processValue(2);
        processValue(3);
        processValue(4);
        processValue(5);
//        while (tasks.size() != 0) {
//            tasks.take().run();
//        }
        System.out.println("done");
    }

    private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();


    private void processValue(int value) throws InterruptedException {
        Observable.just(value)
                .subscribeOn(Schedulers.io())
                .doOnNext(number -> processExecution())
                .observeOn(Schedulers.from(command -> tasks.add(command)))
                .subscribe(x -> System.out.println("Thread:" + Thread.currentThread().getName() + " value:" + x));
        tasks.take().run();
    }

    private void processExecution() {
        System.out.println("Execution in " + Thread.currentThread().getName());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Any idea how to accomplish what I want?

When I run I only printing

Execution in RxIoScheduler-2
Execution in RxIoScheduler-3
Execution in RxIoScheduler-4
Execution in RxIoScheduler-5
Execution in RxIoScheduler-6
done

Regards


Answer:

The problem with your approach is that you can't know how many tasks should be executed at a given time and also not deadlock on waiting for tasks that should happen after you unblock the main thread.

Returning to the Java main thread is not supported by any extension to 1.x I know. For 2.x, there is the BlockingScheduler from the extensions project that allows you to do that:

public static void main(String[] args) {
    BlockingScheduler scheduler = new BlockingScheduler();

    scheduler.execute(() -> {
        Flowable.range(1,10)
        .subscribeOn(Schedulers.io())
        .observeOn(scheduler)
        .doAfterTerminate(() -> scheduler.shutdown())
        .subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
    });

    System.out.println("BlockingScheduler finished");
}

Note the call to scheduler.shutdown() which has to be called eventually to release the main thread, otherwise your program may never terminate.

Question:

The following is what I'm trying to achieve:

|--------------------A-A-A-A-A-A| primary (behavior subject)
|-B-B-B-B-B-B-B-B-B-B-B-B-B-B-B-| secondary (flowable)
|-B-B-B-B-B-B-B-B-B-BA-A-A-A-A-A| result (flowable)

Basically, I'm waiting for user input (through primary observable) but I would like to provide system generated values if user hasn't inputted anything yet. Once user input is received, the secondary observable will not be used anymore.

I looked into switchIfEmpty and combineLatest but they don't fit my needs because:

  1. switchIfEmpty only works if primary observable signals onComplete.
  2. combineLatest only works if both observables emit a value

Is there any way to do this?


Answer:

I'm never sure if it's the easiest or most elegant way to do it, but this works:

const a = Observable.interval(5000);
const b = Observable.interval(1000);

const sharedA = a.shareReplay(1);

const result = b.takeUntil(sharedA).concat(sharedA);

Demo

Another, maybe simple way, would be

const sharedA = a.share();
const result = b.takeUntil(sharedA).merge(sharedA);

Demo

Question:

I'd like to transform a Flowable so that it defers emitting items until a specified number of items is collected, and then emits them to downstream in FIFO order maintaining constant delayed item count. Once upstream signals onComplete, the queued items should be flushed to downstream before emitting onComplete:

(in this example delayed item number is 3)

1 2 3 4 5 6 7 |
      1 2 3 4 5 6 7 |

I don't see any existing operators that do this or can be modified to get that behavior. Observable.delay seems to support only time-based delay, not count-based delay.

It should be easy to implement a custom operator to achieve this, but maybe there's a simpler way with existing operators?


Answer:

You can publish a sequence, skip the last N, then append the last N back:

Flowable.range(1, 7)
    .flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v))
    .doOnNext(v -> System.out.println(v))
// -------------------------------------------------------------------
    .publish(f -> 
        f.skipLast(3).mergeWith(f.takeLast(3))
    )
// -------------------------------------------------------------------
    .blockingSubscribe(v -> System.out.println("<-- " + v));

Question:

I have an RxObservable that emits an item. I have an RxObserver that consumes the item, turns something on, waits 30 seconds, then turns that same thing off. If in the middle of that 30 second wait the Observable emits another item, the Observer needs to cancel the timer and restart the previous process. The solution was very simple to implement with a CountDownTimer. Here's a snippet of what I wrote:

.subscribe {
            ledTimer?.cancel()

            ledTimer = object : CountDownTimer(COUNTDOWN_MS, COUNTDOWN_INTERVAL_MS) {

                override fun onTick(timeRemaining: Long) {
                    Log.d(
                        TAG,
                        "%d seconds remaining from recent Pcu Event.".format(timeRemaining / COUNTDOWN_INTERVAL_MS)
                    )
                }

                override fun onFinish() {
                    Log.d(TAG, "Timer has completed.")
                    ledSubject.onNext(LedEvent.LedOff)
                }
            }
            ledSubject.onNext(it)
            ledTimer?.start()
        }

So as can you see I cancel the timer if it's currently running, create a new object and start it. I need to convert this to use Rx. The part I'm stuck on is the cancel feature of the timer. This is what I came up without the "interrupt" feature:

   ledStream
        .doOnNext{
            ledSubject.onNext(it)
        }.flatMap {
            Observable.timer(COUNTDOWN_S, TimeUnit.SECONDS)
        }.subscribe{
            ledSubject.onNext(LedEvent.LedOff)
        }

So it's very nice and concise but when subsequent items are emitted it doesn't reset. I've looked at operators like switchOnNext but just don't have a good enough grasp on Rx yet. If I could see a simple example with explanation that would be a great help.


Answer:

You can just use debounce operator. Here is the example:

ledStream
        .doOnNext{
            ledSubject.onNext(it)
        }
        .debounce(COUNTDOWN_S, TimeUnit.SECONDS)
        .subscribe{
            ledSubject.onNext(LedEvent.LedOff)
        }

Question:

My intention was to be able to pipeline a set amount of Completable however, it does not work as expected as seen from the following test case.

@Test
public void test_and_then() {

    Completable c1 = Completable.fromAction(() -> {
        System.out.println("<1>");
    });

    Completable c2 = Completable.fromAction(() -> {
        System.out.println("<2.1>");
        boolean test = true;
        if (test) {
            throw new Exception();
        }
        System.out.println("<2.2>");
    });

    Completable c3 = Completable.fromAction(() -> {
        System.out.println("<3>");
    });


    Completable.complete()
            .andThen(c1)
            .andThen(c2)
            .andThen(c3)
            .test()
            .assertError(t -> true)
            .dispose();

    c3.test().assertNotSubscribed();
}

This results in the following output.

<1>
<2.1>
<3>
java.lang.AssertionError: Subscribed! (latch = 0, values = 0, errors = 0, completions = 1)

As seen in the documentation of andThen it looks like it should not subscribe to any Completables added later if an error is detected in previous Completable. My problem is, why "<3>" is printed, basically, that Completable is run. How can I prevent this and simply end the chain of compietables instead? And why does it subscribe to the Completable c3 when the documentation indicates that this should not happen?


Answer:

Actually, it works as you expected. However, you are subscribing to c3:

c3.test().assertNotSubscribed();

This will subscribe to c3 and print <3>.

If you remove that line, it'll work as you expect.

test() subscribes and gives you a test observer that you can do assertions on. It's just unfortunate the output order - it makes you think the stream is not stopped.

Edit

Regarding the comment "what's the purpose of assertNotSubscribed if it always fails".

This is quite a misconception to be honest. test() is not the single way you have to create TestObservers. You can create a test observer like so

TestObserver<Void> testObserver = new TestObserver<>();

You can use assertNotSubscribed to make sure you haven't subscribed the test observer yet. Actually, that method is supposed to check that onSubscribe hasn't been called. It's not meant to test that dispose() was called.

For this scenario, you'd have to do something like assertTrue(testObserver.isDisposed()). You can follow the discussion here.

Question:

I'm using RxJava2 Observable.fromIterable() in a Rest web service. My example iterable is composed of three elements but my Non-Blocking rest service returns just one element on three.

class ToDoDaoImpl implements ToDoDao {
  Map<String, ToDo> toDos;
  ...
  public Observable<ToDo> readAll() {
    return Observable.fromIterable(toDos.entrySet().stream().map(entry -> entry.getValue()).collect(Collectors.toList()));
  }
}

When I call the readAll() method from my Non-Blocking Rest library, I only get one element on three:

@Api(path = "/api/v2/read", method = "GET", produces = "application/json")
Action readAllToDos = (HttpServletRequest request, HttpServletResponse response) -> {
    Observable.just(request)
            .flatMap(req -> toDoDao.readAll())
            .subscribe(output  -> toJsonResponse(request, response, new ResponseDto(200, output)),
                       error   -> toJsonResponse(request, response, new ResponseDto(200, error))
            );
};

My Output:

{
"status": 200,
"response": {
    "id": "5dc74dd8-1ea9-427e-8bb7-482cc6e24c71",
    "title": "learn ReactiveJ",
    "description": "learn to use ReactiveJ library",
    "date": {
        "year": 2018,
        "month": 10,
        "day": 29
    }
},
"datetime": "Oct 29, 2018 4:19:51 PM"
}

If I call the Non-reactive equivalent of my Dao, instead I get what I expect:

{
"status": 200,
"response": [
    {
        "id": "25cbe3bf-12be-42e4-82ce-d4780f6469f6",
        "title": "study reactive",
        "description": "learn reactive programming",
        "date": {
            "year": 2018,
            "month": 10,
            "day": 29
        }
    },
    {
        "id": "51879241-f005-43fa-80fb-78386b663cb7",
        "title": "learn ReactiveJ",
        "description": "learn to use ReactiveJ library",
        "date": {
            "year": 2018,
            "month": 10,
            "day": 29
        }
    },
    {
        "id": "80a07c1b-2317-4eb8-9a39-ac35260f37a2",
        "title": "exercise",
        "description": "do some exercises",
        "date": {
            "year": 2018,
            "month": 10,
            "day": 29
        }
    }
],
"datetime": "Oct 29, 2018 4:37:05 PM"
}

Answer:

If you put doOnNext before subscribe you'll see that you get multiple elements, but apparently toJsonResponse can only deliver one. I bet your non-reactive version simply passed the whole List to ResponseDto.

I'm not sure why you complicated the task but this should work:

class ToDoDaoImpl implements ToDoDao {
    Map<String, ToDo> toDos;
    // ...
    public Observable<List<ToDo>> readAll() {
        return Observable.fromCallable(() -> new ArrayList<>(toDos.values()));
    }
}

@Api(path = "/api/v2/read", method = "GET", produces = "application/json")
Action readAllToDos = (HttpServletRequest request, HttpServletResponse response) -> 
{
    toDoDao.readAll()
        .subscribe((List<ToDo output)  -> 
            toJsonResponse(request, response, new ResponseDto(200, output)),
                   error   -> 
            toJsonResponse(request, response, new ResponseDto(200, error))
        );
};

Question:

I have a stream:

val symbols: Single<List<Symbol>>

Now I want to transform the stream into a UI State with map():

private fun cool(): Single<SymbolContract.State> =
    symbols.map { SymbolContract.State.Symbols(it) }

What I want to do is catch an error on the upstream symbols single, so that I can catch any errors and then return SymbolContract.State.GeneralError().

I want something like a onErrorMap() or something. Unfortunately, putting onErrorResumeItem on symbols doesn't work because it needs to return a List<Symbol>.

I can think of a few ugly ways to do this, but what's the cleanest?


Answer:

I suggest you to use global handling error. I give you a sample so you can get the idea. (It is kotlin) and you can catch as many as exception you would like, some of them are my custom exceptions. Just bear in mind, this sample is about Reactive Webflux but you get the idea. It would be similar in others

@Configuration
class ExceptionTranslator {

    @Bean
    @Order(-1)
    fun handle(objectMapper: ObjectMapper): ErrorWebExceptionHandler {
        return ErrorWebExceptionHandler { exchange, ex ->
            if (exchange.response.isCommitted) {
                return@ErrorWebExceptionHandler Mono.error(ex)
            }

            val response = exchange.response
            response.statusCode = HttpStatus.INTERNAL_SERVER_ERROR
            response.headers.contentType = MediaType.APPLICATION_PROBLEM_JSON_UTF8

            val url: String

            var message = ex.message
            var params = mutableMapOf<String, Any>()

            when (ex) {
                is ParametricException -> {
                    url = ex.url
                    params = ex.params
                }
                is BaseException -> {
                    url = ex.url
                }
                is BadCredentialsException -> {
                    url = INVALID_CREDENTIAL_TYPE
                    message = ex.message ?: "Wrong Credentials"
                }
                is ConcurrencyFailureException -> {
                    url = INTERNAL_TYPE
                    message = ERR_CONCURRENCY_FAILURE
                }
                is MethodArgumentNotValidException -> {
                    val result = ex.bindingResult
                    val fieldErrors = result.fieldErrors.map {
                        FieldErrorVM(it.objectName, it.field, it.code ?: "Unknown")
                    }

                    url = CONSTRAINT_VIOLATION_TYPE
                    message = ERR_VALIDATION
                    params = Collections.singletonMap("errors", fieldErrors)
                }
                else -> url = INTERNAL_TYPE
            }

            if (ex is BaseException) {
                response.statusCode = HttpStatus.valueOf(ex.status.code())
            }

            val bytes = objectMapper.writeValueAsBytes(ProblemVM(url, message ?: "Internal Error", params))
            val buffer = response.bufferFactory().wrap(bytes)
            response.writeWith(Mono.just(buffer))
        }
    }

}

Question:

I have the following minimal (not) working example:

import org.junit.Test;
import io.reactivex.Observable;
import com.google.common.util.concurrent.SettableFuture;

@Test
public void testTest() {
    final Observable<Long> initialValues = Observable.fromArray(100L, 200L, 300L);
    final SettableFuture<Long> future = SettableFuture.create();
    final Observable<Long> newValues = Observable.fromFuture(future);

    new Thread(
            () -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                future.set(400L);
            }
    ).start();

    initialValues.takeUntil(newValues).concatWith(newValues).subscribe(System.out::println, Throwable::printStackTrace);
}

I was expecting the following output:

100

200

300

but the output was

400

Is there any fault in my logic? I'm trying to accomplish the following behavior:

Let there be 2 observables: A, B. Take values from A until B start publishing values, then switch exclusively on B.


Answer:

fromFuture is blocking the thread that subscribes to it so the initialValues never get a chance to run due to takeUntil subscribing to newValues first and then getting blocked. Try this:

initialValues
.takeUntil(newValues.subscribeOn(Schedulers.io()))
.concatWith(newValues)
.subscribe(System.out::println, Throwable::printStackTrace);

Question:

I'm a begginer when it comes to ReactiveX, so this question might be trivial, but I haven't found an answer through searching.

I have an observable that emits items quite irregularly (from ~4 times per second to once every 5 seconds) and I want to make sure it never emits an item more often that once per second. I thought about using zip operator with Observable.interval(), but I realized that if it emits an item after 5 seconds and then emits 3 items in less than a second, all of those items will be emitted in one second.

Is there any simple way to do it?


Answer:

There are a couple of ways to achieve what you want. What you end up using depends on you use case.

throttleLast

throttle let's you specify that you only want a new value at a certain interval. This is probably closest to what you were trying to achieve with zip and interval:

myObservable.throttleLast(1, TIMEUNIT.SECOND)

This will emit the most recent signal emitted every second. The other signals emitted inside the second are discarded.

buffer

buffer does much the same as throttleLast, but instead of passing the most recently emitted value, it will return a Flowable with all the values emitted in the timespan.

myObservable.buffer(1, TIMEUNIT.SECOND)

Question:

I know that share() is a replacement of publish().refCount(). Then from the RxJava wiki:

Observable.publish( ) — represents an Observable as a Connectable Observable ConnectableObservable.refCount( ) — makes a Connectable Observable behave like an ordinary Observable

This make me confused. If after publish().refCount(), it just behave like an ordinary Observable, why should I use it, how does this api make sense?


Answer:

You're right - Observable.share is just a shortcut for publish().refCount(). I think that description you have quoted above is not entirely clear as ConnectedObservable.refCount does a little bit more :)

If you transform your Observable to ConnectableObservable - it will not emit items (even if something is subscribed) unless explicitly called ConnectableObservable.connect - it basically defers execution of subscribe method and prevents from executing it multiple times for every subscriber. This technique is often used to make sure that all subscribers are subscribed before observable starts emitting items (in other words - after everyone has subscribed - connect() method is called).

If you have more than one subscriber (what often happens), you have to handle their subscriptions and unsubscriptions and this is where things are getting tricky. This is why refCount() was introduced. This operator returns new Observable, keeps track of how many subscribers are subscribed to it and stays connected as long as there is at least one subscription. It will also automatically connect when the first subscriber appears.

PS. I'm learning how to use RxJava, if I am wrong - please point it out!

Question:

I am working on Apigateway using vert.x and RxJava. I want to send reactive request for 2 Apis, get response from both of them and send combined JSON by HttpServer. But onComplete() executes to early and returns empty JSON. I think the problem arises from asynchronous character of the vert.x but I don't exactly what's wrong.

Here is my method:

private void dispatchBoth(RoutingContext routingContext) {

    Observer<String> observer = new Observer<String>() {

        JsonArray jsonArray = new JsonArray();

        @Override
        public void onSubscribe(Disposable disposable) {
            System.out.println("Start");

        }

        @Override
        public void onNext(String s) {
            Thread t = new Thread(() -> {
                if(s=="/api/userApi/selectAllUsers") {

                        WebClient client = WebClient.create(vertx);
                            client
                                .get(8081, "localhost", s)
                                .send(ar->{
                                    if (ar.succeeded()) {
                                        HttpResponse<Buffer> response = ar.result();

                                        jsonArray.addAll(response.bodyAsJsonArray());
                                        System.out.println(jsonArray.encodePrettily());

                                    } else {
                                        System.out.println("Something went wrong " + ar.cause().getMessage());
                                    }
                                });

                }else if(s=="/api/holidayApi/selectAllHolidays") {
                        WebClient client = WebClient.create(vertx);
                        client
                                .get(8080, "localhost", s)
                                .send(ar -> {

                                    if (ar.succeeded()) {

                                        HttpResponse<Buffer> response = ar.result();

                                        jsonArray.addAll(response.bodyAsJsonArray());
                                       //  System.out.println(jsonArray.encodePrettily());

                                    } else {
                                        System.out.println("Something went wrong " + ar.cause().getMessage());
                                    }
                                });
                    }
                });
                t.start();
        }
        @Override
        public void onError(Throwable throwable) {

        }

        @Override
        public void onComplete() {
                System.out.println(jsonArray.encodePrettily());
                routingContext.response().end(jsonArray.encodePrettily());
        }
    };
    Observable.fromArray(com).subscribe(observer);
}

And that's the output I get on the console:

[ ]
[ {
  "holidayId" : 2,
  "userId" : 3,
  "place" : "Poland",
  "date" : {
    "year" : 2016,
    "month" : "DECEMBER",
    "dayOfMonth" : 29,
    "dayOfWeek" : "THURSDAY",
    "era" : "CE",
    "dayOfYear" : 364,
    "leapYear" : true,
    "monthValue" : 12,
    "chronology" : {
      "id" : "ISO",
      "calendarType" : "iso8601"
    }
  }
}, {
  "holidayId" : 10,
  "userId" : 1,
  "place" : "Netherland",
  "date" : {
    "year" : 2020,
    "month" : "JANUARY",
    "dayOfMonth" : 21,
    "dayOfWeek" : "TUESDAY",
    "era" : "CE",
    "dayOfYear" : 21,
    "leapYear" : true,
    "monthValue" : 1,
    "chronology" : {
      "id" : "ISO",
      "calendarType" : "iso8601"
    }
  }
}, {
  "userId" : 1,
  "name" : "Kacper",
  "phone_number" : "667667202"
}, {
  "userId" : 3,
  "name" : "Kamil",
  "phone_number" : "6734583443"
}, {
  "userId" : 4,
  "name" : "Janek",
  "phone_number" : "231253575"
}, {
  "userId" : 5,
  "name" : "Grzegorz",
  "phone_number" : "123456789"
}, {
  "userId" : 6,
  "name" : "Justin",
  "phone_number" : "111000111"
}, {
  "userId" : 8,
  "name" : "Mike",
  "phone_number" : "997"
}, {
  "userId" : 9,
  "name" : "Gorge",
  "phone_number" : "991"
} ]

Answer:

onComplete executes on time: when the input flow of strings is finished. You need another way to wait the moment when all I/O operations are completed. This is a tricky part, I do not know if Vertx or RxJava can do this, but standard Java API can, using CompletableFuture. So we create adapter from CompletableFuture to Handler, which holds the result of one I/O operation:

class HandelerFuture extends CompletableFuture<JsonArray> 
         implements Handler<AsyncResult<HttpResponse<Buffer>>> {
    @Override
    public void handle(AsyncResult<HttpResponse<Buffer>> ar) {
        if (ar.succeeded()) {
            JsonArray array = ar.result().bodyAsJsonArray();
            super.complete(array);
        } else {
            super.completeExceptionally(ar.cause());
        }
    }
}

Besides, you need not to wrap the body of onNext method in a Tread. Second, you need not to check what the url string is passed, as it does not make any difference. Third, you use Observer only to handle a list of url strings - this is overcomplication. Plain loop is sufficient.

CompletableFuture<HandelerFuture[]> dispatchBoth(String... urls) {
    ArrayList<HandelerFuture> futures = new ArrayList<>(); // all results
    for (String url : urls) {
        HandelerFuture future = new HandelerFuture();
        futures.add(future);
        WebClient client = WebClient.create(vertx);
        client.get(8081, "localhost", url)
              .send(future);
    }
    CompletableFuture all = new CompletableFuture();
    HandelerFuture[] array = futures.toArray(new HandelerFuture[0]);
    CompletableFuture.allOf(array)
            .thenRunAsync(() -> all.complete(array));
    return all;
}

then it can be run as follows:

    CompletableFuture<HandelerFuture[]> future = dispatchBoth(com);
    HandelerFuture[] results = future.get();
    JsonArray finalArray;
    for (HandelerFuture result:results) {
        try {
            // extract partial json array
            JsonArray partialArray = result.get();
            // combine partialArray with finalArray somehow
        } catch (Exception e) {
            // this is the exception got in handle() method as ar.cause().
            e.printStackTrace();
        }
    }
    routingContext.response().end(finalArray.encodePrettily());

You did not tell how you are going to combine json arrays, so I left this unimplemented.

Question:

I am trying to load the data and put it into a SparseArray using RxJava2. I get the data by calling an URL from an array, but I need the response parsed and inserted into the SparseArray with the order of the URL's in the array, so I need to pass the index of the String item in mUrls.getGroups()

Thanks in advance!

@GET
Single<ResponseBody> getChannels(@Url String url);


groups = new SparseArray<>();


Observable.fromIterable(mUrls.getGroups())
    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        //
        // How can I access the index of the String item in the array?
        //
        .subscribe(new Observer<ResponseBody>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(ResponseBody responseBody) {
                Group group = GroupParser.parseList(responseBody.byteStream(), index);


                groups.put(index, group);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, e.getMessage());
            }

            @Override
            public void onComplete() {

            }
 });

EDIT:

Here's the implemented solution:

 Observable.defer(() -> {
                        AtomicInteger counter = new AtomicInteger();
                        return Observable.fromIterable(mUrls.getGroups())
                                .map(url -> new Pair(url, counter.getAndIncrement()));
                    }).flatMapSingle(pair ->
                            aPI.getChannels(pair.first.toString())
                                    .map(responseBody -> new Pair(responseBody, pair.second))
                                    .subscribeOn(Schedulers.io())
                    )
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Pair>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Pair pair) {
                            Pair<ResponseBody, Integer> resultPair =  (Pair<ResponseBody, Integer>) pair;
                            Group group = GroupParser.parseList(resultPair.first.byteStream(),
                                    resultPair.second);

                            groups.put(resultPair.second, group);
                        }

                        @Override
                        public void onError(Throwable e) {
                            Log.e(TAG, "***** message: " + e.getMessage());
                        }

                        @Override
                        public void onComplete() {
                            Log.i(TAG, "***** onComplete.");
                        }
                    });

Answer:

If you are processing the URLs sequentially, you can just introduce an index field in the Observer:

Observable.fromIterable(mUrls.getGroups())
    .concatMapSingle(url -> getChannels(url))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<ResponseBody>() {

         int index;  // <----------------------------------------------------

         // ...

         @Override
         public void onNext(ResponseBody responseBody) {
             Group group = GroupParser.parseList(responseBody.byteStream(), index);


             groups.put(index, group);

             index++;  // <---------------------------------------------------------
         }

         // ...
    });

However, if you process the URLs concurrently, you have to pair up each URL with an index and tag it along. For example, given a Pair class:

 Observable.defer(() -> {
     AtomicInteger counter = new AtomicInteger();
     return Observable.fromIterable(mUrls.getGroups())
     .map(url -> Pair.of(url, counter.getAndIncremenet()));
 })
 .flatMapSingle(urlIndex -> 
     getChannels(urlIndex.first)
     .map(v -> Pair.of(v, urlIndex.second))
     .subscribeOn(Schedulers.io())
 )
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(new Observer<Pair<ResponseBody, Integer>>() {

         // ...

         @Override
         public void onNext(Pair<ResponseBody, Integer> pair) {
             Group group = GroupParser.parseList(pair.first.byteStream(), pair.second);


             groups.put(pair.second, group);
         }

         // ...
    });

Question:

I'm building an application with Quarkus and it's vert.x extension. Now I want to build a REST endpoint which should stream all saved addresses. To test this whiout a reactive data source I wan't to create a ArrayList with example addresses and stream them so I can check if my test works. But I don't find how I can stream a collection.

My actual code:

    import io.vertx.reactivex.core.Vertx;
    import org.reactivestreams.Publisher;

    import javax.inject.Inject;
    import javax.ws.rs.GET;
    import javax.ws.rs.Path;
    import javax.ws.rs.Produces;
    import javax.ws.rs.core.MediaType;
    import java.util.ArrayList;
    import java.util.Collection;

    @Path("/addresses")
    public class AddressResource {
      private Collection<Address> adresses;

      @Inject private Vertx vertx;

      public AddressResource() {
        super();
        adresses = new ArrayList<>();
      }

      @GET
      @Produces(MediaType.SERVER_SENT_EVENTS)
      public Publisher<Address> get() {
        Address address = new Address();
        address.setStreet("590 Holly Street");
        address.setCity("Townsend");
        address.setState("Ohio");
        address.setZip(6794);
        adresses.add(address);
        adresses.add(address);
        adresses.add(address);
        adresses.add(address);
        adresses.add(address);
        adresses.add(address);

        // What to do here?
        return null;
      }
    }

And this is my test:

    import io.quarkus.test.junit.QuarkusTest;
    import org.hamcrest.Matchers;
    import org.junit.jupiter.api.Test;

    import javax.json.bind.JsonbBuilder;
    import java.util.ArrayList;
    import java.util.List;

    import static io.restassured.RestAssured.given;

    @QuarkusTest
    public class DBServiceTest {

      @Test
      void testGetAddresses() throws InterruptedException {
        given()
            .when()
            .get("/addresses")
            .then()
            .statusCode(200)
            .body(Matchers.containsInAnyOrder(readTestAdresses().toArray()));
      }

      private List<Address> readTestAdresses() {
        return JsonbBuilder.create()
            .fromJson(
                this.getClass().getResourceAsStream("test-addresses.json"),
                new ArrayList<Address>() {}.getClass().getGenericSuperclass());
      }
    }

Edit 1: I tried the following:

    @GET
      @Produces(MediaType.SERVER_SENT_EVENTS)
      public Publisher<String> get() {
        Address address = new Address();
        address.setStreet("590 Holly Street");
        address.setCity("Townsend");
        address.setState("Ohio");
        address.setZip(6794);
        adresses.add(address);
        return Flowable.just("Test 1","Test 2","Test 3");
      }

And this works. So the problem must have something to do with the address objects.


Answer:

You can use something like

  @GET
  @Produces(MediaType.SERVER_SENT_EVENTS)
  public Publisher<String> get() {
    Address address = new Address();
    address.setStreet("590 Holly Street");
    address.setCity("Townsend");
    address.setState("Ohio");
    address.setZip(6794);
    return Flowable.just(address, address, address,....).map(a -> yourToString(a));
  }

Where yourToString is a method that will create the proper string representation (json perhaps).

Question:

I'm pretty new to RX in general, and rxjava in particular, pardon mistakes.

This operation depends on a two async operations.

The first uses a filter function to attempt to get a single entity from a list returned by an async Observable.

The second is an async operation that communicates with a device and produces an Observable of status updates.

I want to take the Single that is created from the filter function, apply that to pairReader(...), and subscribe to its Observable for updates. I can get this to work as shown, but only if I include the take(1) commented, otherwise I get an exception because the chain tries to pull another value from the Single.

  Observable<DeviceCredential> getCredentials() {
    return deviceCredentialService()
            .getCredentials()
            .flatMapIterable(event -> event.getData());
  }

  Single<Organization> getOrgFromCreds(String orgid) {
    return getCredentials()
      // A device is logically constrained to only have a single cred per org
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .take(1)  // Without this I get an exception
      .singleOrError();
  }

  Function<Organization, Observable<Reader.EnrollmentState>> pairReader(String name) {
    return org -> readerService().pair(name, org);
  }

getOrgFromCreds(orgid)
  .flatMapObservable(pairReader(readerid))
  .subscribe(state -> {
     switch(state) {
       case BEGUN:
         LOG.d(TAG, "Pairing begun");
         break;
       case PAIRED:
         LOG.d(TAG, "Pairing success");
         callback.success();
         break;
       case NOTIFIED_SERVER:
         LOG.d(TAG, "Pairing server notified");
         break;
     }},
     error -> {
       Crashlytics.logException(error);
       callback.error(error.getLocalizedMessage());
     });

Answer:

If the source stream emits more than one item, singleOrError() is supposed to emit an error. Doc

For your case, use either first() or firstOrError() instead.

  Single<Organization> getOrgFromCreds(String orgid) {
    return getCredentials()
      .map(DeviceCredential::getOrganization)
      .filter(org -> org.getId().equals(orgid))
      .firstOrError();
  }

Question:

I call a list of methods in a loop, wherein each return an Observable & I add them to a List for processing later.

Code:

List<Observable> productViewObservables = new ArrayList<>();

for (ProductEnricher enricher : orchestrationStrategy.getEnrichers()) {
      productViewObservables.add(enricher.asyncEnrich(productView, productId); }

But Im not sure if the Observable responses get added to that list in the same order that I invoke them, which is essential for my processing. Can someone clarify this?


Answer:

Yes, they are added in the same order, because Observable is just a regular object. Your code is simple sequential code - you are not executing any Observable yet in your example, they are just objects which represent computation that might be done in the future.

If you would like to run these Observables (using subscribe method) then you get into asynchronous world and the results might not be that obvious.

If you'd want to run your List of Observables sequentially then you could use concat method on Observable which takes Iterable as an argument

Question:

I want to process a List<Observable<MyResponse>>. How can I do this?Had a look at Observable.zip() but unable to arrive at a conclusion. Code snippet of what I wanna do:

List<String> myList = myMethod(); // myMethod() returns a List<String> of any possible length.
List<Observable<MyResponse>> observables = new ArrayList<>();
for (String value : myList) {
observableList.add(myAPI.getData(value))
} 
// How to? : For each Observable<MyResponse> in observables call a method myProcessor(MyResponse, ...)

Answer:

You can flatMap the myList:

Flowable.fromIterable(myList)
    .flatMap(value -> myAPI.getData(value))
    .doOnNext(myResponse -> { /* ... */ })
    ...

Question:

I'm new at this library, so if you got any tips or suggestions are more than welcome. I got RxJava2 running and executing a longNetworkOperation fake method, the flow is this:

  • Create the observable: Observable.just("str1")
  • Change the execution thread: .subscribeOn(Schedulers.newThread())
  • Implement the map interface which will execute the LongRunningProcess:

    .map(new Function<String, String>() { 
         @Override
         public String apply(String cad){ // cad is the input value
                Log.d(TAG,"long operation in thread: "+Thread.currentThread().getName()); //output thread name
                return " longNetworkOperation result => "+doLongNetworkOperation(); //here we do the background operations
                }})   
    
  • Get the results in Main Thread: .observeOn(AndroidSchedulers.mainThread())

  • Add the observer which get notified for the results: .subscribe(observer);

And the observer:

 Observer observer = new Observer() {
    @Override
    public void onSubscribe(Disposable d) { Log.d(TAG,"onSubscribe, thread name >> "+Thread.currentThread().getName()); }

    @Override
    public void onNext(Object value) { Log.d(TAG,"on next, input value:<<"+value.toString()+">>  thread name >>"+Thread.currentThread().getName());}

    @Override
    public void onError(Throwable e) { Log.d(TAG,"error >>"+e.toString()); }

    @Override
    public void onComplete() {
        Log.d(TAG,"onCompleted  thread name >> "+Thread.currentThread().getName());
        tvRxJava = (TextView) findViewById(R.id.tv_rxjava); //update the reference of tvRxJava.
        tvRxJava.setText("process executed..."); //this line does not change the value of the TextView tvRxJava.
    }
};

This works fine, with no device rotation, but if you rotate the device while doLongNetworkOperation is running in the background the TextView tvRxJava does not update value. not even with a new inject.

tvRxJava = (TextView) findViewById(R.id.tv_rxjava); 

How do you manage orientation change in android with RxJava2??

This is my gradle dep:

compile 'io.reactivex.rxjava2:rxjava:2.0.5'//rxjava2
compile 'io.reactivex.rxjava2:rxandroid:2.0.1' //rxjava2 for android

Answer:

Normally I would like to have cache mechanism for the observable in order to get back the result again in case of configuration changes (which forces activity recreation). I am sharing two very good article which will give you very clear picture, in order to solve this problem.

  1. First
  2. Second

I am currently following the second approach in my application.

Question:

I have a Source as a cold Observable<List<T>> that emits elements in chunks (lists), I want to process every single item from the chunk in a separate thread, while the emiter (source) is waiting for termination of processing all items from the emited chunk to proceed with the next one and so on.

This code (rxjava 2.0.6) do the stuff but only in one Thread. If I want to fork the observer-computation in many Threads with observeOn(Schedulers.io), the source-thread continue emiting everything until completed and do not block by every chunk.

Observable<List<T>> lazy_source = Observable.create((ObservableEmitter<List<T>> e)
        -> {
    for (int i = 0; i < 1000; i++) {
        List<T> chunk = produceChunkOf(10);
        e.onNext(chunk);
    }

    e.onComplete();
});    
lazy_source
        .subscribeOn(Schedulers.io())
        .flatMap(chunk -> 
                Observable.fromIterable(chunk)
                    // .observeOn(Schedulers.io()) // Uncommenting this will flat all 1000 chunks at once.
                    .doOnNext(item -> consume(item))
                , 10) // Number of concurent Threads
        .subscribe();

I will appreciate any help.


Answer:

how about something like this:

 Observable.range(0, 1000)
            .concatMap(new Func1<Integer, Observable<?>>() {
                @Override
                public Observable<?> call(Integer integer) {
                    return produceChunkOf(10)
                            .flatMap(new Func1<Object, Observable<?>>() {
                                @Override
                                public Observable<?> call(Object item) {
                                    return consume(item)
                                            .observeOn(Schedulers.io());
                                }
                            }, 10)
                            .toList();
                }
            });

first, you create an Observable that emits the inputs to the produceChunkOf, then for each input item,you concatMap for your requirement for sequential execution for each chunk, for each input you produce the chunk , and process it in parallel with flatMap, then collect it after all items processed using toList()

Question:

I need to return Observable specified generic type. But after zip operator I get only Observable, without any type.

How to correct cast usual Observable to Observable<T> ? Perhaps there is some rx operator for this?

Example of code:

public Observable<Schedule> updateSchedules(final List<ScheduleInfo> schedulesInfo) {
    List<Observable<List<ScheduleItem>>> observables = makeScheduleTasks(schedulesInfo);
    Observable observable = Observable.zip(observables, objects -> getSchedules(schedulesInfo, objects));
    return (Observable<Schedule>)observable; //How to cast?
}


private List<Schedule> getSchedules(List<ScheduleInfo> schedulesInfo, Object[] objects) {
        if(objects.length == schedulesInfo.size()){
            List<Schedule> schedules = new ArrayList<>(schedulesInfo.size());
            for (int i = 0; i < schedulesInfo.size(); i++) {
                Object object = objects[i];
                schedules.add(new Schedule(schedulesInfo.get(i), (List<ScheduleItem>) object));
            }
            return schedules;
        }
        return null;
    }

Answer:

You need to provide type information at the places that the compiler can use it:

public Observable<Schedule> updateSchedules(final List<ScheduleInfo> schedulesInfo) {
    List<Observable<List<ScheduleItem>>> observables = makeScheduleTasks(schedulesInfo);
    Observable<Schedule> observable = Observable.zip(observables, objects -> getSchedules(schedulesInfo, objects));
    return observable;
}

should work just fine, as long as getSchedules() returns a Schedule.

Question:

Lets say I have a very long string:

    trillions of chunks
            |
            v
    /asdf/........./bar/baz/foo
                ^
                |
    what I try to find is closer to the right:
        the data after 9999999th '/'

I need all the chunks of data up to this slash, but not any slashes. I see this as a stream and want to do the following:

  1. I start to read symbols from the back and count slashes.
  2. Anything but slash I put into Last-In-First-Out data structure.
  3. In order not to wait for the whole operation to finish, I start reading data from the lifo datastructure as it becomes available.
  4. I terminate after the 9999999th '/'

Can something like this be accomplished with reactive streams and how?


Answer:

I think the following code will achve what you want

@Test
public void reactiveParser() throws InterruptedException {
    ConnectableFlux<String> letters = Flux.create((Consumer<? super FluxSink<String>>) t -> {
        char[] chars = "sfdsfsdf/sdf/sdfs/dfsdfsd/fsd/fsd/fs/df/sdf".toCharArray();
        for (char c : chars) {
            t.next(String.valueOf(c));
        }
    }).publish();

    letters
            .window(
                    letters.filter( t -> t.equals("/"))
            )
            .flatMap( t -> t.collectList())
            .map( t -> t.stream().collect(Collectors.joining()))
            .subscribe(t -> {
                System.out.println(t);
            });

    letters.connect();
}

The example above utilizes the project reactor. Which is pretty cool way of doing the reactive stuff inside of java.

There is plenty of optimization that can be done in the following code. Not using Strings to represent a single letter would be one of them.

But the basic idea is there. You create flux/observable that emits a letters as they come in and make that observable shareable (you have to window over emitting values) and then just collect them in to a single sting. The code bellow should give the following output:

sfdsfsdf
/sdf
/sdfs
/dfsdfsd
/fsd
/fsd
/fs
/df

Of course you have to utilize non-blocking connection so the bytes could be read asynchronously.