Hot questions for Using RxJava 2 in observable

Question:

If an observable completes, do I still have to unsubscribe / dispose (in RxJava2) the observable to remove the Observer (prevent memory leaks) or is this handled internally by RxJava once a onComplete or onError event occurs?

what about other types like Single, Completable, Flowable etc.


Answer:

Yes you are correct.

After a stream is terminated ( onComplete / onError has been called ), subscriber unsubscribes automatically. You should be able to test these behaviors using isUnsubscribed() method on the Subscription object.

Question:

I have the following code:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        s.onNext("1");
                        s.onComplete();
                    }
                });
                thread.setName("background-thread-1");
                thread.start();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("map: thread=" + threadName);
                return "map-" + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(String s) {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onComplete: thread=" + threadName);
            }
        });

And here's the output:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1

Important detail: I'm calling the subscribe method from another thread (main thread in Android).

So looks like Observable class is synchronous and by default and it performs everything (operators like map + notifying subscribers) on the same thread which emits events (s.onNext), right? I wonder... is it intended behaviour or I just misunderstood something? Actually I was expecting that at least onNext and onComplete callbacks will be called on the caller's thread, not on the one emitting events. Do I understand correctly that in this particular case actual caller's thread doesn't matter? At least when events are generated asynchronously.

Another concern - what if I receive some Observable as a parameter from some external source (i.e. I don't generate it on my own)... there is no way for me as its user to check if whether it is synchronous or asynchronous and I just have to explicitly specify where I want to receive callbacks via subscribeOn and observeOn methods, right?

Thanks!


Answer:

RxJava is unopinionated about concurrency. It will produce values on the subscribing thread if you do not use any other mechanisem like observeOn/ subscribeOn. Please don't use low-level constructs like Thread in operators, you could break the contract.

Due to the use of Thread, the onNext will be called from the calling Thread ('background-thread-1'). The subscription happens on the calling (UI-Thread). Every operator down the chain will be called from 'background-thread-1'-calling-Thread. The subscription onNext will also be called from 'background-thread-1'.

If you want to produce values not on the calling thread use: subscribeOn. If you want to switch the thread back to main use observeOn somewhere in the chain. Most likely before subscribing to it.

Example:

Observable.just(1,2,3) // creation of observable happens on Computational-Threads
            .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
            .map(integer -> integer) // map happens on Computational-Threads
            .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
            .subscribe(integer -> {
                // called from mainThread
            });

Here is a good explanitation. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

Question:

I'm currently building a small Social Media style App which leverages RxJava 2 and Firebase. I'm using MVP style architecture, and I've abstracted out my AuthService with an interface called AuthSource.

For simplicity's sake, I'll work with a Single method in my Service:

public class FirebaseAuthService implements AuthSource {

private FirebaseAuth auth;
private FirebaseAuth.AuthStateListener listener;

//initialization code

@Override
public Maybe<User> getUser() {
    return Maybe.create(new MaybeOnSubscribe<User>() {
                            @Override
                            public void subscribe(final MaybeEmitter<User> e) throws Exception {
                                if (auth == null) {
                                    auth = FirebaseAuth.getInstance();
                                }

                                if (listener != null) {
                                    auth.removeAuthStateListener(listener);
                                }

                                listener = new FirebaseAuth.AuthStateListener() {
                                    @Override
                                    public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
                                        FirebaseUser firebaseUser = firebaseAuth.getCurrentUser();
                                        auth.removeAuthStateListener(listener);
                                        if (firebaseUser != null) {
                                            User user = new User(
                                                    firebaseUser.getDisplayName(),
                                                    firebaseUser.getEmail());

                                            user.setUserId(firebaseUser.getUid());


                                            Uri photoUrl = firebaseUser.getPhotoUrl();
                                            if (photoUrl != null){
                                                user.setProfilePhotoUrl(photoUrl.toString());
                                            }
                                            e.onSuccess(user);
                                        } else {
                                            e.onComplete();
                                        }
                                    }
                                };

                                auth.addAuthStateListener(listener);
                            }
                        }
    );

}

}

interface AuthSource {
    Maybe<User> getUser();
//Other methods etc.
}

Finally, I'll show my Presenter method which handles the call:

//from with a Presenter:
@Override
private void getUserData() {
    disposableSubscriptions.add(
            auth.getUser().subscribeOn(schedulerProvider.io())
                    .observeOn(schedulerProvider.ui())
                    .subscribeWith(
                            new DisposableMaybeObserver<User>() {

                                @Override
                                public void onError(Throwable e) {
                                    view.makeToast(R.string.error_retrieving_data);
                                    view.startDispatchActivity();
                                }

                                @Override
                                public void onComplete() {

                                }

                                @Override
                                public void onSuccess(User user) {
                                    ProfilePagePresenter.this.currentUser = user;
                                    view.setName(user.getName());
                                    view.setEmail(user.getEmail());
                                    if (user.getProfilePhotoUrl().equals("")) {
                                        view.setDefaultProfilePhoto();
                                    } else {
                                        view.setProfilePhotoURI(user.getProfilePhotoUrl());
                                    }

                                    getUserProfileFromDatabase();

                                }
                            }
                    )
    );
}

I realize the topic of the question is a bit general, so I'll try to narrow things down from here. The code I've posted above works insofar as I'm succesfully getting Data from Firebase's API using Create(). The problem is, I'm quite new to using RxJava 2, and I'm not certain what's going on under the hood here for garbage collection and memory leaks. I chose to use Observable.create() as per the RxJava 2 Docs:

"Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world."

RxJava 2 Docs Finally, the only proactive thing I'm doing at the moment to dispose of these Observables, is to call CompositeDisposable.clear() in my Presenter when events take the user to a new Activity.

Questions:
-Is it safe to assume that simply calling CompositeDisposable.clear() when the Presenter finishes, will handle my Garbage collection? (assuming I haven't created memory leaks in the rest of the code).

-If my understanding is correct, create() is a better option to use than fromCallable() in this case, as fromCallable() should be used for Synchronous events (i.e. not something like Firebase API callbacks)?

-Is it really as simple as just throwing my Asynchronous callbacks in Observable.create()? I'm terrified at how easy that is to do...


Answer:

Is it safe to assume that simply calling CompositeDisposable.clear() when the Presenter finishes, will handle my Garbage collection? (assuming I haven't created memory leaks in the rest of the code).

It's a little trickier than this. Non-disposed Observable won't create memory leak if everything referenced by the Observable belong to the Activity scope. Both the producer and the consumer will be garbage collected alongside Activity. Memory leak may occur if you referenced resources that will survive the Activity, a provider instantiated at Application level for example. So if you want to use CompositeDisposable.clear() make sure to implement emitter.setCancellable() inside Observable.create() to dispose those leaky resources.

If my understanding is correct, create() is a better option to use than fromCallable() in this case, as fromCallable() should be used for Synchronous events (i.e. not something like Firebase API callbacks)?

create() use to be named fromAsync(). Use fromCallable() to wrap a synchronous method call, create() when wrapping callback code.

Is it really as simple as just throwing my Asynchronous callbacks in Observable.create()? I'm terrified at how easy that is to do...

It is as easy ... if you take care of those pesky references outside of scope as mentioned at the first point.

Usually on Android, a memory leak involve the Context, which is big. Be sure to test your code. leakcanary is a great help for this matter.

Last, you could avoid doing the wrapping yourself by using an existing Firebase RxJava binding. Or take inspiration from them:

Question:

I am using onErrorReturn to emit a particular item rather than invoking onError if the observable encounters an error:

Observable<String> observable = getObservableSource();
observable.onErrorReturn(error -> "All Good!")
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.trampoline())
          .subscribe(item -> onNextAction(),
              error -> onErrorAction()
          );

This works fine but I want to consume the error in onErrorReturn only if certain conditions are met. Just like rethrowing an exception from inside a catch block.

Something like:

onErrorReturn(error -> {
    if (condition) {
        return "All Good!";
    } else {
        // Don't consume error. What to do here?
        throw error; // This gives error [Unhandled Exception: java.lang.Throwable]
    }
});

Is there a way to propagate the error down the observable chain from inside onErrorReturn as if onErrorReturn was never there?


Answer:

Using onErrorResumeNext I guess you can achieve what you want

observable.onErrorResumeNext(error -> {
              if(errorOk)
                  return Observable.just(ok)
              else
                  return Observable.error(error)
          })
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.trampoline())
          .subscribe(item -> onNextAction(),
              error -> onErrorAction()
          );

Question:

I have 2 service endpoints in my application a and b (Both are "Singles"). The request to service b depends on the response of a. After the the response of b I need access to both responses in my subscriber. I plan to do the call like this:

services.a()
    .flatMap(a -> services.b(a))
    .subscribe(b ->
        // Access a and b here
    )

But in this way, I only can access the result of b in my subscriber. How can I pass also the response of a to it?

My first attempt was to use something like this:

// Note: Code will not compile... Just to show the concept
services.a()
    .flatMap(
        a -> Observable.combineLatest(
                Observable.just(a)
                services.b(a).toObservable()
                Bifunction((a, b) -> {return Pair<ResponseA, ResponseB>(a, b)}))
    )
    .toSingle()
    .subscribe(pair -> {
        ResponseA a = pair.first();
        ResponseB b = pair.second();
    })

But as the use case gets a bit more complex, the code will evolve to an ugly monster.


Answer:

You can use a variant of flatMap() with resultSelector, resultSelector method will gather both input (a) and output (result of Observable b) of the flatMap and you can combine them together to whatever you need. See here.

Question:

AFAIK, the debounce() operator of rxJava is used for delaying emission of events. When I apply it with the search box it work normally:

RxTextView.textChangeEvents(editText)
                .debounce(1000, TimeUnit.MILLISECONDS) //Only emit after 1 sec
                .subscribe(new Observer<TextViewTextChangeEvent>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(TextViewTextChangeEvent event) {
                        //Get more information about text change event
                        Log.e(TAG, "Before: " + event.before() + ", start: " + event.start() + ", count: " + event.count());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "textChangeEvents: onComplete");
                    }
                });

But when I apply it with Observable.range() like this:

