Hot questions for Using RxJava 2 in retrofit

Question:

I am relatively new to RXJava in general (really only started using it with RXJava2), and most documentation I can find tends to be RXJava1; I can usually translate between both now, but the entire Reactive stuff is so big, that it's an overwhelming API with good documentation (when you can find it). I'm trying to streamline my code, an I want to do it with baby steps. The first problem I want to solve is this common pattern I do a lot in my current project:

You have a Request that, if successful, you will use to make a second request.

If either fails, you need to be able to identify which one failed. (mostly to display custom UI alerts).

This is how I usually do it right now:

(omitted the .subscribeOn/observeOn for simplicity)

Single<FirstResponse> first = retrofitService.getSomething();

first
   .subscribeWith(
     new DisposableSingleObserver<FirstResponse>() {
         @Override
         public void onSuccess(final FirstResponse firstResponse) {

               // If FirstResponse is OK…
                Single<SecondResponse> second = 
                 retrofitService
                    .getSecondResponse(firstResponse.id) //value from 1st
                    .subscribeWith(
                      new DisposableSingleObserver<SecondResponse>() {

                           @Override
                           public void onSuccess(final SecondResponse secondResponse) {
                              // we're done with both!
                           }

                           @Override
                            public void onError(final Throwable error) {
                            //2nd request Failed, 
                            }                        
                     });

         }

         @Override
         public void onError(final Throwable error) {
              //firstRequest Failed, 
         }
      });

Is there a better way to deal with this in RXJava2?

I've tried flatMap and variations and even a Single.zip or similar, but I'm not sure what the easiest and most common pattern is to deal with this.

In case you're wondering FirstRequest will fetch an actual Token I need in the SecondRequest. Can't make second request without the token.


Answer:

I would suggest using flat map (And retrolambda if that is an option). Also you do not need to keep the return value (e.g Single<FirstResponse> first) if you are not doing anything with it.

retrofitService.getSomething()
    .flatMap(firstResponse -> retrofitService.getSecondResponse(firstResponse.id)
    .subscribeWith(new DisposableSingleObserver<SecondResponse>() {
         @Override
         public void onSuccess(final SecondResponse secondResponse) {
            // we're done with both!
         }

         @Override
          public void onError(final Throwable error) {
             // a request request Failed, 
          }                        
   });

This article helped me think through styles in how I structure RxJava in general. You want your chain to be a list of high level actions if possible so it can be read as a sequence of actions/transformations.

EDIT Without lambdas you can just use a Func1 for your flatMap. Does the same thing just a lot more boiler-plate code.

retrofitService.getSomething()
    .flatMap(new Func1<FirstResponse, Observable<SecondResponse> {
        public void Observable<SecondResponse> call(FirstResponse firstResponse) {
            return retrofitService.getSecondResponse(firstResponse.id)
        }
    })
    .subscribeWith(new DisposableSingleObserver<SecondResponse>() {
         @Override
         public void onSuccess(final SecondResponse secondResponse) {
            // we're done with both!
         }

         @Override
          public void onError(final Throwable error) {
             // a request request Failed, 
          }                        
   }); 

Question:

I am getting error message as below when try to crate Subscriber and subscribe.

can not resolve method 'subscribe(anonymous rx.Subscriber<GooglePlacesResponse>)'

build.gradle

// JSON Parsing
compile 'com.google.code.gson:gson:2.6.1'
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxjava:2.0.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1

GooglePlaceService.java

public interface GooglePlaceService {

    public static final String GOOGLE_API_KEY = "google_api_key";

    @GET("maps/api/place/nearbysearch/json?radius=2000&key="+GOOGLE_API_KEY)
    Observable<GooglePlacesResponse> getNearbyPlaces(@Query("location") String location);
}

ApiUtils.java

public class ApiUtils {    

    public static final String GOOGLE_PLACE_BASE_URL = "https://maps.googleapis.com/";

    public static GooglePlaceService getGooglePlaceService() {
        return getClient(GOOGLE_PLACE_BASE_URL).create(GooglePlaceService.class);
    }

    public static Retrofit getClient(String baseUrl) {

        Retrofit retrofit = new Retrofit.Builder()
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl(baseUrl)
                .build();

        return retrofit;
    }
}

Observable Observable<GooglePlacesResponse> is as below.

Observable<GooglePlacesResponse> mGooglePlacesResponseObervable = ApiUtils.getGooglePlaceService().getNearbyPlaces(latitude + "," + longitude);

mGooglePlacesResponseObervable
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(new Subscriber<GooglePlacesResponse>() { <-- Error here : can not resolve method `subscribe(anonymous rx.Subscriber<GooglePlacesResponse>)`

            @Override
            public void onNext(GooglePlacesResponse googlePlacesResponse) {

                }

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }
     });

Answer:

The fact that adapter has version 2.*.* does not mean that it is intended for use with RxJava 2

You should use the official adapter for the second version of RxJava:

implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' // works with RxJava 2

Then you can add factory:

Retrofit retrofit = new Retrofit.Builder()
    .baseUrl("https://api.example.com")
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
    .build();

Here is the full answer.

Question:

I wanted to integrate retrofit with RxJava, but it cannot resolve symbol 'RxJavaCallAdapterFactory'.

Here is my code
RetrofitService.java
import io.reactivex.Observable;
import retrofit2.http.Field;
import retrofit2.http.FormUrlEncoded;
import retrofit2.http.POST;

public interface RetrofitService {

    @FormUrlEncoded
    @POST("api/do.php")
    Observable<String> getUserToken(@Field("action") String action,
                                    @Field("name") String name,
                                    @Field("password") String password);
}
MainActivity.java
import android.leedev.cn.readertool.R;
import android.leedev.cn.readertool.model.http.service.RetrofitService;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.converter.scalars.ScalarsConverterFactory;

public class MainActivity extends AppCompatActivity {

    private static final String TAG = MainActivity.class.getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        login();
    }

    private void login() {
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://api.xingjk.cn/")
                .addConverterFactory(ScalarsConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .build();

        RetrofitService service = retrofit.create(RetrofitService.class);
        Observable<String> observable = service.getUserToken("loginIn", "******", "6205240");

        observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.i(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.i(TAG, "onNext: " + s.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.i(TAG, "onError: ");
                    }

                    @Override
                    public void onComplete() {
                        Log.i(TAG, "onComplete: ");
                    }
                });
    }

}

Looking forward to your help, thank you!


Answer:

Add:

implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

to your dependencies.

See the documentation.

Also update from RxJavaCallAdapterFactory to RxJava2CallAdapterFactory

Question:

I am trying to understand the retrofit with RxJava. I have seen many different examples on subscribe method and couldn't find the proper explanation for doing it.

1st One

 Observable<PostMessage> call =  service.callAPI(data);
        call.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<PostMessage>(

                ));

2nd One

Observable<PostMessage> call =   service.callAPI(data);
call.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<PostMessage>(

        ) {
            @Override
            public void accept(PostMessage postMessage) throws Exception {

            }
        });

}

3rd One

Observable<PostMessage> call =   service.callAPI(data);
call.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new DisposableObserver<PostMessage>() {
            @Override
            public void onNext(PostMessage postMessage) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

}

4th One

Observable<PostMessage> call =  service.callAPI(data);
        call.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<PostMessage>(

                ) {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(PostMessage postMessage) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

Could anyone explains what there these three ways of doing it. Each has different meaning or do the same thing ?


Answer:

1:a Schedulers.io() is intended for input-output bound work and it's another thread doing its job except the fact that they are cached and recycled for another job if in future if any comes.

1:b AndroidSchedulers.mainThread() because you wanto receive the results back on main thread.

1:c new Subscriber Subscriber is subscribed to Flowable and is another implementation of the Observer.

2: new Consumer A functional interface (callback) that accepts a single value.

3: new DisposableObserver is also a Observer but abstract and allows asynchronous cancellation by implementing Disposable.

4: new Observer Observer is subscibed to Observable and is Provides a mechanism for receiving push-based notifications. when Observable will call the onCompleted() and onNext() or OnError() when finished and only once.

The main difference from Observable is that new Subsciber supports backpressure while both works almost the same also Subscriber is an implementation of the Observer.

And the main difference between Subscriber and Consumer is both are as follows

Observer/Observable: The watching thread is observed by the controller. In case of an event happening, the controller is then notified and can assign the new task to a free thread from a reusable cached thread pool (or wait and cache the tasks in FIFO queue if all threads are currently busy). The worker threads implement Callable and either return successfull with the result (or a boolean value), or return with an error, in which case the controller may decide what to to (depending on the nature of error that has happended).

Producer/Consumer: The watching thread shares a BlockingQueue with the controller (event-queue) and the controller shares two with all workers (task-queue and result-queue). In case of an event, the watching thread puts a task object in the event-queue. The controller takes new tasks from the event-queue, reviews them and puts them in the task-queue. Each worker waits for new tasks and takes/consumes them from the task-queue (first come first served, managed by the queue itself), putting the results or errors back into the result-queue. Finally, the controller can retrieve the results from the result-queue and take according steps in case of errors.

sources:

https://softwareengineering.stackexchange.com/questions/286763/difference-between-consumer-producer-and-observer-observable

What is the difference between an Observer and a Subscriber?

http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html

http://reactivex.io/RxJava/javadoc/io/reactivex/functions/Consumer.html

http://reactivex.io/RxJava/javadoc/io/reactivex/observers/DisposableObserver.html

Question:

I am trying to make a call to API using retrofit and rxJava. The code below seems to work well when using RxJava 1, but once I updated to RxJava 2 I am getting this error:

Error :

No Instance of type variable R exist so that Observable conforms to Observable

Api

Observable<HttpResult<List<Article>>> getList(@Query("key")String key);

Api request done here, and this is where I get this error inside .map operator

Observable cache=providers.getList().map(new HttpRsltFunc<List<Article>>());

Result class model :

private  class HttpRsltFunc<T> implements Func1<HttpResult<T>, T> {
       @Override
       public T call(HttpResult<T> httpResult) {   
           return httpResult.getData();
       }
   }

Edit :

When importing rx.Observable instead of io.reactivex.Observable the code works just fine.

Thank you


Answer:

This is happening because rxjava2 migrated to their own functional interfaces, so you need to implement different interface instead of Func1.

And don't forget to update your retrofit adapter to rxjava2 version: compile 'com.squareup.retrofit2:adapter-rxjava2:latest.version'

Question:

In my Presenter i have a method which gets some list from DataHolder:

disposable.add(dataHolder.getMonthOfAttractions(monthInAdvance)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSingleObserver<Map<String, List<Attraction>>>() {
                @Override
                public void onSuccess(Map<String, List<Attraction>> stringListMap) {

                }

                @Override
                public void onError(Throwable e) {

                }
            }));

