Hot questions for Using RxJava 2 in exception

Question:

As I understand RxJava2 values.take(1) creates another Observable that contains only one element from the original Observable. Which MUST NOT throw an exception as it is filtered out by the effect of take(1) as it's happened second.

as in the following code snippet

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );
Output
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
My questions :
  1. Am I understanding it correct ?
  2. What's really happening to cause the exception.
  3. How to solve this from the consumer ?

Answer:

  1. Yes, but because the observable 'ends' does not mean the code running inside create(...) is stopped. To be fully safe in this case you need to use o.isDisposed() to see if the observable has ended downstream.
  2. The exception is there because RxJava 2 has the policy of NEVER allowing an onError call to be lost. It is either delivered downstream or thrown as a global UndeliverableException if the observable has already terminated. It is up to the creator of the Observable to 'properly' handle the case where the observable has ended and an Exception occurs.
  3. The problem is the producer (Observable) and the consumer (Subscriber) disagreeing on when the stream ends. Since the producer is outliving the consumer in this case, the problem can only be fixed in the producer.

Question:

I get the following error, if I change the orientation of my device while my app is fetching new redditNews.

  E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
    Process: com.spicywdev.schmeddit, PID: 26522
    io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | null
        at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
        at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:73)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
        at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
        at java.util.concurrent.FutureTask.run(FutureTask.java:237)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
        at java.lang.Thread.run(Thread.java:762)
     Caused by: java.io.InterruptedIOException
        at okhttp3.internal.http2.Http2Stream.waitForIo(Http2Stream.java:579)
        at okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:143)
        at okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:125)
        at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:88)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
        at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall.execute(ExecutorCallAdapterFactory.java:91)
        at com.spicywdev.schmeddit.features.news.NewsManager$getNews$1.subscribe(NewsManager.kt:16)
        at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)

That's what the class looks like. Error happens in function requestNews().

class NewsFragment : RxBaseFragment() {

    companion object {
        private val KEY_REDDIT_NEWS = "redditNews"
    }

    private var redditNews: RedditNews? = null
    private val newsManager by lazy { NewsManager() }

    override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?, savedInstanceState: Bundle?): View? {
        return container?.inflate(R.layout.news_fragment)
    }

    override fun onActivityCreated(savedInstanceState: Bundle?) {
        super.onActivityCreated(savedInstanceState)

        news_list.apply {
            setHasFixedSize(true)
            val linearLayoutManager = LinearLayoutManager(context)
            layoutManager = linearLayoutManager
            clearOnScrollListeners()
            addOnScrollListener(InfiniteScrollListener({ requestNews()}, linearLayoutManager))
        }

        initAdapter()

        if (savedInstanceState != null && savedInstanceState.containsKey(KEY_REDDIT_NEWS)) {
            redditNews = savedInstanceState.get(KEY_REDDIT_NEWS) as RedditNews
            (news_list.adapter as NewsAdapter).clearAndAddNews(redditNews!!.news)
        } else {
            requestNews()
        }
    }

    override fun onSaveInstanceState(outState: Bundle) {
        super.onSaveInstanceState(outState)
        val news = (news_list.adapter as NewsAdapter).getNews()
        if (redditNews != null && news.isNotEmpty()) {
            outState.putParcelable(KEY_REDDIT_NEWS, redditNews?.copy(news = news))
        }
    }

    private fun requestNews() {
        /**
         * first time will send empty string for after parameter.
         * Next time we will have redditNews set with the next page to
         * navigate with the after param.
         */
        val subscription = newsManager.getNews(redditNews?.after ?: "")
            .subscribeOn(Schedulers.io())
            .subscribe(
                {
                    retrievedNews ->
                    redditNews = retrievedNews
                    (news_list.adapter as NewsAdapter).addNews(retrievedNews.news)
                },
                {
                    e -> Snackbar.make(news_list, e.message ?: "WZF", Snackbar.LENGTH_LONG).show()
                }
        )
        subscriptions.add(subscription)
    }


    private fun initAdapter() {
        if (news_list.adapter == null) {
            news_list.adapter = NewsAdapter()
        }
    }
}

I'm pretty new to RxJava - I'd be really glad if someone can help me with this. Thank you.


Answer:

You can override Rx Error Handler with following way:

  1. Extends Application class with your custom MyApplication.
  2. Then write this in onCreate method of your application class:

    @Override
    public void onCreate() {
        super.onCreate();
        RxJavaPlugins.setErrorHandler(throwable -> {}); // nothing or some logging
    }
    
  3. Add MyApplication class to Manifest:

    <application
        android:name=".MyApplication"
        ...>
    

For more information, please take a look at RxJava Wiki

Question:

How can I map one occurred exception to another one in RxJava2? For example:

doOnError(throwable -> {
    if (throwable instanceof FooException) {
        throw new BarException();
    }
})

In this case I finally receive CompositeException that consists of FooException and BarException, but I'd like to receive only BarException. Help!


Answer:

You can use onErrorResumeNext and return Observable.error() from it:

source.onErrorResumeNext(e -> Observable.error(new BarException()))

Edit

This test passes for me:

@Test
public void test() {
    Observable.error(new IOException())
    .onErrorResumeNext((Throwable e) -> Observable.error(new IllegalArgumentException()))
    .test()
    .assertFailure(IllegalArgumentException.class);
}

Question:

Suppose I have

// binding dialog open to excel 2
      JavaFxObservable
         .actionEventsOf(importExcel2)
         .map(actionEvent -> chooseFile())
         .filter(Objects::nonNull)
         .observeOn(Schedulers.single())
         .subscribe(file -> {
            sourceFile.setFile(file);
            opRunner.runOp(ImportExcelTable2Op.class);
         });

The fact is chooseFile() returns null when file select is cancelled and streams do not allow nulls.

What to do then?


Answer:

There are two possible solutions:

  1. Turn map into a call to flatMap which returns Observable.just(chooseFile()) or Observable.empty() based on whether the return value is null.
  2. Encode the lack of a result and have chooseFile() return Maybe<File> and turn map into flatMapMaybe. This more accurately encodes what semantics should be.

Question:

I have 2 data sources: DB and server. When I start the application, I call the method from the repository (MyRepository):

public Observable<List<MyObj>> fetchMyObjs() {
    Observable<List<MyObj>> localData = mLocalDataSource.fetchMyObjs();
    Observable<List<MyObj>> remoteData = mRemoteDataSource.fetchMyObjs();
    return Observable.concat(localData, remoteData);
}

I subscribe to it as follows:

mMyRepository.fetchMyObjs()         
            .compose(applySchedulers())
            .subscribe(
                    myObjs -> {
                        //do somthing
                    },
                    throwable ->  {
                        //handle error
                    }
            );

I expect that the data from the database will be loaded faster, and when the download of data from the network is completed, I will simply update the data in Activity.

When the Internet is connected, everything works well. But when we open the application without connecting to the network, then mRemoteDataSource.fetchMyObjs(); throws UnknownHostException and on this all Observable ends (the subscriber for localData does not work (although logs tell that the data from the database was taken)). And when I try to call the fetchMyObjs() method again from the MyRepository class (via SwipeRefresh), the subscriber to localData is triggered.

How can I get rid of the fact that when the network is off, when the application starts, does the subscriber work for localData?


Answer:

Try some of error handling operators:

https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

I'd guess onErrorResumeNext( ) will be fine but you have to test it by yourself. Maybe something like this would work for you:

Observable<List<MyObj>> remoteData = mRemoteDataSource.fetchMyObjs()
      .onErrorResumeNext()

Addidtionally I am not in position to judge if your idea is right or not but maybe it's worth to think about rebuilding this flow. It is not the right thing to ignore errors - that's for sure ;)

Question:

I trying to convert my Asynch task to JavaRx 2. I use google sheets api to download data from spreadsheets. (here is a link how this happens)

Here is a part of my code:

OnCreate:

/**
 * JavaRx
 */