Observable.range(1, 10000)
                .debounce(1000, TimeUnit.MILLISECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Long integer) {

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

the emission keep coming to onNext() very fast (about 1000 emissions/s) and continuously although I have applied debounce(1000, TimeUnit.MILISECONDS) operator.

Want I am expecting is: when I'm using debounce() only 1 number will be emitted after a delay of 1000 miliseconds (it can skip numbers when delaying between 2 emissions). Means emission will go downstream one by one like the search box example above.

I'm new to rx please help me to achieve this and explain why? I don't really know why I MUST use another operator but debounce() because the idea is the same!


Answer:

debounce prevents the downstream from getting overwhelmed by defining a grace period which must elapse between events in order to get the last event, in other words, it will emit the latest element after some quiet time. Range will go through its items as fast it can thus there won't be enough time between items and only the very last will be emitted.

debounce is simply not the operator your use case requires. The extensions project for 2.x has the spanout operator for your use case.

Question:

I'm using RxAndroid 2.0.1 with RxJava 2.0.6.

I have two observables: one returns Maybe<MyObject> based on some String (ID). When the optional object is returned, I have to call the second one that takes the MyObject instance and returns Single<Boolean> if object meets some conditions. Then I can do some further operations with the object instance.

My current implementation is as follows:

objectDAO.getById(objectId)
    .subscribe(
        myObject -> checkCondition(myObject),
        throwable -> /* Fallback */,
        () -> /* Fallback */
    );

private void checkCondition(final MyObject myObject) {
  otherDAO.checkCondition(myObject)
    .subscribe(
        isTrue -> {
          if (isTrue) {
            // yay! now I can do what I need with myObject instance
          } else {
            /* Fallback */
          }
        },
        throwable -> /* Fallback */
    );
}

Now I'm wondering how could I simplify my code. My ideas:

  1. Try to use zip - I can't because second Observable can't be subscribed until the first one returns the MyObject

  2. Try to use filter - Now the issue is that I need to use blocking get to call second observable. It will propably work, but looks like a code smell:

    objectDAO.getById(objectId)
      .filter(myObject ->
        otherDAO.checkCondition(myObject).blockingGet()
      )
      .subscribe(
          myObject -> checkCondition(myObject),
          throwable -> /* Fallback */,
          () -> /* Fallback */
      );
    
  3. Try to use flatMap - The second observable returns Boolean while I need to return the original object. Because of that I need to mape a code snippet with blockingGet and return original object or Maybe.empty()

Any suggestions how to do it in such a way that the code is "clean" (it's smaller and it's still clear what's happening inside)?


Answer:

One thing you could do:

objectDAO.getById(objectId)
    .flatMapSingle(myObject -> otherDAO
        .checkCondition(myObject)
        .map(isTrue -> Pair.create(myObject, isTrue))
    )

Then you have an Observable<Pair<MyObject, Boolean>> and can proceed however you want: subscribe directly and check the Boolean there, filter by the Boolean value, etc.

Question:

Is it OK to decorate Observable<>, Single<>, Maybe<>, Flowable<> in rx-java? E.g. like this:

public final class NonEmptyStringSource extends Observable<String> {

    private final Observable<String> source;

    public NonEmptyStringSource(final Observable<String> source) {
        this.source = source.filter(s -> s.length() > 0);
    }

    @Override
    protected void subscribeActual(final Observer<? super String> observer) {
        this.source.subscribe(observer);
    }
}

Does this approach have some pitfalls? Is it safe in use?


Answer:

Unlike 1.x, this pattern in 2.x has no penalty and is almost like how the standard operators are implemented. Depending on your needs, you may want to implement an ObservableTransformer instead:

ObservableTransformer<String, String> t = 
    upstream -> upstream.filter(s -> s.length() > 0);

Observable.fromArray("a", "b", "", "d", "", "f")
.compose(t)
.subscribe(System.out::println, Throwable::printStackTrace);

Question:

I have an observable sequence. When the first element is inserted, I would like to start a timer and batch subsequent inserted elements during the timespan of the timer. Then, the timer wouldn't start again until another element is inserted in the sequence.

--------|=====timespan====|---------------|=====timespan====|-------------->
        1  2 3 4    5                     6 

would produce:

[1,2,3,4,5], [6] 

I tried with Observable.buffer() and a timespan but from my experimentation, I can see that the timer is started as soon as we subscribe to the observable sequence and is restarted as soon as the previous timer is completed.

So having the same sequence as the previous example and using the buffer() with a timespan, I would have something like this:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
        1  2 3 4                          5 6           

which would produce this:

[1,2,3,4], [], [5,6], []

This is essentially the same question as 29858974, but for java instead.

So the problem is that since I don't want to delay my stream too much I want to have a timer that is quite short, and that timer would be quite intensive. I could simply filter empty lists, but I think that impacts the CPU too much.


Answer:

window operator will act as buffer, you cannot use it directly.

The idea is to control the timer by the emissions of the first observable (which I call insertions). For this you have to include a third parameter to link the two observables (stopWatch Subject in the solution bellow).

    @Test
    public void stop_watch_observable() {

        Subject<Long> stopWatch = PublishSubject.create();

        Observable<Long> insertions = insertions();

        //share to use it as a timer (looking for the first emission)
        //and to recieve the items
        Observable<Long> shared = insertions.share();

        //for each emission of insertions we start a new timer
        //but only the first one is emitted
        //the others are stopped by the takeUntil(stopWatch)
        Observable<Long> window = shared
                .flatMap(e -> Observable.timer(3, TimeUnit.SECONDS).takeUntil(stopWatch));

        shared.buffer(window)
                //each time a window is generated we kill all the current timers
                .doOnNext(e -> stopWatch.onNext(0L))
                .blockingSubscribe(System.out::println);
    }

    // insertions generator which is comming randomly
    private Observable<Long> insertions() {
        AtomicLong al = new AtomicLong(0);
        return Observable.generate((Emitter<Long> emitter) -> {
            if (al.getAndIncrement() % 4 == 0) {
                Long timeToWait = Long.parseLong(RandomStringUtils.randomNumeric(1));
                System.out.println("sleeping for: " + timeToWait);
                sleep(timeToWait * 1000);
            } else {
                sleep(500);
            }
            emitter.onNext(al.get());
        }).subscribeOn(Schedulers.newThread());
    }

Question:

I want to filter items emitted by an Observable, but I have many filter criterias and I'm wondering what is the better way to do that - performance wise.

One way would be to call one "filter" method which has all of the criterias in multiple "if" statements and returns the final filtering result, and to call:

observable
    .filter(this::filter)

Another way would be to have multiple "filterX" methods, each filters by a specific criteria, and call them in a chain:

observable
    .filter(this::filterX)
    .filter(this::filterY)
    .filter(this::filterZ)

My question is - is there any performance difference and which of the two is 'better practice'? I find the second one nicer and more readable, but currently I encountered a "filter" method with ~30 'if' statements and I'm wondering if I should bother and refactor it to the second approach.


Answer:

RxJava library tries to optimize the scenario described by you with the concept of Operator Fusion:

Operator fusion has the premise that certain operators can be combined into one single operator (macro-fusion) or their internal data structures shared between each other (micro-fusion) that allows fewer allocations, lower overhead and better performance.

It gives a specific example about the filter operator in the design document:

  • a is b and the two operator's parameter set can be combined into a single application. Example: filter(p1).filter(p2) combined into filter(p1 && p2).

So, in your case, the library will try its best to combine all the filters in order to not have much performance difference.

Question:

I have an Observable<Single<T>> that I would like to transform that to a new Observable<T> where any Single<T> that fails is ignored.

Here is my attempt at an implementation:

public static <T> Observable<T> skipErrors(final Observable<Single<T>> xs) {
    Preconditions.checkNotNull(xs);
    return xs.flatMap(single -> single.map(Optional::of)
        .onErrorReturn(error -> Optional.empty())
        .flatMapObservable(optional ->
            optional.map(Observable::just).orElseGet(Observable::empty)));
}

Basically it wraps every success in an Optional and maps every failure to Optional.empty(). The optionals are then filtered in a flatMapObservable.

Is there a more idiomatic way to do this?


Unit-test:

final Observable<Single<String>> observable = skipErrors(Observable.just(
    Single.error(new Exception()),
    Single.error(new Exception()),
    Single.just("Hello"),
    Single.error(new Exception()),
    Single.just("world"),
    Single.error(new Exception())));

final ImmutableList<String> expected = ImmutableList.of("Hello", "world");
final ImmutableList<String> actual = observable.toList()
    .blockingGet()
    .stream()
    .collect(ImmutableList.toImmutableList());

assertEquals(expected, actual);

Answer:

I think you should be able to do it a bit simpler:

public static <T> Observable<T> skipErrors(final Observable<Single<T>> singles) {
  Preconditions.checkNotNull(xs);
  return singles
    .flatMap(single -> single
        .toObservable()
        .onErrorResumeNext(Observable.empty())
    );
}

Question:

Say there is interface containing methods:

Observable<Data> makeHttpCall(int param1, boolean param2);

Completable storeInDatabase(Data data);

Completable combinedCall(int param1, boolean param2);

What is the best way to implement the combinedCall method that would:

  1. fetch data from makeHttpCall
  2. store it using storeInDatabase
  3. return Completable that completes when storeInDatabase completes?

Seems that in RxJava 1.0 it was possible to do Completable.merge(Observable) but merge does not seem to accept Observable any more.


Answer:

First of all I don't believe merge is a good fit for your needs, as the storeInDatabase has to be performed on the results of makeHttpCall instead of parallel to it.

This should work for you:

Completable combinedCall(int param1, boolean param2) {
    return makeHttpCall(param1, param2)
            .flatMapCompletable(new Function<Data, CompletableSource>() {
                @Override
                public CompletableSource apply(@NonNull Data d) throws Exception {
                    return storeInDatabase(d);
                }
            });
}

Question:

Sometimes I want to trigger a Runnable as part of my Observable sequence, but the Runnable does not report progress.

I have written a simple factory for wrapping a Runnable object into an Observable:

public static <T> Observable<T> fromRunnable(final Runnable action) {
    if (action == null) {
        throw new NullPointerException("action");
    }
    return Observable.fromPublisher(subscriber -> {
        try {
            action.run();
            subscriber.onComplete();
        } catch (final Throwable throwable) {
            subscriber.onError(throwable);
        }
    });
}

Usage:

Observable.concat(
    someTask, 
    MoreObservables.fromRunnable(() -> {
        System.out.println("Done. ");
    }));

But does RxJava 2 provide this functionality already?


Answer:

There is no such factory method for Observable, but Completable could be made from Runnable. So you could create a Completable first and then convert it to Observable:

Observable.concat(
    someTask, 
    Completable.fromRunnable(() -> {
        System.out.println("Done");
    }).toObservable()
);

Update: Dealing with exceptions

Completable.fromRunnable internally catches exceptions from its Runnable and pushes them into the stream as onError emissions. However, if you are using Java, you have to deal with checked exceptions inside the run() method by yourself. To avoid that you could utilize Callable instead of Runnable, since its call() method's signature declares that it can throw exceptions. Completable.fromCallable() wraps exceptions into onError emissions as well:

Observable.concat(
    someTask, 
    Completable.fromCallable(() -> {
        System.out.println("Done");
        return null;
    }).toObservable()
);

Also Callable could be used to create an Observable or Single with a single item emission.

P.S. Check out the source code, these methods are pretty straightforward.

P.P.S. Kotlin has no checked exceptions ;)


Update 2

There is also fromAction factory method for creating Completable. It accepts Action objects.

A functional interface similar to Runnable but allows throwing a checked exception.

So the code could be simplified to:

Observable.concat(
    someTask, 
    Completable.fromAction(() -> {
        System.out.println("Done");
    }).toObservable()
);

Question:

Short story: I have a situation where I have 2 Observables that have a single purpose:

  • they receive some data
  • they return modified data
  • throw an error if the data cannot be processed

They are each in charge of handling different types of data. Additionally I want to do something when both data has been processed.

My current best implementation is as follows, these are my Observables:

    Single<BlueData> blueObservable = Single.create(singleSubscriber -> {
        if (BlueDataProcessor.isDataValid(myBlueData)) {
            singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData));
        }
        else {
            singleSubscriber.onError(new BlueDataIsInvalidThrow());
        }
    });

    Single<RedData> redObservable = Single.create(singleSubscriber -> {
        if (RedDataProcessor.isDataValid(myRedData)) {
            singleSubscriber.onSuccess(RedDataProcessor.process(myRedData));
        }
        else {
            singleSubscriber.onError(new RedDataIsInvalidThrowable());
        }
    });

    Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable,
            (blueData, redData) -> PurpleGenerator.combine(blueData, redData));

I also have the following subscriptions:

    blueObservable.subscribe(
            result -> {
                saveBlueProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });

    redObservable.subscribe(
            result -> {
                saveRedProcessStats(result);
            },
            throwable -> {
                logError(throwable);
            });


    composedSingle.subscribe(
            combinedResult -> {
                savePurpleProcessStats(combinedResult)
            },
            throwable -> {
                logError(throwable);
            });

MY PROBLEM: The blue & red data is processed twice, because both subscriptions are run again with I subscribe to the combined observable created with Observable.zip().

How can I have this behaviour without running both operations twice?


Answer:

This is not possible with Single in 1.x because there is no notion of a ConnectableSingle and thus Single.publish. You can achieve the effect via 2.x and the RxJava2Extensions library:

SingleSubject<RedType> red = SingleSubject.create();
SingleSubject<BlueType> blue = SingleSubject.create();

// subscribe interested parties
red.subscribe(...);
blue.subscribe(...);

Single.zip(red, blue, (r, b) -> ...).subscribe(...);

// connect()
blueObservable.subscribe(blue);
redObservable.subscribe(red);

Question:

I'm stuck in a quite weird problem and ask for your help.

The overall setting is the following: I have a bunch of data providers that can occur or disappear at runtime. These providers provide - big surprise - data, modeled as io.reactivex.Observables. (To be precise: as BehaviorSubjects, remembering the latest data for new subscribers.)

Now, I need to combine the data of all current data providers, so that I get a new "main" Observable which gets updated whenever any data provider's observable changes or when new providers appear (or old ones disappear).

So far, that sounds like merging, but for the main Observable I need all provider's data combined, on any change, each provider's respective last state. This combining works fine for non-dynamic providers, which are known in advance, using Observable.combineLatest.

But the problems arise, when I embed that method into a dynamic context, listending for added or removed providers. Then an update of one of the provider's Observable triggers not only one update as expected, but several updates, some of them only containing partial data.

Have a look at the following (self-contained, using RxJava 2.1.9) example, wich should clarify on my problem. Commenting is done as println(), so that the produced output is readable, too. The first part is static, without adding or removing providers, and works as expected. The second part is the weird thing...

I'd appreciate any further ideas or assistance to solve this issue - thanks!

