Hot questions for Using RxJava 2 in okhttp

Question:

I am trying use rxJava, rxAndroid, Retrofit2, and OkHTTP3 to download a file from a URL endpoint. My code is unable to create the call adapter for an "Observable< retrofit2.Response< okhttp3.ResponseBody>>". These methods are new to me so I believe I'm missing an important concept here. Any direction or points is greatly appreciated.

FATAL EXCEPTION: main Process: com.example.khe11e.rxdownloadfile, PID: 14130 java.lang.IllegalArgumentException: Unable to create call adapter for io.reactivex.Observable> for method RetrofitInterface.downloadFileByUrlRx at retrofit2.ServiceMethod$Builder.methodError(ServiceMethod.java:720) at retrofit2.ServiceMethod$Builder.createCallAdapter(ServiceMethod.java:234) at retrofit2.ServiceMethod$Builder.build(ServiceMethod.java:160) at retrofit2.Retrofit.loadServiceMethod(Retrofit.java:166) at retrofit2.Retrofit$1.invoke(Retrofit.java:145) at java.lang.reflect.Proxy.invoke(Proxy.java:393) at $Proxy0.downloadFileByUrlRx(Unknown Source) at com.example.khe11e.rxdownloadfile.MainActivity.downloadImage(MainActivity.java:46) at com.example.khe11e.rxdownloadfile.MainActivity$1.onClick(MainActivity.java:39) at android.view.View.performClick(View.java:5207) at android.view.View$PerformClick.run(View.java:21168) at android.os.Handler.handleCallback(Handler.java:746) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5491) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:728) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618) Caused by: java.lang.IllegalArgumentException: Could not locate call adapter for io.reactivex.Observable>. Tried: * retrofit2.adapter.rxjava.RxJavaCallAdapterFactory * retrofit2.ExecutorCallAdapterFactory at retrofit2.Retrofit.nextCallAdapter(Retrofit.java:237) at retrofit2.Retrofit.callAdapter(Retrofit.java:201) at retrofit2.ServiceMethod$Builder.createCallAdapter(ServiceMethod.java:232) ... 16 more

build.gradle:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.4'
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'

RetrofitInterface.java:

package com.example.khe11e.rxdownloadfile;
import io.reactivex.Observable;
import okhttp3.ResponseBody;
import retrofit2.Call;
import retrofit2.Response;
import retrofit2.http.GET;
import retrofit2.http.Streaming;
import retrofit2.http.Url;

public interface RetrofitInterface {
    // Retrofit 2 GET request for rxjava
    @Streaming
    @GET
    Observable<Response<ResponseBody>> downloadFileByUrlRx(@Url String fileUrl);
}

MainActivity.java:

package com.example.khe11e.rxdownloadfile;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import java.io.File;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.ResponseBody;
import okio.BufferedSink;
import okio.Okio;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;

public class MainActivity extends AppCompatActivity {

Button downloadImgBtn;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    downloadImgBtn = (Button) findViewById(R.id.downloadImgBtn);
    downloadImgBtn.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View v) {
            downloadImage();
        }
    });
}

public void downloadImage(){
    RetrofitInterface downloadService = createService(RetrofitInterface.class, "https://www.nasa.gov/");
    downloadService.downloadFileByUrlRx("sites/default/files/iss_1.jpg")
            .flatMap(processResponse())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(handleResult());
}

public <T> T createService(Class<T> serviceClass, String baseUrl){
    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(baseUrl)
            .client(new OkHttpClient.Builder().build())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create()).build();
    return retrofit.create(serviceClass);
}

public Function<Response<ResponseBody>, Observable<File>> processResponse(){
    return new Function<Response<ResponseBody>, Observable<File>>() {
        @Override
        public Observable<File> apply(Response<ResponseBody> responseBodyResponse) throws Exception {
            return saveToDiskRx(responseBodyResponse);
        }
    };
}

private Observable<File> saveToDiskRx(final Response<ResponseBody> response){
    return Observable.create(new ObservableOnSubscribe<File>() {
        @Override
        public void subscribe(ObservableEmitter<File> subscriber) throws Exception {
            String header = response.headers().get("Content-Disposition");
            String filename = header.replace("attachment; filename=", "");
            new File("/data/data/" + getPackageName() + "/images").mkdir();
            File destinationFile = new File("/data/data/" + getPackageName() + "/images/" + filename);

            BufferedSink bufferedSink = Okio.buffer(Okio.sink(destinationFile));
            bufferedSink.writeAll(response.body().source());
            bufferedSink.close();

            subscriber.onNext(destinationFile);
            subscriber.onComplete();
        }
    });
}

