Hot questions for Using RxJava 2 in realm

Question:

I use original Realm wrapper for RxJava. In this way:

Flowable<RealmResult<RealmUser> result =
realm.where(RealmUser.class)
.findAllAsync()
.asFlowable();

Data are loaded on Realm Background thread and emitted to Main thread. My problem is that I need to map large amount of Realm object to own POJO (independent business layer). Realm objects can not be passed between threads. It means that I must map objects on Main thread which is undesirable and blocks Main thread. Do you know to perform mapping on background thread and use similar way? Is possible emits unmanaged copy of RealmResult (on Background thread directly from Realm)? I can not find similar way.

Usage of copyFromRealm is not good choice, becasue result is same as in case of my custom mapping.

At this time I use new model class for users only with attributech which I need. Result is lower time of mapping, but on Main thread which cause UI artifacts.

Thanks


Answer:

If you don't want to micro-manage the existence of a background looper thread on which you actually execute your queries:

private Observable<List<User>> createResults() {
    return Observable.create((ObservableOnSubscribe<List<User>>) emitter -> {
        Realm realm = Realm.getDefaultInstance();
        final RealmResults<RealmUser> results = realm.where(RealmUser.class).findAllAsync();

        final RealmChangeListener<RealmResults<RealmUser>> realmChangeListener = element -> {
            if(element.isLoaded() && !emitter.isDisposed()) {
                List<User> users = mapFrom(element);
                if(!emitter.isDisposed()) {
                    emitter.onNext(users);
                }
            }
        };
        emitter.setDisposable(Disposables.fromAction(() -> {
            if(results.isValid()) {
                results.removeChangeListener(realmChangeListener);
            }
            realm.close();
        }));
        results.addChangeListener(realmChangeListener);
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler());
}

Then you can do the following short-cut, although it evaluates your query twice:

Flowable<RealmResult<RealmUser> result =
    realm.where(RealmUser.class)
         .findAllAsync()
         .asFlowable()
         .subscribeOn(AndroidSchedulers.mainThread())
         .unsubscribeOn(AndroidSchedulers.mainThread())
         .filter(RealmResults::isLoaded)
         .observeOn(Schedulers.from(singleThreadedExecutor))
         .map((ignored) -> {
             RealmConfiguration configuration = realm.getConfiguration();
             try(Realm realm = Realm.getInstance(configuration)) {
                 RealmResults<RealmUser> results = realm.where(RealmUser.class).findAll();
                 return mapToUsersList(results);
             }
         })
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe((users) -> { ...

Question:

The function fetchData is in a fragment. The dataRepository class contains the function that i use to fetch data from the disk:

@Override
public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
    final View view = inflater.inflate(R.layout.fragment_user, container, false);

    realm = Realm.getDefaultInstance();

    Log.i("realm", "oncreateView");

    items = new ArrayList<>();

    try {
        Log.i("realm", "check" + (realm == null));
    } catch (Exception e) {
        e.printStackTrace();
    }

    fetchData();

    return view;
}

@Override
public void onDestroy() {
    super.onDestroy();
    if (realm != null && !realm.isClosed())
        realm.close();
    Log.i("realm", "ondestroy");
}

    private void fetchData() {

    abcDataRepository = DataRepository.getInstance(realm);

    Observable<List<User>> observable = bcDataRepository.getUser_List();

    observable.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(Users-> {
                Log.i("data", Users.size() + "");

                for (int i = 0; i < Users.size(); i++) {
                    items.add(Users.get(i));
                }
            });
}

The DataRepository class:

public class DataRepository {

    private Realm realm;

private static PoliticoDataRepository dataRepository = null;

private PoliticoDataRepository(Realm realm) {
    this.realm = realm;
}

public static PoliticoDataRepository getInstance(Realm realm) {
    if (dataRepository == null)
        dataRepository = new PoliticoDataRepository(realm);

    return dataRepository;
}

@Override
public Observable<List<User>> getUser_List() {

    final UserDiskData diskData = new UserDiskData(realm);

    if (diskData.User_Size() > 0) {
        return diskData.getUser_List();
    } else {
        final UserCloudData cloudData = new UserCloudData(realm);
        return cloudData.getUser_List();
    }
}
}

The UserDiskData class:

class  UserDiskData {

private Realm realm;

UserDiskData(Realm realm) {
    this.realm = realm;
}

int User_Size() {
    final RealmResults<User> results = realm.where(User.class).findAll();
    Log.i("disk", "pulledData" + results.size());
    return results.size();
}

@Override
public Observable<List<User>> getUser_List() {

    Log.i("disk", "called");

    final RealmResults<User> results = realm.where(User.class).findAll();
    ArrayList<User> list = new ArrayList<>();
    for (int i = 0; i < results.size(); i++)
        list.add(results.get(i));

    return Observable.just(list);
}

}

The first time the app runs fine but when i exit the app and open it again, it crashes and gives an error saying This Realm instance has already been closed, making it unusable. It works perfectly fine if i skip out the realm.close at the end. Should i manually close the realm object, if so how? For some reason realm==null returns true when i close the app and open it again, why does it do that, especially considering realm.getdefaultinstance is being called right before it.


Answer:

My colleague @zaki50 points out:

DataRepository keeps a static reference to PoliticoDataRepository. That means that DataRepository.getInstance(Realm realm) can return an old instance of PoliticoDataRepository which contains the closed Realm instance.

You must remove static field and make sure that DataRepository.getInstance(Realm realm) always returns an instance that refers to the passed Realm instance.

Question:

I want merge realm result and network search and observe this result on UI.

below code work well in offline use but when updated data from net ,Realm Auto Refresh Does not consider last search query just find all result.

What's better pattern to achive this perpose?

rx_search View

 @Override
public void search(Observable<CharSequence> searchObservable) {
    isSearching = true;
    Disposable subscribe = searchObservable
            .map(CharSequence::toString)
            .filter(searchViewQueryTextEvent ->
                    searchViewQueryTextEvent.length() > 1).distinctUntilChanged()
            .flatMap(s -> {
                    String[] s1=s.trim().split(" ");
                    String s2="";
                    for (String e:s1){
                        s2=s2+e+"*";
                    }

               return Observable.just(s2);
                })
              //switchMap to get result from repository
            .switchMap(s -> mInteractor.searchStores(s).onBackpressureLatest().toObservable())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(stores -> {
                        if (mView != null) {
                            mView.setList(stores);
                        }
                    }, Throwable::printStackTrace
                    , () -> {
                    }, d -> disposable.add(d));
    disposable.add(subscribe);

get result from realm and update database from net :

@Override
public Flowable<RealmResults<Stores>> searchStores(String searchString) {
    searchNetwork(searchString); //  after update from net realm emit all result table
    try (Realm realm = Realm.getDefaultInstance()) {
         //Query work for first time and after update data  flowble 
         //emit all item and Does not consider  query
        RealmQuery<Stores> query = realm.where(Stores.class).like("name", searchString, Case.INSENSITIVE);
        return query
                .findAll()
                .asFlowable();

    }
}
  private void searchNetwork(CharSequence searchString) {
    api.searchList(searchString)
            .doOnError(Throwable::printStackTrace)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .filter(statusPojo::getStatus)
            .switchMap(data -> Observable.just(data.getData()))
            .subscribe(new DisposableObserver<List<Stores>>(){

                @Override
                public void onNext(List<Stores> stores) {
                    try (Realm realm = Realm.getDefaultInstance()) {
                        realm.executeTransaction(r -> r.insertOrUpdate(stores);
                    }
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                }
            })
    ;
}

Answer:

i use switchMap to set query every time realm flowble emit changed result and its work :

@Override
public Flowable<RealmResults<Stores>> searchStores(String searchString) {
searchNetwork(searchString);

 try (Realm realm = Realm.getDefaultInstance()) {

    RealmQuery<Stores> query = realm.where(Stores.class).like("name", searchString, 
    Case.INSENSITIVE);
    return query
            .findAll()
            .asFlowable()
            .onBackpressureLatest()
            .flatMap(realmResultsCollectionChange -> 
                      query.findAll().asFlowable());

}
}