Hot questions for Using RxJava 2 in repository pattern

Top Java Programmings / RxJava 2 / repository pattern

Question:

I am pretty new to RxJava and I need to create repository with several datasources. It is complex to me because there are several smaller subtasks which I don't know how to implement with RxJava.

First I have self written dao, which process InputStream, and provides Items in the specified range. Currently it simply collects data in a list, but I want to provide Items one by one using flowable; Currently it privides Maybe<List<Item>>. Also there several errors need to be transmitted to higher level (datasource). Such as EndOfFile, to notify DataSource that data fully cached;

Dao.class:

List<Item> loadRange(int start, int number) throws ... {
    ...
    while(...) {
        ...
        //TODO contribute item to flowable
        resultList.add(new Item(...)) 

    }
    return resultList;
}

Maybe<List<Item>> method just created Maybe.fromCallable();

Please help me!


Answer:

Something like this should work for this :

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(emitter -> {
            try {
                while (...){
                    emitter.onNext(new Item());
                }
                emitter.onComplete();
            } catch (IOException e) {
                emitter.onError(e);
            }
        }, BackpressureStrategy.BUFFER);
    }

I assume once the loop is done you want to complete, also send errors downstream, rather than handle on the method signature. Also you can change the BackPressureStrategy to suit your usecase i.e DROP, LATEST etc..

As you're new to RxJava, the anonymous class would be :

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(new FlowableOnSubscribe<Item>() {
            @Override public void subscribe(FlowableEmitter<Item> emitter) {
                try {
                    while (...){
                        emitter.onNext(new Item());
                    }
                    emitter.onComplete();
                } catch (IOException e) {
                    emitter.onError(e);
                }
            }
        }, BackpressureStrategy.BUFFER);
    }

Question:

I am pretty new to RxJava and I need to create repository with several datasources. It is complex to me because there are several smaller subtasks which I don't know how to implement with RxJava.

I have Dao which provides Flowable<Item> in some range to DataSource class. This data source has local cache which can be invalidated at any time. When repository asks DataSource for some range (which can be out of DataSourse bounds, bounds are unknown until fully cached) it must produce an error (or notify Repository in other way).

I want to create Flowable<Item> method for DataSource which will emit items from cache, and if needed concatenate them with Flowable<Item> dao.getRange(...), meanwhile caching new items, which come from dao. Also I need to do stuff with errors coming from dao, they must be handled or converted to higher level errors.

DataSource.class:

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

    final int cacheSize = cache.size();
    final int canLoadFromCache = cacheSize - start;
    final int loadFromDao = amount - canLoadFromCache;

    if (isCorrupted) return Flowable.fromCallable(() -> {
        throw new Exception("CorruptedDatasource");
    });

    Flowable<Item> cacheFlow = null;
    Flowable<Item> daoFlow = null;

    if (canLoadFromCache > 0) {
        cacheFlow = Flowable.fromIterable(
                cache.subList(start, canLoadFromCache)
        );

        daoFlow = dao.getRange(
                uri, 
                cacheSize, //start
                loadFromDao //amount
        );
    } else {
        if (isFullyCached) return Flowable.fromCallable(() -> {
            throw new Exception("OutOfBounds");
        });

        //To not deal with gaps load and cache data between;
        //Or replace it with data structure which can handle for us;
        daoFlow = dao.getRange(
                uri,
                cacheSize,
                start - cacheSize + amount);
        //all these items should be cached;
        //other cached and put downstream;
        //Dao errs should be converted to higher lever exceptions,
        //Or set flags in DataSource;
    }
    // return concatenated flowable;
}

At the higher level repository concatenates data coming from several DataSources, so there must be a way to cancatinate ranges coming from several sources in a way if one haven't enought then range from next one should be added.

Please help me!


Answer:

Try concat or concatEager which concatenates two observables. Also doOnNext() or doOnError() can help you caching and error handlings

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

    ...
        if (isFullyCached) return Flowable.fromCallable(() -> {
            throw new Exception("OutOfBounds");
        });

        //To not deal with gaps load and cache data between;
        //Or replace it with data structure which can handle for us;
        daoFlow = dao.getRange(
                uri,
                cacheSize,
                start - cacheSize + amount);
        //all these items should be cached;
        //other cached and put downstream;
            .doOnNext(result -> /* insert caching logic here */)
        //Dao errs should be converted to higher lever exceptions,
        //Or set flags in DataSource;
            .doOnError(error -> /* handle error here */)
            .onErrorReturn(/* and/or return some empty item */)
    }
    // return concatenated flowable;
    return cacheFlow.concat(daoFlow);
}