Hot questions for Using RxJava 2 in testing

Question:

I have been doing TDD in Kotlin for these past few weeks now in Android using MVP. Things have been going well.

I use Mockito to mock classes but I can't seem to get over on how to implement one of the tests I wanted to run.

The following are my tests:

  1. Call api, receive list of data, then show list. loadAllPlacesTest()
  2. Call api, receive empty data, then show list. loadEmptyPlacesTest()
  3. Call api, some exception happen on the way, then show error message. loadExceptionPlacesTest()

I have tests for #1 and #2 successfully. The problem is with #3, I'm not sure how to approach the test in code.

RestApiInterface.kt

interface RestApiInterface {

@GET(RestApiManager.PLACES_URL)
fun getPlacesPagedObservable(
        @Header("header_access_token") accessToken: String?,
        @Query("page") page: Int?
): Observable<PlacesWrapper>
}

RestApiManager.kt the manager class implementing the interface looks like this:

open class RestApiManager: RestApiInterface{
var api: RestApiInterface
    internal set
internal var retrofit: Retrofit
init {
    val logging = HttpLoggingInterceptor()
    // set your desired log level
    logging.setLevel(HttpLoggingInterceptor.Level.BODY)

    val client = okhttp3.OkHttpClient().newBuilder()
            .readTimeout(60, TimeUnit.SECONDS)
            .connectTimeout(60, TimeUnit.SECONDS)
            .addInterceptor(LoggingInterceptor())  
            .build()


    retrofit = Retrofit.Builder()
            .baseUrl(BASE_URL)
            .client(client)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())//very important for RXJAVA and retrofit
            .build()
    api = retrofit.create(RestApiInterface::class.java)
}
override fun getPlacesPagedObservable(accessToken: String?, page: Int?): Observable<PlacesWrapper> {
    //return throw Exception("sorry2")
    return api.getPlacesPagedObservable(
            accessToken,
            page)
}
}

}

Here is my unit test:

class PlacesPresenterImplTest : AndroidTest(){

lateinit var presenter:PlacesPresenterImpl
lateinit var view:PlacesView
lateinit var apiManager:RestApiManager
//lateinit var apiManager:RestApiManager

val EXCEPTION_MESSAGE1 = "SORRY"

val MANY_PLACES = Arrays.asList(PlaceItem(), PlaceItem());
var EXCEPTION_PLACES = Arrays.asList(PlaceItem(), PlaceItem());


val manyPlacesWrapper = PlacesWrapper(MANY_PLACES)
var exceptionPlacesWrapper = PlacesWrapper(EXCEPTION_PLACES)
val emptyPlacesWrapper = PlacesWrapper(Collections.emptyList())

@After
fun clear(){
    RxJavaPlugins.reset()
}
@Before
fun init(){
    //MOCKS THE subscribeOn(Schedulers.io()) to use the same thread the test is being run on
    //Schedulers.trampoline() runs the test in the same thread used by the test
    RxJavaPlugins.setIoSchedulerHandler { t -> Schedulers.trampoline() }

    view = Mockito.mock<PlacesView>(PlacesView::class.java)
    apiManager = Mockito.mock(RestApiManager::class.java)
    presenter = PlacesPresenterImpl(view,context(), Bundle(), Schedulers.trampoline())
    presenter.apiManager = apiManager

    //exceptionPlacesWrapper = throw Exception(EXCEPTION_MESSAGE1);
}


@Test
fun loadAllPlacesTest() {
    Mockito.`when`(apiManager.getPlacesPagedObservable(Mockito.anyString(), Mockito.anyInt())).thenReturn(Observable.just(manyPlacesWrapper))

    presenter.__populate()
    Mockito.verify(view, Mockito.atLeastOnce()).__showLoading()
    Mockito.verify(view, Mockito.atLeastOnce())._showList()
    Mockito.verify(view).__hideLoading()
    Mockito.verify(view).__showFullScreenMessage(Mockito.anyString())
}

@Test
fun loadEmptyPlacesTest() {

    Mockito.`when`(apiManager.getPlacesPagedObservable(Mockito.anyString(), Mockito.anyInt())).thenReturn(Observable.just(emptyPlacesWrapper))
    presenter.__populate()
    Mockito.verify(view, Mockito.atLeastOnce()).__showLoading()
    Mockito.verify(view, Mockito.atLeastOnce())._showList()
    Mockito.verify(view).__hideLoading()
    Mockito.verify(view).__showFullScreenMessage(Mockito.anyString())
}

@Test
fun loadExceptionPlacesTest() {
    Mockito.`when`(apiManager.getPlacesPagedObservable(Mockito.anyString(), Mockito.anyInt())).thenThrow(Exception(EXCEPTION_MESSAGE1))
    presenter.__populate()
    Mockito.verify(view, Mockito.atLeastOnce()).__showLoading()
    Mockito.verify(view, Mockito.never())._showList()
    Mockito.verify(view).__hideLoading()
    Mockito.verify(view).__showFullScreenMessage(EXCEPTION_MESSAGE1)
}
}