import io.reactivex.Observable;
import io.reactivex.functions.Function;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class CombineLatestWithDynamicSources {

    static final Function<Object[], List<String>> STRING_COMBINER = objects -> Arrays.stream(objects)
                                                                                     .map(Object::toString)
                                                                                     .collect(Collectors.toList());

    public static void main(String[] args) {

        System.out.println("*** STATIC ***");
        staticCombineWorksAsExpected();

        System.out.println("\n*** DYNAMIC ***");
        dynamicCombineBehavesWeird();

    }

    static void staticCombineWorksAsExpected() {

        Subject<String> subjectA = BehaviorSubject.createDefault("A.1");
        Subject<String> subjectB = BehaviorSubject.createDefault("B.1");

        List<Subject<String>> subjects = Arrays.asList(subjectA, subjectB); // potentially more...

        Observable<List<String>> combined = Observable.combineLatest(subjects, STRING_COMBINER);

        System.out.println("initial values:");
        combined.subscribe(strings -> System.out.println(">> Combined: " + strings));

        System.out.println("updating A:");
        subjectA.onNext("A.2");

        System.out.println("updating B:");
        subjectB.onNext("B.2");

        System.out.println("\n... works as expected, but adding subjects isn't possible in this setting.");
    }

    static void dynamicCombineBehavesWeird() {

        List<Subject<String>> subjectsList = new ArrayList<>();
        Subject<List<Subject<String>>> subjectsObservable = BehaviorSubject.createDefault(subjectsList);

        System.out.println("subjects are initially empty:");
        subjectsObservable.subscribe(subjects -> System.out.println(">> Subjects: " + subjects));

        Observable<List<String>> combined = subjectsObservable.flatMap(
                subjects -> Observable.combineLatest(subjects, STRING_COMBINER));

        combined.subscribe(strings -> System.out.println(">> Combined: " + strings));

        System.out.println("add Subject A, providing default value 'A.1' - as expected:");
        Subject<String> subjectA = BehaviorSubject.createDefault("A.1");
        subjectsList.add(subjectA);
        subjectsObservable.onNext(subjectsList);

        System.out.println("updating A - also as expected:");
        subjectA.onNext("A.2");

        System.out.println("add Subject B, providing default value 'B.1' - as expected, now both subject's last values show up:");
        Subject<String> subjectB = BehaviorSubject.createDefault("B.1");
        subjectsList.add(subjectB);
        subjectsObservable.onNext(subjectsList);

        System.out.println("updating A again - I'd expect the second result only! Why is there '[A.3]' popping up before the expected result?!");
        subjectA.onNext("A.3");

        System.out.println("This doesn't happen on updating B...:");
        subjectB.onNext("B.2");

        System.out.println("digging deeper, add Subject C, providing default value 'C.1' - as expected again:");
        Subject<String> subjectC = BehaviorSubject.createDefault("C.1");
        subjectsList.add(subjectC);
        subjectsObservable.onNext(subjectsList);

        System.out.println("Now update A - three results pop up, only the last is expected!");
        subjectA.onNext("A.4");

        System.out.println("update B, which now emits also two results - last expected only:");
        subjectB.onNext("B.3");

        System.out.println("update C works as expected:");
        subjectC.onNext("C.2");

        System.out.println("\n... huh? Seems on updating the first source, the combined results gets computed for the first, " + "then for the first and second, then for first, second and third (and so on?) source...");

    }

}

... which produces the following output:

*** STATIC ***
initial values:
>> Combined: [A.1, B.1]
updating A:
>> Combined: [A.2, B.1]
updating B:
>> Combined: [A.2, B.2]

... works as expected, but adding subjects isn't possible in this setting.

*** DYNAMIC ***
subjects are initially empty:
>> Subjects: []
add Subject A, providing default value 'A.1' - as expected:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e]
>> Combined: [A.1]
updating A - also as expected:
>> Combined: [A.2]
add Subject B, providing default value 'B.1' - as expected, now both subject's last values show up:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e, io.reactivex.subjects.BehaviorSubject@90f6bfd]
>> Combined: [A.2, B.1]
updating A again - I'd expect the second result only! Why is there '[A.3]' popping up before the expected result?!
>> Combined: [A.3]
>> Combined: [A.3, B.1]
This doesn't happen on updating B...:
>> Combined: [A.3, B.2]
digging deeper, add Subject C, providing default value 'C.1' - as expected again:
>> Subjects: [io.reactivex.subjects.BehaviorSubject@4157f54e, io.reactivex.subjects.BehaviorSubject@90f6bfd, io.reactivex.subjects.BehaviorSubject@47f6473]
>> Combined: [A.3, B.2, C.1]
Now update A - three results pop up, only the last is expected!
>> Combined: [A.4]
>> Combined: [A.4, B.2]
>> Combined: [A.4, B.2, C.1]
update B, which now emits also two results - last expected only:
>> Combined: [A.4, B.3]
>> Combined: [A.4, B.3, C.1]
update C works as expected:
>> Combined: [A.4, B.3, C.2]

... huh? Seems on updating the first source, the combined results gets computed for the first, then for the first and second, then for first, second and third (and so on?) source...

Answer:

Some time since - now I know better... ;-)

Simple Solution - so simple... :m

subjectsObservable.switchMap(subjects -> Observable.combineLatest(subjects, STRING_COMBINER))
                  .subscribe(strings -> System.out.println(">> Combined: " + strings));

Question:

Updated

I'm looking for a way to control the flow of one observable by another one. For example, let's have 2 monotically increasing (important) observables of integers:

source  : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control : -1----3----------------5-----------6-------9-----------12------

I need to produce a new observable, whose elements exactly match the source, but their timing is controlled by control observable in the following way: the source values should always be less or equal to control values. This means that only all source values, which are greater than recently published control should wait until they are "released" by control

source         : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control        : -1----3----------------5-----------6-------9-----------12------
expected result: -1----2-2--2--3--3-----4-4--5------6-------8-9---------10-11---

Please take a look at the following code example:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
    // ???
}

@Test
public void testControl() throws InterruptedException {
    Subject<Integer> control = PublishSubject.create();
    Observable<Integer> source = Observable.fromArray(1, 2, 2, 2, 3, 3, 4, 4, 5, 6, 8, 10, 11);
    Observable<Integer> combined = combine(source, control, (s, c) -> s <= c);
    control.subscribe(val -> System.out.println("Control: " + val));
    combined.observeOn(Schedulers.io()).subscribe(val -> System.out.println("Value: " + val));

    control.onNext(3); // should release 1,2,2,2,3,3
    Thread.sleep(1000);
    control.onNext(6); // should release 4,4,5,6
    Thread.sleep(1000);
    control.onNext(11); // should release 8,10,11
    Thread.sleep(1000);
}

Answer:

Since I didn't find any elegant solution, I ended up with implementing it by my own. I will be happy if I someone can suggest more elegant solution (in this case I will unaccept this answer and will accept the better one). Following is my solution:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
    return Observable.create(emitter -> {
        Queue<T> buffer = new ArrayDeque<>();
        AtomicReference<C> lastControl = new AtomicReference<>();
        CompletableSubject sourceCompletable = CompletableSubject.create();
        CompletableSubject controlCompletable = CompletableSubject.create();
        Disposable disposable = new CompositeDisposable(
                control.subscribe(
                        val -> {
                            lastControl.set(val);
                            synchronized (buffer) {
                                while (!buffer.isEmpty() && predicate.apply(buffer.peek(), val)) {
                                    emitter.onNext(buffer.poll());
                                }
                            }
                        },
                        emitter::onError,
                        controlCompletable::onComplete),
                source.subscribe(
                        val -> {
                            C lastControlVal = lastControl.get();
                            synchronized (buffer) {
                                if (lastControlVal != null && predicate.apply(val, lastControlVal)) {
                                    emitter.onNext(val);
                                } else {
                                    buffer.add(val);
                                }
                            }
                        },
                        emitter::onError,
                        sourceCompletable::onComplete),
                controlCompletable.andThen(sourceCompletable).subscribe(emitter::onComplete));
        emitter.setDisposable(disposable);
    });
}

Question:

I have a map that contains key/value pair. From this map I store them by streaming into a database that returns as a result Observable<String>.

My problem comes when I try to reach this string without blocking and put it on the list.

It is always empty. here it is my implementation:

public Observable<List<String>> uploadKeys(Map<String, String> entries) {
       final List<String> collection = new ArrayList<>();
       entries.entrySet().stream()
              .forEach(entry -> storeKeysInTheDb(entry.getKey(), entry.getValue()).map(element -> element.isEmpty() ? "" : collection.add(element)));
       return Observable.just(collection);
}

Answer:

You have to subscribe to the observable returned by storeKeysInTheDb(); until you subscribe, nothing happens and no string results.