Then, in my DataHolder I'm checking if my list isn't null. If true, returns my list, if false it downloads this list from server :

public Single<Map<String, List<Attraction>>> getMonthOfAttractions(int monthInAdvance) {
    Map<String, List<Attraction>> monthOfAttractions = monthlyAttractionsMap.get(monthInAdvance);
    if (monthOfAttractions != null)
        return Single.fromCallable(() -> monthOfAttractions);
    else
        return apiGetMonthOfAttractions(monthInAdvance);

The problem is with apiGetMonthOfAttractions method. I dont know how to correctly implement this method to return value to my Presenter.

I've tried something like:

private Single<Map<String, List<Attraction>>> apiGetMonthOfAttractions(int monthInAdvance) {
    cnkRetrofitProvider.getApiInterface().getAttractions(monthInAdvance)
            .subscribeWith(new CnkApiObserver<AttractionListResponse>() {
                @Override
                public void onSucceeded(AttractionListResponse result) {
                    result.getMonthOfAttractions();
                }

                @Override
                public void onFailed(Error error) {
                }
            });
}

But in this case i have "missing return statement" and I'm out of ideas how to implement it. I'm begging to learn RxJava, so be understanding.

Please help :)

EDIT: This is what how my Retrofit getAttractions() method looks like:

public interface CnkApiInterface {

@GET("pl/front-api/{dateFrom}/{dateTo}")
Single<AttractionListResponse> getAttractions(@Path("dateFrom") String dateFrom, @Path("dateTo") String dateTo);}

Answer:

This is what you are after:

private Single<Map<String, List<Attraction>>> apiGetMonthOfAttractions(int monthInAdvance) {
    return cnkRetrofitProvider.getApiInterface()
            .getAttractions(monthInAdvance)
            .flatMap(attractionListResponse -> Single.just(attractionListResponse.getMonthOfAttractions()));
}

Question:

I am trying to use zip operator in RxJava in android, where I am trying to execute 3 parallel API Calls to get their result together. But my zip operator is not producing result. The code for my sample problem is as follows:

Code for my gradle file

compile 'com.squareup.retrofit2:retrofit:2.0.2'
compile 'com.squareup.retrofit2:converter-gson:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'

I have also included this in my gradle file

exclude 'META-INF/rxjava.properties'

Code for my Retrofit Client

retrofit = new Retrofit.Builder().baseUrl(BASE_URL)
    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
    .addConverterFactory(GsonConverterFactory.create())
    .client(client).build();

Code for my API Interface

public interface ApiInterface {
    @GET("/users/{UUID}/count.json")
    Observable<Count> getCountInfo(@Path("UUID") String UUID, @Query("store_id") String sort);
    @GET("v1/users/{UUID}.json")
    Observable<GetStatus> getState(@Path("UUID") String UUID);
    @GET("v1/user/{UUID}/points.json")
    Observable<Response> getResponse(@Path("UUID") String UUID);
}

Code for my Observables is

Retrofit repo = APIClient.getClient(baseUrl);
Observable<Count> userObservable = repo.create(ApiInterface.class)
    .getCount(userid,"1")
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io());
Observable<GetStatus> eventObservable = APIClient.getClient(baseUrl)
    .create(ApiInterface.class)
    .getState(userid)
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io());
Observable<Response> eventsObservable1 = APIClient
    .getClient(baseUrl)
    .create(ApiInterface.class)
    .getPoints(userid)
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io());

Code for my combined observable and zip operator is:

Observable<CommonSplashResponse> combined = Observable.zip(userObservable, eventsObservable, eventsObservable1,
new Func3<Count, GetStatus, Response, CommonResponse>() {
    @Override
    public CommonResponse call(Count count, GetStatus uStatus, 
        Response lResponse) {
        return new CommonResponse(count, uStatus, lResponse);
    }
});
combined.subscribe(new Subscriber<CommonSplashResponse>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(CommonResponse o) {
        LOG.info("Count Value is " + o.getCount());
        /**
        ***
        */
    }
});

The problem I am facing is that, the statements inside the onNext of the Combined Observable are not being executed. What could be the reason for glitch in execution? So I wanted to ask:

  1. Is there any issue in my dependencies?
  2. Should AndroidSchedulers.mainThread() be used instead of Schedulers.io()

Answer:

For the .zip() operator to emit anything, all zipped observables have to emit at least once. If one of your observables emits an error, or does not emit at all, you will never receive an onNext event.

  • For checking for error emissions, add logging or breakpoints into your onError within subscribe
  • For checking for missing emissions, you can add doOnNext and doOnCompleted calls with logging after all your zipped Observables and see which one does not emit

Cheers!

Question:

Please look at this code:

Main activity:

    @Override
    protected void onCreate(Bundle savedInstanceState)
    {
        super.onCreate(savedInstanceState);

        MutableLiveData<User> mutableUser = new MutableLiveData<>();
        mutableUser.setValue(new User("John","Gordon","Homeless"));

        activityMainBinding = DataBindingUtil.setContentView(this, R.layout.activity_main);
        activityMainBinding.setHandler(new MainActivityHandler());

        activityMainBinding.setLifecycleOwner(this);
        activityMainBinding.setUser(mutableUser);

        setSupportActionBar(activityMainBinding.toolbar);
    }

MutableLiveData is defined in xml layout:

<variable name="user" type="android.arch.lifecycle.MutableLiveData&lt;test.databindingtemplate.ViewModels.User&gt;"/>

Each field from user is bound to control with data binding:

<EditText
                android:id="@+id/editTextName"
                android:layout_width="0dp"
                android:layout_height="wrap_content"
                android:layout_marginBottom="8dp"
                android:layout_marginEnd="8dp"
                android:layout_marginStart="8dp"
                android:layout_marginTop="8dp"
                android:ems="10"
                android:inputType="textPersonName"
                android:text="@={user.firstName}"
                app:layout_constraintBottom_toTopOf="@+id/textViewSecondName"
                app:layout_constraintEnd_toEndOf="parent"
                app:layout_constraintStart_toEndOf="@+id/textViewSecondName"
                app:layout_constraintTop_toTopOf="parent"/>

This works fine, configuration changes are tracked and populated to gui (for example, when I type something in "Name" field, value persists when screen rotates).

Next, I want to implement refreshing user details from rest webservice - Retrofit/RxJava and I want to show busy indicator while data loading is in progress, for example, in onCreate method:

showBusyIndicator(); 
testService.getUserDetails(headers)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(user ->
                {
                  hideBusyIndicator();
                  activityMainBinding.getUser().setValue(user);
                }, throwable -> 
                {
                    Log.d(TAG, "ERROR loading user");
                    hideBusyIndicator();
                });

Now I'm not sure how to handle configuration changes properly - when busy indicator is shown, it should be recreated after screen rotation changes. As MutableLiveData handles configuration changes nicely, I am unable to find any solution (other than "classic" way) to solve my problem with loading indicator and handle result in this case.

Can you point me right direction?

[edit] getUserDetails declaration:

@NonNull
@GET("/user")
Observable<User> getUserDetails(@HeaderMap Map<String, String> headers);

@user8035311

I modified your github sample to trigger simulated data loading on button click, this way (button has been added to layout):

@Override
    protected void onCreate(@Nullable Bundle savedInstanceState)
    {
        super.onCreate(savedInstanceState);
        this.progressDialog = new ProgressDialog(this);
        this.progressDialog.setMessage("Loading...");

        final ActivityMainBinding mainBinding = DataBindingUtil.setContentView(this, R.layout.activity_main);
        mainBinding.setLifecycleOwner(this);

        mainBinding.button2.setOnClickListener(view ->
        {
            RxViewModel.RxViewModelFactory factory = new RxViewModel.RxViewModelFactory();
            final RxViewModel model = ViewModelProviders.of(MainActivity.this, factory).get(RxViewModel.class);
            LiveData<User> liveData = model.getLiveData();
            progressDialog.show();
            liveData.observe(MainActivity.this, s ->
            {
                progressDialog.dismiss();
                mainBinding.setUser(s);
            });

        });
    }

But this does not work. Task is interrupted on screen rotation, with exception:

06-27 09:05:18.737 11829-11829/app.rxrotation.com.architecturecomponentsrxrotation E/WindowManager: android.view.WindowLeaked: Activity app.rxrotation.com.architecturecomponentsrxrotation.MainActivity has leaked window DecorView@5b16e38[] that was originally added here at android.view.ViewRootImpl.(ViewRootImpl.java:485) at android.view.WindowManagerGlobal.addView(WindowManagerGlobal.java:346) at android.view.WindowManagerImpl.addView(WindowManagerImpl.java:93) at android.app.Dialog.show(Dialog.java:330) at app.rxrotation.com.architecturecomponentsrxrotation.MainActivity.lambda$onCreate$1$MainActivity(MainActivity.java:35) at app.rxrotation.com.architecturecomponentsrxrotation.MainActivity$$Lambda$0.onClick(Unknown Source:4) at android.view.View.performClick(View.java:6294) at android.view.View$PerformClick.run(View.java:24770) at android.os.Handler.handleCallback(Handler.java:790) at android.os.Handler.dispatchMessage(Handler.java:99) at android.os.Looper.loop(Looper.java:164) at android.app.ActivityThread.main(ActivityThread.java:6494) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:438) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:807)

After I click button second time, data loading not even executed.


Answer:

You could use a ViewModel with LiveData to preserve your async tasks after config change. So, your LiveData could look like this:

public class UserLiveData extends LiveData<User> {
    public UserLiveData() {
        setValue(new User());
    }

    public UserLiveData(Headers headers) {
        Observable<User> observable = Observable.defer(new Callable<ObservableSource<? extends User>>() {
            @Override
            public ObservableSource<? extends User> call() throws Exception {
                User user = testService.getUserDetails(headers);
                return Observable.just(user);
            }
        });

        observable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DefaultObserver<String>() {
                @Override
                public void onNext(User value) {
                    setValue(value);
                }

                @Override
                public void onError(Throwable e) {
                    throw new RuntimeException(e);
                }

                @Override
                public void onComplete() {

                }
        });
    }
}

Then, your UserDetailsViewModel is the following:

public class UserDetailsViewModel extends ViewModel {
    private LiveData<User> userLiveData;

    public UserDetailsViewModel() {
        userLiveData = new UserLiveData();    
    }

    public void observeLiveDate(LifecycleOwner owner,
                              Observer<User> observer) {
        userLiveData.observe(owner, observer);
    }

    public void loadLiveData(Headers headers) {
        userLiveData = new UserLiveData(someArgs);
    }
}

And then your MainActivity could be as following:

public class MainActivity extends AppCompatActivity {
    private ProgressDialog progressDialog;
    private UserViewModel viewModel;
    private ActivityMainBinding mainBinding;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        this.progressDialog = new ProgressDialog(this);
        this.progressDialog.setMessage("Loading...");

        mainBinding = DataBindingUtil.setContentView(this,
            R.layout.activity_main);
        mainBinding.setLifecycleOwner(this);

        viewModel = ViewModelProviders.of(this).get(UserViewModel.class);

        observeLiveData();
        mainBinding.button2.setOnClickListener(v -> {
            viewModel.initLiveData(<your headers>);
            observeLiveData();
        });
    }

    private void observeLiveData() {
        progressDialog.show();
        progressDialog.setCanceledOnTouchOutside(false);
        viewModel.observeRxLiveDate(this, user -> {
            progressDialog.dismiss();
            mainBinding.setUser(user);
        });
    }
}

I created a very simple example here

Question:

I'm using RxJava 2 to do API's calls. I have to do a call to cancel the booking of a class.

But I must have to do or not one previous call to get missing information.

It's something like this:

if classId not exists
   get classId
   then unbook class
else
   unbook class

I don't want to repeat the unbook class code.

Here are the code simplified:

FitnessDataService service = RetrofitInstance.getRetrofitInstance().create(FitnessDataService.class);
        // if we don't have the aid of class (reserved), we get it from the reserved classes


        if (fitClass.getAid() == null) {
            service.getReservedClasses(FitHelper.clientId)
                    .flatMap(reservedClasses ->
                    {
                        // get the class ID from reserved classes
                         ...
                        return service.unbookClass(fitClass.getAid());
                    }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(response -> {
                                // success
                            }, err -> 
                                // error
        } else {
            service.unbookClass(fitClass.getAid())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(response -> {
                                // success
                            }, err -> 
                                // error
        }

As you can see, the service.unbookClass is repeated. How I can call always this service.unbookClass and only call the service.getReservedClasses if I don't have the class id (fitClass.getAid() == null) without repeating the code to the second call.


Answer:

I would suggest separating the actual source of the id into its own separate observable. Maybe something like:

Observable<Long> idObservable;
if (fitClass.getAid() == null) {
    idObservable = service.getReservedClasses(FitHelper.clientId)
        .map({ reservedClasses ->
            /* Get the Id and do stuff with it */
            return fitClass.getAid();
        });
} else {
    idObservable = Observable.just(fitClass.getAid());
}

idObservable.flatMap({ aid -> service.unbookClass(aid) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(/* TODO */);

Question:

I implemented a call using Retrofit with RxJava2 with retries, however I need it to retry only when you get a code different from 404. There's no point on retrying a 404. This is what I'm using

new RequestFactory()
    .requestBuilder
    .create(Service.class)
    .getData(id)
    .map(response -> response.object)
    .doOnError(t -> Log.e(NET, "Error fetching data id '" + id + "': " + t))
    .retry(3)
    .onErrorResumeNext(Observable.empty())
    .subscribeOn(Schedulers.io())

Answer:

You can use the other form of retry() to conditionally retry.

...
.retryWhen( error -> error.flatMap( responseType -> checkResponseType( responseType ) ) )
...

and then

Observable<Boolean> checkResponseType( ResponseException response ) {
  if ( response.getCode() == 404 ) {
    return Observable.error( response );
  }
  return Observable.just( Boolean.TRUE );
}

This will monitor the error response you get and check for the 404 value. If it is a 404, it won't retry, otherwise it will.

Question:

I have this two services:

private Observable<UMapsResponse> requestToServiceGetMaps(int idVenue, String accessToken) {
    return mService.getUMaps(idVenue, accessToken);
}

 private Observable<UMap> requestToServiceGetMapImageUrl(int idVenue, int idMap, String accessToken) {
    return mService.getUMap(idVenue, idMap, accessToken);
}

So I combined my two services like that:

    requestToServiceGetMaps(idVenue, accessToken)
            .flatMap(uMapsResponse -> Observable.just(uMapsResponse.getIndoorMaps()))
            .flatMapIterable(indoorMap -> indoorMap)
            .flatMap(
                    indoorMap -> requestToServiceGetMapImageUrl(idVenue, indoorMap.getId(), accessToken),
                    (indoorMap, uMap) -> Log.i(TAG, "MapsItem: " + new MapsItem(indoorMap, uMap))
            ).toList()
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<List<Integer>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onSuccess(List<Integer> integers) {

                }

                @Override
                public void onError(Throwable e) {

                }
            });

So I have 2 questions:

  1. Why I have List of Integer in onSuccess method ?
  2. How to get list of MapsItem (indoorMap, uMap) ?

Answer:

You get a list of Integer because you are emitting Integers from the final step in your observer chain, since that is what Log.i() returns.

            ...
            .flatMap(
                    indoorMap -> requestToServiceGetMapImageUrl(idVenue, indoorMap.getId(), accessToken),
                    (indoorMap, uMap) -> new MapsItem(indoorMap, uMap)
            )
            .doOnNext( v -> Log.i(TAG, "MapsItem: " + v) )
            .toList()
            ...

Note that the result of the final flatMap() is the new MapsItem you were looking for.

Question:

I try to get some data from server.

Observable<List<Countries>> backendObservable = mCountriesApi.getCountries()
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());

backendObservable.doOnNext(list -> Log.d(TAG, "doOnNext"));
backendObservable.doAfterNext(list -> Log.d(TAG, "doAfterNext"));
backendObservable.doOnComplete(() -> Log.d(TAG, "doOnComplete"));

backendObservable.subscribe(new Observer<List<Country>>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(List<Country> value) {
            Log.d(TAG, "doOnNext"));
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

It's works, I receive data into "onNext". But methods doOnNext, doAfterNext, doOnComplete doesn't call. What i doing wrong ?


Answer:

You got it right up until observeOn, what happened? You have to chain operators on Observable because they are immutable dataflow plans:

backendObservable = backendObservable.doOnNext(list -> Log.d(TAG, "doOnNext"));
backendObservable = backendObservable.doAfterNext(list -> Log.d(TAG, "doAfterNext"));
backendObservable = backendObservable.doOnComplete(() -> Log.d(TAG, "doOnComplete"));

or more concisely

Observable<List<Countries>> backendObservable = mCountriesApi.getCountries()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(list -> Log.d(TAG, "doOnNext"));
        .doAfterNext(list -> Log.d(TAG, "doAfterNext"));
        .doOnComplete(() -> Log.d(TAG, "doOnComplete"));
        .subscribe(...);

Question:

I have got some wired issue. When I first subscribe, it makes the network call and also saves data to database, but loadFromDb() never gets executed, and there is no error thrown by it.

Why does this happen?

    Flowable<Resource<List<List<DataSource>>>> getBoundResource(List<String> parentId) {
    return new RxNetworkBoundResource<List<List<DataSource>>,
            ContainerResponse>() {
        @Override
        void saveCallResult(@NonNull List<ContainerResponse> data) {
            for (ContainerResponse item : data) {
                // Saves data to database
                List<DataSource> items = item.items;
                containerDao.insert(items);
            }
        }

        @Override
        protected Flowable<List<List<DataSource>>> loadFromDb() {
            return Flowable.just(parentId).flatMapIterable(d -> d)
                    .flatMap(s -> containerDao.loadContainerByParentIdRx(s))
                    .distinct()
                    .doOnNext(data -> {
                        // I am able to get data here
                    })
                    .toList() // I'm not able to get data after toList()
                    .toFlowable()
                    .doOnNext(data -> {
                        // Nothing here
                    });
        }

        @Override
        protected Flowable<List<Response<ContainerResponse>>> createCall() {
            String baseUrl =
                    MyApp.getApplication().getSharedConfig().getBaseUrl();
            return Flowable.just(parentId).flatMapIterable(data -> data).flatMap(s -> {
                String url = baseUrl + "?limit=30&offset=0&parent=" + s;
                return Flowable.zip(Flowable.just(s),webservice.getContainersBoundRx(url),
                        (s1, response) -> {
                            if (response.body() == null) {
                                return response;
                            }
                            for (DataSource container : response.body().items) {
                                container.parentId = s1;
                            }
                            return response;
                        }).toList().toFlowable();
            });
        }

        @Override
        protected boolean shouldFetch() {
            return false;
        }
    }.asFlowable();

I am not able to get anything after subscribe().

    containerRepo.getBoundResource(parentId)
            .subscribe(new Subscriber<Resource<List<List<DataSource>>>>() {
                @Override
                public void onSubscribe(Subscription s) {

                }

                @Override
                public void onNext(Resource<List<List<DataSource>>> listResource) {
                   // No data
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {
                    // This is never called
                }
            });

NetworkboundResource class:

public abstract class RxNetworkBoundResource<ResultType, RequestType> {

private final String TAG = RxNetworkBoundResource.class.getSimpleName();

private Flowable<Resource<ResultType>> result;

RxNetworkBoundResource() {
    // Lazy db observable.
    Flowable<ResultType> dbObservable =
            Flowable.defer(() -> loadFromDb().subscribeOn(Schedulers.computation()));

    // Lazy network observable.
    Flowable<ResultType> networkObservable = Flowable.defer(() ->
            createCall()
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.computation())
                    .doOnNext(request -> {
                        if (request.get(0).isSuccessful()) {
                            saveCallResult(processResponse(request));
                        } else {
                            processInternalError(request);
                        }
                    })
                    .onErrorReturn(throwable -> {
                        throw Exceptions.propagate(throwable);
                    })
                    .flatMap(__ -> loadFromDb())
    );

    result = shouldFetch()
            ? networkObservable
            .map(Resource::success)
            .onErrorReturn(t -> Resource.error(t.getMessage(), null))
            .observeOn(AndroidSchedulers.mainThread())
            : dbObservable
            .map(Resource::success)
            .onErrorReturn(t -> Resource.error(t.getMessage(), null))
            .observeOn(AndroidSchedulers.mainThread())
    ;
}

Flowable<Resource<ResultType>> asFlowable() {
    return result;
}

private List<RequestType> processResponse(List<Response<RequestType>> response) {
    List<RequestType> list = new ArrayList<>();
    for (Response<RequestType> data : response) {
        list.add(data.body());
    }
    return list;
}

private void processInternalError(List<Response<RequestType>> response) throws java.io.IOException {
    for (Response<RequestType> data : response) {
        if (data.errorBody() != null) {
            String error = data.errorBody().string();
            throw Exceptions.propagate(new Throwable(data.code() + ": " + error));
        }
    }
}

abstract void saveCallResult(@NonNull List<RequestType> item);

abstract Flowable<ResultType> loadFromDb();

abstract Flowable<List<Response<RequestType>>> createCall();

abstract boolean shouldFetch();

}

Answer:

Note that .toList() will emit only after its upstream completes. Doc

The problem here is most likely because of this code returning a Flowable that does not complete:

containerDao.loadContainerByParentIdRx(s)

If this Flowable never completes, then the resulting flatMap will also not complete and toList() won't emit anything.

If you are just looking up the database only once, then one option is to change the return type to either Single or Maybe. If you switch to Maybe, for example, you can do:

    @Override
    protected Flowable<List<List<DataSource>>> loadFromDb() {
        return Flowable.just(parentId).flatMapIterable(d -> d)
                .flatMapMaybe(s -> containerDao.loadContainerByParentIdRx(s))
                .distinct()
                .doOnNext(data -> {
                    // I am able to get data here
                })
                .toList() // You should now get this as well.
                .toFlowable()
                .doOnNext(data -> {
                    // Nothing here
                });
    }

Question:

I am currently facing an issue when using retrofit 2 in my application.

public static Retrofit getClient(Context context)
{

    if (okHttpClient == null)
        initOkHttp(context);

    if (retrofit == null)
    {
        retrofit = new Retrofit.Builder()
                .baseUrl(CaselotREST.CASELOT_BASEURL)
                .client(okHttpClient)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();
    }
    return retrofit;
}

I get the following error when I try and compile my project.

error: cannot access Retrofit class file for retrofit2.Retrofit not found


Answer:

I have figured it out. The issue stems from the modularity of the code base in our android application. I have many modules in my android application and each module has dependencies. The dependencies for the retrofit wherein the Webapi module but not in the main app module. To solve this was to add the following to the main gradle file

implementation 'com.squareup.retrofit2:converter-gson:2.2.0'
implementation 'com.squareup.retrofit2:retrofit:2.2.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'

Question:

When I call the Retrofit method GetTodoRepository.fetchTodo() from MainViewModel and call ends in a failure or any non-success result, I would like to let RxJava to both do onErrorReturn() and onError() so I can return a cached object in that case, but still notify MainViewModel that an error happend, so I can show error-related UI views. How do I archive this?

The current code shows how I intended to handle it.

MainViewModel

public class MainViewModel extends ViewModel

    public LiveData<String> getTodo() {
    getTodoRepository.fetchTodo().subscribe(new SingleObserver<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(String s) {
            showProgressAnim.setValue(false);
            todo.setValue(s);
        }

        @Override
        public void onError(Throwable e) {
            showProgressAnim.setValue(false);
            errorMsg.setValue(e.getMessage());
        }
     });
        return todo;
    }
}

GetTodoRepository

public class GetTodoRepository {

    public Single<String> fetchTodo() {
        return retrofit.create(TodoApi.class)
            .getTodo()
            .doOnSuccess(s -> cacheManager.saveTodo(s))
            .onErrorReturn(throwable -> cacheManager.getTodo())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
    }
}

Answer:

You can't have both signal types with a Single but you can turn fetchTodo() into Observable and emit the cached item and the error together:

fetchTodo()
.toObservable()
.onErrorResumeNext(error -> 
     Observable.just(cached)
     .concatWith(Observable.error(error))
)

Question:

The use case : I need to send some request in the server using android client (retrofit). After I get my first answer , i need to update the sending object values (that depend on the last item that I get) and resend it until all data are downloaded. I want to know how can I achieve this with Retrofit and RxJava (I don't want to use while loops etc)

EDIT : The thing is , I don't know the exact number of the "flat maps" because the data might get larger or smaller . I have lets say 420000 records and for every request I donwload 1000 data


Answer:

You could flatMap them, and using its response in the next one, by using the it parameter, which is the response of the previous.

mathApi.multiplyByTwo(1)
    .flatMap {
        mathApi.multiplyByTwo(it)
    }.flatMap {
        mathApi.multiplyByTwo(it)
    }.subscribe {
        // here "it" will be 4 (1*2*2) 
    }

And in case you don't know how many flatMaps you will end up having, you could, for instance, do it with a recursive function.

private fun multiplyByTwo(number: Int) {
    mathApi.multiplyByTwo(number).subscribe {
        if (it < Integer.MAX_VALUE) { // When you run out of data.
            multiplyByTwo(it)
        }
    }
}

Question:

I devellop an android application on android studio. I use java.

I want to use this api from open food facts : https://fr.openfoodfacts.org/api/v0/produit/3029330003533.json

But I only know how to use retrofit and Rxjava with only one pojo class.

I use this website to create pojo classe : http://pojo.sodhanalibrary.com But he creates loads of pojo class and I don't know if it's correct and how i can use it ?

Next you can see that i have loads of POJO class.

POJO class


Answer:

Use JsonSchema for generating pojo for the parsing library you are using(GSON/Jackson etc) and for Api calling user RxJava and retrofit like this

Create Pojo

import java.util.ArrayList;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.foodit.data.remote.wrapper.SignupDetailsWrapper;

@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
        "code",
        "msg",
        "details"
})
public class LoginResponse {

    @JsonProperty("code")
    private int code;
    @JsonProperty("msg")
    private String msg;
    @JsonProperty("details")
    private List<LoginDetailsWrapper> details = new ArrayList<LoginDetailsWrapper>();

    @JsonProperty("code")
    public int getCode() {
        return code;
    }

    @JsonProperty("code")
    public void setCode(int code) {
        this.code = code;
    }

    @JsonProperty("msg")
    public String getMsg() {
        return msg;
    }

    @JsonProperty("msg")
    public void setMsg(String msg) {
        this.msg = msg;
    }

    @JsonProperty("details")
    public List<LoginDetailsWrapper> getDetails() {
        return details;
    }

    @JsonProperty("details")
    public void setDetails(List<LoginDetailsWrapper> details) {
        this.details = details;
    }

}

Define Api in ApiInterface like this

 @FormUrlEncoded
    @POST("login")
    Observable<LoginResponse> userLogin(@Field("device_id") String device_id, @Field("device_type") String device_type,
                                        @Field("username") String username, @Field("password") String password
    );

and Call api like this

    @Override
    public void userLogin(String device_id, String device_type, String username, String password) {
        getCompositeDisposable().add(loginActivtiyInteractor.userLogin(device_id, device_type, username, password)
                .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
                .subscribe(loginResponse -> {
                    if (loginResponse != null) {
                        if (loginResponse.getCode() == 1) {
                            getMvpView().hideLoading();
                            getMvpView().updateView(loginResponse);
                        } else {
                            getMvpView().hideLoading();
                            getMvpView().onError(loginResponse.getMsg());
                        }
                    }
                }, throwable -> {
                    throwable.printStackTrace();
                    getMvpView().onError(throwable.getLocalizedMessage());
                    getMvpView().hideLoading();
                }));
    }

I hope it helps.

Question:

I'm trying to migrate from using plain Retrofit to using the RxJava extension for retrofit in order to make chain of API calls on the background thread.

For example, I have an object called ModelGroup which has a list of ModelPerson objects. My goal is to do the following.

  1. Send ModelGroup to the server and receive a response, which is an integer, representing the newly inserted ID, let's call it newGroupId.
  2. For each ModelPerson in ModelGroup, set Person.groupId to newGroupId.
  3. Send each person to the server.
  4. If all ModelPerson objects from the ModelGroup were successfully updated with newGroupId then respond with onSuccess, otherwise onError.

My current solution can be seen below.

private void makeGroupInsert(ModelGroup modelGroup) {

    int newGroupId = myApi.insertNewGroup(modelGroup.getName(), modelGroup.getRating())
            .execute()
            .body();

    for (ModelPerson person : modelGroup.getPersons()) {
        person.setGroupId(newGroupId);

        String response = myApi.insertNewPerson(
                person.getGroup_id(),
                person.getFirst_Name(),
                person.getLast_Name())
                .execute()
                .body();

        if (!response.equals("success")) {
            // One failed to update, send error to main thread.
        }
    }

    // All succeeded, send success back to main thread.
}

Question

How can I achieve the same (or better) functionality using a RxJava + Retrofit solution?

EDIT 1

MyApi is defined below.

public interface MyApi {

    @POST("insert_new_group")
    Call<Integer> insertNewGroup(@Query("group_name") String groupName,
                                   @Query("group_rating") int rating);

    @POST("insert_new_person")
    Call<String> insertNewPerson(@Query("group_id") int groupId,
                                   @Query("first_name") String firstName,
                                   @Query("last_name") String lastName);
}

Answer:

First of all, you need to change Retrofit beans to use Observables. For example, it can look like the following line:

@POST("insert_new_group")
Observable<Integer> insertNewGroup(...

Then you can chain requests:

void updateData() {
    myApi.insertNewGroup(modelGroup.getName(), modelGroup.getRating()) //Creating new group and getting its ID
            .switchMap(this::setGroupIdAll) //Calling observable that will loop thru all persons and set their groupIDs
            .subscribe(
                    (n) -> {/*you will get String after every 'insertNewPerson' run*/},
                    (e) -> {/*error handling*/}
            );

}

Observable<String> setGroupIdAll(Integer id) {
    return Observable.fromIterable(personsIterable) //personsIterable contains all your ModelPerson objects
            .flatMap(this::updatePerson); //Call Observabl;e that will send updated person to the server
}

Observable<String> updatePerson(ModelPerson person) {
    return myApi.insertNewPerson(
            person.getGroup_id(),
            person.getFirst_Name(),
            person.getLast_Name());
}

Question:

I am creating an Android app and the objective is to display a list of Pokemon which can be found using the PokeApi at https://pokeapi.co/

I have two instances, one where it works without RxJava2 and one where it doesn't work with RxJava 2. For both instances I use Retrofit 2.

For when it does not work when I include RxJava2 the error that I recieve is

D/thrown: java.lang.RuntimeException: Unable to invoke no-args constructor for retrofit2.Call<za.co.lbnkosi.discoveryassesment.domain.model.RemoteDataObjectModel>. Registering an InstanceCreator with Gson for this type may fix this problem.

At this point I have looked through a lot of Stackoverflow questions similar to this one and most if not all of them mention deserialization which for me has not worked this far.

I would like to know what the problem is or what I am doing wrong and how I can fix this issue. Below I have included the relevant code

public interface PokeApi {

    //Ignore
    @GET("pokemon")
    Call<RemoteDataObjectModel> getPokemonList(@Query("limit") int limit, @Query("offset") int offset);

    @GET("pokemon")
    Observable<Call<RemoteDataObjectModel>> getPokemonList2(@Query("limit") int limit, @Query("offset") int offset);

}
public class RemoteDataObjectModel {

    @SerializedName("results")
    private ArrayList<RemoteDataModel> results;

    public ArrayList<RemoteDataModel> getResults() {
        return results;
    }

    public void setResults(ArrayList<RemoteDataModel> results) {
        this.results = results;
    }
}
public class RemoteDataModel {

    @SerializedName("number")
    private int number;

    @SerializedName("name")
    private String name;

    @SerializedName("url")
    private String url;

    public int getNumber() {
        String[] urlItems = url.split("/");
        return Integer.parseInt(urlItems[urlItems.length -1]);
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }
}
public class RetrofitComponent {

    private static RetrofitComponent INSTANCE;
    private PokeApi pokeApi;


    private RetrofitComponent(){

        OkHttpClient okHttpClient = new OkHttpClient().newBuilder()
                .connectTimeout(60, TimeUnit.SECONDS)
                .readTimeout(60, TimeUnit.SECONDS)
                .writeTimeout(60, TimeUnit.SECONDS)
                .build();


        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://pokeapi.co/api/v2/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .client(okHttpClient)
                .build();

         pokeApi = retrofit.create(PokeApi.class);

    }


    public static RetrofitComponent getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new RetrofitComponent();
        }
        return INSTANCE;
    }

    public Observable<Call<RemoteDataObjectModel>> getPokemonList(int limit, int offest) {
        return pokeApi.getPokemonList2(30,0);
    }


}
private void getPokemonList(PokeApiDataSource.PokemonListCallback callback) {
       RetrofitComponent.getInstance()
                .getPokemonList(100,0)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Call<RemoteDataObjectModel>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Call<RemoteDataObjectModel> remoteDataObjectModelCall) {
                        Log.d("","");
                        remoteDataObjectModelCall.enqueue(new Callback<RemoteDataObjectModel>() {
                            @Override
                            public void onResponse(@NotNull Call<RemoteDataObjectModel> call, @NotNull Response<RemoteDataObjectModel> response) {
                                loading = true;
                                RemoteDataObjectModel pokeApiObjects = response.body();
                                _arrayList = Objects.requireNonNull(pokeApiObjects).getResults();
                                callback.pokemonListSuccess();
                            }
                            @Override
                            public void onFailure(@NotNull Call<RemoteDataObjectModel> call, @NotNull Throwable t) {
                                loading = true;
                                Log.e(TAG, " onFailure: " + t.getMessage());
                            }
                        });
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("thrown", e.toString());
                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

Answer:

I think Call is the class from Retrofit. It provides a callback function to get the response asynchronously. But since you are going to use RxJava, the nature of Rxjava is already asynchronous. You may not need to get the response as Call. Instead, please try this

public interface PokeApi {

   // If you need to get the response body + headers ...
    @GET("pokemon")
    Observable<Response<RemoteDataObjectModel>> getPokemonList2(@Query("limit") int limit, @Query("offset") int offset);


   // If you only need body
    @GET("pokemon")
    Observable<RemoteDataObjectModel> getPokemonList2(@Query("limit") int limit, @Query("offset") int offset);

   // Or the better way, the result from API is only return once. So, Single is more suitable in this case
    @GET("pokemon")
    Single<RemoteDataObjectModel> getPokemonList2(@Query("limit") int limit, @Query("offset") int offset);

}

Question:

I get a HTTP 500 INTERNAL SERVER ERROR on a POST using RXJAVA and RETROFIT and I dont fully understand how this Call works, but the other Call is working fine, with the same BASE_URL constant.

Here is my interface:

public interface AuthApi {

    @GET("user/{id}")  //users/id
    Flowable<User> getUser(
            @Path("id") int id
    );

    @POST("login")
    @FormUrlEncoded
    Flowable<User> login(
            @Field("username") String username,
            @Field("password") String password
    );
}

The @GET Method works fine The @POST Method returns an error

I believe this has something to do, with how the string or post request is getting structured, because Postman is working perfectly with following json:

{
    "username": "Test1",
    "password": "test1"
}

Here is the the rxjava call:

authApi.login("Test1","test1")
                .toObservable()
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<User>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(User user) {
                        Log.d(TAG,"onNext :"+ user.getEmail());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: ", e);
                    }

                    @Override
                    public void onComplete() {

                    }
                });

