Hot questions for Using RxJava 2 in lambda

Question:

I have an example Java code using method reference that I want to rewrite to Kotlin. Java version is using method reference, solution is short and clear. But on the other hand I cannot use method reference in Kotlin. The only version that I've managed to write is one presented below. It seems like Function3 { s: String, b: Boolean, i: Int -> combine(s, b, i) } could be written in cleaner way (if possible method reference would be perfect).

I'm new to Kotlin so I'll be grateful for any clues.

Java

import io.reactivex.Observable;

public class TestJava {

    Observable<String> strings() {
        return Observable.just("test");
    }

    Observable<Boolean> booleans() {
        return Observable.just(true);
    }

    Observable<Integer> integers() {
        return Observable.just(1);
    }

    void test() {
        Observable.combineLatest(strings(), booleans(), integers(),
                this::combine);
    }

    double combine(String s, boolean b, int i) {
        return 1.0;
    }
}

Kotlin

import io.reactivex.Observable
import io.reactivex.functions.Function3

class TestKotlin {

    fun strings(): Observable<String> {
        return Observable.just("test")
    }

    fun booleans(): Observable<Boolean> {
        return Observable.just(true)
    }

    fun integers(): Observable<Int> {
        return Observable.just(1)
    }

    fun test() {
        Observable.combineLatest(strings(), booleans(), integers(),
                Function3 { s: String, b: Boolean, i: Int -> combine(s, b, i) })
    }

    fun combine(s: String, b: Boolean, i: Int): Double {
        return 1.0
    }
}

EDIT

Following method references in Kotlin gives an error.

fun test() {
    Observable.combineLatest(strings(), booleans(), integers(), this::combine)
}

fun test() {
    Observable.combineLatest(strings(), booleans(), integers(), TestKotlin::combine)
}

None of the following functions can be called with the arguments supplied: @CheckReturnValue @SchedulerSupport public final fun combineLatest(p0: ((Observer) -> Unit)!, p1: ((Observer) -> Unit)!, p2: ((Observer) -> Unit)!, p3: ((???, ???, ???) -> ???)!): Observable<(???..???)>! defined in io.reactivex.Observable @CheckReturnValue @SchedulerSupport public open fun combineLatest(p0: ObservableSource!, p1: ObservableSource!, p2: ObservableSource!, p3: io.reactivex.functions.Function3!): Observable<(???..???)>! defined in io.reactivex.Observable @CheckReturnValue @SchedulerSupport public open fun combineLatest(p0: Function!, out (???..???)>!, p1: Int, vararg p2: ObservableSource!): Observable<(???..???)>! defined in io.reactivex.Observable

EDIT 2

RxKotlin solves issue mentioned in answer.

import io.reactivex.rxkotlin.Observables

class TestKotlin {

    fun test() {
        Observables.combineLatest(strings(), booleans(), integers(), this::combine)
    }

}

Answer:

You can also using Function Reference Expression in Kotlin just like as Method Reference Expression in Java. for example, the combine function reference as KFunction3 in Kotlin as below:

val f: kotlin.reflect.KFunction3<String, Boolean, Int, Double> = this::combine

In java, Method Reference Expression can assign to any compatible Functional Interface, but you can't assign a Function Reference Expression to any Function types even if they are compatible, for example:

val f:io.reactivex.functions.Function3<String,Boolean,Int,Double> =this::combine
//                                                type mismatch error     ---^

Indeed, Kotlin using SAM Conversions to convert lambda/Function Reference Expression into an implementation of Java Functional Interface, which means Kotlin does something like as below:

fun test() = TODO()

val exector:Executor = TODO()

exector.execute(::test)

//::test compile to java code as below:
Runnable task = new Runnable(){
   public void run(){
      test();
   }
};

Why did Method Reference Expression can works fine in Java, but using Function Reference Expression is failed in Kotlin?

Base on the above, and you can see there are many overloaded combineLatest methods in rx-java2. for example, the following two can't make Kotlin using Function Reference Expression correctly:

combineLatest(
       ObservableSource<out T1>,
       ObservableSource<out T2>,
       ObservableSource<out T3>, 
       Function3<T1, T2, T3, out R>
)

combineLatest(
       ObservableSource<out T1>,
       ObservableSource<out T2>,
       ObservableSource<out T3>, 
       ObservableSource<out T4>, 
       Function4<T1, T2, T3, T4, out R>
)

As I said Kotlin using SAM Conversions to convert a lambda/Function Reference Expression to a Java Functional Interface, but it can't assign to a Java Functional Interface directly.

The 4th parameter of the combineLatest method in Kotlin, the compiler can't infer its type at all, since the 4th parameter type can be io.reactivex.functions.Function3 or ObservableSource. because ObservableSource is also a Java Functional Interface.