//Observable
Observable<String> observable
        = Observable.create(
        new ObservableOnSubscribe<String>() {
            @Override

            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //Use onNext to emit each item in the stream//
                e.onNext("https://docs.google.com/spreadsheets/d/1W5S5W2QH6WHjUcL1VMwqIqOdFYVleTopJNryQJGw568/gviz/tq?tqx=out:QUERY&tq=select+B,X,Y,Z");

                    //Once the Observable has emitted all items in the sequence, call onComplete//
                e.onComplete();
            }
        }
    ).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

//Create our subscription
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "onSubscribe " + Thread.currentThread().getName());
    }

    @Override
    public void onNext(String value) {
        try {
            String data = getLeagueData(value);
            mLeagues.add(autoProcessJsonLeague("Argentina Primera Division", returnJSON(data)));
        } catch (IOException e) {
            e.printStackTrace();
        }

        Log.e(TAG, "onNext: " + value + Thread.currentThread().getName());

    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "onError: ");
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete: All Done! " + Thread.currentThread().getName());
    }
};
observable.subscribe(observer);

Other Methods:

private String getLeagueData(String urlString) throws IOException {
//Download JSON file
    InputStream is = null;

    try {
        URL url = new URL(urlString);
        HttpsURLConnection conn = (HttpsURLConnection) url.openConnection();
        conn.setInstanceFollowRedirects(true);  //you still need to handle redirect manually.
        HttpsURLConnection.setFollowRedirects(true);
        conn.setReadTimeout(10000 /* milliseconds */);
        conn.setConnectTimeout(15000 /* milliseconds */);
        conn.setInstanceFollowRedirects(true);
        conn.setRequestMethod("GET");
        conn.setDoInput(true);
        // Starts the query
        conn.connect(); //ERROR HAPPENS HERE!
        int responseCode = conn.getResponseCode();
        is = conn.getInputStream();

        String contentAsString = convertStreamToString(is);
        //Log.d("contentAsString", contentAsString);
        return contentAsString;
    } catch (ProtocolException e) {
        e.printStackTrace();
    } catch (MalformedURLException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        if (is != null) {
            is.close();
        }
    }

    return null;
}

private String convertStreamToString(InputStream is) {
    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
    StringBuilder sb = new StringBuilder();

    String line = null;
    try {
        while ((line = reader.readLine()) != null) {
            sb.append(line + "\n");
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        try {
            is.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    return sb.toString();
}


private League autoProcessJsonLeague(String leagueName, JSONObject object) {
    //Get the data from the JSON string
    ArrayList<Team> teams = new ArrayList<>();
    try {
        JSONArray rows = object.getJSONArray("rows");
        for (int r = 0; r < rows.length(); ++r) {
            JSONObject row = rows.getJSONObject(r);
            JSONArray columns = row.getJSONArray("c");
            String name = columns.getJSONObject(0).getString("v");
            int points = columns.getJSONObject(1).getInt("v");
            double hGoalAv = columns.getJSONObject(2).getDouble("v");
            double aGoalAv = columns.getJSONObject(3).getDouble("v");
            hGoalAv = Utilities.round(hGoalAv, 2);
            aGoalAv = Utilities.round(aGoalAv, 2);
            teams.add(new Team(name, points, hGoalAv, aGoalAv));
            //Log.d("Team", name + " " + hGoalAv + " " + aGoalAv);
        }
    } catch (JSONException e) {
        e.printStackTrace();
        e.printStackTrace();
    }
    return new League(leagueName, teams);
}

So I create an observable, I subscribe on IO thread and observeOn the main thread. With onNext I send the url link to the observer and then I try to connect to the server to download the json string file.

Error happens on method getLeagueData() on the line conn.connect(); It says java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.

FULL STACK TRACE ERROR:

08-16 08:53:09.934 29841-29841/com.aresproductions.bettingtools E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.aresproductions.bettingtools, PID: 29841
java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.
  at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111)
  at android.os.Handler.handleCallback(Handler.java:751)
  at android.os.Handler.dispatchMessage(Handler.java:95)
  at android.os.Looper.loop(Looper.java:154)
  at android.app.ActivityThread.main(ActivityThread.java:6195)
  at java.lang.reflect.Method.invoke(Native Method)
  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:874)
  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:764)
 Caused by: android.os.NetworkOnMainThreadException
  at android.os.StrictMode$AndroidBlockGuardPolicy.onNetwork(StrictMode.java:1303)
  at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:86)
  at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:74)
  at java.net.InetAddress.getAllByName(InetAddress.java:752)
  at com.android.okhttp.internal.Network$1.resolveInetAddresses(Network.java:29)
  at com.android.okhttp.internal.http.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:187)
  at com.android.okhttp.internal.http.RouteSelector.nextProxy(RouteSelector.java:156)
  at com.android.okhttp.internal.http.RouteSelector.next(RouteSelector.java:98)
  at com.android.okhttp.internal.http.HttpEngine.createNextConnection(HttpEngine.java:346)
  at com.android.okhttp.internal.http.HttpEngine.connect(HttpEngine.java:329)
  at com.android.okhttp.internal.http.HttpEngine.sendRequest(HttpEngine.java:247)
  at com.android.okhttp.internal.huc.HttpURLConnectionImpl.execute(HttpURLConnectionImpl.java:457)
  at com.android.okhttp.internal.huc.HttpURLConnectionImpl.connect(HttpURLConnectionImpl.java:126)
  at com.android.okhttp.internal.huc.DelegatingHttpsURLConnection.connect(DelegatingHttpsURLConnection.java:89)
  at com.android.okhttp.internal.huc.HttpsURLConnectionImpl.connect(HttpsURLConnectionImpl.java)
  at com.aresproductions.bettingtools.MainActivity.getLeagueData(MainActivity.java:307)
  at com.aresproductions.bettingtools.MainActivity.access$000(MainActivity.java:80)
  at com.aresproductions.bettingtools.MainActivity$2.onNext(MainActivity.java:180)
  at com.aresproductions.bettingtools.MainActivity$2.onNext(MainActivity.java:171)
  at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:198)
  at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:250)
  at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109)
  at android.os.Handler.handleCallback(Handler.java:751) 
  at android.os.Handler.dispatchMessage(Handler.java:95) 
  at android.os.Looper.loop(Looper.java:154) 
  at android.app.ActivityThread.main(ActivityThread.java:6195) 
  at java.lang.reflect.Method.invoke(Native Method) 
  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:874) 
      at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:764) 
08-16 08:53:09.936 29841-29841/com.aresproductions.bettingtools E/MQSEventManagerDelegate: failed to get MQSService.

Thanks in advance!


Answer:

The problem is that you are doing the network call on the main thread. Though you have subscribed on Schedulers.io(), the onNext() method where you are doing the network call will be called on main thread since you are observing on main thread observeOn(AndroidSchedulers.mainThread()).

The solution will be to call the getLeagueData(String urlString) inside subscribe() method of observable and call e.onNext(result) with the result of the network call.

Question:

I have the following RxJava 2 code (in Kotlin), which have an Observable that

disposable = Observable.create<String>({
    subscriber ->
            try {
                Thread.sleep(2000)
                subscriber.onNext("Test")
                subscriber.onComplete()
            } catch (exception: Exception) {
                subscriber.onError(exception)
            }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe({ result -> Log.d("Test", "Completed $result") },
             { error -> Log.e("Test", "Completed ${error.message}") })

While it is still Thread.sleep(2000), I perform disposable?.dispose() call, it will error out

FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.elyeproj.rxstate, PID: 10202
java.lang.InterruptedException 
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:371)
    at java.lang.Thread.sleep(Thread.java:313)
    at presenter.MainPresenter$loadData$1.subscribe(MainPresenter.kt:41)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)

I expect the dispose would help to cancel the operation silently, or at most, have the error catch with with the Log.e on the subscribe. However, it just crash as per the error message above.

Why did the Exception escape? Isn't dispose suppose to cancel the entire operation silently without crashing?


Answer:

There is a combination of factors here:

  1. dispose of a stream that uses subscribeOn also disposes of the thread used. This also involves calling Thread.interrupt() when using Schedulers.io(). This causes the exception in your case.
  2. InterruptedException is an Exception thrown by Thread.sleep, so it is caught by your code and passed to onError like any other exception.
  3. Calling onError after dispose redirects the error to the global error handler due to RxJava2's policy of NEVER throwing away errors. To work around this check subscriber.isDisposed() before calling onError or use RxJava 2.1.1's new subscriber.tryOnError.

    if (!subscriber.isDisposed()) {
      subscriber.onError(exception)
    }
    

Question:

I have a Spring-boot app that saves data to a Couchbase server.

I like to save about 500k Students using this code:

for (String student: students) {
  ...
  bucket.upsert(JsonDocument.create(<unique key here>, TTL, JsonObject.fromJson(studentAsJson)));                
}

This works fine for small amount of Students (up to 100K +-).

But whenever app reaches roughly the 110K-140K Student, I get this error:

2018-12-23 16:20:57.804 ERROR 17468 --- [nio-8989-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.couchbase.client.java.error.TemporaryFailureException] with root cause

rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73) ~[rxjava-1.3.8.jar:1.3.8]
at rx.observers.Subscribers$5.onNext(Subscribers.java:235) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.setProducer(OnSubscribeTimeoutTimedWithFallback.java:155) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103) ~[rxjava-1.3.8.jar:1.3.8]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:508) ~[core-io-1.7.1.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.access$000(AbstractGenericHandler.java:86) ~[core-io-1.7.1.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:526) ~[core-io-1.7.1.jar:na]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.3.8.jar:1.3.8]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_161]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_161]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

My Couchbase is running in Docker on the localhost and I wonder if this is some real problem or just limitation due to docker & localhost.


Answer:

Never worked with CouchBase but it's seems like it's a problem with backpressure(https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0)).

So basically your consumer(in this case CouchBase) can't consume the data as fast as you give to it. From what can be seen from the logs you are using RxJava1. I would suggest upgrading to RxJava2. It will need some time because of the changes but you will have access to Flowable(http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html) which offers BackPressure out of the box.

By setting up a Flowable with backpressure you will be able to take care of the problem for slow consumers as it allows you to wait for a consumer to consume the data then send to it the new ones.

If you can't update the version of RxJava this article from David Karnok is worth taking a look as it show ways to deal with this chains without the need for backpresssure: https://github.com/ReactiveX/RxJava/wiki/Backpressure

Hope this helps.

Question:

I recently started using RxJava 2 and am wondering if its possible to catch more specific Exceptions when calling Completable.blockingAwait().

This is what I am doing:

try {

  Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(@NonNull CompletableEmitter e) throws Exception {
        e.onError(new IOException());
    }
  }).blockingAwait();

} catch (RuntimeException e) {
  Throwable cause = e.getCause();

  if(cause instanceof IOException) {
      // Handle IOException
  }
}

And this is what I would like to do:

try {

  Completable.create(new CompletableOnSubscribe() {
    @Override
    public void subscribe(@NonNull CompletableEmitter e) throws Exception {
        e.onError(new IOException());
    }
  }).blockingAwait();

} catch (IOException e) {
  // Handle IOException
}

Is there any possibility to make this work?


Answer:

A couple of lines down in the linked Javadoc there is the blockingGet operator that returns a Throwable which you can do instanceof checks with:

 Throwable err = Completable.error(new IOException()).blockingGet();
 if (err instanceof IOException) {
     // ...
 }

Question:

So I am trying to implement instant search using rxjava2 and retrofit, the process is simple, as soon as the user changes the text publish.onNext() is called (publish is a PublishSubject object). I have added filter and debounce and switch map operators to facilitate search from the server when text's length is greater than a threshold and call is not made with successive input simultaneously.

This is the code :

subject = PublishSubject.create();
    getCompositeDisposable().add(subject
            .filter(s -> s.length() >= 3)
            .debounce(300,
                    TimeUnit.MILLISECONDS)
            .switchMap(s -> getDataManager().getHosts(
                    getDataManager().getDeviceToken(),
                    s).observeOn(Schedulers.io()))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(hostResponses -> {
                getMvpView().hideEditLoading();
                if (hostResponses.size() != 0) {
                    if (this.hostResponses != null)
                        this.hostResponses.clear();
                    this.hostResponses = hostResponses;
                    getMvpView().setHostView(getHosts(hostResponses));
                } else {
                    getMvpView().onFieldError("No host found");
                }

            }, throwable -> {
                getMvpView().hideEditLoading();
                if (throwable instanceof HttpException) {
                   HttpException exception = (HttpException)throwable;
                    if (exception.code() == 401) {
                        getMvpView().onError(R.string.code_expired,
                                BaseUtils.TOKEN_EXPIRY_TAG);
                    }
                }

            })
    );

Now my code is working fine, I am achieving what I need but I am getting a bug when I enter a long string and press backspace button, what happens is that when my AutoCompleteTextView's text is cleared, an exception is thrown

Here is the stacktrace of the exception :

java.io.InterruptedIOException: thread interrupted  
at okio.Timeout.throwIfReached(Timeout.java:145)  
at okio.Okio$1.write(Okio.java:76)  
at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)  
at okio.RealBufferedSink.flush(RealBufferedSink.java:216)  
at okhttp3.internal.http1.Http1Codec.finishRequest(Http1Codec.java:166)  
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:84) 
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at com.facebook.stetho.okhttp3.StethoInterceptor.intercept(StethoInterceptor.java:56)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)  
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)  
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)  
at okhttp3.RealCall.execute(RealCall.java:77)  
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)  
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)  
at io.reactivex.Observable.subscribe(Observable.java:10700)  
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)  
at io.reactivex.Observable.subscribe(Observable.java:10700)  
at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45)  
at io.reactivex.Observable.subscribe(Observable.java:10700)  
at io.reactivex.internal.operators.observable.ObservableSwitchMap$SwitchMapObserver.onNext(ObservableSwitchMap.java:126)  
at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111)  
at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceTimedObserver.emit(ObservableDebounceTimed.java:140)  
at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceEmitter.run(ObservableDebounceTimed.java:165)  
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)  
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)  
at java.util.concurrent.FutureTask.run(FutureTask.java:237)  
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)  
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)  
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)  
at java.lang.Thread.run(Thread.java:762

Answer:

That inner observeOn(Schedulers.io()) doesn't look right, given that you immediately move the element back to the main thread after that. It should be subscribeOn(Schedulers.io()) there instead.

Also remove the subscribeOn() call just before your subscribe call as it should have no practical effect given that the chain is subscribed to a PublishSubject at the top.

.switchMap(s -> getDataManager()
               .getHosts(getDataManager().getDeviceToken(), s)
//             .observeOn(Schedulers.io())
               .subscribeOn(Schedulers.io())   // <-------------------------
)
.observeOn(AndroidSchedulers.mainThread())
//.subscribeOn(Schedulers.io())   // <--------------------------------------
.subscribe(hostResponses -> {

Question:

Although one of the rail is emitted onError(), isCancelled() in another rails is still return false, that lead to the UndeliverableException. How to check downstream is cancelled in parallel rails?

Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        System.out.println("Flowable.create-emitter.isCancelled:" + emitter.isCancelled());
        for (int i = 1; i < 10; i++) {
            emitter.onNext(i);
        }
        emitter.onComplete();
    }

}, BackpressureStrategy.BUFFER).parallel(6).runOn(Schedulers.io())
        .flatMap(new Function<Integer, Publisher<String>>() {
            @Override
            public Publisher<String> apply(Integer t) throws Exception {
                // TODO Auto-generated method stub
                return Flowable.create(new FlowableOnSubscribe<String>() {

                    @Override
                    public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                            System.out.println("flatMap-before onError-isCancelled:" + emitter.isCancelled());
                            try {
                                if (true) { // assume trigger the error
                                    throw new Exception("Test");
                                }
                                if (!emitter.isCancelled()) {
                                    emitter.onNext(String.valueOf((t + 1)));
                                    emitter.onComplete();
                                }
                            } catch (Exception ex) {
                                if (!emitter.isCancelled()) {
                                    emitter.onError(ex);
                                }
                            }
                            System.out.println("flatMap-after onError-isCancelled:" + emitter.isCancelled());
                    }
                }, BackpressureStrategy.BUFFER);
            }

        }).sequential().subscribeOn(scheduler).observeOn(Schedulers.single())
        .subscribeWith(new ResourceSubscriber<String>() {

            public void onComplete() {
                System.out.println("onComplete");

            }

            public void onError(Throwable arg0) {
                System.out.println("onError:" + arg0.toString());

            }

            public void onNext(String arg0) {
                System.out.println("onNext:" + arg0);
            }

        });