PlacesPresenterImpl.kt This is the presenter.

   class PlacesPresenterImpl
constructor(var view: PlacesView, var context: Context, var savedInstanceState:Bundle?, var mainThread: Scheduler)
: BasePresenter(), BasePresenterInterface, PlacesPresenterInterface {

lateinit var apiManager:RestApiInterface
var placeListRequest: Disposable? = null


override fun __firstInit() {
    apiManager = RestApiManager()
}

override fun __init(context: Context, savedInstanceState: Bundle, view: BaseView?) {
    this.view = view as PlacesView
    if (__isFirstTimeLoad())
        __firstInit()
}


override fun __destroy() {
    placeListRequest?.dispose()
}

override fun __populate() {
    _callPlacesApi()
}


override fun _callPlacesApi() {
    view.__showLoading()
    apiManager.getPlacesPagedObservable("", 0)
            .subscribeOn(Schedulers.io())
            .observeOn(mainThread)
            .subscribe (object : DisposableObserver<PlacesWrapper>() {
                override fun onNext(placesWrapper: PlacesWrapper) {
                    placesWrapper?.let {
                        val size = placesWrapper.place?.size
                        view.__hideLoading()
                        view._showList()
                        System.out.println("Great I found " + size + " records of places.")
                        view.__showFullScreenMessage("Great I found " + size + " records of places.")
                    }
                    System.out.println("onNext()")
                }

                override fun onError(e: Throwable) {
                    System.out.println("onError()")
                    //e.printStackTrace()
                    view.__hideLoading()
                    if (ExceptionsUtil.isNoNetworkException(e)){
                        view.__showFullScreenMessage("So sad, can not connect to network to get place list.")
                    }else{
                        view.__showFullScreenMessage("Oops, something went wrong. ["+e.localizedMessage+"]")
                    }

                    this.dispose()
                }

                override fun onComplete() {
                    this.dispose()
                    //System.out.printf("onComplete()")
                }
            })


}

private fun _getEventCompletionObserver(): DisposableObserver<String> {
    return object : DisposableObserver<String>() {
        override fun onNext(taskType: String) {
            //_log(String.format("onNext %s task", taskType))
        }

        override fun onError(e: Throwable) {
            //_log(String.format("Dang a task timeout"))
            //Timber.e(e, "Timeout Demo exception")
        }

        override fun onComplete() {
            //_log(String.format("task was completed"))
        }
    }
}}

Problem/Questions for the loadExceptionPlacesTest()

  1. I'm not sure why the code doesn't go to the Presenter's onError(). correct me if I'm wrong the following but this is what I think:
  •     a - `apiManager.getPlacesPagedObservable("", 0)` observable itself throws an Exception that is why the `.subscribe()` can not happen/proceed and the methods of the observer won't get called,
  •     b - it will only go to onError() when the operations inside the observable encounters an Exception like JSONException
  1. For loadExceptionPlacesTest() I think the 1b above is the way to go to make the presenter's onError() get called and make the test pass. Is this correct? If it is how to do it on the test. If it is not can you guys point out what I am missing or doing wrong?

Answer:

I'll leave this here for future reference and to be able to elaborate a bit more, even though I've answered in the comments.

What you're trying to accomplish is to put the stream in the onError flow. Unfortunately, by mocking it like this:

Mockito.`when`(apiManager.getPlacesPagedObservable(
                   Mockito.anyString(), Mockito.anyInt()))
                           .thenThrow(Exception(EXCEPTION_MESSAGE1))

You're actually telling Mockito to setup your mock in a way that just calling apiManager.getPlacesPagedObservable(anystring, anystring) should thrown an exception.

It is indeed true that throwing an exception inside an Rx stream will cause the entire stream to stop and end up in the onError method. However, this is exactly the problem with the approach you're using. You're not inside the stream when the exception is thrown.

Instead what you want to do is tell Mockito that once you call apiManager.getPlacesPagedObservable(anystring, anystring) you want to return a stream that will end up in the onError. This can be easily achieved with Observable.error() like so:

Mockito.`when`(apiManager.getPlacesPagedObservable(
               Mockito.a‌​nyString(), Mockito.anyInt()))
                    .thenReturn(Observable.error(
                                Exception(EXCEPTION_MESSAGE1)))

