Hot questions for Using RxJava 2 in rx kotlin

Question:

I have a Observables which emits some numbers and I simply want to take last N elements.

I have following code (I'm using RxKotlin which is simply a wrapper on RxJava):

val list = listOf(1,2,3,4,5,6,7,8,9,10)
Observable.fromIterable(list)
          .buffer(3, 1)
          .lastOrError()
          .subscribe{value -> println(value)}

Unfortunately the result is [10], as I looked closer what the buffer operator returns, I saw this:

[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
[7, 8, 9]
[8, 9, 10]
[9, 10]
[10] 

is there a way to get last "full" buffer -> [8, 9, 10] ?


Answer:

In RxJava, many operators have the name that matches the common language expression of the same operation: take + last N -> takeLast(int n):

Observable.range(1, 10)
   .takeLast(3)
   .toList() // <--  in case you want it as a list
   .subscribe(System.out::println);

Question:

I have a simple setup to a problem but the solution seems to be more complicated.

Setup: I have a hot observable which originates from a scanner that will emit every number as a different element and an R when a code is complete.

Problem: From this I want a hot observable that emits every full code as 1 element.

I tried playing around with different flatMap, takeUntil and groupByoperators but haven't been able to come to a solution.


Answer:

You can use the buffer operator.

PublishSubject<Token<Integer>> s = PublishSubject.create();

Observable<Token<Integer>> markers = s.filter(x->x.isMarker());

s.buffer(markers).subscribe(
    v->{
        Optional<Integer> reduce = v.stream()
            .filter(t->!t.isMarker())
            .map(t->(ValueToken<Integer>)t)
            .map(ValueToken::get)
            .reduce((a,b)->a+b);
        reduce.ifPresent(System.out::println);
    }
);

s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker()); // will emit 25

s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // will emit 17

s.onNext(value(10));
s.onNext(value(7)); // Not emitting yet

I made a class to wrap both values and markers in the flow.

public abstract class Token<T> {
    private static final MarkerToken MARKER = new MarkerToken<>();

    public boolean isMarker() {
        return false;
    }

    public static <T> MarkerToken<T> marker() {
        return MARKER;
    }

    public static <T> ValueToken<T> value(T o) {
        return new ValueToken<>(o);
    }

    public static class ValueToken<T> extends Token<T> {
        T value;

        public ValueToken(T value) {
            this.value = value;
        }

        public T get() {
            return value;
        }
    }

    public static class MarkerToken<T> extends Token<T> {
        public boolean isMarker() {
            return true;
        }
    }

}
update (using scan)

The previous method would emit also on the closing of the stream, with this solution you can emit only complete buffers.

The message class function as an accumulator, it will accumulate tokens until closing marker is accumulated.

When this happens the next message will start from scratch.

The presence of the closing mark as last element marks the message as complete.

public static class Message<T> {
    List<Token<T>> tokens = new ArrayList<>();

    public Message<T> append(Token<T> t) {

        Message<T> mx = new Message<T>();
        if(!isComplete()) {
            mx.tokens.addAll(tokens);
        }
        mx.tokens.add(t);
        return mx;
    }

    public boolean isComplete() {
        int n = tokens.size();
        return n>0 && tokens.get(n-1).isMarker();
    }

    public Optional<List<Token<T>>> fullMessage(){
        return isComplete() ? Optional.of(tokens):Optional.empty(); 
    }
}

Scanning the source you emit a message for each token emitted, then you filter out incomplete message and emit just the one marked as complete.

    s.scan(new Message<Integer>(), (a, b) -> a.append(b))
        .filter(Message::isComplete)
        .map(Message::fullMessage)
        .map(Optional::get).subscribe(v -> {
            System.out.println(v);
        });

    s.onNext(value(12));
    s.onNext(value(13));
    s.onNext(marker());// [V(12), V(13), MARKER]

    s.onNext(value(10));
    s.onNext(value(7));
    s.onNext(marker()); // [V(10), V(7), MARKER]



    s.onNext(value(10));
    s.onNext(value(127));

    s.onComplete(); // Not emitting incomplete messages on the closing of the subject.

Question:

I've got an observable: Observable.create<Boolean> {emitter = it}, to which I push some values. I want it to publish a 'false' value, as soon as some specific time period has passed without any value being pushed to that emitter.

How is that possible with RxJava/Kotlin 2?


Answer:

You can try to combine 2 functions: timeout and onErrorReturn.

Observable.create<Boolean> { 
    // use emitter here
}
     .timeout(3, TimeUnit.SECONDS)
     .onErrorReturn { if (it is TimeoutException) false else throw it } 
     .subscribe { println("onNext $it") }

Updated snippet after clarification. repeatWhen will resubscribe to Observable.

Observable.create<Boolean> { 
    // use emitter here
}
    .timeout(3, TimeUnit.SECONDS)
    .onErrorReturn { if (it is TimeoutException) false else throw it }
    .repeatWhen { it }
    .subscribe { println("onNext $it") }

Question:

I've got a buffered stream, waiting for a predetermined amount of silence time, before publishing a list of elements that have been buffered:

INTEGERS
     .share()
     .buffer(INTEGERS.debounce(DEBOUNCE_TIME,TimeUnit.MILLISECONDS,scheduler))
     .map { durations ->
       ... 
     }

I'd like to make DEBOUNCE_TIME dynamically adjust depending on the average of the buffered items, but I'm having a hard time figuring out how to achieve this.


Answer:

You could defer the debounce, take one item of it and trigger repeat once the new debounce value has been determined:

int DEBOUNCE_TIME = 100;
AtomicInteger debounceTime = new AtomicInteger(DEBOUNCE_TIME);
PublishSubject<Integer> mayRepeat = PublishSubject.create();

AtomicInteger counter = new AtomicInteger();

Observable<Integer> INTEGERS =
        Observable.fromArray(10, 20, 200, 250, 300, 550, 600, 650, 700, 1200)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS)
                .map(w -> counter.incrementAndGet()));