public Observable<List<String>> uploadKeys(Map<String,String> entries) {
  return Observable.fromIterable( entries.entrySet )
           .flatMap( entry -> storeKeysInTheDb( entry.getKey(), entry.getValue() )
           .filter(element -> !element.isEmpty())
           .toList();
}

When you subscribe to the resulting observable, the entries will be mapped and stored and the resulting elements will be emitted as a list.

Question:

I'm trying to build an operator like s.startWith(x), but conditional one - let's call it s.startWithIfNothingAvailable(x). I want it to prefix stream with an x only if s has no elements available at the moment of subscription.

Let me illustrate the idea with an example.

s is a stream of reports from server.

  • If there is no report arrived yet, I want to prefix s with an empty one - just to update ui with something.
  • If s contains something(perhaps some reports were cached) prefixing s will result in rendering empty report and then non empty one. I would like to avoid such blinking.

I think the other way of solving that is to use something like .concat but which order observables by availability of its elements.

Observable.concatFirstAvailable(serverReport, emptyReport), if serverReport has no elements yet - switch to emptyReport and than get back to waiting on serverReport.


Answer:

You could merge with a delayed special report item:

// imitate infinite hot service
PublishSubject<Report> service = PublishSubject.create();

// special report indicating the service has no reports
Report NO_REPORT = new Report();

AtomicBoolean hasValue = new AtomicBoolean();

service
// we'll need the main value for both emission and control message
.publish(main ->
     // this will keep "listening" to main and allow a timeout as well
     main.mergeWith(
         // signal the empty report indicator
         Observable.just(NO_REPORT)
         // after some grace period so main can emit a real report
         .delay(100, TimeUnit.MILLISECONDS)
         // but if the main emits first, don't signal the empty report
         .takeUntil(main)
     )
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(report -> {
     if (report == NO_REPORT) {
         // even if this onNext is serialized, NO_REPORT may get emitted
         if (!hasValue.get()) {
             // display empty report
         }
     } else {
         // this indicates a NO_REPORT should be ignored onward
         hasValue.set(true);
         // display normal report
     }
}, error -> {  /* show error */ })

Thread.sleep(200); // Thread.sleep(50)
service.onNext(new Report());

Question:

I'm trying to figure out how to get a result from the network, persist it and return the reponse body to the Observable. Like this:

@Override
    public Observable<DefaultUserResponse> createUser(CreateUserCommand command) {
        return this.userService.createUser(command)
                .map(defaultUserResponse -> {
                    User user = new User();
                    defaultUserResponse.setUser(user);
                    return defaultUserResponse;
                }).flatMap(defaultUserResponse -> persist(defaultUserResponse.getUser()));
    }

Observable<User> persist(User user) {return null;}

Steps:

  1. I get a DefaultUserResponse from the server.
  2. Transform the command to a User (command = DTO)
  3. Persist the user locally.
  4. Return the DefaultUserResponse.

How should I proceed ?

Thanks


Answer:

Consider using Completable type for your persist method (if you don't care about the returned value(s)):

Completable persist(User user) {
    return Completable.fromCallable(() -> {
        //persisting
        return null;
    }
}

Then you could use andThen operator to wait until persisting completes and push your value further:

...
.flatMap(defaultUserResponse -> persist(defaultUserResponse.getUser()).andThen(Observable.just(defaultUserResponse)));

Or if you still want to use Observable for persisting, just use another flatMap instead of andThen.

Question:

I have an Observable that emits random bits/booleans. I need to make another Observable that combines those random bits to create and emit random integers. Every time the underlying Observable emits a bit, this Observable appends that bit to a bit string, once that bit string reaches a specific length, this Observable converts it to an integer and emits it.

Here's the illustration:

Here's how I implement it using Android LiveData:

final StringBuilder bitStringBuilder = new StringBuilder();
final MediatorLiveData<Integer> integerLiveData = new MediatorLiveData<>();
integerLiveData.addSource(
        randomSource.getBooleanLiveData(),
        new Observer<Boolean>() {
            @Override
            public void onChanged(Boolean b) {
                bitStringBuilder.append(b ? '1' : '0');
                if (bitStringBuilder.length() == 31) {
                    integerLiveData.setValue(Integer.parseInt(bitStringBuilder.toString(), 2));
                    bitStringBuilder.setLength(0); // clear the bit string builder
                }
            }
        }
);

How to achieve this using RxJava 2?


Answer:

Buffer bits:

source
.buffer(31)
.map(bits -> {
    int result = 0;
    for (int b : bits) {
        result = (result << 1) | (b ? 1 : 0);
    }
    return result;
})

Question:

I was experimenting with RxJava operators and came across this issue and don't know why it's behaving the way it is. Let me walk through an example and hopefully the problem will be clear.

I have a fruits Observable which I created using Observable.just

Observable<String> fruits = Observable.just("Apple", "Banana", "Mango", "Strawberry", "Raspberry", "Blood Oranges");

I have another languages observable which I created using Observable.create

Observable<String> pLanguages = Observable.create(emitter -> {
            emitter.onNext("Kotlin");
            emitter.onNext("Java");
            emitter.onNext("Python");
            emitter.onNext("Javascript");
            emitter.onNext("Go");
            emitter.onNext("C");
            emitter.onNext("Rust");
        });

The switchIfEmpty() operator will subscribe to the operator which we pass to it if the source operator doesn't emit any observables.

Using this filter, I should see the fruits being emitted based on the definition of switchIfEmpty().

pLanguages.filter(s -> s.startsWith("Z"))
                .switchIfEmpty(fruits)
                .subscribe(System.out::println, System.out::println, () -> System.out.println("Complete"));

But I don't see any emissions. However, if I switch the observables like so

fruits.filter(s -> s.startsWith("Z"))
                .switchIfEmpty(pLanguages)
                .subscribe(System.out::println, System.out::println, () -> System.out.println("Complete"));

I do see the pLanguages observer emitting events. I've tried to debug but not sure why the Observable.create() is not emitting an empty observable for the switchIfEmpty() operator while Observable.just() does. I've spent couple of hours on this and hoping someone here can answer the question.


Answer:

Summary

You need to signal a completion on the languages source

Example (based on the provided source code)

Observable<String> pLanguages = Observable.create(emitter -> {
    emitter.onNext("Kotlin");
    emitter.onNext("Java");
    emitter.onNext("Python");
    emitter.onNext("Javascript");
    emitter.onNext("Go");
    emitter.onNext("C");
    emitter.onNext("Rust");
    emitter.onComplete();
});

Example in Kotlin (addt'l)

fun main(args: Array<String>) {
    val numbers = Observable.just("one", "two", "three")
    val languages = Observable.create<String> { emitter ->
        emitter.onNext("java")
        emitter.onNext("kotlin")
        emitter.onNext("swift")
        emitter.onComplete() // <-- Invoke this
    }

    languages
            .filter { s -> s.toLowerCase().startsWith("z") }
            .switchIfEmpty(numbers)
            .subscribe({
                System.out.println(it)
            }, {
                System.out.println(it)
            })

    numbers
            .filter { s -> s.toLowerCase().startsWith("z") }
            .switchIfEmpty(languages)
            .subscribe({
                System.out.println(it)
            }, {
                System.out.println(it)
            })


}

Output

one
two
three
java
kotlin
swift

Read more on

Question:

I was wondering if the observable is eligible to be garbage collected in this scenario:

fun getObservable() = Observable.interval(500, TimeUnit.Milliseconds)

fun main(args: Array<String>) {
    getObservable().subscribe { println(it) }

    //Just to be able to observe the output.
    Thread.sleep(20*1000)
}

It doesn't stop emitting. But I can't use it to be sure that it will not be GC'd after some time since it may just be that GC isn't triggering in this short span.

My guess is that it should be GC'd. We don't have a reference to the Observable returned from the getObservable() function since we can't access it anymore. Calling the function again will give a different new Observable. Since we can't reach the Observable itself, the rest of the objects are also unreachable. So, I think all of this can be garbage collected at any random time.

Some of my code in an Android App uses somewhat similar code. And it is an Observable that I hope will last as long as the Application. Since, GC will collect any garbage, I don't want my Observable to be GC'd in the middle of my app. So, please keep that in mind that I'm not looking for answers telling me whether it will be GC'd or not. What I care about is whether it can be GC'd.

Thank you for taking out your time to help me.

EDIT: To make the context clearer, here's the basic idea of the concerned code as I use it in my app.

Repository.kt:

interface Repository {

    //Other code.

    fun getUserPrefs(): Flowable<UserPreference>
}

MyViewModel.kt:

class MyViewModel(private val repository: Repository): ViewModel() {

    init {
        repository.getUserPrefs()
            .subscribe { //Code with UI state side effects }
    }
}

In my app, Repository's implementation will be a singleton.For those not familiar with Android, just know that ViewModel's can last for pretty long time. It's work is basically to be responsible for managing UI state.


Answer:

So after the sleep main exits and the emitting stops. If you try it with Thread.sleep(1000) you'll see that it only outputs

0
1

The reason for this is that your program terminates.

What you want here might be blockingSubscribe:

getObservable().blockingSubscribe { println(it) }

which will only return once the Observable is finished.

Note that you can subscribe on 3 types of events: onNext, onError and onComplete:

getObservable().blockingSubscribe({
    println(it)
}, {
    throw it
}, {
    println("finished")
})

Subscribing to all 3 will not change the fact that the subscriber runs out of scope so you'll still need to use blockingSubscribe. If you want to offload work between threads you can use subscribeOn and observeOn:

getObservable()
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.single())
        .blockingSubscribe {
            println(it)
        }

You can read more about these in this question.

About garbage collection:

The Java garbage collector is intelligent enough to reclaim resources which have references to each other but are not referenced by the so called "root reference":

 (Root reference)

 (obj0) --> (obj1)
  \          /
   \        /
    \      /
    (obj2)

^^^--- obj0, 1 and 2 are eligible for garbage collection

So why does your Observable keep running despite it being detached from other objects? The answer is that there are multiple types of "root" references in Java:

  • Local variables
  • Active Java threads
  • Static variables
  • JNI References

In your case when you call subscribe on your Observable it offloads work to RxJava-s threads which are "root" references in themselves so even if you don't have a reference of any of those objects they will still keep running ad infinitum or until the Threads terminate.

Question:

In RxJava2 you can't do something like this

    observable.subscribe(s -> {
        System.out.println(s);
    }, e -> {
        System.err.println(e);
    }, c -> {
        System.err.println(c);
    });

because c-> {} is not Action type(obviously)

Instead you're forced to do something like this

    Action onComplete = new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("on complete");
        }       
    };
    observable.subscribe(s -> {
        System.out.println(s);
    }, e -> {
        System.err.println(e);
    },onComplete);

Any reason why onComplete is not made as Consumer type?


Answer:

The reason is that onComplete doesnt emit any data. A Consumer requires a Type. If something is completed, it's completed.

If you have a Completable it only calls onComplete() which doesnt have emitted data aswell.

onCompleted really means completed. onNext/onSuccess means it has data emitted with Consumer<Type> while onError means it has a Consumer<Throwable>.

Question:

Based on post type i need to push item to observable list. So in case when if closure is not used (for example post.type != SomeType) my observable list is empty.

So due to that im receiving NoSuchElementException all the time from Single.zip

Here is my sample code

val observableList = mutableListOf<Single<Response>>()

if (post.type == SomeType) {
      observableList.add(addNewObservable()) <-- adding API call to the lsit
}

return Single.zip(observableList) { arg -> arg } <-- Throws error NoSuchElementException

I figured that i can use for post type != SomeType :

observableList.add(Single.just(Response(""))

And this will work fine.

So is there a better solution for it? How can i avoid that error?


Answer:

If your list can legitimately be empty, maybe you want a Maybe instead of Single. One way would be

val observableList = mutableListOf<Maybe<Response>>()
...
if (post.type == SomeType) {
      observableList.add(addNewObservable().toMaybe())
}
...
return Maybe.zip(observableList) { arg -> arg }

Maybe.zip on an empty list should return a Maybe which completes immediately without emitting anything.

Or an alternate solution: just test if the list is empty before calling Single.zip.

It really depends on what behavior you want (except for not throwing an exception) in the first place.

Question:

I have a hard-coded observable sequence that is zipped with an interval sequence. The intention is to introduce a delay for each element.

The expected output is:

1
2
3
Done. 

Here is the code:

    Observable.just(1, 2, 3)
        .zipWith(Observable.interval(1000L, TimeUnit.MILLISECONDS), (x, y) -> x)
        .subscribe(
            System.out::println, 
            Throwable::printStackTrace, 
            () -> System.out.println("Done. "));

However, it never pushes an element, throws an error or calls complete.

Why is this?


Answer:

Your main thread probably exits so the process stops before you see any output.

Add a latch or some other mechanism to get the main thread not to quit.

CountDownLatch l = new CountDownLatch(1);
Observable.just(1, 2, 3)
          .zipWith(Observable.interval(1000L, TimeUnit.MILLISECONDS), (x, y) -> x)
          .subscribe(
              System.out::println,
              Throwable::printStackTrace,
              () -> {
                  System.out.println("Done. ");
                  l.countDown();
              }
          );

l.await();

Question:

I have a simple program like this:

public class MainApp {
    public static void main(String[] args) {
        getAcronyms()
                .flatMap(Observable::fromIterable)
                .flatMap(MainApp::getTitle)
                .filter(Objects::nonNull)
                .subscribe(System.out::println);

    }

    private static Observable<List<String>> getAcronyms(){
        List<String> strings = new ArrayList<>();
        strings.add("YOLO");
        strings.add("LMAO");
        strings.add("ROFL");
        strings.add("AYY LMAO");
        return new Observable<List<String>>() {
            @Override
            protected void subscribeActual(Observer<? super List<String>> observer) {
                observer.onNext(strings);
                observer.onComplete();
            }
        };
    }

    private static Observable<String> getTitle(String url) {
        return new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext(url + " title!");
                observer.onComplete();
            }
        };
    }
}

This works fine, but when I chain a take:

getAcronyms()
        .flatMap(Observable::fromIterable)
        .flatMap(MainApp::getTitle)
        .filter(Objects::nonNull)
        .take(2)
        .subscribe(System.out::println);

it prints 2 values but gives me an NPE:

YOLO title!

LMAO title!

Exception in thread "main" java.lang.NullPointerException at io.reactivex.internal.operators.observable.ObservableTake$TakeObserver.onComplete(ObservableTake.java:83) at io.reactivex.internal.operators.observable.ObservableTake$TakeObserver.onNext(ObservableTake.java:64) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.tryEmit(ObservableFlatMap.java:262) at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onNext(ObservableFlatMap.java:559) at MainApp$2.subscribeActual(MainApp.java:41) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:436) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323) at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onSubscribe(ObservableFlatMap.java:546) at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:55) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162) at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139) at MainApp$1.subscribeActual(MainApp.java:31) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10842) at io.reactivex.Observable.subscribe(Observable.java:10828) at io.reactivex.Observable.subscribe(Observable.java:10731) at MainApp.main(MainApp.java:18)

Could someone help me figure out why this is happening and what I'm doing wrong?


Answer:

This exception occurs due to take, after emitting defined number of items, internally tries to dispose Disposable object that is not set.

Thus you have to provide it when creating initial Observable by calling observer.onSubscribe(disposable) inside subscribeActual method implementation. But don't reinvent the wheel, moreover creating an Observable by calling its public constructor is meant for custom operators. Just use the static factory methods. In your case the best choice is Observable.fromCallable:

private static Observable<List<String>> getAcronyms(){
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() throws Exception {
            List<String> strings = new ArrayList<>();
            strings.add("YOLO");
            strings.add("LMAO");
            strings.add("ROFL");
            strings.add("AYY LMAO");
            return strings;
        }
    });
}

Also check this article: RxJava 2 Disposable -  Under the hood

Question:

Is there a possibility to convert Observable<Single<Object>> to Single<List<Object>>? I'd also like to know whether it's possible to make the observable to wait for all the underlying singles to complete.


Answer:

I imagine you could do it with something like this (untested):

myObservable
    .flatMap(x -> x)    // Flatten to Observable<Object>
    .toList()           // Collect in an Observable<List<Object>>
    .single();          // Convert to Single

Question:

I'm writing my application with as much as I can of rxjava and I'm having the following problem.

I Have a method that syncs data with the api, and I download stuff based on a timestamp. So I would like the following data flow:

Get last timestamp -> Download data -> Save data -> Update timestamp if no errors ocurred

The problem is that I receive the timestamp from the api response to avoid date differences between devices, and as soon as I transform the response from the api I loose the timestamp.

Here is a simplification of the current flow:

class Response
{
    Date timeStamp;
    Data data;
}

interface IRepository
{
    Completable insert(Data data);
}

interface IWebService
{
    Observable<Response> getByLastUpdate(Date date);
}

class SyncPreferences
{
    public Date getLastDownloadDate() { /**/ }
    public void setLastDownloadDate(Date date){ /**/ }
}

public class SyncService
{
    private final IRepository repository;
    private final IWebService webService;
    private final SyncPreferences syncPreferences;

    public SyncService(IRepository repository, SyncPreferences syncPreferences, IWebService webService)
   {
        this.repository = repository;
        this.webService = webService;
        this.syncPreferences = syncPreferences;
   }

   private Completable sync()
   {
        return webService.getByLastUpdate(syncPreferences.getLastDownloadDate())
            .doOnComplete((response) -> {
                syncPreferences.setLastDownloadDate(response.timeStamp)
            }) // What I would like to do
            .flatMapCompletable((response) -> {
                repository.insert(response.data);
            });
    }
}

The problem is that doOnComplete method does not receive the parameter (response) from the observable and I would like to set the timestamp as the last thing so if an error occurs I can just retry downloading with the old timestamp.

The only solution I found was make a transaction mechanism in the SyncPreferencesthat I could use like this:

    private Completable sync()
    {
        return webService.getByLastUpdate(syncPreferences.getLastDownloadDate())
            .doOnNext((response) -> {
                syncPreferences.setLastDownloadDate(response.timeStamp);
            })
            .flatMapCompletable((response) -> {
                repository.insert(response.data);
            })
            .andThen(syncPreferences.commitChanges());
    }

I am new to RxJava so if you have suggestions in general I would be glad to know =]

P.S I don't know if this code compiles since I just wrote it here for the example.


Answer:

What about this:

webService.getByLastUpdate(syncPreferences.getLastDownloadDate())
    .flatMapCompletable(response ->
        repository
            .insert(response.data)
            .doOnComplete(() ->
                syncPreferences.setLastDownloadDate(response.timeStamp)
            )
    );

Question:

Basis a condition in my Observable, I want to delay onNext / onError. My code is as follows:

 fun check3(){
        val list = arrayListOf(1,2,3,4,5,6,7, null)
        val obs = Observable.create<Int> { subscriber ->
           list.filter {
                it != null
            }.map {
                if (it!! %2 == 0 ) {
                    Thread.sleep(3000)
                    subscriber.onError(IllegalArgumentException("Mod is true"))
                } else {
                    subscriber.onNext(it)
                    subscriber.onComplete()
                }
            }
        }
    }

A sore here being Thread.sleep(3000)

Is there a better way of doing this? Basically I want to delay the onError notification to my subscriber if the if(it %2) condition is met


Answer:

You can use concatMap to turn the sleep into a non-blocking delay:

Observable.fromIterable(list.filter { it != null })
.concatMap {
    if (it!! % 2 == 0) {
        return@concatMap Observable.error(IllegalArgumentException("Mod is true"))
                         .delay(3, TimeUnit.SECONDS, true)
    }
    Observable.just(it)
}
.take(1)

Question:

I've got two observables - OnPeriodChanged and OnFilterChanged, and trying to figure out how to call a function for view adapter when one of them changes. I've tried .zip, but for some reason it does not get triggered:

Observable.zip(OnPeriodChanged, OnFilterChanged, (Date, Filter) -> HistoryViewModel.getScans(Date.first, Date.second, Filter)).subscribe(scans -> histAdapter.setScans(scans));

What I can use here to invoke the getter function and pass the results from it to setter?


Answer:

zip will emit the items to the downstream only after both of your observables (OnPeriodChanged, OnFilterChanged) emitted. I think you are trying to call HistoryViewModel.getScans whenever any of the item changes, with latest values of Date and Filter. You could use combineLatest instead of zip

Try changing it to

    Observable.combineLatest(OnPeriodChanged, OnFilterChanged, (Date, Filter) -> HistoryViewModel.getScans(Date.first, Date.second, Filter))
            .subscribe(scans -> histAdapter.setScans(scans));

Question:

Observable<List<Stop>> zippedObservable = Observable.zip(observableList, objects -> {
    List<Stop> stopList = Collections.emptyList();
    for (Object obj : objects) {
        stopList.add((Stop) obj);
    }
    return stopList;
});

I have the zippedObservable variable which was zipped by multiple observables.

disposable.add(zippedObservable.observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(new DisposableObserver<List<Stop>>() {
    // onNext, onComplete, onError omitted
}));

This function emits the items (zipped stop list) successfully, but I'd like to emit these items every minute. I assumed that interval operator would be perfect for this case, but I couldn't figure out how to mix both zip and interval functionalities.

This is what I tried

zippedObservale.interval() // cannot call interval operator here.
Observable.zip(...).interval() // cannot call interval operator here too.

I am looking for someone to explain how to mix these two operators so that I can emit the items every minute. Thank you.


Answer:

interval is a static method that creates an Observable<Long> that emits a Long at a given period or interval.

To achieve what you describe, you need to use one such Observable to pace your zipped Observable:

Observable<List<Stop>> zipped = ...;
Observable<Long> interval = Observable.interval(...);
Observable<List<Stop>> everyMinute = zipped.sample(interval);

In that case, it will simply emit at most one result of zipped every minute, dis-regarding whatever else zipped is emitting. I'm not sure that's what you want.

If you want to simply re-emit the same value over and over, you might want to add a repeat() in between.

Question:

I have this code :

getLocationObservable() // ---> async operation that fetches the location. 
//  Once location is found(or failed to find) it sends it to this filter :
.filter(location -> {  // ---> I want to use this location in the the onNext in the end

     after finishing some calculation here, I either return 'true' and continue 
     to the next observable which is a Retrofit server call, or simply 
     return 'false' and quit.
})
.flatMap(location -> getRetrofitServerCallObservable( location )
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
     new Observer<MyCustomResponse>() {
          @Override
          public void onSubscribe(Disposable d) {
               _disposable = d;
          }
          @Override
          public void onNext(MyCustomResponse response) {
          // I want to be able to use the `location` object here
          }
          @Override
          public void onError(Throwable e) {

          }
          @Override
          public void onComplete() {

          }
     });

I want to be able to use the location object from line 3(first observable), in the "onNext" that is trigerred by the second observable. I can't manage to work it out.. any help would be much appreciated.


Answer:

Instead of

getRetrofitServerCallObservable( location )

you could map the result to be a Pair (from your favourite library) of the response and the location:

getRetrofitServerCallObservable( location ).map(response -> Pair.create(location, response))

Then, in your onNext, you'd be receiving Pair<Location,MyCustomResponse> instances.

If you don't want to use a Pair class, you could use Object[], but if you do, please don't tell me about it :P

Question:

Hi I am new to RXJava.

I am trying to get Users from multiple Observable which contains an arraylist. I want to merge both the data and publish it to Observer.

Here is my snippet.

    Observable<Users> baseDataObservable = mService.getUsers(Constants.SITE);

    Items items = new Items();
    items.setName("User Defined...");
    items.setImageURL("http://www.proto.gr/sites/www.proto.gr/files/styles/colorbox/public/images/fruits/cherry.png");
    ArrayList<Items> items1 = new ArrayList<>();
    items1.add(items);

    Users users = new Users();
    users.setItems(items1);
    Observable<Users> usersObservable = Observable.just(users);

    Observable.merge(baseDataObservable, usersObservable)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);

In above code, 1st observable brings data from API and 2nd one puts user defined data.

But when I use merge on observer I have only single data in ArrayList<Items> which is set defined data, the data which comes initially from api shows data provided by 2nd observable.

Can someone please help me


Answer:

If you are trying to "merge" the array lists you are getting from both the sources, and you need to combine those into a single list, you need to use zip operator instead of merge. The merge operator will emit items immediately when any of the observables triggers, where zip will collect result from all the input observables and proceed after that.

In your approach, the observer will trigger twice with 2 results. first the user-defined items, then the result from API (with a delay since its an API call)

Try like this to get a combined result

        Observable.zip(baseDataObservable, usersObservable, new BiFunction<Users, Users, ArrayList<Items>>() {
        @Override
        public ArrayList<Items> apply(Users usersFromApi, Users usersUserDefined) throws Exception {
            ArrayList<Items> itemsArrayList = new ArrayList<>();
            itemsArrayList.addAll(usersFromApi.getItems());
            itemsArrayList.addAll(usersUserDefined.getItems());
            return itemsArrayList;
        }
    }).subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableObserver<ArrayList<Items>>() {
                @Override
                public void onNext(ArrayList<Items> items) {
                    // here is the combined Arraylist
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

Question:

I'm currently using rx-java 2 and have a use case where multiple Observables need to be consumed by single Camel Route subscriber. Using this solution as a reference, I have a partly working solution. RxJava - Merged Observable that accepts more Observables at any time?

I'm planning to use a PublishProcessor<T> that will be subscribed to one camel reactive stream subscriber and then maintain a ConcurrentHashSet<Flowable<T>> where I can dynamically add new Observable. I'm currently stuck on how can I add/manage Flowable<T> instances with PublishProcessor? I'm really new to rx java, so any help is appreciated! This is what I have so far :

PublishProcessor<T> publishProcessor = PublishProcessor.create();
CamelReactiveStreamsService camelReactiveStreamsService = 
CamelReactiveStreams.get(camelContext);
Subscriber<T> subscriber = 
     camelReactiveStreamsService.streamSubscriber("t-class",T.class);
}
Set<Flowable<T>> flowableSet = Collections.newSetFromMap(new ConcurrentHashMap<Flowable<T>, Boolean>());

public void add(Flowable<T> flowableOrder){
    flowableSet.add(flowableOrder);
}

public void subscribe(){
    publishProcessor.flatMap(x -> flowableSet.forEach(// TODO)
    }) .subscribe(subscriber);
}

Answer:

You can have a single Processor and subscribe to more than one observable stream. You would need to manage the subscriptions by adding and removing them as you add and remove observables.

PublishProcessor<T> publishProcessor = PublishProcessor.create();

Map<Flowable<T>, Disposable> subscriptions = new ConcurrentHashMap<>();

void addObservable( Flowable<T> flowable ) {
  subscriptions.computeIfAbsent( flowable, fkey -> 
    flowable.subscribe( publishProcessor ) );
}
void removeObservable( Flowable<T> flowable ) {
  Disposable d = subscriptions.remove( flowable );
  if ( d != null ) {
    d.dispose();
  }
}
void close() {
  for ( Disposable d: subscriptions.values() ) {
    d.dispose();
  }
}

Use the flowable as the key to the map, and add or remove subscriptions.

Question:

I've an observable which do some magic and I am using it as a singleton in activity scope. So who ever wanna get updates, can subscribe and can get the last emitted value and new emissions.

Observable<Object> currentZoneObservable;

    public Observable<Object> getCurrentZoneObservable() {
        if (currentZoneObservable == null) {
            currentZoneObservable = someUseCase.getObservable()
            .replay(1)
            .autoConnect();
        }
        return currentZoneObservable;
    }

I am making sure all those subscribers, unsubscribe as a good citizen. I am even counting number of subscribers in doOnSubsbscribe() and doOnUnSubscribe() When the app goes in the background, it has 0 subscribers.

My problem is the connectableObservable never completes and if it is doing some operation, it still keep going on with that. for eg. if it is retrying some network call when network is not available, it keeps doing it.

How can I make sure my connectableObservable stops when there are no subscribers? My understanding is that .autoConnect() should take care of this.


Answer:

autoConnect() does not unsubscribe when there are no observers anymore - refCount() does.

You might be interested in using a BehaviorSubject: some nice docs

Question:

I'm looking to create a LocationHandler class that returns an observable<Location> whose I can send a new Location and subscribers get the last one added and any subsequent values.

I've written this class, it works but I don't know if it's the correct way to do it because I've added a callback and I smell it bad.

Thanks for any help.

public class LocationHandler {
    private MessageHandler<Location> onNewItem;
    private Observable<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        locationObservable = getHookedObservable()
                .mergeWith(locationInitBuilder.build())
                .replay(1).autoConnect();
    }


    private Observable<Location> getHookedObservable() {
        return Observable.create(new ObservableOnSubscribe<Location>() {
            @Override
            public void subscribe(ObservableEmitter<Location> e) throws Exception {
                onNewItem = location -> e.onNext(location);
            }
        });
    }

    public Observable<Location> getLocation(){
        return locationObservable;
    }

    public void setLocation(Location address){ // <---------- add new values
        if (onNewItem != null){
            onNewItem.handleMessage(address);
        } else {
            throw new IllegalStateException("Cannot add an item to a never subscribed stream");
        }
    }
}