This returns the HTTP 500 INTERNAL SERVER ERROR

But to give you guys more details, here is the error log:

E/AuthViewModel: onError: 
    retrofit2.adapter.rxjava2.HttpException: HTTP 500 INTERNAL SERVER ERROR
        at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54)
        at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
        at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:47)
        at io.reactivex.Observable.subscribe(Observable.java:10838)
        at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
        at io.reactivex.Observable.subscribe(Observable.java:10838)
        at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:29)
        at io.reactivex.Flowable.subscribe(Flowable.java:12978)
        at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:32)
        at io.reactivex.Flowable.subscribe(Flowable.java:12978)
        at io.reactivex.Flowable.subscribe(Flowable.java:12924)
        at io.reactivex.internal.operators.observable.ObservableFromPublisher.subscribeActual(ObservableFromPublisher.java:31)
        at io.reactivex.Observable.subscribe(Observable.java:10838)
        at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
        at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:764)

This is the expected response:

{
    "email": "test1m@test1.ie",
    "id": 11,
    "username": "Test1"
}

Do I overlook something? do you guys have any tips for me? And is there a way to debug the POSt request to actualy see the POSt request?


Answer:

Check this answer. This is how you can check network request and responses for your app. Also if you are sending POST request with @FormUrlEncoded, that option needs to be enabled on server side to be processed right way. You can try with @Body instead of @FormUrlEncoded and @Field.

@POST("login")
Flowable<User> login(
     @Body LoginData data
);

Where UserInfo is class.

public class LoginData {

    private String username;
    private String password;

    public LoginData(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

}

Question:

I have the following endpoints in REST API:

public interface AutomoticzAPI {

    @POST("/api/beacon_auth/login")
    Single<LoginResponse> login(@Body LoginRequest request);

    @GET("/api/system/ws_devices")
    Single<WSDevicesResponse> wsDeviceList(@Header("Authorization") String tokenHeader);

}

When I call login endpoint, in response I recieve access token that I save into ClientSession holder object. Later I can retrieve token from ClientSession use to call server's protected resources:

        api.login(ClientSession.getInstance().getLoginRequest(login, password))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(loginResponse -> {
                    String accessToken = loginResponse.getAccessToken();
                    ClientSession.getInstance().setAccessToken(accessToken);
                    view.onLoginSuccess();
                }, throwable -> {
                    RetrofitException exception = (RetrofitException) throwable;
                    if (exception.getKind().equals(RetrofitException.Kind.HTTP)){
                        view.onLoginFailed(exception.getMessage());
                    } else if(exception.getKind().equals(RetrofitException.Kind.NETWORK))
                    {
                        view.onLoginFailed("Network error...");
                    } else {
                        view.onLoginFailed("Unknown error occurred...");
                    }
                });

When I'm calling wsDeviceList endpoint, server could return 401 HTTP response code and json body with error code and message:

{
    "code": "EXPIRED-TOKEN",
    "message": "Token expired"
}

If that happens I want to call login endpoint once again to get new access token. Here is my code so far:

    ClientSession clientSession = ClientSession.getInstance();
    String token = "Bearer "+clientSession.getAccessToken();
    String url = ClientSession.getInstance().getUrl();
    AutomoticzAPI api = NetworkManager.getApiClient(url);
    api.wsDeviceList(token)
            .retryWhen(throwableFlowable -> throwableFlowable.flatMap(
                    new Function<Throwable, Publisher<?>>() {
                        @Override
                        public Publisher<?> apply(Throwable throwable) throws Exception {
                            RetrofitException exception = (RetrofitException) throwable;
                            if (exception.isUnauthorizedError()){
                                return relogin(api, clientSession.getLoginRequest());
                            }
                            return (Publisher<?>) throwable;
                        }
                    }
            ))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(wsDevicesResponse -> {
                view.onDeviceListLoaded(wsDevicesResponse.getWsdevices());
            }, throwable -> {
                RetrofitException exception = (RetrofitException) throwable;
                view.onError(exception);
            });
}