(It might be possible that you need to add some type information in this part here Observable.error(), you might also need to use something else instead of an observable - single, completable, etc.)

The mocking above will tell Mockito to setup your mock to return an observable that will error as soon as it's subscribed to. This will in turn put your subscriber directly in the onError stream with the specified exception.

Question:

I'm having trouble identifying if a Completable has been subscribed to in my tests, for example:

interface IWebApi {
    Observable<Data> download();
}

interface IDataRepository {
    Completable insert(Data data);
}

class SyncService {

    IWebApi webApi;
    IDataRepository repository;

    public SyncService(IWebApi webApi, IDataRepository repository) {
        this.webApi = webApi;
        this.repository = repository;
    }

    public Completable sync() {
        return webApi.download()
            .flatMapCompletable((Data data) -> { repository.insert(data) })
    }
}

And then in my tests:

@Test
public void syncTest() {
    Data data = new Data();
    IDataRepository repository = mock (IDataRepository.class);
    IWebApi webApi = mock (IWebApi.class);

    given(webApi.download()).willReturn(Observable.just(data));
    given(repository.insert(data)).willReturn(Completable.complete());

    TestObserver<Void> observer = new TestObserver<Void>();
    SyncService service = new SyncService(webApi, repository);
    service.sync()
            .subscribe(observer);

    observer.assertComplete();
    verify(repository).insert(data);
}

This test will pass. But I Could rewrite the sync method without using flatMapCompletable like this:

    public Completable sync() {
        return webApi.download()
            .doOnNext((Data data) -> { repository.insert(data) })
            .ignoreElements();
    }

Then my tests would pass but code would not work because even thought I've called the insert method, I've not called subscribe() in it.

How should I proceed about this?

P.S I am new to RxJava so if I am not using the best practices I would love to know :)

Update

Fixed the mistake of not calling .ingnoreElements() pointed by Maxim Ostrovidov


Answer:

You can use test() operator for convinience:

SyncService service = new SyncService(webApi, repository);
TestObserver observer = service.sync().test();

But I Could rewrite the sync method without using flatMapCompletable like this:

public Completable sync() {
    return webApi.download()
        .doOnNext((Data data) -> { repository.insert(data) })
}

This won't compile because doOnNext is used only to invoke actions on items, not changing the stream return type. In your case method is expecting Completable but actually it will be Observable<Data>.

Even if you force change the eventual stream type:

public Completable sync() {
    return webApi.download()
        .doOnNext((Data data) -> { repository.insert(data) })
        .ignoreElements(); //converts to Completable
}

repository.insert(data) will not be invoked because doOnNext is not subscribing to anything you passed and not returning anything:

//under the hood of lambdas
.doOnNext(new Consumer<Data>() {
    @Override
    public void accept(Data data) throws Exception {

    }
})

Your initial code is the best for what you want to achieve:

public Completable sync() {
    return webApi.download()
        .flatMapCompletable((Data data) -> { repository.insert(data) })
}

flatMapCompletable subscribes to passed Completable using items emitted by Observable:

.flatMapCompletable(new Function<Data, CompletableSource>() {
    @Override
    public CompletableSource apply(Data data) throws Exception {
        return repository.insert(data);
    }
})

Edit

To test the fact of repository.insert subscription you can use another TestObserver and Completable with applied doOnSubscribe:

TestObserver repositoryInsertObserver = TestObserver.create();
Completable insertCompletable = Completable.complete()
    .doOnSubscribe(d -> repositoryInsertObserver.onSubscribe(d));

//pass completable to your mock
given(repository.insert(data)).willReturn(insertCompletable);

//and verify that subscription took place
repositoryInsertObserver.assertSubscribed();

Question:

As you can see from the code below, I am attempting to test the behavior of the doOnComplete() call occurring within my repository. However, when I mock the injected dependency of my client where I return items using Observable.just(), the doOnComplete() is no longer invoked. I believe this is intentional as per RxJava2. I'm unsure as to how to work around this.

@Singleton
public class Repository {
    private SomeNetworkClient client;
    private SomeCache cache;

    @Inject
    public Repository(SomeNetworkClient client, SomeCache cache) {
        this.client = client;
        this.cache = cache;
    }

    public Observable<SomeItem> getSomeItem() {
        return client.getSomeItem()
                .doOnComplete(() -> cache.doThisOnComplete())
                .doOnError(throwable -> someError);
    }
}