Following @Blackbelt advice I've modified it with a ReplaySubject.

public class LocationHandler {
    private ReplaySubject<Location> inputStream = ReplaySubject.create(1);
    private Observable<Location> locationObservable;

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) {
        locationObservable = locationInitBuilder.build()
                .mergeWith(inputStream)
                .replay(1).autoConnect();
    }

    public Observable<Location> getLocation(){
        return locationObservable;
    }

    public void setLocation(Location address){
        inputStream.onNext(address);
    }
}

Answer:

you could use a Subject instead of MessageHandler. Subject can act as observable and subscriber at the same time. You could have a method in your LocationHandler that returns Subject#asObservable to which you will subscribe. Internally, when setLocation, you will have to invoke Subject#onNext providing the location. There are different types of Subjects available. Please refer the documentation to choose the one that suits better your needs. E.g.

  public class LocationHandler {
     BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create();

     public Observable<GeevLocation> getLocation() {
        return mLocationSubject.asObservable();
     }

    public void setLocation(GeevLocation address){
        mLocationSubject.onNext(address);
    }
 }

from the outside call getLocation and subscribe to the returned Observable. When a setLocation is called you will get the object onNext

Question:

I have this code:

    int finalAttempts = attempts;
    Certificate certificate = Observable.range(1, attempts)
            .delay(3, TimeUnit.SECONDS)
            .map(integer -> {
                try {
                    order.update();
                    if(order.getStatus() != Status.VALID) {
                        if(integer == finalAttempts) {
                            Exceptions.propagate(new AcmeException("Order failed... Giving up."));
                        }
                    } else if(order.getStatus() == Status.VALID) {
                        Certificate cert = order.getCertificate();
                        return cert;
                    }
                } catch (AcmeException e) {
                    Exceptions.propagate(e);
                }
                return null; // return only if this is TRUE: order.getStatus() == Status.VALID
            }).toBlocking().first();

I'd like to know the best way to prevent this Observable from returning at all when the order.getStatus() == Status.VALID is still not true. At the same time if all try or attempt has been consumed and the status is still not true it should throw the exception.


Answer:

The filter() operator could be your friend in this case. Something like this comes to my mind:

int finalAttempts = attempts;
Certificate certificate = Observable.range(1, attempts)
        .delay(3, TimeUnit.SECONDS)
        .filter(integer -> {
            order.update();
            return order.getStatus() == Status.VALID;
        })
        .map(integer -> {

            // do your stuff

        }).toBlocking().first();

Question:

Here is my code:

package com.example.myapplication;

import io.reactivex.Observable;

public class SampleRx {

    Observable<String> getBoth() {
        return Observable.merge(getSeq1(), getSeq2());
    }

    Observable<String> getSeq1() {
        return Observable.create(emitter -> {
            emitter.onNext("A");

            Thread.sleep(1_500);
            emitter.onNext("B");

            Thread.sleep(500);
            emitter.onNext("C");

            Thread.sleep(250);
            emitter.onNext("D");

            Thread.sleep(2_000);
            emitter.onNext("E");
            // Thread.sleep(2_000);
            emitter.onComplete();
        });
    }

    Observable<String> getSeq2() {
        return Observable.create(emitter -> {
            Thread.sleep(200);
            emitter.onNext("1");

            Thread.sleep(500);
            emitter.onNext("2");

            Thread.sleep(400);
            emitter.onNext("3");

            Thread.sleep(300);
            emitter.onNext("4");

            Thread.sleep(1_800);
            emitter.onNext("5");
            emitter.onComplete();
        });
    }
}

Here is the output:

val=A
val=D
val=4
val=5

Why is there 5 whereas E is ignored (because it's followed by onComplete() as I guess).


Answer:

Running your code :

SampleRx().getBoth().subscribe(System.out::println);

I get :

A
B 
C 
D 
E 
1 
2 
3 
4 
5 

This is correct behaviour, results will not be interleaved as this uses the calling/same thread for all emissions, and the merge only completes when both Obervables signal completed.

To achieve interleaving for the merge each Observable need to run on a different thread, so they do not block one another, so if each observable is subscribed to on io i.e.

Observable.<String>create(emitter -> {
            emitter.onNext(value);
            ...
            ...
            emitter.onComplete();
        }).subscribeOn(Schedulers.io()); 

Then you get this output :

A, Thread : RxCachedThreadScheduler-1
1, Thread : RxCachedThreadScheduler-2
2, Thread : RxCachedThreadScheduler-2
3, Thread : RxCachedThreadScheduler-2
4, Thread : RxCachedThreadScheduler-2
B, Thread : RxCachedThreadScheduler-1
C, Thread : RxCachedThreadScheduler-1
D, Thread : RxCachedThreadScheduler-1
5, Thread : RxCachedThreadScheduler-2
E, Thread : RxCachedThreadScheduler-1

Which then honours independent emissions that do not block each other.

You have not provided any information regarding a debounce, which you included in your title, so I cannot comment.

Question:

I need to combine two observables, A & B, using the following rules:

  1. The type of the combined observable must be Maybe<A>
  2. A may emit many times or none
  3. B may emit 1 time or none
  4. As long as B hasn't emitted anything, the combined observable will emit nothing.
  5. When B emits, if A has emitted, the last result of A will be emitted by the combined observable. If A hasn't emitted yet, Maybe.empty() will be emitted.

I have tried using combineLatest but that doesn't work if A hasn't emitted yet. Also tried with mergeWith and doing flatMap { if(it-was-**B**-who-emitted) Maybe.empty() else Maybe.just(emitted-item) } but I am unable to convert from ObservableSource to Maybe

Is it possible to create such an observable in RxJava 2?


Answer:

I don't think it can be done simply with default RxJava operators, but as a workaround you can make A emit "empty element" with your value just at the beginning. And then, using combineLatest you could check if emitted element is "empty element" and if it is, transform it to Maybe.empty() like so (example with Observable<String>):

Observable.combineLatest(
            a.startWith("EMPTY"),
            b,
            BiFunction<String, String, String> { a1, b1 -> a1 })
            .firstElement()
            .flatMap{ if (it == "EMPTY") Maybe.empty() else Maybe.just(it) }

Question:

In this thread, a question is posed about how to observe the unsubscribe event so that you can clean up and remove the listener after it's unsubscribed. However, in RxJava2, the method that the above thread no longer works.

def myObservable = Observable.create({ aEmitter ->
    val listener = {event -> 
      aEmitter.onNext(event);                
    }
    existingEventSource.addListener(listener)

    // Fails since aEmitter doesn't have an add() method nor does Subscriptions exist.
    aEmitter.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})

What is the proper way of addressing this in RxJava2?


Answer:

Please look at the stringObservable Observable, how to handle subscriptions.

public class MyTest {
  @Mock private MyService mock;

  @Before
  public void setUp() throws Exception {
    MockitoAnnotations.initMocks(this);
  }

  @Test
  public void nam3e() {
    ArrayList<Listener> listeners = new ArrayList<>();

    doAnswer(
            invocation -> {
              Object[] args = invocation.getArguments();
              Listener arg = (Listener) args[0];

              listeners.add(arg);

              return null;
            })
        .when(mock)
        .addListener(any());

    Observable<String> stringObservable =
        Observable.create(
            e -> {
              Listener listener =
                  s -> {
                    e.onNext(s);
                  };

              mock.addListener(listener);

              e.setCancellable(
                  () -> {
                    mock.removeListener(listener);
                  });
            });

    TestObserver<String> test = stringObservable.test();

    Listener listener = listeners.get(0);
    listener.onNext("Wurst");

    test.assertNotComplete().assertValue("Wurst");
    verify(mock, times(1)).addListener(any());

    test.dispose();

    verify(mock, times(1)).removeListener(any());
  }

  public interface MyService {
    void addListener(Listener listener);

    void removeListener(Listener listener);
  }

  @FunctionalInterface
  public interface Listener {
    void onNext(String s);
  }
}

Question:

I want observable code to run on different thread than main thread. How can I do this, I'm doing like this:

Observable operationObservable = Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            e.onNext(longRunningOperation());
            e.onComplete();
        }
    })
    .subscribeOn(Schedulers.io()) // subscribeOn the I/O thread
    .observeOn(AndroidSchedulers.mainThread());

Answer:

If you need a new thread to run something on you can just use subscribeOn(Schedulers.newThread()).

Another alternative would be to create your own scheduler and executors which is really not necessary for most cases.

Further reading: link1 link2 link3

Question:

I'm trying to combine two forms insertion in one using RxJava, RxAndroid and Mosby3, but I can't find a way to make it work.

My structure:

public final class CheckinIntent {

    private final CheckinCommand checkinCommand;
    private final Bitmap signature;

    public CheckinIntent(CheckinCommand checkinCommand, Bitmap signature) {
        this.checkinCommand = checkinCommand;
        this.signature = signature;
    }

    public CheckinCommand getCheckinCommand() {
        return checkinCommand;
    }

    public Bitmap getSignature() {
        return signature;
    }
}

Where I fire my intent (MVI pattern):

final Observable<Bitmap> signatureObservable = Observable.just(BitmapFactory.decodeFile(storage.getFile("signs", booking.getBookingId()).getAbsolutePath()));
        final Observable<CheckinCommand> checkinCommandObservable = Observable.just(new CheckinCommand(booking.getBookingId(), booking.getUserId(), booking.getPartnerId(), userDetailsTextView.getText().toString(), "google.com"));

        final Observable<CheckinIntent> intentObservable = Observable.zip(signatureObservable, checkinCommandObservable, (image, command) -> new CheckinIntent(command, image));

        return saveButtonClickObservable
                .flatMap(bla -> intentObservable);

