Hot questions for Using RxJava 2 in retrywhen

Question:

I'm implementing an observable which retries on error after a delay of 5s. I'm using retrofit for networking. The problem i'm facing is that there are a lot of retries when the API returns an error. I want to retry only after 5s but the retry happens at an insane rate(almost three times in a second). Any idea why?

userAPI.getUsers()
       .filter { it.users.isNotEmpty() }
       .subscribeOn(Schedulers.io())
       .retryWhen { errors -> errors.flatMap { errors.delay(5, TimeUnit.SECONDS) } }
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe({}, {})

where userAPI.getUsers() returns an observable.

Insane number of API requests:

08-13 12:31:31.308 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:31.825 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:32.370 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:32.897 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:33.436 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:33.952 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:34.477 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:35.020 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:35.609 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo
08-13 12:31:36.205 26277-26453/com.app.user.dummy D/OkHttp: --> GET https://userapi.com/foo

P.S: i've already gone through the blog post by Dan lew on retryWhen and repeatWhen.


Answer:

Please look into Dan Lew's blog post again where he actually describes a solution for your use case under Uses title.

source.retryWhen(errors -> errors.flatMap(error -> Observable.timer(5, TimeUnit.SECONDS)))

Explanation:

Notice the difference between Observable.delay and Observable.timer as Observable.delay's documentation states that

Error notifications from the source Observable are not delayed.

In retryWhen your errors Observable is subscribed to immediately (as Observable.delay only delays onNext) and then the source errors out again and immediately emits the error, that triggers retryWhen and you enter a recursive loop.

Question:

I'm trying to revalidate a token every time the API responds with invalid token error (Re-authentication). I have this small example which replicates the problem i'm facing. Basically, the first call will throw an exception, which will trigger a retry, when it retries, the auth method is not fully called again (it does not print "Entered Auth", but prints "authing...").

public class Example {

AtomicInteger atom = new AtomicInteger(1);

public Example(){}

public void start(){
    auth().andThen(call())
            .retryWhen(throwableFlowable -> throwableFlowable.flatMap(throwable -> {
                System.out.println("Retrying...\n");
                return Flowable.timer(1, TimeUnit.SECONDS);
            }))
            .subscribe(integer -> System.out.println("Result: " + integer), e -> System.out.println("Error" + e.getMessage()));
}

public Completable auth(){
    System.out.println("Entered Auth");
    return Completable.create(emitter -> {
        System.out.println("authing...");
        emitter.onComplete();
    });
}

public Single<String> call(){
    return getId()
            .flatMap(this::getNameById);
}

public Single<Integer> getId(){
    return Single.create(emitter -> {
        emitter.onSuccess(atom.getAndIncrement());
    });
}

public Single<String> getNameById(int id){
    return Single.create(emitter -> {
        HashMap<Integer, String> hash = new HashMap<>();
        hash.put(1, "s");
        hash.put(2, "b");
        if(id == 1){
            emitter.onError(new Throwable());
        }else{
            emitter.onSuccess(hash.get(id));
        }
    });
}

}

Again, here's my output:

Entered Auth
authing...
Retrying...

authing...
Result: b

How can i force the entire auth() method to run on retry?


Answer:

Use Completable.defer, it will wrap your Completable creation and redo it on retry instead of just resubscribing.

 Completable.defer(() -> auth()).andThen(call())
                .retryWhen(throwableFlowable -> throwableFlowable.flatMap(throwable -> {
                    System.out.println("Retrying...\n");
                    return Flowable.timer(1, TimeUnit.SECONDS);
                }))
                .subscribe(integer -> System.out.println("Result: " + integer), e -> System.out.println("Error" + e.getMessage()));

Output:

Entered Auth
authing...
Retrying...

Entered Auth
authing...
Result: b

Question:

Here is the code:

import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

public class RxJavaTest {

    @Test
    public void onErr() {

        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                            .current()
                            .nextInt(10) == 5) {
                    observer.onError(new Exception("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        AtomicBoolean finished = new AtomicBoolean(false);
        values1
                .retryWhen(throwableObservable -> throwableObservable
                        .takeWhile(throwable -> {
                            boolean result = (throwable instanceof IllegalArgumentException);
                            if (result) {
                                System.out.println("Retry on error: " + throwable);
                                return result;
                            }
                            System.out.println("Error: " + throwable);
                            return result;
                        })
                        .take(20))
                .onErrorReturn(throwable -> "Saved the day!")
                .doOnTerminate(() -> finished.set(true))
                .subscribe(v -> System.out.println(v));
    }
}

The goal is to

  • retry only when there is an IllegalArgumentException,
  • for any other exception, return immediately (by onErrorReturn).

The code above accomplishes the first goal, but failed at the second, it stops retrying, but ignores the .onErrorReturn part.

Any idea how to make it work?


Answer:

You can make it work changing your retryWhen to:

                .retryWhen(throwableObservable ->
                                throwableObservable.flatMap(throwable -> {
                                    if (throwable instanceof IllegalArgumentException) {
                                        System.out.println("Retry on error: " + throwable);
                                        return Observable.just(1);
                                    } else {
                                        System.out.println("Error: " + throwable);
                                        return Observable.<Integer>error(throwable);
                                    }
                                })
                )

In order to make it retry, it doesn't matter which value you return in the retryWhen (in the example above it's returning 1). As per javadoc:

If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.

Question:

I've been playing around with retryWhen() method and i noticed that if you use filter() inside retryWhen() and if filter() fails there's no callback executed not even onCompleted(). Can you please explain to me why is this happening? Thanks in advance.

The working case:

    Observable.error(new RuntimeException())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retryWhen(errors -> errors
                    .filter(throwable -> throwable instanceof RuntimeException)
                    .zipWith(Observable.range(1, 3), (throwable, retryCount) -> {
                        Log.i("lol", "retry " + retryCount);
                        return retryCount;
                    }))
            .subscribe(e -> Log.i("lol", "onNext"), throwable -> Log.i("lol", "onError"), () -> Log.i("lol", "onCompleted"));

The working output:

I: retry 1
I: retry 2
I: retry 3
I: onCompleted

But when i changed the filter with filter(throwable -> throwable instanceof IOException) the observable is like in a frozen state. No callback fired.

Observable.error(new RuntimeException())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .retryWhen(errors -> errors
                        .filter(throwable -> throwable instanceof IOException)
                        .zipWith(Observable.range(1, 3), (throwable, retryCount) -> {
                            Log.i("lol", "retry " + retryCount);
                            return retryCount;
                        }))
                .subscribe(e -> Log.i("lol", "onNext"), throwable -> Log.i("lol", "onError"), () -> Log.i("lol", "onCompleted"));

Answer:

You don't want to use filter() inside the retryWhen() operator. Instead, use an if statement or switch statement to ensure you have complete coverage of all cases.

The way that retryWhen() works is that it creates an observable and invoking the function with it. When it catches the throwable in its onError() method, it emits that throwable into the observable and waits for the result. If it gets no result, such as when a throwable is filtered, it will wait forever.