Hot questions for Using RxJava 2 in stream

Question:

How to convert Observable to Publisher in RxJava version 2?

In the first version we have the https://github.com/ReactiveX/RxJavaReactiveStreams project that do exactly what I need. But How can I do it in RxJava 2?


Answer:

Use the following code:

Publisher publisher = observable.toFlowable(BackpressureStrategy.XXX);

As Observable does not implement backpressure, you need to select backpressure strategy when converting. See available choices here.

Or use Flowable instead of Observable in the first place. See here for details.

Question:

I am very excited with new RxJava Sources such as: Single, Maybe, Completable, which make your interfaces classes cleaner and prevent from a lot of mistakes during create of your 'Source' (e.g. forgetting to call onComplete())

But it requires lots of boilerplate to combine them into a complex stream.

E.g. we have common Android situation of loading and caching data. Let's assume we have 2 sources api and cache and we would like to combine it:

public interface Api {
    Single<Integer> loadFromNetwork();
}

public interface Cache {
    Maybe<Integer> loadFromCache(); //maybe because cache might not have item.
}

let's try to combine it:

final Single<Integer> result = cache.loadFromCache()
        .switchIfEmpty(api.loadFromNetwork());

it will not compile, because Maybe doesn't have overload Maybe.switchIfEmpty(Single):Single

so we have to convert everything:

final Single<Integer> result = cache.loadFromCache()
        .switchIfEmpty(api.loadFromNetwork().toMaybe())
        .toSingle();

Another possible way to combine it also requires сonversion:

final Single<Integer> result = Observable.concat(
            cache.loadFromCache().toObservable(),
            api.loadFromNetwork().toObservable()
        ).firstOrError();

So I don’t see any way to use the new sources without many transformations that add code noise and create a lot of extra objects.

Due to such issues, I can't use Single, Maybe, Completable and continue to use Observable everywhere.

So my question is:

  • What are the best practices of combining Single, Maybe, Completable.

  • Why these Sources don't have overloads to make combing easier.

  • Why these Sources don't have common ancestor and use it as parameter of switchIfEmpty and other methods?


P.S. Does anybody know why these classes doesn't have any common hierarchy? From my perspective if some code can work for example with Completable it will also works fine with Single and Maybe?


Answer:

RxJava 2.1.4 that was released on Sep 22, 2017 adds needed overload Maybe.switchIfEmpty(Single):Single.

So in case when we would like to combine following classes:

public interface Api {
    Single<Integer> loadFromNetwork();
}

public interface Cache {
    Maybe<Integer> loadFromCache(); //maybe because cache might not have item.
}

We can finally do:

final Single<Integer> result = cache.loadFromCache()
        .switchIfEmpty(api.loadFromNetwork());

Rx team has done great job by adding extra overloads to Maybe, Single, Observable, that simplifies combining them together.

As for release 2.1.16 we have following methods for combining Maybe, Single and Completable:

Maybe: flatMapSingleElement(Single):Maybe, flatMapSingle(Single):Single, switchIfEmpty(Single):Maybe, flatMapCompletable(Completable):Completable

Single: flatMapMaybe(Maybe):Maybe, flatMapCompletable(Completable):Completable

Completable: andThen(Single):Single, andThen(Maybe):Maybe

Question:

I am trying to use new Android Architecture Components. I'dd like to convert Rx Flowable to LiveData with LiveDataReactiveStreams but the class is missing.

Here are my dependencies:

compile "android.arch.lifecycle:runtime:1.0.0-alpha1" compile "android.arch.lifecycle:extensions:1.0.0-alpha1" kapt "android.arch.lifecycle:compiler:1.0.0-alpha1"

Any ideas?


Answer:

You're missing this dependency:

implementation "android.arch.lifecycle:reactivestreams:1.0.0-beta2"

Question:

I have a lot of Single's in my code, such as

Disposable disp = Single.fromCallable(()-> loadData())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(res-> showInUI(res),
                    throwable -> Log.e(TAG, throwable.getMessage()))
            );

As I understood from the documentation, the difference between Observable and Single is that Single can respond with an error, Never respond, Respond with a success and it emits only once. Now I do not dispose anywhere and everything works fine.

So do I need to execute disp.dispose() at all?


Answer:

Yes, indeed, it doesn't matter whether it is Single/Observable/Completable. It is matter as you don't want to keep your UI bound to a some background work, that will leak your Activity. It's also matter cause you don't want to get notification at the UI beyond some point (after your Activity destroyed for instance) that can cause NPEs or other problems. Besides that, it's a good practive to cancel and stop expensive background work when user leave/close the Activity/Screen is in, in order to clear resources. All of those considerations are common to all Observable types.

Question:

I have an Observable that does never finish. It emits a List<Item>. I need to filter out some of those items every time it emits that list. Currently I have this as a solution:

mData.getItemsObservable() // Observable<List<Item>>
        .compose(...)
        .flatMapSingle(items -> Observable.fromIterable(items)
                .filter(item -> item.someCondition())
                .toList())
        .subscribe(items -> {
            // ...
        }, error -> {
            // ...
        });

Is this the best way to filter out some items? Is there a simpler (more readable) way to do the same?

I've tried this too, but it didn't emit anything:

mData.getItemsObservable() // Observable<List<Item>>
        .compose(...)
        .flatMap(Observable::fromIterable) // or like this: flatMapIterable(items -> items)
        .filter(item -> item.someCondition())
        .toList()
        .subscribe(items -> {
            // ...
        }, error -> {
            // ...
        });

Answer:

The first approach is okay if you want to stick to RxJava. Otherwise, you could use IxJava and perform the filtering directly in a map operation:

mData.getItemsObservable() // Observable<List<Item>>
    .compose(...)
    .map(v -> Ix.from(v).filter(w -> w.someCondition()).toList())
    .subscribe(items -> {
        // ...
    }, error -> {
        // ...
    });

Question:

Looking for Rxjava operator to merge sources into one stream currently have this

  Disposable observable = Observable.concat(
                service.loadPopCells().toObservable(),
                service.loadChallangeData().toObservable(),
                service.loadUserCell().toObservable()
        )              
  .subscribe(data->sendtoViewmodel(data)); // called 3 times

i have 3 stream , so on subscribe is called three time, but i want it to be called once with all data

looking to get something like this

    Disposable observable = Observable.concat(
                service.loadPopCells().toObservable(),
                service.loadChallangeData().toObservable(),
                service.loadUserCell().toObservable()
        )
        // i want to achieve something like this 
        .mapallresult(data,data2,data3){ 
         private List<SimpleCell> shots = new ArrayList<>();
         shots.add(data);
         shots.add(data2);
         shots.add(data2);
         return shots;  }
         ///
         .subscribe(dataList->sendtoViewmodel(dataList); // called once 

Answer:

zip operator will help you:

Observable.zip(
        service.loadPopCells().toObservable(),
        service.loadChallangeData().toObservable(),
        service.loadUserCell().toObservable(),
        (data1, data2, data3) -> Arrays.asList(data1, data2, data3))
        .subscribe(dataList -> sendtoViewmodel(dataList));
    }

Or even shorter:

Observable.zip(
    service.loadPopCells().toObservable(),
    service.loadChallangeData().toObservable(),
    service.loadUserCell().toObservable(),
    Arrays::asList)
    .subscribe(this::sendtoViewmodel);

Question:

There are two issues which I am currently facing.

1) As soon as the line RetrofitProvider.getInstance().getCurrentWeather(.....) is called the network call is being done. How can it be deferred till the observer is connected to it.

2) Once weatherInfoPublisher.onComplete() is called, the next time I call onComplete on this object the new observer's onNext is not getting called.

public Observable<LinkedList<WeatherInfo>> getWeatherData(final String payload, final TempUnit tempUnit) {

        PublishSubject weatherInfoPublisher = PublishSubject.create();

        RetrofitProvider.getInstance().getCurrentWeather(payload + ",us", translateTempUnit(tempUnit))
            .flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String todayResponse) throws Exception {

                    Log.d(TAG, "Received today weather: " + todayResponse);

                    parseTodayData(todayResponse, weatherDataList);
                    return RetrofitProvider.getInstance().getForecastWeather(
                            payload + ",us", translateTempUnit(tempUnit), FORECAST_DAYS);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableObserver<String>() {
                @Override
                public void onNext(String futureResponse) {

                    Log.d(TAG, "Received future weather: " + futureResponse);
                    parseFutureData(futureResponse, weatherDataList);

                    weatherInfoPublisher.onNext(weatherDataList);
                    weatherInfoPublisher.onComplete();
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "The error is, " + e.getMessage());
                }

                @Override
                public void onComplete() {

                }
            });

        return weatherInfoPublisher;
    }

This is a singleton class and the entire implementation has been provided in here in Github Link.


Answer:

How can it be deferred till the observer is connected to it.

Do not subscribe to that observable in this method. Instead return that observable to the client. As soon as the observable is subscribed - a request would be performed.

the next time I call onComplete on this object the new observer's onNext is not getting called.

See reactive stream specs: if a stream completes - it can never be continued, that's a terminal event.

Question:

Basically, I have a stream of words Observable<String>

What I want is to accumulate them, emitting nothing until I have a ".". When I have a "." I want to emit the whole sentence. And so on until the stream is completed.