Answer:

I found the solution. I need add a global error consumer to solve the problem. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

Question:

We need a global handler for rxJava2 exceptions passed to onError of any subscriber to just log the stacktrace happened in any part of the system. In the first version of rxJava is a RxJavaHooks.setOnError method which does exactly this thing.

RxJavaHooks.setOnError(throwable -> Log.w(TAG, "rx error", throwable));

But in the rxJava2 that class was removed and as they noticed the logic have been moved to RxJavaPlugins class. It has some similar named method RxJavaPlugins.setErrorHandler, but actually the invocation purpose is completely different. It handles only undeliverable exceptions. How can i log all exceptions either it is catched by subscriber or not? I don't really want to attach logger to every particular observable in the system Upd: maybe you have some advice only for android system


Answer:

There is no simple way of doing it; you have to install onSubscribe or onAssembly hooks to all the main reactive types which then wrap the Subscribers in a way that doesn't interfere with internal optimizations unexpectedly.

The extensions project has an assembly tracking debug feature which captures the stacktrace of the current thread when flows are assembled and attaches that information in the onError exception passing through it (which you actually need).

Question:

I tried to use RxJava with Retrofit to chain network requests in Android. But as I stated in the title, it causes an IllegalArgumentException. In the following, you can see my code I have written so far(I also included the imports and the gradle file with the dependencies I use for my project):

ApiClient.java :

import retrofit2.Retrofit;

import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;
import retrofit2.http.GET;
import retrofit2.http.Header;
import retrofit2.http.Path;
import rx.Observable;


public class ApiClient {

    // trailing slash is needed
    public static final String BASE_URL = "....";

    private static CliqueDBApiInterface sCliqueDBApiInterface;

    public static CliqueDBApiInterface getCliqueDBApiInterface(){

        if(sCliqueDBApiInterface == null){
            Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();

            sCliqueDBApiInterface = retrofit.create(CliqueDBApiInterface.class);
        }

        return sCliqueDBApiInterface;
    }

    public interface CliqueDBApiInterface{
        @GET("statementsOfClique/{idOfClique}")
        //Call<ListOfStatements> getStatementsOfTheClique(@Path("idOfClique") int id, @Header("Authorization") String authHeader );
        Observable<ListOfStatements> getStatementsOfTheClique(@Path("idOfClique") int id, @Header("Authorization") String authHeader );
    }

}

MainActivity.java:

import retrofit2.HttpException;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;

// ... some other code

Observable<ListOfStatements> call = service.getStatementsOfTheClique(clique.getId(), authenticationToken);

call.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<ListOfStatements>() {
          @Override
          public void onNext(ListOfStatements listOfStatements) {
              // Called once the `ListOfStatements` object is available
              List<Statement> statementList = listOfStatements.getStatementsOfClique();
              // do something with statementList


          }

          @Override
          public void onCompleted() {
             // Nothing to do here
          }

          @Override
          public void onError(Throwable e) {
             if (e instanceof HttpException) {
                int code = ((HttpException) e).code();
             }

          }
});

Now the gradle file:

apply plugin: 'com.android.application'

android {
    compileSdkVersion 28
    defaultConfig {
        applicationId "com.celik.abdullah.hefuxi11"
        minSdkVersion 19
        targetSdkVersion 28
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
    }
    buildTypes {
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }
    packagingOptions{
        exclude 'META-INF/rxjava.properties'
    }
}

dependencies {
    implementation fileTree(include: ['*.jar'], dir: 'libs')
    implementation 'com.android.support:appcompat-v7:28.0.0'
    implementation 'com.android.support.constraint:constraint-layout:1.1.3'
    implementation 'com.android.support:recyclerview-v7:28.0.0'
    implementation 'com.android.volley:volley:1.1.1'
    implementation 'com.google.code.gson:gson:2.8.5'
    implementation 'com.android.support:cardview-v7:28.0.0'
    testImplementation 'junit:junit:4.12'
    androidTestImplementation 'com.android.support.test:runner:1.0.2'
    androidTestImplementation 'com.android.support.test.espresso:espresso-core:3.0.2'
    implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
    implementation 'com.squareup.retrofit2:retrofit:2.5.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
    implementation 'io.reactivex:rxandroid:1.2.0'
    implementation 'io.reactivex:rxjava:1.1.4'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'

}

But the app crashes and I get the following error/exception in the LogCat:

03-13 18:36:29.190 4128-4128/com.celik.abdullah.hefuxi11 E/AndroidRuntime: FATAL EXCEPTION: main Process: com.celik.abdullah.hefuxi11, PID: 4128 java.lang.IllegalArgumentException: Unable to create call adapter for rx.Observable for method CliqueDBApiInterface.getStatementsOfTheClique at retrofit2.Utils.methodError(Utils.java:52) at retrofit2.HttpServiceMethod.createCallAdapter(HttpServiceMethod.java:60) at retrofit2.HttpServiceMethod.parseAnnotations(HttpServiceMethod.java:34) at retrofit2.ServiceMethod.parseAnnotations(ServiceMethod.java:36) at retrofit2.Retrofit.loadServiceMethod(Retrofit.java:168) at retrofit2.Retrofit$1.invoke(Retrofit.java:147) at java.lang.reflect.Proxy.invoke(Proxy.java:913) at $Proxy1.getStatementsOfTheClique(Unknown Source) at com.celik.abdullah.hefuxi11.adapters.CliqueAdapter$1.run(CliqueAdapter.java:124) at android.os.Handler.handleCallback(Handler.java:790) at android.os.Handler.dispatchMessage(Handler.java:99) at android.os.Looper.loop(Looper.java:164) at android.app.ActivityThread.main(ActivityThread.java:6494) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807) Caused by: java.lang.IllegalArgumentException: Could not locate call adapter for rx.Observable. Tried: * retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory * retrofit2.CompletableFutureCallAdapterFactory * retrofit2.ExecutorCallAdapterFactory at retrofit2.Retrofit.nextCallAdapter(Retrofit.java:239) at retrofit2.Retrofit.callAdapter(Retrofit.java:203) at retrofit2.HttpServiceMethod.createCallAdapter(HttpServiceMethod.java:58) ... 14 more

Do somebody know how to resolve that problem? I have found some other SO questions about this exception and tried all answers by playing/changing the import packages in the gradle file but nothing has worked. I also add the addCallAdapterFactory(RxJava2CallAdapterFactory.create()) when I build the retrofit instance but I got the same error.

Thanks in advance,


Answer:

You are using RxJava 1.x observable and subscribers. You have added adapter factory of RxJava 2.x at .addCallAdapterFactory(RxJava2CallAdapterFactory.create())

you should import from RxJava 2.x

accordingly, you need to update dependency as well

implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'




import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;

Question:

In the below code the subscriber stops recieving data whenever there is a timeout exception. How can I make sure that the subscriber does not stop when there is exception.