public class RepositoryTest {
    private Repository testedRepository;
    @Mock
    SomeNetworkClient client;
    @Mock
    SomeCache cache;
    @Mock
    SomeItem someItem;

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        testedRepository = new Repository(client, cache);
        when(client.getSomeItem())
            .thenReturn(Observable.just(someItem));
    }

    @Test
    public void thisTestFails() {
        testedRepository.getSomeItem().blockingFirst();
        // This fails because it appears that mocking with
        // Observable.just() makes doOnComplete() not be invoked.
        verify(cache.doThisOnComplete());
    }
}

Answer:

The problem in you code is, that blockingFirst() won't care to listen for complete event to happen. It will immediately return the first item from the stream and dispose from observable.

Instead, you may perform assertion this way:


    testedRepository
            .getSomeItem()
            .test()
            .assertComplete()

Question:

In this test, those two lists of strings are the same, but the assertValue method is telling me they are not equal because they have different memory addresses. How can I make it test against the actual strings in the array, so the following will pass?

@Test
public void shouldReturnStringList() {;
    String[] strings = {"Hello", "World"};
    String[] strings2 = {"Hello", "World"};
    Observable<String[]> stringsObservable = Observable.just(strings);

    TestObserver<String[]> testObserver = new TestObserver<>();
    stringsObservable.subscribe(testObserver);

    testObserver.assertValue(strings2);
}

The failed test message:

java.lang.AssertionError: Expected: [Ljava.lang.String;@5ebec15 (class: String[]), Actual: [Ljava.lang.String;@21bcffb5 (class: String[]) (latch = 0, values = 1, errors = 0, completions = 1)


Answer:

Since assertValue uses basic Object.equals for its comparison, it will fail for arrays because they just compare addresses, not values.

You will need to use the assertValue(Predicate<T>) version of the operator and then use Arrays.equals(Object[],Object[]) to do the actual comparison:

testObserver.assertValue(arr -> Arrays.equals(arr, strings2))

Question:

Here is a Person class for holding person object. It has Comparable implementation for comparison.

public class Person implements Comparable<Person> {

    private String firstName = "";
    private String lastName = "";

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    @Override
    public int compareTo(@NonNull Person person) {
        return this.firstName.compareTo(person.getFirstName());
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

Here is a factory class for creating the people list and people list Observables. It always create the same set of people.

import java.util.ArrayList;
import io.reactivex.Observable;

public class PeopleFactory {

    public static ArrayList<Person> createPeople() {
        ArrayList<Person> people = new ArrayList<>();

        for (int i=0; i<10; i++) {
            people.add(new Person("Thanks" + i, "Giving" + i));
        }

        return people;
    }

    public static Observable<ArrayList<Person>> createPersonObservable() {
        return Observable.just(createPeople());
    }
}

Now I want use the TestObserver in RxJava 2 to test the people list against the people list from the people emitted from the people Observable.

import org.junit.Test;
import java.util.ArrayList;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;


public class PersonTest {
    @Test
    public void testPeople() {
        // Creating a people Observable
        Observable<ArrayList<Person>> peopleObservable = PeopleFactory.createPersonObservable();

        // Creating an test observer and subscribe to the above peopleObservable
        TestObserver<ArrayList<Person>> observer = new TestObserver<>();
        peopleObservable.subscribe(observer);

        // Creating the same people list and assert this people list with the people list from the peopleObservable
        ArrayList<Person> people = PeopleFactory.createPeople();
        observer.assertValue(people);
    }
}

I am expecting this test should pass because the PeopleFactory is creating the same set of people for a normal people list and a people list Observable, and the Person class has a Comparable which says the object will be equal as long as the firstName are equal.

But the test ignored this Comparable, compared it against the object addresses, and gave me the following test failure message:

java.lang.AssertionError: Expected: [com.example.myapplication.Person@7006c658, com.example.myapplication.Person@34033bd0, com.example.myapplication.Person@47fd17e3, com.example.myapplication.Person@7cdbc5d3, com.example.myapplication.Person@3aa9e816, com.example.myapplication.Person@17d99928, com.example.myapplication.Person@3834d63f, com.example.myapplication.Person@1ae369b7, com.example.myapplication.Person@6fffcba5, com.example.myapplication.Person@34340fab] (class: ArrayList), Actual: [com.example.myapplication.Person@2aafb23c, com.example.myapplication.Person@2b80d80f, com.example.myapplication.Person@3ab39c39, com.example.myapplication.Person@2eee9593, com.example.myapplication.Person@7907ec20, com.example.myapplication.Person@546a03af, com.example.myapplication.Person@721e0f4f, com.example.myapplication.Person@28864e92, com.example.myapplication.Person@6ea6d14e, com.example.myapplication.Person@6ad5c04e] (class: ArrayList) (latch = 0, values = 1, errors = 0, completions = 1)

    at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:163)
    at io.reactivex.observers.BaseTestConsumer.assertValue(BaseTestConsumer.java:332)
    at com.example.myapplication.PersonTest.testPeople(PersonTest.java:21)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)

So, what is the correct way or how can I unit test a list of objects using TestObserver in RxJava 2?

Note: All of the above code were written in Android Studio.


Answer:

From the javadoc of Comparable:

This interface imposes a total ordering on the objects of each class that implements it. ... It is strongly recommended, but not strictly required that (x.compareTo(y)==0) == (x.equals(y)).

The Comparable::compareTo method is only used when sorting or otherwise comparing the order of elements. Equality is entirely separate and as you noted, the object identities are being asserted.

You can quickly confirm what the framework is doing by looking at the source https://github.com/ReactiveX/RxJava/blob/v2.1.6/src/main/java/io/reactivex/observers/BaseTestConsumer.java#L331

if (!ObjectHelper.equals(value, v)) {
  throw fail("Expected: " + valueAndClass(value) + ", Actual: " + valueAndClass(v));
}

ObjectHelper::equals

public static boolean equals(Object o1, Object o2) { // NOPMD
  return o1 == o2 || (o1 != null && o1.equals(o2));
}

So it's calling Object::equals which you have not overloaded for the Person class so the test uses object identity and fails as expected.

Question:

I'm trying to test that Single.zip(...) calls every argument even if one fails, however sometimes verify will fail because the test has finished executing before Single.zip has...

I feel like await should essentially be blocking but that doesn't always seem to be the case. Anything I'm missing?

Code:

public Completable execute(String id) {
    return doThing()
        .flatMap(result -> Single.zip(
            employeeService.getEmployee(id),
            databaseService.getData(id),
            (employee, data) -> ...
         ))
         .toCompletable();
}

Test:

@Test
public void test() {
    Exception ex = new Exception("err");

    when(employeeService.getEmployee(anyString()).thenReturn(Single.error(ex));

    myObject.execute("id")
     .test()
     .await()
     .assertFailure(e -> e.equals(ex);

    verify(employeeService).getEmployee(eq("id"));
    verify(databaseService).getData(eq("id"));
}

Possible Solution

  verify(employeeService, atLeast(0)).getEmployee(eq("id"));
  verify(employeeService, atMost(1)).getEmployee(eq("id"));
  verify(databaseService, atLeast(0)).getData(eq("id"));
  verify(databaseService, atMost(1)).getData(eq("id"));

Answer:

I'm trying to test that Single.zip(...) calls every argument even if one fails, however sometimes verify will fail because the test has finished executing before Single.zip has...

This is hard to believe. I think your test is failing for some other reason.

Single.zip(
        employeeService.getEmployee(id),
        databaseService.getData(id),
        (employee, data) -> ...

When Single.zip() is called here, employeeService.getEmployee(id) and databaseService.getData(id) will be called even before control goes into Single.zip().

The arguments to a method call are evaluated first even before the control goes into that method. So irrespective of what Single.zip() does both these methods will be called.

I think your test is failing because there is some other error in doThing() perhaps.

Question:

I have this method to fetch search result from api

public void fetchSearchResults(Observable<String> searchObservable) {
    searchObservable
        .filter(search -> !TextUtils.isEmpty(search))
        .debounce(700, TimeUnit.MILLISECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(search -> getView().showLoader)
        .switchMap(search -> apiService.fetchSearchResults(search)) //Api call is executed on an io scheduler
        .subscribe(consumer, errorConsumer);
}

and I wrote this JUnit test for this method:

@Test
public void fetchSearchResultsTest() {
    TestScheduler testScheduler = new TestScheduler();
    Observable<String> nameObservable = Observable.just("","FA")
            .concatMap(search -> Observable.just(search).delay(100,
                           TimeUnit.MILLISECONDS, testScheduler));
    testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    verify(view, never()).showLoader();

    testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    verify(view, never()).showLoader();
}

But the test fails on the last verify statement with message

org.mockito.exceptions.verification.NeverWantedButInvoked

view.showLoader();

I have tried passing a TestScheduler to the debounce operator and setting the default computation scheduler as a TestScheduler through RxJavaPlugins but the result does not change, the test still fails.

If the test is failing then that would mean that the debounce operator is sending the event right through it ignoring timeout passed in it's arguments. I don't know if this correct but this is as far I understand. So, my question is how can I fix this test and control the events from the debounce operator like I'm doing with the source observable with TestSchedulers?


Answer:

Your test is failing because of the onCompleted() that occurs when the second item is emitted. The documentation says that debounce() will issue the final item immediately that it receives onCompleted().

To make your test work, either concatenate an Observable.never(), or add more items into the pipeline.

Here is an article on using debounce for doing auto-completion.

Question:

I am trying to write a unit test for my view model based on RxJava. In production it runs fine but in my unit test one of my subscriptions is lost without dispose or anything ever being called.

I am using the MVI pattern. All classes (Intent, Action, Result, ViewState) just have a text for simplicity reasons.

public class ViewModel {
    private static final String TAG = ViewModel.class.getSimpleName();
    private final ISchedulerProvider schedulerProvider;
    private Observable<ViewState> viewState;
    private PublishSubject<Intent> intentEmitter = PublishSubject.create();
    private IActionProcessHolder<Action, Result> actionProcessHolder;

    public ViewModel(ISchedulerProvider schedulerProvider, IActionProcessHolder actionProcessHolder){
        this.schedulerProvider = schedulerProvider;
        this.actionProcessHolder = actionProcessHolder;
        viewState = compose();
    }

    private Observable<ViewState> compose() {
        return intentEmitter
                .compose(getIntentToActionTransformer())
                .compose(actionProcessHolder.getActionProcessor())
                .map(result -> new ViewState(result.getText()));
    }

    public void processIntents(Observable<Intent> intent){
        intent
                .subscribeOn(schedulerProvider.io())
                .observeOn(schedulerProvider.io())
                .subscribe(
                        anIntent -> intentEmitter.onNext(anIntent),
                        error -> Log.e(TAG, error.getMessage())
                );
    }

    public Observable<ViewState> getViewState(){
        return viewState;
    }

    private ObservableTransformer<Intent, Action> getIntentToActionTransformer(){
        return intent ->
                intent.map(theIntent -> new Action(theIntent.getText()));
    }
}
public class ATestScheduler implements ISchedulerProvider {

    @Override
    public Scheduler computation() { return Schedulers.trampoline(); }

    @Override
    public Scheduler io() { return Schedulers.trampoline(); }

    @Override
    public Scheduler ui() { return Schedulers.trampoline(); }
}
public class Intent {
    String text;
    public Intent(String text){ this.text = text; }
    public String getText(){ return text; }
}
public class ActionProcessHolder implements IActionProcessHolder<Action,Result>{
    @Override
    public ObservableTransformer<Action,Result> getActionProcessor(){
        return action -> action.map(theAction -> new Result(theAction.getText()));
    }
}
public class ViewModelTest {

    @Mock
    private ObservableTransformer<Action, Result> actionProcessor;
    @Mock
    private IActionProcessHolder<Action, Result> actionProcessHolder;
    private TestObserver<ViewState> testObserver;
    private ViewModel viewModel;
    private CompositeDisposable subscriptions = new CompositeDisposable();

    @Before
    public void setup(){
        MockitoAnnotations.initMocks(this);
        ISchedulerProvider schedulerProvider = new ATestScheduler();
        testObserver = new TestObserver<>();

        when(actionProcessHolder.getActionProcessor()).thenReturn(actionProcessor);
        when(actionProcessor.apply(any())).thenReturn(Observable.just(new Result("mocked text")));

        viewModel = new ViewModel(schedulerProvider, actionProcessHolder);
        subscriptions.add(viewModel.getViewState()
            .observeOn(schedulerProvider.ui())
            .subscribe(
                    viewState -> testObserver.onNext(viewState)
                    , error -> Assert.fail(error.getMessage())));
    }

    @Test
    public void testProcessIntent(){
        viewModel.processIntents(Observable.just(new Intent("newText")));
        testObserver.assertValueAt(0, viewState -> viewState.getText().equals("mocked text"));
        verify(actionProcessor, times(2)).apply(any());
    }

I am expecting the actionProcessor to be called twice, once during initialization and once when I process another intent in the actual test. However when the intent is processed, the intentEmitter in the view model no longer has any subscribers, so onNext does nothing.

I have worked out that the line in the ViewModel class:

.compose(actionProcessHolder.getActionProcessor())

seems to break it. When i simply map my action to a result like this:

.map(action -> new Result(action.getText()))

(the same the actionProcessor should do) the subscription is not disposed. I have set breakpoints everywhere the subscriptions are disposed but none of them are ever called. Does anyone have any idea why this happens?


Answer:

The problem is actually not that the subscription is disposed but the chain of subscriptions is never established. During the setup, when subscribe is called, with this line

when(actionProcessor.apply(any())).thenReturn(Observable.just(new Result("mocked text")));

Mockito immediately caused the Observable to emit its value and broke the subscription process. I solved it now by mocking the actionProcessor without using Mockito, simply substituting above line with this:

actionProcessor = action -> action.map(ignore -> new Result("mocked text"));

Question:

My understanding is that I can use the merge operator with 2 other Observables followed by the firstOrError operator to return the first Observable that returns a result.

I'm trying to run a test case but it doesn't seem to do what I thought it would do, so not sure if there's an issue with my test or I'm using the operators incorrectly, I appreciate if anyone could help me with that. If anyone know of a better way to achieve the same results that would be great too.

Here's my test case:

@Test
public void testMergeOperator() {
    TestObserver<String> observer = new TestObserver<>();

    Observable<String> foo = Observable.just("FOO");
    Observable<String> bar = Observable.timer(2, SECONDS).just("BAR");

    foo.publish(first -> Observable.merge(first, bar.takeUntil(first))
            .firstOrError()
            .toObservable())
            .subscribe(observer);

    assertEquals(1, observer.valueCount());
    assertEquals("FOO", observer.values().get(0));
}

And I'm getting:

junit.framework.ComparisonFailure: Expected :FOO Actual :BAR


Answer:

Replace just with map:

@Test
public void testMergeOperator() {
    TestObserver<String> observer = new TestObserver<>();

    Observable<String> foo = Observable.just("FOO");
    Observable<String> bar = Observable.timer(2, TimeUnit.SECONDS).map(v -> "BAR");

    foo.publish(first -> Observable.merge(first, bar.takeUntil(first))
            .firstOrError()
            .toObservable())
            .subscribe(observer);

    assertEquals(1, observer.valueCount());
    assertEquals("FOO", observer.values().get(0));
}

Question:

I'm having some issues with combining both a BehaviorSubject and Observable.combineLatest. I was able to reproduce it in a (smaller) test. Here is a test which is currently failing:

import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;

import static java.util.Collections.singletonList;

public class MyTest {

    @Test
    public void test() {
        Subject<Integer> intSource = BehaviorSubject.createDefault(1);

        Subject<List<Observable<Integer>>> mainSubject = 
            BehaviorSubject.createDefault(singletonList(intSource));

        TestObserver<List<Integer>> testObserver = 
            mainSubject.flatMap(observables ->
                Observable.combineLatest(observables, this::castObjectsToInts)
            )
            .test();

        List<Observable<Integer>> newValue = new ArrayList<>();
        newValue.add(intSource); // same value as before
        newValue.add(Observable.just(2)); // add another value to this list.

        mainSubject.onNext(newValue);

        // intSource was already '1', but this is just to 'update' it.
        intSource.onNext(1); // COMMENT OUT THIS LINE

        testObserver.assertValueAt(0, singletonList(1));
        testObserver.assertValueAt(1, Arrays.asList(1, 2));
        testObserver.assertValueAt(2, Arrays.asList(1, 2)); // COMMENT OUT THIS LINE
        testObserver.assertValueCount(3); // REPLACE 3 WITH 2
    }

    private List<Integer> castObjectsToInts(Object[] objects) {
        List<Integer> ints = new ArrayList<>(objects.length);
        for (Object object : objects) {
            ints.add((Integer) object);
        }
        return ints;
    }
}

(If you comment out both lines with "COMMENT OUT THIS LINE", and replace the last 3 with a 2, the test succeeds.)

Why is this test failing? I don't see anything wrong with the code.


Answer:

It fails because you forgot that the mainSubject.flatMap still has the first intSource active, thus the intSource.onNext(1) will first trigger that combineLatest sequence. The testObserver.assertValueAt(2) then will be that single value in a List. assertValueAt(3) will contain the 1, 2 instead and the whole sequence has 4 items.

Question:

In order to have reusable and testable rxjava code, I have separated parts of my code by using ObservableTransformers. It works fine in production, however testing it is not as easy as expected since I seem to be unable to mock those ObservableTransformers.

This is the class to test:

import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.subjects.PublishSubject;

public class Cut {

    private PublishSubject<String> emitter = PublishSubject.create();
    private ITransformerProvider transformerProvider;

    public Cut(ITransformerProvider transformerProvider) {
        this.transformerProvider = transformerProvider;
    }

    public void writeNewText(String text){
        emitter.onNext(text);
    }

    public Observable<String> getActualText(){
        return emitter
                .compose(transformerProvider.getObservableTransformer());
    }

    class MyActualObservableTransformer implements ITransformerProvider {
        @Override
        public ObservableTransformer<String, String> getObservableTransformer() {
            //does something I do not want to execute in unit test, e.g. REST call
            return null;
        }
    }
}
import io.reactivex.ObservableTransformer;

public interface ITransformerProvider {
    ObservableTransformer<String, String> getObservableTransformer();
}

Lets assume the productive MyActualObservableTransformer needs mocking in order to have a plain unit test.

And this is a simple test attempting to test what happens when I hand over a new text. I do realize the test makes no sense, it is just condensed to show my problem:

public class TextTest {

    @Mock
    private ITransformerProvider transformerProvider;
    @Mock
    private ObservableTransformer<String, String> observableTransformer;

    private TestObserver<String> testObserver;
    private Cut cut;


    @Before
    public void setup(){
        MockitoAnnotations.initMocks(this);

        when(transformerProvider.getObservableTransformer()).thenReturn(observableTransformer);
        when(observableTransformer.apply(any())).thenReturn(Observable.just("mockedText"));

        testObserver = new TestObserver<>();
        cut = new Cut(transformerProvider);

        cut.getActualText()
            .observeOn(Schedulers.trampoline())
            .subscribe(
                    newText -> testObserver.onNext(newText)
                    , error -> Assert.fail(error.getMessage()));
    }

    @Test
    public void testGetActualText(){
        cut.writeNewText("ignoredText");
        testObserver.assertValueAt(0, text -> text.equals("mockedText"));
    }
}

The problem is that when running the test, the emitter has no subscribers.

Having analyzed the call stack I think I know what the problem is. So as far as I understand it, when subscribe is called, all subscribes of the observables within the chain are called. Without my ObservableTransformer mock, this is what happens and this is why it works fine in production.

However, when I add my mock:

when(observableTransformer.apply(any())).thenReturn(Observable.just("mockedText"));

as soon as subscribe for it is called, it emits a new value and the subscription chain is never completed. So my question is, how can I mock an observable without it emitting a value during subscription? I guess Observable.just or a BehaviorSubject is not the way to go since it behaves as expected in this case, as far as my still limited RxJava knowledge goes. Any ideas?


Answer:

The simplest solution I came up with is to just not use Mockito for mocking and mock it myself:

private ObservableTransformer observableTransformer = text -> text.map(ignore -> "mockedText");

Question:

I'm building a microservice using Micronaut. This particular service depends on a automatically generated client from another service, that I'd like to mock in my component tests.

However I'm unable to mock one of the methods of the apiClient. it's signaute looks like this:

    Single<? extends HttpResponse> patchProfile(String userId, UserprofileApiEntity userProfile);

where HttpResponse is from the package io.micronaut.http and Single comes from the package io.reactivex

In my test, I'm trying to mock this endpoint as such:

        when(userProfileClientMock.patchProfile(userIdCaptor.capture(), profileCaptor.capture())).thenReturn(Single.just(HttpResponse.ok()));

however I'm getting the error

error: no suitable method found for thenReturn(Single<CAP#1>)
        when(userProfileClientMock.patchProfile(userIdCaptor.capture(), profileCaptor.capture())).thenReturn(response);
                                                                                                 ^
    method OngoingStubbing.thenReturn(Single<CAP#2>) is not applicable
      (argument mismatch; Single<CAP#1> cannot be converted to Single<CAP#2>)
    method OngoingStubbing.thenReturn(Single<CAP#2>,Single<CAP#2>...) is not applicable
      (argument mismatch; Single<CAP#1> cannot be converted to Single<CAP#2>)
  where CAP#1,CAP#2 are fresh type-variables:
    CAP#1 extends HttpResponse from capture of ? extends HttpResponse
    CAP#2 extends HttpResponse from capture of ? extends HttpResponse
Note: /home/anders/Documents/minecraft/minecraft.api.public.displaynames/minecraft.api.public.displaynames.server/src/test/java/net/minecraft/api/pub/displaynames/DisplaynamesApiTest.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 error

from what I gather it's got something to do with the <? extends . . .> part of the signature, I've successfully mocked different methods from similar apiclients returning stuff like Single<Foo> or Maybe<Bar> unfortunately in this case it does not make sense to change the signature of the client.

I've tried specifying the response single as a variable, explicitly typing it as Single<? extends HttpResponse> and Single<MutableHttpResponse> thinking maybe mockito was confused by type-erasure somehow, but to no avail.

any ideas what might be the cause of this, and how to work around it?


Answer:

so I was eventually able to solve this, using a different mockito grammar, namely

doReturn(foo).when(bazMock).barMethod();

this style of mocking does not provide type-safety, which is a bit of a trade-off, but here seems to be a necessary evil. I would guess this applies to any case where you want to mock something returning a generic class typed as foo<? extends bar>