private Observer<File> handleResult(){
    return new Observer<File>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d("OnSubscribe", "OnSubscribe");
        }

        @Override
        public void onNext(File file) {
            Log.d("OnNext", "File downloaded to " + file.getAbsolutePath());
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
            Log.d("Error", "Error " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d("OnComplete", "onCompleted");
        }
    };
}
}

I've tried adding Call as mentioned here so it looks like:

Call<Observable<Response<ResponseBody>>> downloadFileByUrlRx(@Url String fileUrl);

however this causes issues with the flatMap function as it cannot find symbol method flatMap(Function< Response< ResponseBody>,Observable< File>>).


Answer:

You are using RxJava1 adapter for Retrofit, replace it with RxJava2 variant:

//compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

UPDATE

Starting with Retrofit version 2.2.0 there is a first-party call adapter for RxJava2:

compile 'com.squareup.retrofit2:retrofit:2.2.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'

Question:

I am seeing the following crash on Crashlytics:

Fatal Exception: io.reactivex.exceptions.UndeliverableException: java.net.SocketTimeoutException: connect timed out
       at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.disposeAll(FlowableFlatMap.java:590)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.cancel(FlowableFlatMap.java:354)
       at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.cancel(FlowableSubscribeOn.java:141)
       at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189)
       at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestInnerSubscriber.cancel(FlowableCombineLatest.java:540)
       at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.cancelAll(FlowableCombineLatest.java:454)
       at io.reactivex.internal.operators.flowable.FlowableCombineLatest$CombineLatestCoordinator.cancel(FlowableCombineLatest.java:209)
       at io.reactivex.internal.subscriptions.SubscriptionHelper.cancel(SubscriptionHelper.java:189)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.dispose(FlowableFlatMap.java:690)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:602)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:668)
       at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onError(BasicFuseableSubscriber.java:101)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onError(FlowableSubscribeOn.java:102)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.checkTerminate(FlowableFlatMap.java:566)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:374)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:605)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:668)
       at io.reactivex.internal.operators.single.SingleToFlowable$SingleToFlowableObserver.onError(SingleToFlowable.java:68)
       at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onError(ObservableElementAtSingle.java:104)
       at io.reactivex.internal.util.HalfSerializer.onError(HalfSerializer.java:133)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.innerError(ObservableRetryWhen.java:132)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver$InnerRepeatObserver.onError(ObservableRetryWhen.java:172)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.checkTerminate(ObservableFlatMap.java:495)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:331)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:571)
       at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
       at io.reactivex.internal.operators.observable.ObservableError.subscribeActual(ObservableError.java:37)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
       at io.reactivex.internal.operators.observable.ObservableZip$ZipCoordinator.drain(ObservableZip.java:205)
       at io.reactivex.internal.operators.observable.ObservableZip$ZipObserver.onNext(ObservableZip.java:276)
       at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:309)
       at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:230)
       at io.reactivex.subjects.SerializedSubject.onNext(SerializedSubject.java:104)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.onError(ObservableRetryWhen.java:106)
       at io.reactivex.internal.operators.single.SingleToObservable$SingleToObservableObserver.onError(SingleToObservable.java:65)
       at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onError(SingleDoOnSuccess.java:64)
       at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onError(SingleMap.java:69)
       at io.reactivex.internal.operators.observable.ObservableSingleSingle$SingleElementObserver.onError(ObservableSingleSingle.java:95)
       at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onError(BodyObservable.java:72)
       at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:56)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:34)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleDoOnSuccess.subscribeActual(SingleDoOnSuccess.java:35)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
       at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)
       at io.reactivex.Observable.subscribe(Observable.java:11194)
       at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
       at io.reactivex.Single.subscribe(Single.java:3096)
       at io.reactivex.internal.operators.single.SingleToFlowable.subscribeActual(SingleToFlowable.java:37)
       at io.reactivex.Flowable.subscribe(Flowable.java:13234)
       at io.reactivex.Flowable.subscribe(Flowable.java:13180)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
       at io.reactivex.internal.operators.flowable.FlowableFromArray$ArraySubscription.slowPath(FlowableFromArray.java:164)
       at io.reactivex.internal.operators.flowable.FlowableFromArray$BaseArraySubscription.request(FlowableFromArray.java:89)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
       at io.reactivex.internal.operators.flowable.FlowableFromArray.subscribeActual(FlowableFromArray.java:37)
       at io.reactivex.Flowable.subscribe(Flowable.java:13234)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
       at io.reactivex.Flowable.subscribe(Flowable.java:13234)
       at io.reactivex.Flowable.subscribe(Flowable.java:13180)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
       at java.util.concurrent.FutureTask.run(FutureTask.java:237)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
       at java.lang.Thread.run(Thread.java:761)