public class ReactiveDataService
{
    private static String[] quotes = {"ITEM1", "ITEM2", "ITEM3"};
    public Observable<Notification<String>> getStreamData()
    {

    return Observable.create(subscriber -> {
        if(!subscriber.isUnsubscribed())
        {

            Stream<String> streams = Arrays.stream(quotes);
            streams.map(quote -> quote.toString()).filter(quote -> quote!=null)
            .forEach(q -> { 
                subscriber.onNext(Notification.createOnNext(q));
                try
                {
                    Random rand = new Random();
                    Integer i = (rand.nextInt(5)+1)*1000;
                    Thread.sleep(i);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            });
        }
        subscriber.onCompleted();
    });

    }
}    


public class ReactiveResource
{
    public static void main(String args[])
    {
    Observable<Notification<String>> watcher = new ReactiveResource().getData()
    .timeout(4, TimeUnit.SECONDS)
    .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
    .onErrorResumeNext(th -> {
        return Observable.just(Notification.createOnError(new TimeoutException("Timed Out!")));
    });

    watcher.subscribe(
            ReactiveResource::callBack, 
            ReactiveResource::errorCallBack,
            ReactiveResource::completeCallBack
    );
}

public static Action1 callBack(Notification<String> data)
{
    System.out.println(data.getValue());
    return null;
}

public static void errorCallBack(Throwable throwable)
{
    System.out.println(throwable instanceof TimeoutException);
    System.out.println(throwable);      
}

public static void completeCallBack()
{
    System.out.println("On completed successfully");
}


private Observable<Notification<String>> getData()
{
    return new ReactiveDataService().getStreamData();
}

Answer:

You can combine publish, mergeWith and timer to achieve this effect:

static <T> ObservableTransformer<T, T> onTimeoutKeepAlive(
        long timeout, TimeUnit unit, Scheduler scheduler, T keepAliveItem) {
    return upstream -> 
        upstream.publish(o ->
            o.mergeWith(
                Observable.timer(timeout, unit, scheduler)
                .map(t -> keepAliveItem)
                .takeUntil(o)
                .repeat()
                .takeUntil(o.ignoreElements().toObservable())
            )
        );
}

usage:

source
    .compose(onTimeoutKeepAlive(
         10, TimeUnit.SECONDS, Schedulers.computation(),
         Notification.createOnError(new TimeoutException())
    ))
    .subscribe(/* ... */);

Question:

I've code like this in a repository:

return Completable.fromAction {
    // Some code
    loginService.login(id)
        .subscribe(
            { response ->
                if(response.isNotSuccessful()) {
                    throw Exception()
                }
                // Some code
            },
            { e ->
                throw e
            }
    )
}

I've code like this in a ViewModel:

fun onLoginAction(id) {
    repository.login(id)
        .subscribe(
            {
                showSuccess()
            },
            {
                showFailure()
            }
    )
}

Basically, the ViewModel calls the login method in the repository which returns the Completable.

This results in an UndeliverableException when the response is not successful. I want the Completable's subscriber's onError() method to be called. How do I do this?


Answer:

I don't have enough knowledge to actually say this with certainty, but I still think this has some value to you and it's too big for a comment.

Here's what I think it's happening. When onError fails rx won't run this through the same observable stream. Instead, it will propagate this to the RxPlugins error handler and eventually to the default exception handler in your system. You can find this here.

This is to say that when loginService.login(id) throws the exception in the onError, the Completable stream won't have a chance to catch it and forward it to the onError of the outer subscribe. In other words, the completable stream is independent of the login service one.

Usually, you'd want to create one single stream and let the view model subscribe to it. If you have more than one stream, rx has loads of operators to help you chain these. Try and make the repository return one stream from the service. Something like this:

fun login(id) = loginService.login(id)

And now on the view model, you can check if the call was or not successful using the same method - response.isNotSuccessful()

Question:

I have few report's from different users they have problem with crashing application.

To be honest I already don't have to much ideas what can happen.

Device is no active for some times for ex. 40 - 60 minutes and then this error.

Thanks for any advice or suggestions.

Error logs:

07-27 18:08:12.558 1883-5804/? E/AndroidRuntime: FATAL EXCEPTION: RxNewThreadScheduler-42\
io.reactivex.exceptions.UndeliverableException: java.io.IOException\
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)\
    at io.reactivex.internal.operators.single.SingleCreate$Emitter.onError(SingleCreate.java:82)\
    at com.framelogic.rabbitmq.connection.RabbitMQConnectionImpl.lambda$getChannel$5$RabbitMQConnectionImpl(RabbitMQConnectionImpl.java:141)\
    at com.framelogic.rabbitmq.connection.RabbitMQConnectionImpl$$Lambda$3.subscribe(Unknown Source)\
    at io.reactivex.internal.operators.single.SingleCreate.subscribeActual(SingleCreate.java:39)\
    at io.reactivex.Single.subscribe(Single.java:3310)\
    at io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)\
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:579)\
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)\
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)\
    at java.util.concurrent.FutureTask.run(FutureTask.java:234)\
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)\
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)\
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)\
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)\
    at java.lang.Thread.run(Thread.java:841)\
 Caused by: java.io.IOException\
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124)\
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120)\
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142)\
    at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:136)\
    at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)\
    at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:559)\
    at com.framelogic.rabbitmq.connection.RabbitMQConnectionImpl.lambda$getChannel$5$RabbitMQConnectionImpl(RabbitMQConnectionImpl.java:132)\
    at com.framelogic.rabbitmq.connection.RabbitMQConnectionImpl$$Lambda$3.subscribe(Unknown Source)\'a0\
    at io.reactivex.internal.operators.single.SingleCreate.subscribeActual(SingleCreate.java:39)\'a0\
    at io.reactivex.Single.subscribe(Single.java:3310)\'a0\
    at io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)\'a0\
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:579)\'a0\
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)\'a0\
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)\'a0\
    at java.util.concurrent.FutureTask.run(FutureTask.java:234)\'a0\
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)\'a0\
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)\'a0\
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)\'a0\
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)\'a0\
    at java.lang.Thread.run(Thread.java:841)\'a0\
 Caused by: com.rabbitmq.client.ShutdownSignalException: connection error\
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)\
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)\
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443)\
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263)\
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136)\
    at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:136)\'a0\
    at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)\'a0\
    at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:559)\'a0\
    at com.framelogic.rabbitmq.connection.RabbitMQConnectionImpl.lambda$getChannel$5$RabbitMQConnectionImpl(RabbitMQConnectionImpl.java:132)\'a0\
    at com.framelogic.rabbitmq.connection.RabbitMQConnectionImpl$$Lambda$3.subscribe(Unknown Source)\'a0\
    at io.reactivex.internal.operators.single.SingleCreate.subscribeActual(SingleCreate.java:39)\'a0\
    at io.reactivex.Single.subscribe(Single.java:3310)\'a0\
    at io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)\'a0\
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:579)\'a0\
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)\'a0\
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)\'a0\
    at java.util.concurrent.FutureTask.run(FutureTask.java:234)\'a0\
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)\'a0\
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)\'a0\
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)\'a0\
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)\'a0\
    at java.lang.Thread.run(Thread.java:841)\'a0\
 Caused by: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 60 seconds\
    at com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection.java:784)\
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:684)\
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)\
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:603)\
    at java.lang.Thread.run(Thread.java:841)\'a0\

Answer:

Solution:

Implement global rx error handler ( https://github.com/ReactiveX/RxJava/blob/2.x/docs/What's-different-in-2.0.md#error-handling )

 // If Java 8 lambdas are supported
    RxJavaPlugins.setErrorHandler(e -> { });

    // If no Retrolambda or Jack 
    RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer());

Of course this will not fix the problem. This will only prevent application crash if we forget to implement onError() method somewhere.

However everything is better than application crash : )

Question:

I have a problem with throwing custom exceptions when using Retrofit2 and RxJava2.

Retrofit Api definition

public interface BackendInterface {

    @Headers({"......"})
    @POST("international-services/")
    Observable<BackendResponse> post(@Body BackendRequest request, @Header("sessionId") String sessionId);
}

Usage of Retrofit Api

@Override
public Observable<SomeBusinessObject> getSomething(@NonNull String sessionId, @NonNull SomeOtherBusinessObject something) {
    BackendRequest br = createBackendRequest(something);
    return backendInterface.post(br, sessionId).map(backendResponse -> {

        if (backendResponse.getResult().getCode() != 0) {
            return Observable.error(new ServiceException("my custom exception"));
        }

        SomeBusinessObject result = mapBusinessObject(br);
        return result;
    });
}