And binding it all together:

 @Override
    protected void bindIntents() {
        Observable<CheckinViewState> checkinViewStateObservable =
                intent(CheckinView::sendCheckin)
                        .flatMap(checkinIntent -> imageRepository.uploadImage(checkinIntent.getSignature())
                        .flatMap(command ->  bookingRepository.doCheckin(command) <------ PROBLEM HERE, HOW CAN I ACCESS THE COMMAND FROM ABOVE ??
                                .subscribeOn(Schedulers.from(threadExecutor))
                                .map(CheckinViewState.Success::new)
                                .cast(CheckinViewState.class)
                                .startWith(new CheckinViewState.LoadingState())
                                .onErrorReturn(CheckinViewState.ErrorState::new))
                        .observeOn(postExecutionThread.getScheduler());

        subscribeViewState(checkinViewStateObservable, CheckinView::render);
}

Observable<CnhImageResponse> uploadImage(Bitmap bitmap);

My problem is, my uploadImage returns an internal structure that ends of on a String, but, how can I get the returned string, add it to my command object (setting the returned URL in this object) and continue the flow (sending my command to the cloud) ?

Thanks!


Answer:

Just flatMap on the observable directly within the first flatMap. In that case you have reference to both, the checkinIntent and command

 @Override
 protected void bindIntents() {
        Observable<CheckinViewState> checkinViewStateObservable =
                intent(CheckinView::sendCheckin)
                        .flatMap(checkinIntent -> { 
                          return imageRepository.uploadImage(checkinIntent.getSignature()
                                                .flatMap(imageResponse ->  bookingRepository.doCheckin(command) <-- Now you have access to both, command and CnhImageResponse 
                         }) 
                         .subscribeOn(Schedulers.from(threadExecutor))
                         .map(CheckinViewState.Success::new)
                         .cast(CheckinViewState.class)
                         .startWith(new CheckinViewState.LoadingState())
                         .onErrorReturn(CheckinViewState.ErrorState::new))
                         .observeOn(postExecutionThread.getScheduler());

        subscribeViewState(checkinViewStateObservable, CheckinView::render);
}

Alternative solution: Pass a Pair<CheckinIntent, Command> to the Observable from bookingRepository.doCheckin(...) like this:

@Override
protected void bindIntents() {
        Observable<CheckinViewState> checkinViewStateObservable =
                intent(CheckinView::sendCheckin)
                        .flatMap(checkinIntent -> imageRepository.uploadImage(checkinIntent.getSignature()
                                                                 .map(imageResponse -> Pair.create(checkinIntent, imageResponse))) // Returns a Pair<CheckinIntent, CnhImageResponse>
                        .flatMap(pair ->  bookingRepository.doCheckin(pair.first) <-- Now you can access the pair holding both information
                                .subscribeOn(Schedulers.from(threadExecutor))
                                .map(CheckinViewState.Success::new)
                                .cast(CheckinViewState.class)
                                .startWith(new CheckinViewState.LoadingState())
                                .onErrorReturn(CheckinViewState.ErrorState::new))
                        .observeOn(postExecutionThread.getScheduler());

        subscribeViewState(checkinViewStateObservable, CheckinView::render);
}

Just a few other notes:

You almost ever want to prefer switchMap() over flatMap() in MVI. switchMap unsubscribes previous subscription while flatMap doesnt. That means that if you flatMap as you did in your code snipped and if for whatever reason a new checkinIntent is fired while the old one hasn't completed yet (i.e. imageRepository.uploadImage() is still in progress) you end up having two streams that will call CheckinView::render because the first one still continue to work and emit results down through your established observable stream. switchMap() prevents this by unsubscribing the first (uncompleted) intent before starting "switchMaping" the new intent so that you only have 1 stream at the time.

The way you build your CheckinIntent should be moved to the Presenter. This is kind of too much "logic" for a "dump" View. Also Observable.just(BitmapFactory.decodeFile(...)) is running on the main thread. I recommend to use Observable.fromCallable( () -> BitmapFactory.decodeFile(...)) as the later deferres his "work" (bitmap decoding) until this observable is actually subscribed and then you can apply background Schedulers. Observable.just() is basically the same as:

Bitmap bitmap = BitmapFactory.decodeFile(...); // Here is the "hard work" already done, even if observable below is not subscribed at all.
Observable.just(bitmap);

Question:

I'm new with RxJava. I'm trying to create an observable that when it completes it will start all over again until I call dispose, but I'm facing an OutofMemory error after a while, below is a simplified example of what I'm trying to do

  public void start() throws RuntimeException {
        log.info("\t * Starting {} Managed Service...", getClass().getSimpleName());

        try {

            executeObserve();

            log.info("\t * Starting {} Managed Service...OK!", getClass().getSimpleName());
        } catch (Exception e) {
            log.info("Managed Service {} FAILED! Reason is {} ", getClass().getSimpleName(), e.getMessage(), e);
        }
    }

start is invoked at the initialization phase once, the executeObserve is as follows (in a simplified form..). Notice that on the onComplete I "resubmit" executeObserve

public void executeObserve() throws RuntimeException {

        Observable<Book> booksObserve
                = manager.getAsObservable();

        booksObserve
                 .map(Book::getAllOrders)
                 .flatMap(Observable::fromIterable)
                 .toList()
                 .subscribeOn(Schedulers.io())
                 .subscribe(collectedISBN ->  
                      Observable.fromIterable(collectedISBN)
                       .buffer(10)
                       // ...some more steps here...
                       .toList()
                       .toObservable()
                       // resubmit
                      .doOnComplete(this::executeObserve)
                      .subscribe(validISBN -> {
                             // do something with the valid ones
                      })
             )
        );
    }

My guess is that this is not the way to go if I want to resubmit my tasks but it was not possible to find any documentation.

the booksObserve is implemented as follows

public Observable<Book> getAsObservable() {
    return Observable.create(e -> {
        try (CloseableResultSet<Book> rs = (CloseableResultSet<Book>) datasource.retrieveAll())) {
            for (Book r : rs) {
                e.onNext(r);
            }
            e.onComplete();
        } catch (Exception ex) {
            e.onError(ex);
        }
    });
}

What is the correct way to constantly resubmit an operation until we call dispose or equivalent? I'm using RxJava 2


Answer:

You have created an endless recursion, the loop will create more and more resources and sometime it will blow with OutOfMemory/Stack overflow exception.

In order to repeat the Observable work you should use repeat() operator, it will resubscribes to the Observable when it receives onComplete().

Besides that, some general comments on your code:

  • why are you nesting the second Observable inside the subscriber? you are breaking the chain, you can just continue the chain instead of creating new Observable at the Subscriber.
  • Moreover, it's seems (assuming Observable.fromIterable(collectedBets) using the collectedISBN that gets with the onNext() o.w. from where does it comes?) you're collecting all items to a list, and then flatting it again using from iterable, so it's seems you can just continue on the stream , something like that:

    booksObserve
       .map(Book::getAllOrders)
       .flatMap(Observable::fromIterable)
       .buffer(10)
       // ...some more steps here...
       .toList()
       .toObservable()
       // resubmit
       .doOnComplete(this::executeObserve)
       .subscribeOn(Schedulers.io())
       .subscribe(validISBN -> {
             // do something with the valid ones
        });       
    
  • Anyhow, with the nested Observable, the repeat() operator will just repeat the nested one, and not the entire stream (which is what you want) as it is not connected to it.

Question:

I am trying to convert Void and Response type to Observable corresponding. I tried .just but .create not sure if I can use just or create to do this conversion. Thanks.

void getSomeValue(){
    Observable<Response> returnedObservable=getResponse();//How to convert this from Response to Observable<Response>
    Observable<Void> returnedObservable=doSomething();//How to convert this from Void to Observable<Void>
}

Void doSomething(){
   //some code...
   return null;
}

Response getResponse(){
  //some code....
  return someResponse;
}

Answer:

Small note: RxJava provides several types of observables (see here). For this case, there is no reason for returning an Observable<Void>, you can use a Completable: in fact, it represents a deferred computation without any value.

You can use the method fromCallable: this defers the execution of getResponse function until the Observable is subscribed. The eager approach consists to use just: it evaluates the function immediately in the current thread.

Observable<Response> response = Observable.fromCallable(this::getResponse);
Observable<Response> response = Observable.just(getResponse());

Same for Completable:

Completable something = Completable.fromAction(this::doSomething);

Question:

I am trying to understand how Observables are executed but can't seem to get this simple code to work.

public class RxJavaExample {
    public static void main(String[] args) {
        Observable<String> hello = Observable.fromCallable(() -> 
            getHello()).subscribeOn(Schedulers.newThread());

        hello.subscribe();

        System.out.println("End of main!");
    }

    public static String getHello() {
        System.out.println("Hello called in " + 
            Thread.currentThread().getName());
        return "Hello";
    }
}

Shouldn't hello.subscribe() execute getHello()?


Answer:

It is because your main thread finishes, before the background thread gets to getHello. Try to add a Thread.sleep(5000) in your main method before exiting.

Alternatively, wait until the onCompleted of your subscription is called.

EDIT: The reason why the program terminates is because RxJava spawns daemon threads. On the search for a good source, I also found this question, which probably answers it as well.

Question:

I am doing my first steps with RxJava und Retrofit. I have a rest API which returns an Observable<Department>. The class Department has a nested list of Team objects. How can I get an Observable<Team> which delivers all teams of the nested list from the Observable<Department>?

I tried playing around with map()and switchMap() but I still can't get the desired Observable<Team>.

My idea is to subscribe to the Observable<Team> and for every delivered team I trigger some update logic.


Answer:

You can just create a pipeline using map to get the inner values and then flatMapIterable to unwrap the arrays.

Suppose dept$ is your Observable<Department>

dept$
  .map(d -> d.getTeam())
  .flatMapIterable(teamArr -> teamArr)
  .doOnNext(System.out::println)
  .subscribe();

Question:

I'm using RxJava in an Android app I'm using. I want to know what happens when I unsubscribe from an Observable chain.

Assume I'm referring to the following observable chain:

sourceObservable
    .map { ... }
    .doOnNext { ... }
    .flatMap { ... }

I see 3 possible answers:

  1. When I unsubscribe, the source observable is stopped i.e the sourceObservable stops emitting items. The last emitted item goes down the chain till it's end. It causes each operator to fire.

  2. When I unsubscribe, the source observable stops. The last emitted item doesn't go down but the operator it is currently going through(say doOnNext) is allowed to finish.

  3. When I unsubscribe, the source observable stops and no more code is executed at all. It means that say I was using doOnNext, the code in doOnNext will stop where it is and will not be allowed to finish.

Which one(if any) of them is correct? Does the answer depend on whether the source observable is hot or cold? On the operator?


Answer:

if an item is emitted and you imediatly dispose the stream the item want stop it will be processed till it finish processing but no more items will be emitted.

so if you watch this example maybe you will understands better

     observable = Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(aLong -> {
                System.out.println("1");
                return aLong;
            })
            .map(aLong -> {
                observable.dispose();
                System.out.println("2");
                return aLong;
            })
             .map(aLong -> {
                 observable.dispose();
                 System.out.println("3");
                 return aLong;
             })
             .doOnNext(aLong -> System.out.println("4"))
             .map(aLong -> {
                 observable.dispose();
                 System.out.println("5");
                 return aLong;
             })
            .subscribe();
}

In this stream only the first item will be processed till the end of stream so even if the first job that we do is unsubscribe we will still process emitted item so we unsubscribe from the source of emmiting

Question:

I am currently using RxJava and I have something like this,

List<Integer> taskIds = ...