Now according to the official documentation this is because somewhere in some rx chain the exception cannot be delivered and thus rather than hiding it Rx handles it by causing a crash.

I know I could just avoid this behavior by using

RxJavaPlugins.setErrorHandler(e -> { });

but I'd rather find the source of the problem. However nowhere in the exception log can I see the actual api request or method call that is causing this, only the stack trace from Rx and Okhttp/retrofit.

My app is quite big so I'd have to go through all my repositories to see where I might have missed an onError handling.

Is there a better way to debug this issue?


Answer:

As stated in the question comments, I had to deal with a similar issue. Our problem was in our Android app. The network calls would take too long and the users would send the app to the background. When this happens, we dispose of the subscriptions. When the socket timeout happens there's no one listening for the exception and this causes an UndeliverableException.

We've replaced the default error handler with (it's in kotlin, I hope this is ok):

private object DefaultErrorHandler : Consumer<Throwable> {
  override fun accept(t: Throwable) {
    when (t) {
        is UndeliverableException -> accept(t.cause!!)
        is NullPointerException,
        is IllegalArgumentException -> Thread.currentThread().run {
            uncaughtExceptionHandler.uncaughtException(this, t)
        }
        else -> // Swallow the exception here. We logged it to Crashlytics...
    }
  }
}

val defaultErrorHandler: Consumer<Throwable> = DefaultErrorHandler

// Then on application start we would replace the error handler
RxJavaPlugins.setErrorHandler(defaultErrorHandler)

I'm pretty sure defaultErrorHandler is a horrible name. Sorry about that.

A bit of explanation. The exceptions we don't swallow are NullPointerException and IllegalArgumentException. These get forwarded to the current thread's uncaught exceptions handler. We did this because these are usually related to programming errors.

We check for UndeliverableExceptions and unwrap them to rerun again through the same consumer. This is just to make sure we run the correct logic on the exception that couldn't be delivered.

All other exceptions are swallowed and logged into crashlytics for further assessment.

One key thing, this works for our use cases. It might be that you need to adapt it to yours. I'm not saying this is the best way to do it. It's an example. Perhaps for you, you want to just ignore socket timeouts.

Question:

I have an Android project developed using sdk version 25. I'm using RXJava as my thread management and Retrofit library for hit the network.

I also implemented custom Interceptor for adding override fun intercept(chain: Interceptor.Chain): Response? {

    val request = addHeader(chain)

    val response = chain.proceed(request)
    checkErrorResponse(response)

    return response

The api call will always inside the RX Java flow and I'm making sure of it. After I put my APK to the Playstore, a crash is detected by Crashlytics.

#0. Crashed: main: 0 0 0x0000000000000000
   at okio.Okio$4.newTimeoutException(Okio.java:230)
   at okio.AsyncTimeout.exit(AsyncTimeout.java:285)
   at okio.AsyncTimeout$2.read(AsyncTimeout.java:241)
   at okio.RealBufferedSource.indexOf(RealBufferedSource.java:345)
   at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:217)
   at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:211)
   at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
   at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
   at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
   at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
   at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
   at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
   at com.payfazz.data.base.net.PayfazzInterceptor.intercept(PayfazzInterceptor.kt:24)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
   at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
   at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
   at okhttp3.RealCall.execute(RealCall.java:69)
   at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
   at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
   at io.reactivex.Observable.subscribe(Observable.java:10842)
   at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
   at io.reactivex.Observable.subscribe(Observable.java:10842)
   at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
   at io.reactivex.Observable.subscribe(Observable.java:10842)
   at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:39)
   at io.reactivex.Observable.subscribe(Observable.java:10842)
   at io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:39)
   at io.reactivex.Observable.subscribe(Observable.java:10842)
   at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
   at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:260)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
   at java.lang.Thread.run(Thread.java:818)

As I investigated the stacktrace, I found my code here exactly at my custom interceptor. And as far as I know, the crash is caused by SocketTimeoutException, then a timed out request. Then I try to reproduce the timeout but it cannot be done. In my environment the exception is always catched by RX Java and sent to onError() method.

How can the crash not catched by RX Java? Should I wrap proceed() method with catch block just for safety?


Answer:

These kinds of crashes are caused when an exception is thrown outside of the RxJava lifecycle. This means your exception can be caused by either of these options:

  1. Going by the retrofit sourcecode, there might be a race condition where Disposable.dispose() is called but the timeout occurs before the underlying Call is cancelled. This would cause the timeout to be sent to the observer after it has been disposed, triggering the crash.
  2. An error is thrown inside of the onError handler. This should not occur according to the RxJava protocol and will cause a crash with a CompositeException. Since the error you've posted does not include the exact Exception class this cannot be ruled out.