The problem is the following line

return Observerable.error(new ServiceException("my custom exception"));

Android Studio marks it as error and says "no instance(s) of type variable(y) T exist so that Observable conforms to SomeBusinessObject inference variable R has incompatible bounds: equality contraints: SomeBusinessObject lower bounds: Observable

The IDE's quick fix suggests to make my method "getSomething" return Observable.

I have no idea how to forward custom exceptions to the subscriber of my observable.


Answer:

Your .map is trying to return both SomeBusinessObject as well as Observable<"SomeType">. Since they don't have a common parent object it cannot resolve what Observable.error() should return.

The solution it to use .flatmap instead and replacing the return with return Observable.just(result);

Question:

I'm introducing myself to rxJava2 in Android with MVP.

For initial steps I have created an Activity, a presenter and a Model objects (every of them with an interface class).

In my activity, I create the presenter, and try to load data, notifying the view when it is ready:

protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.my_activity);

    MvpPresenter presenter = new Presenter(this,
            AndroidSchedulers.mainThread());

    presenter.loadData();

Inside the presenter, on loadData method, just calling back to view

@Override
public void loadData() {

    compositeDisposable.add(dataRepository.getDataList().
            subscribeOn(Schedulers.io()).
            observeOn(mainScheduler).
            subscribeWith(new DisposableSingleObserver<List<Book>>() {
                @Override
                public void onSuccess(@NonNull List<Book> bookList) {
                    if (!bookList.isEmpty())
                        view.displayBooks(bookList);
                    else
                        view.displayEmpty();
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    view.displayError("boooom");
                }
            }));
    }

Method getBooks() is as follows (I have in mind to use an API, but so far, for simplicity purposes, I just have the data hardcoded)

public Single<List<Shelf>> getShelfList() {
    return Single.fromCallable(new Callable<List<Book>>() {
        @Override
        public List<Book> call() throws Exception {
            Book b1 = new Book();
            Book b2 = new Book();
            Book b3 = new Book();

            List<Book> bookList = Arrays.asList(b1,b2,b3);
            return bookList;
        }
    });
}

The method view.displayBooks(bookList) just have a Toast message (again, for simplicity purposes)

public void displayBooksInShelf(List<Shelf> shelfList) {
        Toast.makeText(this, shelfList.size(), Toast.LENGTH_LONG).show();
    }

Seems very simple and it should the Toast as three books are returned correctly... but when executing, it throws an exception:

java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.
 at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111)
 at android.os.Handler.handleCallback(Handler.java:751)
 at android.os.Handler.dispatchMessage(Handler.java:95)
 at android.os.Looper.loop(Looper.java:154)
 at android.app.ActivityThread.main(ActivityThread.java:6077)
 at java.lang.reflect.Method.invoke(Native Method)
 at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:866)
 at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:756)
Caused by: android.content.res.Resources$NotFoundException: String resource ID #0x2
 at android.content.res.Resources.getText(Resources.java:331)
 at android.widget.Toast.makeText(Toast.java:287)

In Unitary Test (Mockito) everything seems to work fine, but together it fails. My suspects are that there is some kind of thread issue. Anybody can help my with this problem?

Also, knowing that I'd like to use retrofit, is the approach I'm using the correct one? Or maybe the thread should be managed in the Model/Repository class?

Hope the information I've provided is enough to clarify where the problem is

Thanks in advance!


Answer:

Notice this line in the stacktrace:

Caused by: android.content.res.Resources$NotFoundException: String resource ID #0x2

You are trying to access String resource with invalid resource id. Here where it happens:

Toast.makeText(this, shelfList.size(), Toast.LENGTH_LONG).show();

The second argument must be String message or int resource id. You are passing int here, so compiler chooses overload with a resource id (obviously invalid).

If you want to just print int you have to pass String representation of it:

Toast.makeText(this, Integer.toString(shelfList.size()), Toast.LENGTH_LONG).show();

Question:

I have a vertx service for all message-broker related operations. For e.g. an exchange creation function looks like this

@Override
    public BrokerService createExchange(String exchange, 
    Handler<AsyncResult<JsonArray>> resultHandler) {
        try {
            getAdminChannel(exchange).exchangeDeclare(exchange, "topic", true);
            resultHandler.handle(Future.succeededFuture());
        } catch(Exception e) {
            e.printStackTrace();
            resultHandler.handle(Future.failedFuture(e.getCause()));
        }
        return this;
    }

I am in the process of converting my entire codebase into rxjava and I would like to convert functions like these into completables. Something like:

try {
    getAdminChannel(exchange).exchangeDeclare(exchange, "topic", true);
    Completable.complete();
} catch(Exception e) {
    Completable.error(new BrokerErrorThrowable("Exchange creation failed"));
}

Furthermore, I would also like to be able to throw custom errors like Completable.error(new BrokerErrorThrowable("Exchange creation failed")) when things go wrong. This is so that I'll be able to catch these errors and respond with appropriate HTTP responses.

I saw that Completable.fromCallable() is one way to do it, but I haven't found a way to throw these custom exceptions. How do I go about this? Thanks in advance!


Answer:

I was able to figure it out. All I had to do was this:

@Override
  public BrokerService createExchange(String exchange, Handler<AsyncResult<Void>> resultHandler) {
    Completable.fromCallable(
            () -> {
              try {
                getAdminChannel(exchange).exchangeDeclare(exchange, "topic", true);
                return Completable.complete();
              } catch (Exception e) {
                return Completable.error(new InternalErrorThrowable("Create exchange failed"));
              }
            })
        .subscribe(CompletableHelper.toObserver(resultHandler));

    return this;
  }

Question:

I'm trying to use RXJava2 with Room database . I use Single Observable to get a list of my object . But every time I get this error

cannot find symbol class EmptyResultSetException

I'm using this tutorial ANDROID ROOM PERSISTENT AND RXJAVA2

I used it before with AsyncTask without RXJava and it worked so I wanted to use it with RXJava

Here is my build.gradle

apply plugin: 'com.android.application'

  android {
compileSdkVersion 28
defaultConfig {
    applicationId "apit.net.sa.simpleusingroom"
    minSdkVersion 16
    targetSdkVersion 28
    versionCode 1
    versionName "1.0"
    testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
}
buildTypes {
    release {
        minifyEnabled false
        proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
    }
}
}

    dependencies {
implementation fileTree(dir: 'libs', include: ['*.jar'])
implementation 'com.android.support:appcompat-v7:28.0.0'
implementation 'com.android.support.constraint:constraint-layout:1.1.3'




implementation 'com.android.support:support-v13:28.0.0'
implementation 'com.android.support:recyclerview-v7:28.0.0'
implementation 'com.android.support:design:28.0.0'
implementation 'com.jakewharton:butterknife:8.5.1'
    //ROOM database
implementation 'android.arch.persistence.room:runtime:1.0.0'
annotationProcessor 'android.arch.persistence.room:compiler:1.0.0'

//RXJava2
implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

//RXjava with room
implementation 'android.arch.persistence.room:rxjava2:1.0.0-alpha1'

annotationProcessor 'com.jakewharton:butterknife-compiler:8.5.1'
testImplementation 'junit:junit:4.12'
androidTestImplementation 'com.android.support.test:runner:1.0.2'
androidTestImplementation 'com.android.support.test.espresso:espresso-core:3.0.2'
 }

Dao Class

 @Query("SELECT * FROM UserEntity")
Single<List<UserEntity>> getUsersSingle();

MyActivity

DatabaseSingInstance.getInstance(this)
            .getOurDatabase()
            .userDAO()
            .getUsersSingle()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableSingleObserver<List<UserEntity>>() {


                @Override
                public void onSuccess(final List<UserEntity> userEntities) {
                    adapter = new UserAdapter(userEntities, MainActivity.this, new UserAdapter.OnLongClickListner() {
                        @Override
                        public void onClick(int position) {
                            showPopupWindow(userEntities,position);
                        }
                    });
                    myRec.setAdapter(adapter);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e("error"," "+e.getMessage());
                }


            });