Observable.just(new Task(taskIds.get(0)))
        .compose(executeWorkFlow())
        .filter(Response::isSuccess)
        .flatMap(res -> Observable.just(new Task(taskIds.get(1)))
        .compose(executeWorkFlow())
        .filter(Response::isSuccess)
        .flatMap(res -> Observable.just(new Task(taskIds.get(2)))
        .compose(executeWorkFlow())
        .filter(Response::isSuccess)
        ...

execute() is a Transformer which performs an API call and returns the response i.e.

ObservableTransformer<Task, Response> execute() { ... }

Is there a better way of writing this? Given I will not know how many task ids will be emitted from previous Observable.


Answer:

Use fromIterable, concatMap and takeUntil(Predicate):

Observable.fromIterable(tasks)
    .concatMap(task ->
        Observable.just(new Task(task))
            .compose(executeWorkFlow())
    )
    .takeUntil(response -> !response.isSuccess)

Question:

I'm new to rx java and in the below code whenever there is a exception the subscriber stop's subscribing the data. In this case "hello3" is never printed. How can I make the subscriber to continue receiving data?

public class ReactiveDataService
{
  public Observable<String> getStreamData()
  {
    return Observable.create(o -> {
        o.onNext("hello1");
        o.onNext("hello2");
        o.onError(new TimeoutException("Timed Out!"));
        o.onNext("hello3");
    });
  } 
}

Observable<String> watcher = new ReactiveResource().getData()
    .timeout(3, TimeUnit.SECONDS)
    .onErrorResumeNext(th -> {
        return Observable.just("timeout exception");
    });

    watcher.subscribe(
            ReactiveResource::callBack, 
            ReactiveResource::errorCallBack,
            ReactiveResource::completeCallBack
    );

public static Action1 callBack(String data)
{
    System.out.println("--" + data);
    return null;
}

public static void errorCallBack(Throwable throwable)
{
    System.out.println(throwable);      
}

public static void completeCallBack()
{
    System.out.println("On completed successfully");
}


private Observable<String> getData()
{
    return new ReactiveDataService().getStreamData();
}

Answer:

RxJava only allows at most one onError which means the end of the flow. If you find yourself wanting to emit more errors, possibly interleaved with normal items, it means you need a data type, a pair or record class, that can represent both. The io.reactivex.Notification is one option.

public Observable<Notification<String>> getStreamData() {
    return Observable.create(o -> {
        o.onNext (Notification.createOnNext("hello1"));
        o.onNext (Notification.createOnNext("hello2"));
        o.onError(Notification.createOnError(new TimeoutException("Timed Out!")));
        o.onNext (Notification.createOnNext("hello3"));
        o.onComplete();
    });
}

Question:

I'm using RXJava 2 to perform an operation. I would like the operation to happen every 5 seconds. However after introducing the interval method it changes my function and breaks my .subscribe(DisponsableObserver<Boolean>)

mObserver = getObserver();              //return DisponsableObserver<Boolean>
observable = getSolultionObservable();  //return Observer<Boolean>

observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .interval(5, TimeUnit.SECONDS) 
            .subscribe(mSolutionObserver); //Now Produces Cannot Resolve Method error

How can I succesfully run my observable on a timer and return a boolean?


Answer:

You can use the flatMap operator for converting the items (long) emitted by the interval every 5 seconds, into your Observable

Observable.interval(5, TimeUnit.SECONDS) 
    .flatMap(long -> observable)
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(mSolutionObserver);

EDIT: without lambda expression:

Observable.interval(5, TimeUnit.SECONDS) 
    .flatMap(new Function<Long, ObservableSource<Boolean>>() {
                @Override
                public ObservableSource<Boolean> apply(Long aLong) throws Exception {
                    return observable;
                }
            })
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(mSolutionObserver);

Question:

I have a List<Observable<T>> that I would like to transform into an Observable<List<T>>.

I am aware of Observable.zip, which seems like the right function, but I'm not sure how to define the zipper parameter.

Here is what I have tried:

final List<Observable<T>> tasks = getTasks();

final Observable<List<T>> task = Observable.zip(
    tasks, 
    x -> ImmutableList.copyOf(x)
        .stream()
        .map(x -> (T)x)
        .collect(ImmutableList.toImmutableList()));

However, this requires an unchecked cast.

How should I go about this in RxJava 2?


Note that this question refers to RxJava 1.


Answer:

Use the primitives:

List<Observable<T>> tasks = getTasks();
Observable<List<T>> task = Observable.merge(tasks).toList();

However, do you really need all the tasks at once? You could skip the toList and proccess the tasks as they come; this will give you both better responsiveness and easier concurrency control.

Question:

I'm writing client-server application. I'm getting data from database and put it in ReplaySubject (ReplaySubject is necessary because i need to guaranteed same data on every client) of rxjava2 when client connect subscribe to it i would like to send this data to him but when i try it in my head "possible way ^_^" it blocks. By blocks i mean it don't send data but when I shutdown server data instantly show at client side.

I try to add some threads at client and server side eventloop (I was thinking maybe thread block because i use 'Infinite' source so to receive this i need another thread or something like that).

Server side channel code:

public
    class ClientHandler
        extends SimpleChannelInboundHandler<DataWrapper> {


    private final Observable<DataWrapper> data;

    public ClientHandler(Observable<DataWrapper> data) {
        this.data = data;
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // super.channelRegistered(ctx);
        final Channel channel = ctx.channel();
        Server
            .INSTANCE
            .appendToChannelGroup(channel);

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // super.channelActive(ctx);
        // i believe there is something wrong
        data.subscribe(ctx::writeAndFlush);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    // rest skip
}

Client side:

public
    class DirectNetworkCommunicator
        extends SimpleChannelInboundHandler<DataWrapper> {


    private Observable<DataWrapper> generatedData;
    private ExecutorService fallbackThread;


    DirectNetworkCommunicator(Observable<DataWrapper> generatedData) {
        this.fallbackThread = Executors.newSingleThreadExecutor();
        this.generatedData = generatedData;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // super.channelRead(ctx, msg);
        DataWrapper inComingData = (DataWrapper) msg;
        Adapter
            .INSTANCE
            .appendFromNettworkData(inComingData);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // super.channelReadComplete(ctx);
        ctx.flush();
    }
    // rest skip
}

So how i mention before I would love it to recive data just when it become not when the server is down ^_^. If that gonna help netty version 4.1.37 final.


Answer:

Ok, so future peoples face the same problem i found answer on my own. Netty from client side use background thread as main for communication witch means I've wait for main thread to release before it can make operation on observable. Hope it helps someone.

Question:

As shown below, I am creating Observables. I would like to wait specific amount of time in seconds as shown in the code. therefore I used either delay or interval operator. I expected the code to wait i.e. 5 seconds then System.out.println from the observer to be printed.

but what happens is, doOnNext is executed and the code never goes further. I mean the execution stops at doOnNext even after the 5 seconds elapsed.

Code:

public static void main(String[] args) {
    Observable<List<Person>> observables = Observable.create(e-> {
        for(List<Person> p : Main.getPersons()) {
            e.onNext(p);
        }
        e.onComplete();
    });
     observables
    //.subscribeOn(Schedulers.newThread())//newThread
    .flatMap(p->Main.toObservable(p.get(0).getName()))
    .doOnNext(p-> System.out.println(p.length()) )
    .map(p->p+"..STRING")
    //.delay(5, TimeUnit.SECONDS)
    //.interval(0, 5, TimeUnit.SECONDS)
    .observeOn(Schedulers.io())
    .subscribe(new Observer() {
        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void onNext(Object arg0) {
            // TODO Auto-generated method stub
            System.out.println("onNextFromObserver: " + arg0);
        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub
        }
    });
}

private static <T> Observable<T> toObservable(T s) {
    return Observable.just(s);
}

private static List<List<Person>> getPersons() {
    return Arrays.asList(
            Arrays.asList(new Person("Sanna1", 59, "EGY"), new Person("Sanna2", 59, "EGY"), new Person("Sanna3", 59, "EGY")),
            Arrays.asList(new Person("Mohamed1", 59, "EGY"), new Person("Mohamed2", 59, "EGY")),
            Arrays.asList(new Person("Ahmed1", 44, "QTR"), new Person("Ahmed2", 44, "QTR"), new Person("Ahmed3", 44, "QTR")),
                    Arrays.asList(new Person("Fatma", 29, "KSA")),
                    Arrays.asList(new Person("Lobna", 24, "EGY"))
                    );
}

Answer:

You have to wait in the main method. Put Thread.sleep(10000) at the very end of the main() method so the Observable has chance to run. RxJava threads are daemon threads that stop when the application thread falls out of the main() method.

public static void main(String[] args) {

    Observable.just("Hello World!", "Keep printing values!")
    .zipWith(Observable.interval(0, 5, TimeUnit.SECONDS), (a, b) -> a)
    .subscribe(v -> 
        System.out.println(Thread.currentThread() + ": " + v)
    );

    Thread.sleep(10000);  // <-----------------------------------

}

Question:

I need to fill the RecylcerView with list of data when activity starts. I implemented this by subscribe, where in onSuccess method I showing my list. But how can I again show updated list when Activity onResume. I mean after switching to another activity and then on returned to previous, I need to show updated RecylcerView again. How can I implemented this, or I need every time subscribe for showing. Maybe multiple subscribe on same observable in Activity's onResume is a bad practice?

Presenter

@Override
public void setCitiesList() {
    disposables.add(getCitiesListObservable()
            .subscribe(list -> view.showCitiesList(list)));

}

private Single<List<City>> getCitiesListObservable() {
    Observable<City> citiesFromDb = dataManager.getCitiesFromDb();
    return citiesFromDb
           .switchMapSingle(city ->
            // logic
}

Activity

@Override
protected void onResume() {
    super.onResume();
    presenter.setCitiesList();
}

In a word, how can I avoid multiple subscribing? I need to show updated cities on each onResume.


Answer:

The Observable subscribe() method returns a Disposable object that you are adding to disposables. Calling the Disposable dispose() method and removing the Subscription from your disposables should solve this.

Because you are targeting a specific disposable in your onResume, it would likely be cleaner to store the disposable as an independent instance variable.

Disposable cityListDisposable;

@Override
public void setCitiesList() {
    cityListDisposable = getCitiesListObservable()
            .subscribe(list -> view.showCitiesList(list));

}

@Override
public void onPause() {
    if (cityListDisposable != null) {
        cityListDisposable.dispose();
    }
}

Question:

I am having trouble creating an observable that will return a list of Objects. I have a list of ids and want to make a request to my database. In this case I am using firebase. When a result is gotten i will like to compile each of these objects into a list and then return that list. I need to wait for all the objects to be returned before i return. I am doing this in my view model deserializer class. Here is my code.

private class Deserializer implements Function<QuerySnapshot, ArrayList<User>>{
    @Override
    public ArrayList<User> apply(QuerySnapshot input) {

        List<DocumentSnapshot> temp = input.getDocuments();
        final ArrayList<User> users = new ArrayList<>();
        List<String> idList = new ArrayList<>();

        for (DocumentSnapshot snapshot: temp){
            String userId= snapshot.get("userId").toString();
            idList.add(userId);
        }

        //Create observable and make request to firebase
        //compile all the returned into a list return that list
    }

}

There are several ways ways i could get data back from my firebase database, i could return the Documentsnapshot Task or i could use a call back to get my data back like so

public void getUser(String userId, final Callback callback){

    DocumentReference docRef = db.collection(DATABASE_COLLECTION_USERS).document(userId);
    docRef.get().addOnSuccessListener(new OnSuccessListener<DocumentSnapshot>() {
        @Override
        public void onSuccess(DocumentSnapshot documentSnapshot) {
            callback.result(documentSnapshot.toObject(User.class));
        }
    });
} 

and then add a callback when calling this function. like getUser(id, new Callback {});

The real problem i am having is how to tie all the request together. I know using RxJava is would be a solution. But i am not knowledgeable on RxJava. It is pretty similar to this question resolving a promise using mongodb and nodejs i asked earlier. That was in nodejs. But i would want to do this using RxJava


Answer:

To anyone wandering I solved this problem by creating a cloud function that takes a list of ids and searches the database for the users and returns the list of users. This makes it so my client does not need to do any work. I used the same idea with promises from the link in my question. Thanks

Question:

I have a method,

Observable<String> uploadFile(File file);

then i implement method

Single<List<String>> uploadFile(List<file> files){
     return Observable.fromIterable(files).flatMap(file -> upLoadFile(context, file))
            .toList();
 }

but

input file1, file2, file3 output is List{file2, file1, file3}

How can I keep the files in the correct order?


Answer:

To preserve the order of the observables use concatMap instead of flatMap

Single<List<String>> uploadFile(List<file> files){
     return Observable.fromIterable(files).concatMap(file -> 
            upLoadFile(context, file))
            .toList();
 }

Question:

I have an Observable which makes an emission every half second. When this Observable makes an emission, I do not care about the object which is emitted.

In this situation using a Completable is inadequate as a Completable can only make one zero argument emission.

This is what I am currently using, which works fine, but is imperfect

compositeDisposable.add(
    Observable.interval(500L, TimeUnit.MILLISECONDS)
    .timeInterval()
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(longTimed -> {
        if (emissionBoolean) {
            //todo: find an observable that can emit 0 arguments
            return Observable.just(true);
            }
        return Observer::onComplete;
        })
    .subscribe(wishIWasAZeroArgumentBoolean -> {
        onTick();
        }));

this is what I want to have for my subscribe instead

.subscribe(() -> {
    onTick();
}));

Answer:

I think this is a valid question. Maybe cannot be used because Maybe does not emit more than once. _ can't be used as a parameter name in Java 9 or above.

There isn't a way you can send "empty" notifications. RxJava wiki suggests using Object in case you want to explicitly ignore the emitted value. For example:

compositeDisposable.add(
    Observable.interval(500L, TimeUnit.MILLISECONDS)
    .timeInterval()
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(longTimed -> {
        if (emissionBoolean) {
            //todo: find an observable that can emit 0 arguments
            return Observable.just(new Object());
        }
        return Observable.empty()
    })
    .subscribe(object -> { // you still need to declare though.
        onTick();
    }));

Also, the code can be cleaner if you can switch to Kotlin, because in Kotlin you don't need to explicitly declare the name of the parameter if there is only one parameter.

compositeDisposable.add(
    Observable.interval(500L, TimeUnit.MILLISECONDS)
    .timeInterval()
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap {
        if (emissionBoolean) {
            Observable.just(Any())
        } else {
            Observable.empty<Any>()
        }}
    .subscribe {
        onTick()
    }

Question:

I'm making a basic chat app. I have 3 basic classes. The User class stores the userName. The Message class stores the userId of the sender and the content. The Chat class contains a list of Messages. These classes also contain other data but that's not related to this question.

I can retrieve users by their ids. The Users are returned in the form of Single<User>.

To display a message in my UI, I need 2 things: the name of the sender and the content of the message. I put these into a helper class(SimplifiedMessage). I need to obtain a list of SimplifiedMessages.

I already have the Chat object.

Basically, I need to get the list of all the messages and then for each message, get the name of the sender and then put the name and content together into a SimplifiedMessage. Finally, I have to collect all the SimplifiedMessages together into a list. Also, this has to be done in order. If the chat holds the messages [M1, M2, M3], the list of SimplifiedMessages should also be in the same order. How can I achieve this?


Answer:

Reference RxJava Continuations as suggested by Akarnokd for explanation. My use case resembles the second snippet in the dependent continuations section. This code snippet will do the work:

Flowable.fromIterable(chat.messages)
    .concatMap { message ->
        repository.getUserById(message.senderId)
            .concatMap { user ->
                Flowable.just(SimpifiedMessage(user.name, message.content))
            }
    }
    .toList()

For those who are not familiar with concatMap, it is a flatMap that doesn't allow interleaving of emissions. This allows us to satisfy the order condition.