When you meet SAM Conversion conflict like as this, you can using an adapter function that converts a lambda to a specific SAM type, for example:

typealias RxFunction3<T1, T2, T3, R> = io.reactivex.functions.Function3<T1,T2,T3,R>

val f: RxFunction3<String, Boolean, Int, Double> = RxFunction3{ s, b, i-> 1.0}

So you must using an adaption before using lambda/Function Reference Expression, for example:

typealias RxFunction3<T1, T2, T3, R> = io.reactivex.functions.Function3<T1,T2,T3,R>

fun <T1,T2,T3,R> KFunction3<T1,T2,T3,R>.toFunction3(): RxFunction3<T1, T2,T3,R> {
    return RxFunction3 { t1, t2, t3 -> invoke(t1, t2, t3) }
}

Then you can using Function Reference Expression as below, for example:

Observable.combineLatest(
        strings(),
        booleans(), 
        integers(), 
        //             v--- convert KFunction3 to RxFunction3 explicitly
        this::combine.toFunction3()
)

Question:

I am trying to subscribe observable like :

List<String> colors = Arrays.asList("RED", "BLACK", "WHITE", "GREEN", "YELLOW", "BROWN", "PURPUL", "BLUE");
    Observable.just(colors).subscribe(s -> System.out.println(s));

It's working fine but if I use method reference compiler gives error "void is not a functional iterface"

Any one can explain little bit deep? As per me subscriber accept consumer functional interface, which doesn't return anything but we can print stream data like :

 Observable.just(colors).subscribe(s -> System.out::println);// NOT COMPILE

Answer:

Your method reference syntax is wrong. Change it like so,

Observable.just(colors).subscribe(System.out::println);

Question:

Code:

source.subscribe(
                string -> {
                    if( observed ) destination.onNext( reverse( string ) );
                }
        );



    private String reverse( String forward ) {
        return new StringBuilder( forward ).reverse().toString();
    }

source is declared ObservableSource<String>.

destination is Observer<String>.

Android Studio underlines string in the call to reverse and tells me "reverse(java.lang.String) in StringReverser (my enclosing class) cannot be applied to (<lambda parameter>)".

I don't get it. source is delivering Strings, destination consumes Strings. and the reverse method takes a String parameter. What's the problem?


Answer:

As @Andreas pointed out in the comments, I was treating an RxJava 2 Observer like an old-style RxJava 1 Subscriber. Doesn't work! Once I implemented all 4 Observer methods (onSubscribe(), onNext(), onError() and onComplete()) the error went away.

Question:

I'm making an android app and trying to either add a Map Marker to a map if it's the first api call or just re-set the position of the marker. I'm using RxJava2 for repeated calls to the API. The problem is I cannot check if it's the first API call as I'm not allowed to access the non-final boolean.

boolean firstReposition = true;
        //Create position call
        ISSPositionService service = ServiceGenerator.createService(ISSPositionService.class);

        //create observable
        Observable<ISSPositionData> issPositionCall = service.getPosition();

        Disposable disposable = issPositionCall.subscribeOn(Schedulers.io())
                .repeatWhen(completed -> completed.delay(30, java.util.concurrent.TimeUnit.SECONDS))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(positionData -> {
                    LatLng currentIssPosition = new LatLng(positionData.getIssPosition().getLatitude(), positionData.getIssPosition().getLongitude());
                    if (firstReposition) {
                        issMarkerOptions.position(currentIssPosition);
                        map.addMarker(issMarkerOptions);
                        firstReposition = false;
                    }
                    else {
                        issMarker.setPosition(currentIssPosition);
                    }

                    //animate camera so it shows current position
                    map.animateCamera(CameraUpdateFactory.newLatLng(currentIssPosition));
                });

How would I rewrite the code so I'm able to check and set the boolean?


Answer:

Use AtomicBoolean, you can use set() and get() methods java-docs

AtomicBoolean firstReposition = new AtomicBoolean(true);
    //Create position call
    ISSPositionService service = ServiceGenerator.createService(ISSPositionService.class);

    //create observable
    Observable<ISSPositionData> issPositionCall = service.getPosition();

    Disposable disposable = issPositionCall.subscribeOn(Schedulers.io())
            .repeatWhen(completed -> completed.delay(30, java.util.concurrent.TimeUnit.SECONDS))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(positionData -> {
                LatLng currentIssPosition = new LatLng(positionData.getIssPosition().getLatitude(), positionData.getIssPosition().getLongitude());
                if (firstReposition) {
                    issMarkerOptions.position(currentIssPosition);
                    map.addMarker(issMarkerOptions);
                    firstReposition.set(false);
                }
                else {
                    issMarker.setPosition(currentIssPosition);
                }

                //animate camera so it shows current position
                map.animateCamera(CameraUpdateFactory.newLatLng(currentIssPosition));
            });