Hot questions for Using RxJava 2 in rx binding

Question:

When I turned off Internet and swipe to refresh on first time, everything works ok (refreshing disabling, and showing NetworkErrorView), but when I swipe to refresh on second time, refreshing status freezing, and as I understand in SingleObserver not calling neither onSuccess (because internet off, okay), nor onError, which should calling because Internet is off, as a result doAfterTerminate also not calling.

By the way,

dataManager.getCitiesFromDb() return Observable(City) and dataManager.getCityConditionsResponse() return Single(List(City))

MainActivity (onCreate)

presenter.setRefreshObservable(RxSwipeRefreshLayout.refreshes(swipeRefreshLayout));

Presenter

@Override
public void setRefreshObservable(Observable<Object> observable) {
    observable
            .flatMapSingle(l -> getCitiesListObservable()
                    .flatMap(list -> Single.fromObservable(Observable.fromIterable(list)))
                    .map(city -> city))
            .toList()
            .subscribe(new SingleObserver<List<City>>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposables.add(d);
                }

                @Override
                public void onSuccess(List<City> list) {
                    view.showCitiesList(list);
                }

                @Override
                public void onError(Throwable e) {
                    view.showNetworkErrorView();
                }
            });
}

private Single<List<City>> getCitiesListObservable() {
    return dataManager.getCitiesFromDb()
            .flatMapSingle(city ->
                    dataManager.getCityConditionsResponse(city.getQuery())
                            .map(response -> {
                                city.setTemp(response.getTemp());
                                city.setIcon(response.getIcon());
                                return city;
                            })
            )
            .toList()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doAfterTerminate(() -> view.hideRefreshingStatus());
}

Answer:

If an observable emits an error in RxJava, it is terminated so you can not re-use that stream anymore. Here in your case, when your network request (probably dataManager.getCityConditionsResponse method) gives you error because of internet, your stream is broken. To handle this you have to add RxJava's onErrorReturn to your network request to not emit error. As, it won't emit error to down stream, your stream will not be broken. Eventually, it can continue emitting more item.

Question:

I am using RxBinding for Android widgets. I would like to do the following: Observable<java.lang.Void> obs = RxView.clicks(button); But I get a compile time error saying that the expected type is kotlin.Unit. RxView.clicks(button) returns an Observable<Unit>but I don't think that Java has a Unit datatype. How do I get an Observable<Void> in Java?


Answer:

You can live with Observable<Unit> in Java. kotlin.Unit is a class file available for java programs as soon as kotlin-stdlib-<some version>.jar is in your class path, and it is there already because it is required by RxBinding.

If, however, some other part of your program requires Observable<Void>, it can be obtained with:

 Observable<Unit> ou = RxView.clicks(button);
 Observable<Void> ov = ou.as(unit->null);