Answer:

I solved the problem according to this Answer

I just changed the room_version to 1.1.0 . By the way I'm waiting for any one who can explain this error or just explain why it happened .

Question:

I have a simple code below

    compositeDisposable.add(Observable.create<Int> { Thread.sleep(1000) }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({}, {Log.d("Track", it.localizedMessage)}, {}))
    Handler().postDelayed({compositeDisposable.clear()}, 100)

It purposely use Thread.sleep(1000), just to trigger the InterruptedException. I purposely delay 100 milliseconds, so that ensure the sleep in the chain has started, and dispose it.

(Note, I know using of Thread.sleep is not preferred. I'm just writing this code to test and understand why onError is not called on this scenario, and how to prevent the crash elegantly without need to use try-catch in the RxJava chain)

At that time, when it is triggered, the error is not cause onError (i.e. doesn't reach the Log. But instead it throws the below error and crash the App.

io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:74)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:43)
    at io.reactivex.Observable.subscribe(Observable.java:11194)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:463)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
    at java.lang.Thread.run(Thread.java:764)
 Caused by: java.lang.InterruptedException
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:373)
    at java.lang.Thread.sleep(Thread.java:314)
    at com.elyeproj.porterduff.AnimateDrawPorterDuffView$startAnimate$1.subscribe(AnimateDrawPorterDuffView.kt:45)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:11194) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:463) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) 
    at java.lang.Thread.run(Thread.java:764) 

Why was't the InterruptedException caught by the onError in RxJava?


Answer:

As per pointed by @akarnokd, in https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling, looks like the RxJava has been disposed before the throw, hence the error thrown later got through. To address the issue just need to register

 RxJavaPlugins.setErrorHandler { e -> /*Do whatever we need with the error*/ }

Question:

I'm working on some concurrent io operation. The code below is a test code for my architecture. I want to give timeout to async io operations but the flow stucks after some steps. When i discard the timeout func code works well.

What i'm missing ? Where is the problem ?

    Scheduler mainObserveOn = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler mainSubscribeOn = Schedulers.from(Executors.newFixedThreadPool(1));
    Scheduler flatmapObserveOn = Schedulers.from(Executors.newFixedThreadPool(2));
    Scheduler flatmapSubscribeOn = Schedulers.from(Executors.newFixedThreadPool(2));

    AtomicLong a = new AtomicLong();
    Flowable.create(emitter -> {

        while(true)
        {
            for (int i = 0; i < 100; i++)
            {
                emitter.onNext(a.incrementAndGet());
                System.out.println("Emit : "+a.get()+" | Thread : "+Thread.currentThread().getName());
            }

            Thread.sleep(100);
        }

    }, BackpressureStrategy.BUFFER)
    .observeOn(mainObserveOn)
    .flatMap(
            o -> Flowable.just(o)
                    .observeOn(flatmapObserveOn)
                    .doOnNext(o1 -> {
                        System.out.println(o1+" | Thread : "+Thread.currentThread().getName());

                        if( (long)o1 % 10 == 0 )
                        {
                            System.out.println("SLEEP");
                            Thread.sleep(10);
                        }
                    })
                    .doOnError(throwable -> throwable.printStackTrace())
                    .onExceptionResumeNext(s -> {})
                    .timeout(2, TimeUnit.MILLISECONDS)
                    .subscribeOn(flatmapSubscribeOn)
    )
    .subscribeOn(mainSubscribeOn)
    //.blockingSubscribe(System.out::println, System.err::println);
    .subscribe(System.out::println, System.err::println);


    Thread.sleep(1000000000000000000L);

By the way i want to skip the error operations so using empty onExceptionResumeNext. Is there better way for this ?


Answer:

The timeout code inside doOnNext() will cause the observer chain to sleep for 10 seconds every 10 emissions. Later, you have a timeout() for 2 seconds; this will trigger on the tenth emission, causing onError() to be emitted. You have no error handling code after this, so it will likely cause the entire observer chain to quit.

The result will be 101 log lines for the initial emissions, followed by 10 log lines for the first 10 emissions followed by an error message, likely Timeout.

It isn't clear what you want to happen, but this is what you get.

Question:

Following is a simplified example of a problem that I'm trying to tackle. In my code base, the problem is occurring with a PublishSubject but Observable can be used to reproduce this problem.

Problem:

I've an uncaught exception in onNext() of the subscriber that propagates the exception to the RxJava's global exception handler which results into a crash. However this is not the desired behavior. The objective is to handle uncaught exceptions gracefully.

Question:

Why is onError() not being called for the uncaught exception? What method(s) can I use to catch such uncaught exceptions?

Quick solution:

I can use RxJavaPlugins.setErrorHandler to cope with this situation but I'd rather keep this solution as the last resort if we don't have any other solution to deal with the problem at hand.

Code:

Observable.just(true)
                .subscribeWith(new DisposableObserver<Boolean>() {
                    @Override
                    public void onNext(Boolean aBoolean) {
                        String x = null;
                        x.getBytes();
                    }

                    @Override
                    public void onError(Throwable e) { <--- NOT CALLED UPON EXCEPTION
                        System.out.println("inside test observable onError");
                        e.printStackTrace();
                    }

                    @Override
                    public void onComplete() {

                    }
                });

Stacktrace:

12-03 11:09:01.454 4750-4750/? E/MDMApplication: UNCAUGHT EXCEPTION IN APP: java.lang.RuntimeException: Unable to start activity ComponentInfo{com.test.mdm.app/mdm.connector.app.MainActivity}: java.lang.NullPointerException
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2195)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:2245)
        at android.app.ActivityThread.access$800(ActivityThread.java:135)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1196)
        at android.os.Handler.dispatchMessage(Handler.java:102)
        at android.os.Looper.loop(Looper.java:136)
        at android.app.ActivityThread.main(ActivityThread.java:5017)
        at java.lang.reflect.Method.invokeNative(Native Method)
        at java.lang.reflect.Method.invoke(Method.java:515)
        at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:779)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:595)
        at dalvik.system.NativeStart.main(Native Method)
     Caused by: java.lang.NullPointerException
        at mdm.connector.app.MainActivity$1.onNext(MainActivity.java:63)
        at mdm.connector.app.MainActivity$1.onNext(MainActivity.java:59)
        at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:103)
        at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:64)
        at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
        at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12036)
        at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
        at io.reactivex.Observable.subscribe(Observable.java:12036)
        at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
        at io.reactivex.Observable.subscribe(Observable.java:12036)
        at io.reactivex.Observable.subscribeWith(Observable.java:12088)
        at mdm.connector.app.MainActivity.onCreate(MainActivity.java:59)
        at android.app.Activity.performCreate(Activity.java:5231)
        at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1087)
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2159)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:2245) 
        at android.app.ActivityThread.access$800(ActivityThread.java:135) 
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1196) 
        at android.os.Handler.dispatchMessage(Handler.java:102) 
        at android.os.Looper.loop(Looper.java:136) 
        at android.app.ActivityThread.main(ActivityThread.java:5017) 
        at java.lang.reflect.Method.invokeNative(Native Method) 
        at java.lang.reflect.Method.invoke(Method.java:515) 
        at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:779) 
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:595) 
        at dalvik.system.NativeStart.main(Native Method) 

Answer:

The solution to the problem above is, instead of using DisposableObserver we should use Consumer & Consumer in the subscribe method. This ensures that if there are any uncaught exceptions in the Consumer accept method, they will be caught in Consumer accept method instead of getting propagated to RxJava's global exception handler and resulting into a crash.

Example:

Observable.just(true)
.subscribe(new Consumer<Boolean> {
  public void accept(Boolean aboolean) {
      String x = null;
      x.getBytes();
  }
}, new Consumer<Throwable> {

  public void accept(Throwable e) {
      e.printStackTrace()
  }
});

If there is another better solution to cope with unhandled exceptions, please suggest that in comments

Question:

I'm using the RxJava and running into the following issue.

    threw exception [Request processing failed; nested exception is java.lang.IndexOutOfBoundsException: Index: 0, Size: 0] with root cause
rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: [Ljava.lang.Object;.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:190) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:257) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:323) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext(OperatorOnErrorResumeNextViaFunction.java:154) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onNext(OperatorTimeoutBase.java:131) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.Observable.unsafeSubscribe(Observable.java:10151) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:228) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.2.0.jar!/:1.2.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_111]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]

The following is the snippet of what I'm trying to do with RxJava

    Observable<A> AObservable = Observable.fromCallable(() ->
            //External Service Call
    ).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorReturn(throwable -> {
                LOGGER.warn(format("Server did not respond within %s ms for id=%s", 800, id));
                return null;
            });

    Observable<B> BObservable = Observable.fromCallable(() ->
            //External Service Call
    ).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorReturn( throwable -> {
                LOGGER.warn(format("Service did not respond within %s ms for id=%s", 800, Id));
                return null;
            });

    // Build Default response
    Observable<C> CObservable = Observable.fromCallable(() ->
            // Build Default one
    ).subscribeOn(Schedulers.io());


    return Observable.zip(AObservable, BObservable,CObservable,
            (AResponse, BResponse, CResponse) -> {

        // Handle response and combine them

    }).toBlocking().first();

I'm testing this locally and its working well but when I deploy it on the aws I run into the above-said issue. Also Please note that I'm not running into an issue for all the ids but only for few ids. I'm fairly new to RxJava could someone point out is there a potential issue with the async code.


Answer:

You can use switchIfEmpty operator to return default value when either of requests timeouts (thus returning empty observable).

Observable<String> AObservable = Observable.fromCallable(() -> {
    Thread.sleep(100);
    return "response A";
}).timeout(800, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .onErrorResumeNext((value) -> {
            return Observable.empty();
        });

Observable<String> BObservable = Observable.fromCallable(() -> {
    Thread.sleep(1500);
    return "response B";
}).timeout(800, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .onErrorResumeNext((value) -> {
            return Observable.empty();
        });

Observable<String> CObservable = Observable.fromCallable(() -> "default response")
        .subscribeOn(Schedulers.io());

String result = Observable.zip(AObservable, BObservable,
        (AResponse, BResponse) -> AResponse + " and " + BResponse)
        .switchIfEmpty(CObservable)
        .singleElement()
        .blockingGet();

System.out.println(result);

You can change Thread.sleep arguments to get different results.

Question:

I'm using the RxJava and running into the following issue.

    threw exception [Request processing failed; nested exception is java.lang.IndexOutOfBoundsException: Index: 0, Size: 0] with root cause
rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: [Ljava.lang.Object;.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:190) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:257) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:323) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onNext(OperatorOnErrorResumeNextViaFunction.java:154) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorSubscribeOn$1$1.onNext(OperatorSubscribeOn.java:53) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onNext(OperatorTimeoutBase.java:131) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeFromCallable.call(OnSubscribeFromCallable.java:33) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.Observable.unsafeSubscribe(Observable.java:10151) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:228) ~[rxjava-1.2.0.jar!/:1.2.0]
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.2.0.jar!/:1.2.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_111]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_111]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]

The following is the snippet of what I'm trying to do with RxJava

    Observable<A> AObservable = Observable.fromCallable(() ->
            //External Service Call
    ).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorReturn(throwable -> {
                LOGGER.warn(format("Server did not respond within %s ms for id=%s", 800, id));
                return null;
            });

    Observable<B> BObservable = Observable.fromCallable(() ->
            //External Service Call
    ).timeout(800, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .onErrorReturn( throwable -> {
                LOGGER.warn(format("Service did not respond within %s ms for id=%s", 800, Id));
                return null;
            });

    // Build Default response
    Observable<C> CObservable = Observable.fromCallable(() ->
            // Build Default one
    ).subscribeOn(Schedulers.io());


    return Observable.zip(AObservable, BObservable,CObservable,
            (AResponse, BResponse, CResponse) -> {

        // Handle response and combine them

    }).toBlocking().first();

I'm testing this locally and its working well but when I deploy it on the aws I run into the above-said issue. Also Please note that I'm not running into an issue for all the ids but only for few ids. I'm fairly new to RxJava could someone point out is there a potential issue with the async code.


Answer:

You can use switchIfEmpty operator to return default value when either of requests timeouts (thus returning empty observable).

Observable<String> AObservable = Observable.fromCallable(() -> {
    Thread.sleep(100);
    return "response A";
}).timeout(800, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .onErrorResumeNext((value) -> {
            return Observable.empty();
        });

Observable<String> BObservable = Observable.fromCallable(() -> {
    Thread.sleep(1500);
    return "response B";
}).timeout(800, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .onErrorResumeNext((value) -> {
            return Observable.empty();
        });

Observable<String> CObservable = Observable.fromCallable(() -> "default response")
        .subscribeOn(Schedulers.io());

String result = Observable.zip(AObservable, BObservable,
        (AResponse, BResponse) -> AResponse + " and " + BResponse)
        .switchIfEmpty(CObservable)
        .singleElement()
        .blockingGet();

System.out.println(result);

You can change Thread.sleep arguments to get different results.

Question:

I encountered to OnErrorNotImplementedException when I'm learning RxJava2. I learned that this exception occur due to isn't implemented error handler to subscriber. So, I thought why don't it exist except onError method. Why?


Answer:

doOnError method doesn't consume the error and eventually it will be thrown in subscriber. In this case if you are not handling in subscriber then OnErrorNotImplementedException will be thrown.

below approaches will consume error as mentioned in subscribe method.

Approach 1:

Observable.just(new Object())
            .doOnNext(o -> { /* /* This method catches the on next but doesn't consume it. */})
            .doOnComplete(() -> { /* test */})
            .doOnError(throwable -> {/* This method catches the error but doesn't consume it. */})
            .subscribe(o ->
                    {/* success */}
                    ,
                    throwable -> {/* error */} // here error will be consumed at the end
            );

Approach 2:

Observable.just(new Object())
            .doOnNext(o -> { /* /* This method catches the on next but doesn't consume it. */})
            .doOnComplete(() -> { /* test */})
            .doOnError(throwable -> {/* This method catches the error but doesn't consume it. */})
            .subscribe(
                    new DefaultObserver<Object>() {
                        @Override
                        public void onNext(Object o) {

                        }

                        @Override
                        public void onError(Throwable e) {
                          // here error will be consumed at the end
                        }

                        @Override
                        public void onComplete() {

                        }
                    }
            );

Overall, You have to consume error otherwise you will face the same error which you've mentioned.

Question:

I'm working with Rxjava2,

I'm using flatmap in RxJava with below structure:

Observable1.flatmap() and return to Observable 2.

as below codes :

getApi().createUser(os, deviceToken)
            .compose(view.regisObserver())
            .subscribeOn(Schedulers.io())
            .flatMap(result -> {
                String user_token = result.data.user_token;
                getPreferenceStore().setAuthToken(user_token);

                setReadPolicy();

                Observable<ObjectDto<UserProfile>> obs = getApi().updateProfile(null, null, null);
                        obs.compose(view.regisObserver());

                return obs;
            })
            .subscribe(result-> {
                getPreferenceStore().setUserId(result.data.user_id);

            //                    view.onUpdateProfile();
            }, Throwable::printStackTrace);

@Override
public <T> ObservableTransformer<T, T> regisObserver() {
    return observable -> observable.compose(prepare())
            .doOnSubscribe(disposable -> showProgressDialog())
            .doOnComplete(this::closeProgressDialog)
            .doOnError(throwable -> {
                if (BuildConfig.DEBUG) {
                    throwable.printStackTrace();
                }
                showProgressDialog();
                closeProgressDialog();
            });

}

The code compiles without error. It got error in run-time NetworkErrorOnMainThread. I don't know how to fix it.


Answer:

You need to specify that you want to subscribe on non-UI thread...for example

.subscribeOn(Schedulers.io())

Actually, looks like you are doing that for updateProfile....just need to also do it for createUser