INTEGERS.publish(o ->
        o.buffer(
            Observable.defer(() ->
                o.debounce(
                    debounceTime.get(), TimeUnit.MILLISECONDS)
            )
            .take(1)
            .repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b))
        )
    )
    .map(list -> {
        int nextDebounce = Math.min(100, list.size() * 100);
        debounceTime.set(nextDebounce);
        mayRepeat.onNext(1);
        return list;
    })
    .blockingSubscribe(System.out::println);

This prints:

[1, 2]
[3, 4, 5]
[6, 7, 8, 9]
[10]

Question:

I try to write a transformation function which is used with compose() in order to reduce boilerplate code. It's pretty simple like this:

    fun <R> withSchedulers(): ObservableTransformer<R, R> {
        return ObservableTransformer {
            it.subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
        }
    }

So everytime I want to subscribe to anything on ioThread and listen the result on mainThread, it's just few lines of code:

    Observable.just(1)
        .compose(MyUtilClass.withSchedulers())
        .subscribe()

But there isn't Observable only, but we also have Single, Completable, Maybe and Flowable. So every time I want to combine them with my withSchedulers() function, I have to transform it into the new type (which I don't expect).

For example,

Completable.fromAction { 
        Log.d("nhp", "hello world")
    }//.compose(MyUtilClass.withSchedulers()) <-- This is not compiled
            .toObservable() <--- I have to transform it into Observable
            .compose(MyUtilClass.withSchedulers())
            .subscribe()

So my question is, is there any way to write the above function to use with compose() for any kind of Observable (Single, Completable,...) ? Or we have to write different functions which use ObservableTransformer, SingleTransformer, ....?


Answer:

I created a helper method using reified type :

inline fun <reified T> withSchedulers(): T {
    when (T::class) {
        ObservableTransformer::class  -> return ObservableTransformer<Unit, Unit> {
            it.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        } as T
        SingleTransformer::class      -> return SingleTransformer<Unit, Unit> {
            it.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        } as T
        CompletableTransformer::class -> return CompletableTransformer {
            it.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
        } as T
    }
    throw IllegalArgumentException("not a valid Transformer type")
}

Examples :

    Observable.just("1", "2")
            .compose(withSchedulers<ObservableTransformer<String, String>>())
            .subscribe(System.out::println)

    Single.just(3)
            .compose(withSchedulers<SingleTransformer<Int, Int>>())
            .subscribe(Consumer { println(it) })

    Completable.defer { Completable.complete()  }
            .compose(withSchedulers<CompletableTransformer>())
            .subscribe { println("completed") }

Output :

1
2
3
completed

Probably other ways of doing this, but this came to mind.

Question:

Is any way to create an RxJava2 Observable to notify state changes? like:

private var internalState = "state1"
val state: Observable<String> = Observable(...)
...
fun updateState(newState: String) { ... } // !!! attention: I need to notify all subscribers about this new state

then use it every where like:

// observe on state
state.subscribe(...)

this subscription must be called every state updates


Answer:

After some research:

val source = PublishSubject.create<String>()

var state = "state1"
    get() = field // redundant: only to show it's possible to get state directly: maybe for class internal usages
    set(value) {
        field = value
        source.onNext(field)
    }

Now use normal subscribes or create new Observable from source

source.subscribe(...)