1) The -> nothing, accumulated
2) quick -> nothing, accumulated
3) brown -> nothing, accumulated
4) fox -> nothing, accumulated
5) jumps -> nothing, accumulated
6) over -> nothing, accumulated
7) the -> nothing, accumulated
8) lazy -> nothing, accumulated
9) dog. -> emit the whole sentence, clear accumulator

scan, reduce - looks similar, but not exactly, not sure


Answer:

There is the extension operator bufferUntil you can use collect until the string contains the punctuation, then join the list of strings into one string:

Flowable.fromArray("The", "quick", "brown", "fox", "jumps",
    "over", "the", "lazy", "dog.",
    "This", "sentence", "is", "false.")
.compose(FlowableTransformers.bufferUntil(v -> v.endsWith(".")))
.map(list -> Strings.join(" ", list))
.test()
.assertResult(
    "The quick brown fox jumps over the lazy dog.",
    "This sentence is false."
);

Question:

I am trying to download and send pdf using Intent to PDF app to display the file as seen here as the answer of JDenais This is the code to download the pdf and pass it via Intent.

public class PdfOpenHelper {

public static void openPdfFromUrl(final String pdfUrl, final Activity activity) {
    Observable.fromCallable(new Callable<File>() {
        @Override
        public File call() throws Exception {
            try {
                URL url = new URL(pdfUrl);
                URLConnection connection = url.openConnection();
                connection.connect();

                // download the file
                InputStream input = new BufferedInputStream(connection.getInputStream());
                File dir = new File(activity.getFilesDir(), "/shared_pdf");
                dir.mkdir();
                File file = new File(dir, "temp.pdf");
                OutputStream output = new FileOutputStream(file);

                byte data[] = new byte[1024];
                long total = 0;
                int count;
                while ((count = input.read(data)) != -1) {
                    total += count;
                    output.write(data, 0, count);
                }

                output.flush();
                output.close();
                input.close();
                return file;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<File>() {
                @Override
                public void onSubscribe(Subscription s) {

                }

                @Override
                public void onNext(File file) {
                    String authority = activity.getApplicationContext().getPackageName() + ".fileprovider";
                    Uri uriToFile = FileProvider.getUriForFile(activity, authority, file);

                    Intent shareIntent = new Intent(Intent.ACTION_VIEW);
                    shareIntent.setDataAndType(uriToFile, "application/pdf");
                    shareIntent.addFlags(Intent.FLAG_GRANT_READ_URI_PERMISSION);
                    if (shareIntent.resolveActivity(activity.getPackageManager()) != null) {
                        activity.startActivity(shareIntent);
                    }
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            });
}
}

But I am getting the error

'Cannot resolve method subscribe(anonymous org.reactivestreams.Subscriber<java.io.File>) on .subscribe(new Subscriber<File>()

I am new to rx java, I don't know what is wrong with code.

Thanks in advance


Answer:

In rx-java2 consumer type changed. Use io.reactivex.Observer to subscribe to io.reactivex.Observable. org.reactivestreams.Subscriber is used only for io.reactivex.Flowable subscriptions.

.subscribe(new Observer<File>() {
        @Override
        public void onSubscribe(Subscription s) {

        }

        @Override
        public void onNext(File file) { 

        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    });

Question:

I have chained a few rx operators together to do multiple tasks. I need to access a field from an object that is in the parent stream, downstream.

ie. How can I access channel.uid downstream?

createThing(panel) // Observable<~>
    .flatMapSingle(
            channel -> {
        return createOrUpdateItem(channel);
    })
    .flatMapCompletable(
            item -> {
        return linkItemToChannel(item.name, /* need access to channel.uid here */ channel.uid);
    });

Answer:

Use Observable.flatmap(Function mapper, BiFunction resultSelector) (or Flowable version). E.g.:

createThing(panel) //I assume that this method returns Observable
    .flatMap(channel -> createOrUpdateItem(channel).toObservable(),
            (channel, item) -> linkItemToChannel(item.name, channel.uid).toObservable())
    .toCompletable();

There is no similar overrided methods for flatMapSingle or flatMapCompletable, so you have to convert your Single and Completable to Observable (or Flowable). Or you can write your own operator ;)

Question:

Below is my code snippet.

I know you are not supposed to block cachedFlowable like this, but this is just an example.

It gets stuck at the blockingGet line.

If I replace singleOrError with singleElement, the code will still get stuck. If I replace singleOrError with firstElement, the code will no longer get stuck.

Can someone please explain to me why this is the case?

    public static void main(String[] args) {
        final Flowable<Integer> cachedFlowable = Flowable.just(1).cache();
        cachedFlowable
                .doOnNext(i -> {
                    System.out.println("doOnNext " + i);
                    final Integer j = cachedFlowable.singleOrError().blockingGet();
                    System.out.println("after blockingGet " + j);
                })
                .blockingSubscribe();
    }

Answer:

The reason it deadlocks with singleX operator is that such operators wait for a possible 2nd item emission but since you are blocking them, any second item or completion from the main source can't get executed. With firstX they only care about the very first item thus unblock almost immediately which allows the source to complete.

So yes, you should not use blocking methods in flows like that but instead use flatMap or concatMap to do a per item subflow:

var cache = Flowable.just(1).cache();

cache
.doOnNext(i -> System.out.println("doOnNext " + i))
.concatMapSingle(item -> cache.firstOrError())
.doOnNext(j -> System.out.println("after " + j))
.blockingSubscribe();

Question:

I'm trying to achieve the following behavior:

  • Have a stream of events periodically polled/generated (short duration, say 1 second)
  • Events are then grouped according to some internal trait.
  • Each group of events is written to a matching file immediately (this is a crucial of the behavior I want to maintain)
  • Files are expected to be reused for matching groups (have the same key) on subsequent events until they are sealed/rotated
  • Upon a longer duration (say, 5 seconds) file are sealed/rotated and acted upon using additional subscribers

I wrote the following sample code to achieve the above behavior:

    private static final Integer EVENTS = 3;
    private static final Long SHORTER = 1L;
    private static final Long LONGER = 5L;
    private static final Long SLEEP = 100000L;

    public static void main(final String[] args) throws Exception {

        val files = new DualHashBidiMap<Integer, File>();

        Observable.just(EVENTS)
                .flatMap(num -> Observable.fromIterable(ThreadLocalRandom.current().ints(num).boxed().collect(Collectors.toList())))
                .groupBy(num -> Math.abs(num % 2))
                .repeatWhen(completed -> completed.delay(SHORTER, TimeUnit.SECONDS))
                .map(group -> {
                    val file = files.computeIfAbsent(group.getKey(), Unchecked.function(key -> File.createTempFile(String.format("%03d-", key), ".txt")));
                    group.map(Object::toString).toList().subscribe(lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true));
                    return file;
                })
                .buffer(LONGER, TimeUnit.SECONDS)
                .flatMap(Observable::fromIterable)
                .distinct(File::getName)
                .doOnNext(files::removeValue)
                .doOnNext(file -> System.out.println("File - '" + file + "', Lines - " + FileUtils.readLines(file, StandardCharsets.UTF_8)))
                .subscribe();
        Thread.sleep(SLEEP);
    }

While it works as expected (set aside thread safety issue for map access for now, I'm using the bidi-map from commons-collections4 just for the sake of demonstration), I was wondering if there's a way to implement the above functionality in a pure RX form, without relying on external map access?

Note that it's crucial for the files to be written immediately upon on group creation, meaning we must make the file live beyond the scope of generated event groups

Thanks in advance.


Answer:

Interesting question.. I could be wrong, but I don't think you can avoid having a Map of Files somewhere in the pipeline.

I think my solution could be further cleaned up, but it seems to accomplish the following:

  • Removes the need for bidirectional mapping
  • Avoids the need to call Map.remove(...)

I'm proposing you treat the Map of Files being written as a distinct Observable, emitting a brand new Map at the slower interval:

    Observable<HashMap<Integer, File>> fileObservable = Observable.fromCallable(
                () -> new HashMap<Integer, File>() )
            .repeatWhen( completed -> completed.delay( LONGER, TimeUnit.SECONDS ));

Then in your event Observable, you can call .withLatestFrom( fileObservable, ( group, files ) -> {...} ) (note: this block is incomplete still):

    Observable.just( EVENTS )
        .flatMap( num -> Observable.fromIterable(
                ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
        .groupBy( num -> Math.abs( num % 2 ))
        .repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
        .withLatestFrom( fileObservable, ( group, files ) -> {

            File file = files.computeIfAbsent(
                    group.getKey(),
                    Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));

            group.map( Object::toString ).toList()
                .subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));

            return files;
        } )

So far so good, you're getting your latest set of Files supplied alongside your events. Next you've got to process the Files. I think you can do that using distinctUntilChanged(). It should be pretty efficient since it will call HashMap.equals(...) under the covers and the Map object's identity isn't changing most of the time. HashMap.equals(...) first checks for same identity.

Since at this point you're really interested in processing the previous set of emitted Files rather than the current, you could use the .scan(( prev, current ) -> {...} ) operator. With that, here's the completed code block from above:

    Observable.just( EVENTS )
        .flatMap( num -> Observable.fromIterable(
                ThreadLocalRandom.current().ints( num ).boxed().collect( Collectors.toList() )))
        .groupBy( num -> Math.abs( num % 2 ))
        .repeatWhen( completed -> completed.delay( SHORTER, TimeUnit.SECONDS ))
        .withLatestFrom( fileObservable, ( group, files ) -> {

            File file = files.computeIfAbsent(
                    group.getKey(),
                    Unchecked.function( key -> File.createTempFile( String.format( "%03d-", key ), ".txt" )));

            group.map( Object::toString ).toList()
                .subscribe( lines -> FileUtils.writeLines(file, StandardCharsets.UTF_8.name(), lines, true ));

            return files;
        } )
        .distinctUntilChanged()
        .scan(( prev, current ) -> {

            Observable.fromIterable( prev.entrySet() )
                .map( Entry::getValue )
                .subscribe( file -> System.out.println( "File - '" + file + "', Lines - " +
                                FileUtils.readLines( file, StandardCharsets.UTF_8 )));

            return current;
        } )
        .subscribe();

    Thread.sleep( SLEEP );

A little lengthier than your original solution, but might solve a couple of issues.

Question:

I have a Flowable stream that concatenates multiple streams together:

  Flowable
    .empty()
    .concatWith(longOperationA())
    .concatWith(longOperationB())
    .onErrorResumeNext(throwable -> {
      // some cleanup tasks
      return Flowable.error(throwable);
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(mySubscriber);

Both longOperationA() and longOperationB() emit items. Regarding what circumstances arise (an error occurs or mySubscriber gets disposed), I want to let my stream act differently. The error case is covered by the onErrorResumeNext() callback, but not the case when mySubscriber becomes disposed.

How can I change my stream to do another task when the subscriber is disposed of?

To give more context about this, I have tried doOnCancel():

Flowable
  .concatWith(longOperationA())
  .concatWith(longOperationB())
  .doOnCancel(() -> {
    // some cleanup tasks
  })
  .onErrorResumeNext(throwable -> { ...

However, doOnCancel() doesn't only get called when mySubscription becomes disposed, but also when longOperationA() (and `longOperationB() respectively) is finished.

Is there any other way to let my stream react to the dispose event?


Answer:

You can use doOnDispose for this:

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#doOnDispose-io.reactivex.functions.Action-

Question:

I have the following snippet and it generates Flowable<String>. I'm not sure how can I make the Files.lines Autoclosable. I needed to pass in iterator as the second argument to read line one by one as its consumed.

Please note that I have not used the third argument (disposeState) as in generate(Callable<S> initialState, BiConsumer<S,Emitter<T>> generator, Consumer<? super S> disposeState) because there is no point in passing iterator as disposeState.

private Flowable<String> generateFlowable(File file) {
    return Flowable.generate(
            () -> Files.lines(Paths.get(file.toURI()), StandardCharsets.UTF_8).iterator(),
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next());
                } else {
                    emitter.onComplete();
                }
            }
    );
}

The lines are consumed and parsed one by one in the other method. When I did run lsof I found that the handler was not closed. Can some suggest me how could we do that?


Answer:

There are two possible ways to automatically close the Flowable. The first one is utilizing Flowable::using:

private Flowable<String> generateFlowable(File file) {
  return Flowable.using(
          () -> Files.lines(file.toPath(), StandardCharsets.UTF_8),
          stream -> Flowable.fromIterable(stream::iterator),
          Stream::close
  );
}

The second one is using Flowable::generate but does use BufferedReader:

private Flowable<String> generateFlowable(File file) {
  return Flowable.generate(
          () -> Files.newBufferedReader(Paths.get(file.toURI()), StandardCharsets.UTF_8),
          (reader, emitter) -> {
            String line = reader.readLine();
            if (line != null) {
              emitter.onNext(line);
            } else {
              emitter.onComplete();
            }
          }, BufferedReader::close);
}

Question:

I want to implement rather simple DAG in RxJava.

We have a source of items:

Observable<String> itemsObservable = Observable.fromIterable(items)

Next, I'd like to have a processor that will subscribe to itemsObservable and will enable to multiple subscribers to subscribe to it.

So I created: PublishProcessor<String> itemsProccessor = PublishProcessor.create();

Unfortunately, this isn't possible: itemsObservable.subscribe(itemsProccessor);

Why? What's the proper API to implement this kind of DAG?

Here's a diagram for demonstration:

Here's my (failed) try to implement this kind of DAG:

List<String> items = Arrays.asList("a", "b", "c");
Flowable<String> source = Flowable.fromIterable(items);

PublishProcessor<String> processor = PublishProcessor.create();
processor.doOnNext(s -> s.toUpperCase());

processor.subscribe(System.out::println);
processor.subscribe(System.out::println);
source.subscribe(processor); 

Answer:

It's because PublishProcessor implements Subscriber while Observable's subscribe method accepts Observer. You can convert your itemsObservable to Flowable and it will do the job.

    Observable<String> items = Observable.fromIterable(Arrays.asList("a","b"));
    PublishProcessor<String> processor = PublishProcessor.create();
    items.toFlowable(BackpressureStrategy.BUFFER)
            .subscribe(processor);

Question:

I've got the following infinite stream which does something every second. What I want is to stop the stream upon error and handle it. How can I achieve that?

void doSomething() {
        Disposable disposable = execute(doSomethingInner(), 0L, TimeUnit.SECONDS, schedulerProvider.io(), someClass -> 1L).doOnError
                (throwable -> {
            Timber.e(throwable, "error happened");// Never triggered
        })
                .doOnNext(someClass -> Timber.i("doing the infinite stuff"))
                .subscribe(Functions.emptyConsumer(), throwable -> {
                    Timber.e(throwable, "stop doing the infinite stuff");// Never triggered
                });
    }

    Observable<SomeClass> doSomethingInner() {
        return Observable.error(new Exception("something went wrong"));
    }

    Observable<SomeClass> execute(Observable<SomeClass> source,
                                  long delayInterval,
                                  TimeUnit timeUnit,
                                  Scheduler scheduler,
                                  Function<SomeClass, Long> interval) {
        return Observable.defer(new Callable<ObservableSource<SomeClass>>() {
            long currentInterval = delayInterval;

            @Override
            public ObservableSource<SomeClass> call() {
                return Single.timer(currentInterval, timeUnit, scheduler)
                        .flatMapObservable(o -> source)
                        .doOnNext(t -> currentInterval = interval.apply(t));
            }
        })
                .repeat()
                .retry();
    }

Answer:

I think retry() is consuming your error. Try to either:

  • Remove this retry() completely
  • or change it to retry(Predicate<Throwable>) to decide whether to repeat.

It is default behavior of subscriber to cancel stream on error if you don't consume it earlier, and you should receive callback to onError() inside subscribe().

Question:

I have a (Flowable) stream of items to be processed in parallel using a single common resource, and the resource must be disposed afterwards. I tried to use Single.using() operator, but it disposes the resource before even the first item in the stream is processed.

Sample program (in Kotlin):

package my.test.rx_task_queue

import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

object TestCommonResource {
    private val logger = LoggerFactory.getLogger(TestCommonResource::class.java)
    @JvmStatic
    fun main(args: Array<String>) {
        val queue = Flowable.fromIterable(1..5)
        val resIdx = AtomicInteger(0)
        val resource = Single.using({
            val res = "resource-${resIdx.incrementAndGet()}"
            logger.info("Init resource $res")
            res
        }, { res ->
            Single.just(res)
        }, { res ->
            logger.info("Dispose resource $res")
        }, false)

        val result = resource.flatMap { res ->
            queue.flatMapSingle({ item ->
                Single.fromCallable {
                    logger.info("Process $item with $res")
                    "$item @ $res"
                }
                        .subscribeOn(Schedulers.io())
            }, false, 2)
                    .toList()
        }
                .blockingGet()
        logger.info("Result: $result")
    }
}

Sample log output:

14:30:27.721 [main] INFO my.test.rx_task_queue.TestCommonResource - Init resource resource-1
14:30:27.744 [main] INFO my.test.rx_task_queue.TestCommonResource - Dispose resource resource-1
14:30:27.747 [RxCachedThreadScheduler-1] INFO my.test.rx_task_queue.TestCommonResource - Process 1 with resource-1
14:30:27.747 [RxCachedThreadScheduler-2] INFO my.test.rx_task_queue.TestCommonResource - Process 2 with resource-1
14:30:27.748 [RxCachedThreadScheduler-3] INFO my.test.rx_task_queue.TestCommonResource - Process 3 with resource-1
14:30:27.749 [RxCachedThreadScheduler-4] INFO my.test.rx_task_queue.TestCommonResource - Process 4 with resource-1
14:30:27.749 [RxCachedThreadScheduler-1] INFO my.test.rx_task_queue.TestCommonResource - Process 5 with resource-1
14:30:27.750 [main] INFO my.test.rx_task_queue.TestCommonResource - Result: [1 @ resource-1, 2 @ resource-1, 3 @ resource-1, 4 @ resource-1, 5 @ resource-1]

Using Flowable.parallel() instead of flatMap() leads to the same result.


Answer:

The disposing happens with the disposal of the source, so if you want to dispose after everything is done, you just need have singleFunction return the whole stream:

object TestCommonResource {
    private val logger = LoggerFactory.getLogger(TestCommonResource::class.java)
    @JvmStatic
    fun main(args: Array<String>) {
        val queue = Flowable.fromIterable(1..5)
        val resIdx = AtomicInteger(0)
        val result = Single.using({
            val res = "resource-${resIdx.incrementAndGet()}"
            logger.info("Init resource $res")
            res
        }, { res ->
            queue.flatMapSingle({ item ->
                Single.fromCallable {
                    logger.info("Process $item with $res")
                    "$item @ $res"
                }
                        .subscribeOn(Schedulers.io())
            }, false, 2)
                    .toList()
        }, { res ->
            logger.info("Dispose resource $res")
        }, false)
                .blockingGet()
        logger.info("Result: $result")
    }
}

Question:

Suppose I have 3 publishers and 1 processor. The publishers emits items in the form {key: <integer>, value: <object>, publisher_id: <string>}.

The publishers makes IO operations, so:

  • On the one hand, I'd like the publishers to work on (roughly) N items at a given moment.
  • On the other hand, I'd like the consumer to merge the items to one single record (i.e. {key: <integer>, values: <list>})

I've actually already implemented a FluxProcessor that has an internal storage (ConcurrentHashMap) to keep all the items. It manually request() new items whenever CAPACITY wasn't reached.

I'd like to know if there's a built-in functionality to do that with RxJava(2)/ Spring Reactor API?


Answer:

Use merge, rebatchRequests and toMultimap with RxJava 2:

Flowable<KeyValuePublisher> source1 = ...
Flowable<KeyValuePublisher> source2 = ...
Flowable<KeyValuePublisher> source3 = ...

Flowable.merge(
    source1.rebatchRequests(N),
    source2.rebatchRequests(N),
    source3.rebatchRequests(N)
)
.toMultimap(kvp -> kvp.key, kvp -> kvp.value)
subscribe(map -> System.out.println(map));

Question:

I want to filter a stream(), the filter must be supplied with a predicate how can i achieve this using RxJava, i am using Room Persistence Database, i want to add new fields if the database does not already contain them, my problem is with the doesDatabaseContainSong() method.

I'm new to RX Java, the code is a mix of Kotlin and Java

    // 1. ADD NEW SONGS TO DATABASE
songs.stream()
        .filter(song -> !doesDatabaseContainSong(song, mViewModel))
        .forEach(this::addSongToDatabase);

I want the following function(doesDatabaseContainSong) to return a boolean instead it returns Single< Int >

public static boolean doesDatabaseContainSong(Song song, SongViewModel model) {
    int result = model.doesDatabaseContainSong(song.getId(), Type.GOOGLE_DRIVE.name());
    return result != 0;
}

The View Model

fun doesDatabaseContainSong(mId: String): Single<Int> {
    return repository.doesDatabaseContainSong(mId)
}

The Repository

fun doesDatabaseContainSong(mId: String): Single<Int> {
    return songDao.doesDatabaseContainSong(mId)
}

The DAO

@Query("SELECT COUNT(id) from song_table WHERE id = :mId")
fun doesDatabaseContainSong(mId: String): Single<Int>

Answer:

The DAO (changed return type to Boolean)

@Query("SELECT COUNT(id) from song_table WHERE id = :mId")
fun doesDatabaseContainSong(mId: String): Single<Boolean>

The Repository (changed return type to Boolean)

fun doesDatabaseContainSong(mId: String): Single<Boolean> {
    return songDao.doesDatabaseContainSong(mId)
}

The View Model (changed return type to Boolean)

fun doesDatabaseContainSong(mId: String): Single<Boolean> {
    return repository.doesDatabaseContainSong(mId)
}

(changed return type to Boolean)

public static Single<Boolean> doesDatabaseContainSong(Song song, SongViewModel model) {
    return model.doesDatabaseContainSong(song.getId(), Type.GOOGLE_DRIVE.name());
}

Now final code:

songs
    .stream()
    .flatMap(doesContainSong -> doesDatabaseContainSong(song, mViewModel))
    .filter(doesContainSong -> !doesContainSong)
    .forEach(this::addSongToDatabase);

Hope this helps.

Question:

The various RxJava2 streams (Single, Maybe etc.--I'll refer to them as Xxx) have a static method .create( XxxOnSubscribe onSubscribe ). The XxxOnSubscribe object passed is supposed to implement the method void subscribe( XxxEmitter emitter ).

An Emitter seems to be like an Observer but with a few differences (it's not exactly an extension of Observer):

  • its .subscribe() method returns void instead of a Disposable
  • it lacks an .onSubscribe() method (which returns a Disposable)
  • it adds methods .setCancellable(), .setDisposable(), .isDisposed(), and .tryOnError().

The docs say "The emitter implementations will dispose/cancel this instance (what instance?) when the downstream cancels the flow or after the event generator logic calls onSuccess(Object), onError(Throwable), onComplete() or when tryOnError(Throwable) succeeds."

Among my questions about this:

  • Why can't an Xxx be created with a normal Observer with a .subscribe() returning a Disposable?
  • Why was .onSubscribe() eliminated from Emitter?
  • What is the rationale for requiring an Emitter?
  • When would you use a Cancellable vs. a Disposable?
  • How does the addition of .tryOnError() help things?

Is there a fuller explanation of this scheme available?

(Possibly related: my Android app seems to be experiencing some odd behavior of the .retry() operator. Does .retry() call .dispose() upstream before attempting to resubscribe?)


Answer:

what instance?

Should be clear from the context of the preceding sentence in the docs:

The emitter allows the registration of a single resource, in the form of a Disposable or Cancellable via setDisposable(Disposable) or setCancellable(Cancellable) respectively. The emitter implementations will dispose/cancel this instance when the downstream cancels the flow or after the event generator logic calls onSuccess(Object), Emitter.onError(Throwable), Emitter.onComplete() or when tryOnError(Throwable) succeeds.

The instance of a Disposable or Cancellable.

Why can't an Xxx be created with a normal Observer

We don't want the user to call the onSubscribe method as it has no use in the operators. We can't hide methods so we designed a separate interface with only the supported methods.

with a .subscribe() returning a Disposable?

Because of the so-called synchronous cancellation problem. If your method never returns for some reason, it can't return a Disposable and the downstream has no means to cancel the flow.

Why was .onSubscribe() eliminated from Emitter

There is no reason to call it as the operator will handle the call to Observer.onSubscribe for the downstream for you before the subscribe method of the emitter implementation gets invoked.

What is the rationale for requiring an Emitter?

API design. We provide a clear set of methods that can be called without exposing methods that should not or would not work anyway.

When would you use a Cancellable vs. a Disposable?

Please read the javadoc and signature. Use whichever is convenient for your resource cancellation needs.

How does the addition of .tryOnError() help things?

Please read the javadoc.

Is there a fuller explanation of this scheme available?

Please see each example in the various method javadocs.

Does .retry() call .dispose() upstream before attempting to resubscribe?

In theory, there is no need to call dispose when an error has been received but the current implementation may do it regardless. I have to think about if this needs to change in RxJava or not.

Question:

I have a list of ids, and have a method which takes an id and return Observable<Boolean>, it is doing an operation using the ID and return true if success.

let's say that I have user id (1), and I need to update his profile with this method Observable<Boolean> updateProfile(int id) , that's ok and working fine.

what I need now is creating method for multiple id's, and if all profiles are updated return true. it may has this signature Observable<Boolean> updateAllProfiles(int[] ids)

How to achieve something like this?


Answer:

Assuming you want to update each profile separately and return true after all updates end, you can use combination of flatMap and reduce:

Observable<Boolean> updateAllProfiles(Integer[] ids) {
    return Observable.from(ids)
            .flatMap(id -> updateProfile(id))
            .reduce((prevResult, currResult) -> prevResult && currResult);
}

and the use:

updateAllProfiles(new Integer[]{0, 1, 2, 3, 4})
            .subscribe(updateAllSucceed -> { //do something with result});

this will fire all update in parallel (assuming each update profile will act on Scheduler.io or alike that create new thread) and will accumulate all results indication and return true/false accordingly. BTW, you might want to consider Completable instead of Observable<Boolean> which is well suits to 'void' update methods (you can read my explanation here)

Question:

I am trying to use Retrofit and RxJava to make an API call within a custom view in an app that I am working on, but I encounter an incompatible type error when trying to subscribe to the Observable from my Retrofit API call.

ApiService

public interface ApiService {
@GET("airline-tickets.php")
Observable<List<Ticket>> getStarredRepositories();
}

ApiClient

public class ApiClient {
private static final String GITHUB_BASE_URL = "https://api.androidhive.info/json/";
private ApiService apiService;
private static ApiClient instance;

public ApiClient() {
    final Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(GITHUB_BASE_URL)
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build();
    apiService = retrofit.create(ApiService.class);
}

public static ApiClient getInstance() {
    if (instance == null) {
        instance = new ApiClient();
    }
    return instance;
}

public Observable<List<Ticket>> getStarredRepos() {
    return apiService.getStarredRepositories();
}
}

MainActivity

public class MainActivity extends AppCompatActivity {

private static final String TAG = MainActivity.class.getName();

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    Subscription subscription= (Subscription) ApiClient.getInstance()
            .getStarredRepos()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<List<Ticket>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<Ticket> tickets) {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
}

This part of code showing Inconvertible types; cannot cast 'void' to 'org.reactivestreams.Subscription

Subscription subscription= (Subscription) ApiClient.getInstance()
            .getStarredRepos()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<List<Ticket>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<Ticket> tickets) {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });

Answer:

Use subscribeWith which returns Observer that you passed onto that method. You are probably using old RxJava 1 code. RxJava 2 API changed a bit. Observable now accepts Observer and you should change Subscription to Observer