public Publisher<?> relogin(AutomoticzAPI api, LoginRequest loginRequest){
    return (Publisher<?>) api.login(loginRequest)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(loginResponse -> {
                String accessToken = loginResponse.getAccessToken();
                ClientSession.getInstance().setAccessToken(accessToken);
            }, throwable -> {
                RetrofitException exception = (RetrofitException) throwable;
                view.onError(exception);
            });
}

But when relogin method gets executed my program crashes. I'm not proficient in RxJava and probably doing this wrong. How I can make recall login to refresh access token and then call wsDeviceList once again?


Answer:

Use Authenticator API of retrofit and inside this call access token api to get access token and re try the fail API call using this access token.

Question:

Context

Using android retrofit2, need to access deeply nested name strings to fetch and display their Details (where Details object has reference to the Group and User objects used to get the Detail).

The JSON consists of a list of Groups that each contain a list of Users that each contain a list of name Strings which are captured in these models:

public class Group {
    @SerializedName("id")
    public String id;
    @SerializedName("users")
    public List<User> users;
}
public class User {
    @SerializedName("id")
    public String id;
    @SerializedName("detailNames")
    public List<String> detailNames;
}
public class Detail {
    // allow access to objects used to get detail
    public Group group;
    public User user;
    @SerializedName("name")
    public String name;
    @SerializedName("description")
    public String description;
}

The models are populated using the UserApi:

public interface UserApi {
    @GET("groups")
    Call<List<Group>> getGroups();

    @GET("groups/{group_id}/users/{user_id}/details/{detail_name}")
    Call<Detail> getDetail(
            @Path("group_id") String groupId,
            @Path("user_id") String userId,
            @Path("detail_name") String detailName
    );
}
Aim

The aim is to use a given UserApi to make and parse requests to display a Dialog in the format of:

Group1 (expandable heading)
    User1 (expandable heading)
        Detail1 (checkbox)
        Detail2 (checkbox)
        ...
Group2 (expandable heading)
    User2 (expandable heading)
        Detail1 (checkbox)
        ...
    ...
...

Problem

The problem is the current solution requests Groups and uses a triple nested for loop to access and fetch Details for each name:

private void fetchDetails(List<Group> groupList) {
    ArrayList<Group> groups = (ArrayList<Group>) groupList;
    if (groups != null && groups.size() > 0) {
        for (Group group : groups) {
            for (User user: group.users) {
                for (String detailName : user.detailNames) {
                    fetchDetail(group, user, detailName);
                }
            }
        }
    }
}

The problem is worsened since the triple loop makes a request for each name, and is done within the getGroups onResponse callback, which seems unreadable/unmaintainable:

protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    mUserApi = UserApiClient.getApi();
    fetchGroups();
}
private void fetchGroups() {
    Callback<List<Group>> groupsCall = new Callback<List<Group>>() {
        @Override
        public void onResponse(Call<List<Group>> call, Response<List<Group>> response) {
            int statusCode = response.code();
            switch (statusCode) {
                case HttpURLConnection.HTTP_OK:
                    List<Group> groups = response.body();
                    fetchDetails(groups);
                    break;
            }
        }
        @Override
        public void onFailure(Call<List<Group>> call, Throwable t) {}
    };
    mUserApi.getGroups().enqueue(groupsCall);
}
private void fetchDetail(final Group group, final User user, String detailName) {
    Callback<Detail> detailCallback= new Callback<Detail>() {
        @Override
        public void onResponse(Call<Detail> call, Response<Detail> response) {
            int statusCode = response.code();
            switch (statusCode) {
                case HttpURLConnection.HTTP_OK:
                    MainActivity.this.runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            // display details in ListView
                        }
                    });
                    break;
            }
        }
        @Override
        public void onFailure(Call<Detail> call, Throwable t) {}
    };
    mUserApi.getDetail(group.id, user.id, detailName).enqueue(detailCallback);
}

An RxJava2 solution was suggested to avoid nesting callbacks like the above implementation, but was unfinished due to confusion regarding managing 3 layers of nesting to access the names:

Observable<List<Group>> groupCall =  mUserApi.getGroups();
groupCall.flatMapIterable(x -> x)
   .flatMap(group -> {
       Observable.fromIterable(group.users)
           .flatMap(user -> {
               Observable.fromIterable(user.detailNames)
                   .map(detailName -> {
                        mUserApi.getDetail(group.id, user.id, detailName)
                            .flatMap(detail -> {
                                detail.group = group;
                                detail.user = user;
                                return Observable.just(detail)
                            })
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(new Observer<List<Group>>() {
                                @Override
                                public void onSubscribe(Disposable d) {}
                                @Override
                                public void onNext(List<Detail> value) {
                                    mDetails = (ArrayList<Detail>) value;
                                }
                                @Override
                                public void onError(Throwable e) {}
                                @Override
                                public void onComplete() {}
                            });
                   });
           }
   })

There have been some questions (e.g., RxJava multiple loop with condition) that deal with nesting in RxJava but still unsure how to apply these to the deeply nested names.

Question

Is it possible to use RxJava2 to avoid callback hell and simplify the triple for loop, is there another way, or should the solution resort to synchronous requests within AsyncTask/AsyncTaskLoader?


Answer:

As I mentioned in the comment, I think what you already have is pretty much the simplest form you can get. But it seems you are interested in doing this without the loop so here's a couple of suggestions (but not necessarily better):

Method 1: Container Classes

If you are willing to create intermediate container classes that can hold group, user, detailname in a single object, you can do something like this:

First, create these container classes:

public class UserWithGroup {
    final Group group;
    final User user;

    public UserWithGroup(Group group, User user) {
        this.group = group;
        this.user = user;
    }
}

public class DetailWithUser {
    final Group group;
    final User user;
    final String detailName;

    public DetailWithUser(Group group, User user, String detailName) {
        this.group = group;
        this.user = user;
        this.detailName = detailName;
    }
}

Then your code with Java 8 Stream can be:

private void fetchDetails(List<Group> groupList) {
    groupList.stream()
            .flatMap(g -> g.users.stream().map(u -> new UserWithGroup(g, u)))
            .flatMap(ug -> ug.user.detailNames.stream().map(n -> new DetailWithUser(ug.group, ug.user, n)))
            .forEach(d -> fetchDetail(d.group, d.user, d.detailName));
}

Or with RxJava:

private void fetchDetails2(List<Group> groupList) {
    Observable.fromIterable(groupList)
            .flatMap(g -> Observable.fromIterable(g.users).map(u -> new UserWithGroup(g, u)))
            .flatMap(ug -> Observable.fromIterable(ug.user.detailNames).map(n -> new DetailWithUser(ug.group, ug.user, n)))
            .flatMap(d -> mUserApi.getDetail(d.group.id, d.user.id, d.detailName)
                    .map(detail -> {
                        detail.group = d.group;
                        detail.user = d.user;
                        return detail
                    }))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(detail -> {
                ...
            });
}

Method2:

Android.util.Pair a container class that can hold any two objects. If you use this instead of creating intermediate containers, and you are okay with it, the code can be further simplified.

Java 8 Stream and Pair:

private void fetchDetails3(List<Group> groupList) {
    groupList.stream()
            .flatMap(g -> g.users.stream().map(u -> Pair.create(g, u)))
            .flatMap(p -> p.second.detailNames.stream().map(n -> Pair.create(p, n)))
            .forEach(p -> fetchDetail(p.first.first, p.first.second, p.second));
}

RxJava and Pair:

private void fetchDetails4(List<Group> groupList) {
    Observable.fromIterable(groupList)
            .flatMap(g -> Observable.fromIterable(g.users).map(u -> Pair.create(g, u)))
            .flatMap(p -> Observable.fromIterable(p.second.detailNames).map(n -> Pair.create(p, n)))
            .flatMap(p -> fetchDetail2(p.first.first, p.first.second, p.second)
                    .map(detail -> {
                        detail.group = d.group;
                        detail.user = d.user;
                        return detail
                    }))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(detail -> {
                ...
            });
}

Question:

I have a loading page where I want to perform two network requests (retrofit2-rxjava) for unrelated information. I don't want to proceed to the next page until those two requests are finished, even if either one or both fail.

  1. Tied the requests together using zip. Is there a way to not be forced into using a BiFunction, and not have to return null?

  2. Requests A and B have a .doOnNext and .doOnError. if one of these return an error, does the zip observable continue? and does the zip subscriber also return the error?

  3. Is this the best way to do this?

private Disposable retrieveBothThings() {
return Observable.zip(getThingA(), getThingB(),
                    (A, B) -> {
                        onAllCallsComplete();
                        return null;
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(o -> {}, Logger::e);
}



private Observable<...> getThingA() {
            return SessionManager.getInstance().getApi()
                    .getA()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(this::onACompleted)
                    .doOnError(this::onAFailed);
}

private Observable<...> getThingB() {
        return SessionManager.getInstance().getApi()
                .getB()
                .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread()).toObservable()
                .doOnNext(this::onBSuccess)
                .doOnError(this::onBFailure);
}

private void onBSuccess(...) {
    ...        
}

private void onBFailure(final Throwable throwable) {
    Logger.e(throwable);
}

private void onACompleted(...) {
    ...        
}

private void onAFailed(final Throwable throwable) {
    Logger.e(throwable);
}

Answer:

You can combine two observables using the merge() operator. You can transform an error into onComplete() using onErrorResumeNext().

Completable.merge(
  observable1
    .doOnNext(this::onACompleted)
    .doOnError(this::onAFailed)
    .onErrorResumeNext( Completable.complete() )
    .toCompletable(),
  observable2
    .doOnNext(this::onBCompleted)
    .doOnError(this::onBFailed)
    .onErrorResumeNext( Completable.complete() ),
    .toCompletable() )
.subscribe( ignore -> {}, 
            error -> {},
            () -> { processCompletion(); } );

Question:

This is my Code:-[output which i am getting is showing in image below.][1] mainfile code

But i want output like showing github repos of two users in one screen and vikashumain(username) repos on left side and naman14(username) repos on right side.but i am getting only one user repos showing on both sides like vikashumain user showing only on both sides

BASE_URL=https://api.github.com/

/**
 * A simple {@link Fragment} subclass.
 */
public class BlankFragment extends Fragment {


    public BlankFragment() {
        // Required empty public constructor
    }

    @BindView(R.id.re1)
    RecyclerView recyclerView;
    @BindView(R.id.re2)
    RecyclerView recyclerView2;

    View v;
    List<MainCategoryData> list,list2;
    private RepoAdapter adapter;
    Retrofit retrofit;
    @Override
    public View onCreateView(LayoutInflater inflater, ViewGroup container,
                             Bundle savedInstanceState) {
        // Inflate the layout for this fragment
        v= inflater.inflate(R.layout.fragment_blank, container, false);
        ButterKnife.bind(this,v);
        recyclerView.setHasFixedSize(true);
        RecyclerView.LayoutManager layoutManager = new LinearLayoutManager(getContext());
        recyclerView.setLayoutManager(layoutManager);
        recyclerView2.setHasFixedSize(true);
        RecyclerView.LayoutManager layoutManager2 = new LinearLayoutManager(getContext());
        recyclerView2.setLayoutManager(layoutManager2);
        retrofit = new Retrofit.Builder()
                .baseUrl(BASE_URL)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();
        retrofit.create(ApiServiceMain.class).maincategorya()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::handleRespons, this:: handleError);
        Observable.just(retrofit.create(ApiServiceMain.class)).subscribeOn(Schedulers.computation())
                .flatMap(s -> {
                    Observable<List<MainCategoryData>> couponsObservable
                            = s.maincategorya().subscribeOn(Schedulers.io());

                    Observable<List<MainCategoryData>> storeInfoObservable
                            = s.maincategoryap().subscribeOn(Schedulers.io());

                    return Observable.merge(couponsObservable,storeInfoObservable);
                }).observeOn(AndroidSchedulers.mainThread()).subscribe(this::handleRespons, this::handleError );

        return v;
    }




    private void handleRespons(List<MainCategoryData> storeCoupons) {
        list = new ArrayList<>(storeCoupons);
        adapter = new RepoAdapter(list);
        recyclerView.setAdapter(adapter);
        list2 = new ArrayList<>(storeCoupons);//error
        adapter = new RepoAdapter(list2);
        recyclerView2.setAdapter(adapter);


    }
    public void handleError(Throwable error) {
        Toast.makeText(getContext(), "Error "+error.getLocalizedMessage(), Toast.LENGTH_SHORT).show();
    }
    }

This is my service file:-ApiService.java

public interface ApiServiceMain
{
//we used post here becz here we have parameters .Get and Post works in the same way but post has parameters and get doesn't have.
    @GET("users/vikashumain/starred")
    Observable<List<MainCategoryData>> maincategoryapi();

    @GET("users/naman14/starred")
    Observable<List<MainCategoryData>> maincategoryap();

    @GET("users/vikashumain/starred")
    Observable<List<MainCategoryData>> maincategorya();



}

Model(POJO) class:-

public class MainCategoryData {

    public  int id;
    public  String name;
    private String html_url;
    public  String description;
    public  String language;
    public  int stargazers_count;

    public MainCategoryData() {

    }

    public int getId() {
        return id;
    }
    List<Vikashumain> data2;
    public String getName() {
        return name;
    }

    public List<Vikashumain> getData2() {
        return data2;
    }

    public void setData2(List<Vikashumain> data2) {
        this.data2 = data2;
    }

    public void setId(int id) {
        this.id = id;

    }

    public void setName(String name) {
        this.name = name;
    }

    public void setHtmlUrl(String html_url) {
        this.html_url = html_url;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public void setLanguage(String language) {
        this.language = language;
    }

    public void setStargazersCount(int stargazers_count) {
        this.stargazers_count = stargazers_count;
    }

    public String getHtmlUrl() {
        return html_url;
    }

    public String getDescription() {
        return description;
    }

    public String getLanguage() {
        return language;
    }

    public int getStargazersCount() {
        return stargazers_count;
    }
    public MainCategoryData(int id, String name, String html_url, String description, String language, int stargazers_count) {
        this.id = id;
        this.name = name;
        this.html_url = html_url;
        this.description = description;
        this.language = language;
        this.stargazers_count = stargazers_count;
    }
}

https://i.stack.imgur.com/sga9A.jpg

json file which i am parsing:-https://api.github.com/users/vikashumain/starred second json file which i am parsing:-https://api.github.com/users/naman14/starred


Answer:

It's not clear what you try to achieve here. You merge the two users' data into one list and set the same list on both views.

This will update both panels to the same list because of handleRespons:

retrofit.create(ApiServiceMain.class).maincategorya()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(this::handleRespons, this::handleError);

Sometime after this, the following call will produce 2 lists, one for each user and the handleRespons gets invoked twice. The slower user's data will again overwrite both panels data.

Observable.just(retrofit.create(ApiServiceMain.class))
          .subscribeOn(Schedulers.computation())
          .flatMap(s -> {
              Observable<List<MainCategoryData>> couponsObservable
                        = s.maincategorya().subscribeOn(Schedulers.io());

              Observable<List<MainCategoryData>> storeInfoObservable
                        = s.maincategoryap().subscribeOn(Schedulers.io());

              return Observable.merge(couponsObservable,storeInfoObservable);
          })
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(this::handleRespons, this::handleError);

You should probably implement two separate response handlers unique to each panel:

private void handleResponseLeft(List<MainCategoryData> storeCoupons) {
    List<MainCategoryData> list = new ArrayList<>(storeCoupons);
    RepoAdapter adapter = new RepoAdapter(list);
    recyclerView.setAdapter(adapter);
}

private void handleResponseRight(List<MainCategoryData> storeCoupons) {
    List<MainCategoryData> list2 = new ArrayList<>(storeCoupons);
    RepoAdapter adapter2 = new RepoAdapter(list2);
    recyclerView2.setAdapter(adapter2);
}

ApiServiceMain s = retrofit.create(ApiServiceMain.class);

s.maincategorya()
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(this::handleResponseLeft, this::handleError);

s.maincategoryap()
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(this::handleResponseRight, this::handleError);

Question:

What I'd like to do is access data at a few different URLs, combine this data, and then work with it in my app. I'm having no trouble getting all of the data I need, but what I can't figure out is how to combine the data into a single list and know when all of the API calls are done (all of the data is in the list).

Here's the loop where I make my API calls:

for(String diningCourt: diningCourts){
        menuApi.getMenu(diningCourt, date)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(responseData -> {

                     //I want to add responseData to a list with 
                     //the results from the rest of the calls 

                });
    }

What's the correct way to collect the data? I've tried using a static class variable to store the results but I only end up with data from one of the API calls. Any suggestions would be greatly appreciated.


Answer:

We use "combineLatest" see http://reactivex.io/documentation/operators/combinelatest.html With this operator both calls are executed and you get a result, when both calls have finished.

Observable.combineLatest(observable1, observable2, (result1, result2) -> { //use both results })

Question:

I'm calling an API. When the app starts I need to get a token from the API to be able to do the other calls. The token is managed by a "token manager" inside the app. I'm using RxJava 2 and Retrofit 2 to manage the call.

When I launch the app, the Token Manager get the token and the Fragment gets the data. I expected the calls to be executed sequentially as both are using the same Retrofit client object injected with Dagger 2. But, in fact, the call to get data is executed before the call to get the token finishes, and as this call needs the token, the request fails.

Some people suggest to use a flatmap but I can't as the logic are managed at two different places in the app (the TokenManager class and the Fragment). How can I solve my issue?


Answer:

So from your description you mentioned that you can't use flatmap(), but according to your requirements it seems that one of the two calls is dependent on the other.

anyway the possibilities are as follow:

Dependent calls - same place (e.g Activity) in that case you should use flatMap() for example calls A and B, call A must get token so B could be executed. Dependent calls - different places (e.g Service - Activity) the most fit way for this situation is an event bus, and you can use PublishSubject from Rxjava to implement such an event bus.

so in your case the second solution is the way to go. you get the Token in your TokenManger, then notify any screen need it, you could notify its expiration too, so app doesn't hit the network unnecessarily.

as a simple example:

PublishSubject<Token> publishToken = PublishSubject.create();

//notify others that you got a Token
publishToken.onNext(myToken);
..
..
//in other place (eg fragment)
getTokenEventBus().subscribe(token -> {
  //do your other call
 }, throwable ->{ 
  //handle error
 }, () -> {
  //event complete
});

Question:

I am trying to sync all local data with the data on the server so i am fetching all local data and for anything other than local data i am making a call to the server to fetch the rest of the data.

public Observable<List<Chat>> getChats(int userId) {

    Log.i("chatRequest", TAG);

    return Observable.merge(chatDiskData.getChats(), chatDiskData.getChats().flatMap(new Function<RealmResults<Chat>, ObservableSource<List<Chat>>>() {
        @Override
        public ObservableSource<List<Chat>> apply(@NonNull RealmResults<Chat> chats) throws Exception {

            Log.i("chatRequest", TAG + "  inside");

            ArrayList<String> chatIds = new ArrayList<>();
            HashMap<String, Long> chatsTimeStamp = new HashMap<>();

            for (int i = 0; i < chats.size(); i++) {
                String chatId = chats.get(i).getChat_id();
                chatIds.add(chatId);

                long timestamp = chatDiskData.lastMessage(chatId);
                chatsTimeStamp.put(chatId, timestamp);
            }

            return chatCloudData.getChats(userId, chatIds, chatsTimeStamp).subscribeOn(Schedulers.io())
                    .doOnNext(chats1 -> Log.i("chatRequest", TAG + "  inside  inside"));
        }
    }));
}

ChatCloudData

public Observable<List<Chat>> getChats(int userId, ArrayList<String> 
    chatIds, HashMap<String, Long> chats) {

    Log.i("chatRequest", TAG);

    ChatsRequest chatsRequest = new ChatsRequest.Builder(userId)
            .chatIDs(chatIds)
            .chats(chats)
            .build();

    return apiService.getChats(chatsRequest);
}

Endpoint

@POST("Chat/sync") Observable<List<Chat>> getChats(@Body ChatsRequest chatsRequest);

The output it as follows

03-11 22:44:45.430 31540-31540/com.project I/chatRequest: com.project.ChatFragment 03-11 22:44:45.430 31540-31540/com.project I/chatRequest: com.project.Repository.ChatRepository 03-11 22:44:45.430 31540-31540/com.project I/chatRequest: com.project.Repository.ChatDiskData getChats 03-11 22:44:45.436 31540-31540/com.project I/chatRequest: com.project.Repository.ChatDiskData getChats 03-11 22:44:45.442 31540-31815/com.project I/chatRequest: com.project.Repository.ChatRepository inside

Why isn't the getChats method of cloudData being called?


Answer:

return Observable.merge(chatDiskData.getChats().observeOn(AndroidSchedulers.mainThread()),
            chatDiskData.getChats().observeOn(AndroidSchedulers.mainThread())
                    .flatMap(new Function<RealmResults<Chat>, ObservableSource<List<Chat>>>() {
                        @Override
                        public ObservableSource<List<Chat>> apply(@NonNull RealmResults<Chat> chats) throws Exception {

                            Log.i("chatRequest", TAG + "  inside");

                            ArrayList<String> chatIds = new ArrayList<>();
                            HashMap<String, Long> chatsTimeStamp = new HashMap<>();

                            for (int i = 0; i < chats.size(); i++) {
                                String chatId = chats.get(i).getChat_id();
                                chatIds.add(chatId);

                                long timestamp = chatDiskData.lastMessage(chatId);
                                chatsTimeStamp.put(chatId, timestamp);
                            }

                            return chatCloudData.getChats(userId, chatIds, chatsTimeStamp).subscribeOn(Schedulers.io())
                                    .doOnNext(chats1 -> Log.i("chatRequest", TAG + "  inside  inside"));
                        }
                    }));

The .observeOn(AndroidSchedulers.mainThread() was the important part as i was using realm and it was fetching data on a different thread.

Question:

Is it possible with rxjava2 to achieve sequence described below? I have classes like:

class Holder {
    List<Image> images;
}
class Image {
    String url;
    String localFileUrl;
}

I need a sequence like:

* Iterate over Holders;
  * For each Holder iterate over Images;
    * Working with each images(decode Bitmap from localFileUrl and upload 
     bitmap to network -> get url for image from network -> write url to 
     Image's file url);
* After iterating over all Holders(on this step all images inside Holders
 should have filled url), make upload for Holders;

I've tried something like:

Observable.fromIterable(holders)
  .concatMap(holder -> {
    Observable.fromIterable(holder.images)
      .concatMap(image -> {
        // decode bitmap here
        uploadImage(bitmap)
      })
  })
// what should I do here to get observable which emits holders,
// not result from uploadImage(bitmap)?
  .toList()
  .flatMap(? -> uploadeHolders());

Is it possible to iterate over data, perform some action for each item on data, and do some work with prepared data (with performed action for each item)?


Answer:

Something like that -

Observable.fromIterable(holders)
  .concatMap(holder -> {
    Observable.fromIterable(holder.images)
      .flatMap(image -> {
        // decode bitmap here
        uploadImage(bitmap)
        .map(url -> {
          image.url = url;
          return image;
        });
      })
      .toList()
      .map(list -> {
        holder.images = list;
        return holder;
      });
  })
  .toList(); // optional, if you need holders as a list

Question:

I'm working on a MVVM app. I've been able to use AndroidViewModel, Activity and MutableLiveData together, but I don't know why am I having problem with this specific method.

For example, I've an AndroidViewModel Called "IssuesViewModel". It has a private MutableLiveData<Boolean> mutable; and this two methods:

// Retrofit 2 call
public void reportIssue(Data data){
    mutable = new MutableLiveData();
    retrofit.reportIssue(Data data, new RestCallback<Void>(){
        @Override
        public void success(Void success){
            mutable.setValue(true);
        }
        @Override
        public void failure(){
            mutable.setValue(false);
        }
    );
}

// Getting mutableLiveData
public MutableLiveData<Boolean> getMutable(){
    return mutable;
}

Now, in my "IssuesActivity", that is extending AppCompatActivity and implementing LifecycleRegistryOwner, I have this methods:

//... Activity callbacks and other methods ...
private IssuesViewModel viewModel;
@Override
protected void onCreate(Bundle savedInstanceState){
    super.onCreate(savedInstanceState)
    // Toolbar code and other stuff...
    viewModel = ViewModelProviders.of(this).get(IssuesViewModel.class);
    // More code ...
}

@Override
public LifecycleRegistry getLifecycle() {
    return new LifecycleRegistry(this);
}

// Triggered after button click
private void reportIssue(Data data){
    viewModel.reportIssue(data);
    viewModel.getMutable().observe(this, new Observer<Boolean>(){
        @Override
        public void onChanged(@Nullable Boolean response){
            if (response != null && response){
                //Success, update UI
            } else {
                //Error, update UI
            }
    }
}

I've been able to use mutableLiveData before (with custom Object instead of Boolean). I did the same for all my previous calls... I don't know why this is not working.

Thanks!

Edit 1, Nov 21th:

I think the problem is that my activity extends AppCompatActivity and implements LifecycleRegistryOwner. I have an Activity with similar code to the one above and everything works, but that Activity is extending LifecycleActivity. When I change this activity, extending AppCompatActivity and implementingLifecycleRegistryOwner, observer's onChange never triggers. I found this bug that is related with my problem, but I think it just works with Fragments...


Answer:

This problem was solved after updating android.arch.lifecycle:extensions to stable version 1.0.0. Now, everything works as expected.

Question:

It seems the zip function doesnt do anything.The LOG of requests with TAG INSIDE prints out the observables but is empty OUTSIDE.In the zip function LOG calls dont do anything. getPosts returns the list of ids. I`m a beginner in Android so maybe I bit more than I can chew with Rxjava,but apparently this is the best solution.Essentially, getPosts returns a list of ids which I should use to compose further requests in getStory.If there anything simpler I`m eager to hear it.Thanks.

MainActivty

package com.example.hackernews;

import androidx.appcompat.app.AppCompatActivity;
import androidx.recyclerview.widget.RecyclerView;

import android.annotation.SuppressLint;
import android.os.Bundle;
import android.util.Log;

import java.util.ArrayList;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;

import static java.lang.Math.min;

public class MainActivity extends AppCompatActivity {
    private RecyclerView mRecyclerView;
    private RecyclerView.Adapter mAdapter;
    private RecyclerView.LayoutManager mLayoutManager;
    private  List<DataResponse> dataResponses;
    private Observable<List<Integer>> ids;
    private List<Observable<DataResponse>> requests = new ArrayList<>();

    @SuppressLint("CheckResult")
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);




//        mRecyclerView = findViewById(R.id.recyclerView);
//        mRecyclerView.setHasFixedSize(true);
//        mLayoutManager = new LinearLayoutManager(getApplicationContext());
//        mRecyclerView.setLayoutManager(mLayoutManager);



        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        OkHttpClient client = new OkHttpClient.Builder()
                .addInterceptor(interceptor)
                .build();

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://hacker-news.firebaseio.com/v0/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .client(client)
                .build();

        HackerNewsApi hackerNewsApi = retrofit.create(HackerNewsApi.class);

        ids = hackerNewsApi.getPosts();

        ids.subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(id -> {
                    for (Integer i : id) {
                        requests.add(hackerNewsApi.getStory(i));
                    }
                    Log.e("onSubscribe", "INSIDE " + requests);
                }, Throwable::printStackTrace);
        Log.e("onSubscribe", "OUTSIDE " + requests);

        Observable.zip(
                requests,
                new Function<Object[], Object>() {
                    @Override
                    public Object apply(Object[] objects) throws Exception {
                        // Objects[] is an array of combined results of completed requests

                        Log.e("onSubscribe", "YOUR OBJECTS ARE HERE: " + objects);
                        // do something with those results and emit new event
                        return new Object();
                    }
                })
                // After all requests had been performed the next observer will receive the Object, returned from Function
                .subscribe(
                        // Will be triggered if all requests will end successfully (4xx and 5xx also are successful requests too)
                        new Consumer<Object>() {
                            @Override
                            public void accept(Object o) throws Exception {
                                //Do something on successful completion of all requests
                                Log.e("onSubscribe", "YOUR OBJECTS ARE HERE: " + o);
                            }
                        },

                        // Will be triggered if any error during requests will happen
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable e) throws Exception {
                                //Do something on error completion of requests
                            }
                        }
                );


//            mRecyclerView = findViewById(R.id.recyclerView);
//            mRecyclerView.setHasFixedSize(true);
//            mLayoutManager = new LinearLayoutManager(this);
//            Log.e("onSubscribe", "YOUR DATA IS HERE: " + dataResponses);
//            mAdapter = new ExampleAdapter(dataResponses);
//
//
//            mRecyclerView.setLayoutManager(mLayoutManager);
//            mRecyclerView.setAdapter(mAdapter);


    }

}

HackerNewsApi

package com.example.hackernews;

import java.util.List;

import io.reactivex.Observable;
import retrofit2.Call;
import retrofit2.http.GET;
import retrofit2.http.Path;

public interface HackerNewsApi {

    @GET("askstories.json?print=pretty")
    Observable<List<Integer>> getPosts();

    @GET("item/{id}.json?print=pretty")
    Observable<DataResponse> getStory(@Path("id") Integer id);
}

Answer:

You create a list of story retrieving observables in the background and concurrently try to use the list being constructed on the main thread.

Why not simply compose over the getPosts() like this?

 hackerNewsApi.getPosts()
 .subscribeOn(Schedulers.io())
 .flatMapIterable(posts -> posts)
 .flatMap(post -> hackerNewsApi.getStory(post))
 .toList()
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(allStories -> { /* ... */ }, error -> { /* ... */ });

flatMapIterable unrolls your initial list of posts and toList recombines them in some order.