Hot questions for Using RxJava 2 in micronaut

Question:

I've been out of the Java game for ~8 years and a lot has changed since then. The biggest challenge for me has been RxJava / reactive. I'm looking for rough guidance on how to do the equivalent below in a fully reactive way.

The basic requirement, implemented below using ThreadPoolExecutor, is to processing a large amount of Stuff by calling a remote web service, which has a documented rate limit of 100 requests/minute. My goal is to process as much as possible as fast as possible, without dropping any Stuff but still honoring the downstream rate limit. This code has been simplified to avoid errors, bulkheads, circuit breakers, retry logic, etc.

This code currently works fine but it results in what feels like a lot of wasted threads given all the non-blocking reactive options. Even the HTTP client I'm using to call my service offers back a Flowable, which I'm simply blocking on in each of the executor's 20 threads.

I'd love to understand what the reactive equivalent should be. Where I've struggled is almost all the docs I find showcase using static sources for the Observable (ex: Observable.fromArray(1,2,3,4,5)). I know the solution likely involves IoScheduler and maybe groupBy, but I have yet to figure out how to merge the Flowables coming from my HTTP client into some complete chain that does parallelization (up to a limit, such as 20) and rate limiting.

public class Example {
    private static final int THREADS = 20;

    // using https://docs.micronaut.io/latest/guide/index.html#httpClient
    @Client("http://stuff-processor.internal:8080")
    @Inject
    RxHttpClient httpClient;

    private ThreadPoolExecutor executor;
    private final RateLimiter rateLimiter;

    public Example() {
        // up to 20 threads to process the unbounded queue
        // incoming Stuff is very bursty...
        // ...we could go hours without anything and then hundreds could come in
        this.executor = new ThreadPoolExecutor(THREADS, THREADS,
                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        this.executor.allowCoreThreadTimeOut(true);

        // using https://resilience4j.readme.io/docs/ratelimiter
        RateLimiterConfig config = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(60))
                .limitForPeriod(100)
                .timeoutDuration(Duration.ofSeconds(90))
                .build();
        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
    }

    /**
     * Called when the user takes an action that can cause 1 or 1000s of new
     * Stuff to be entered into the system. Each instance of Stuff results in
     * a separate call to this method. Ex: 100 Stuffs = 100 calls.
     */
    void onNewStuff(Stuff stuff) {
        final Runnable task = () -> {
            final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
                    HttpRequest.POST("/process", stuff),
                    Boolean.class);

            final HttpResponse<Boolean> response = flowable.blockingFirst();
            if (response.body()) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail :(");
            }
        };

        final Runnable rateLimitedTask = 
                RateLimiter.decorateRunnable(rateLimiter, task);
        executor.submit(rateLimitedTask);
    }
}

Thank you!


Answer:

First, to build this in a completely non-blocking manner, you need to use a non-blocking, asynchronous HTTP client library like Netty. I am not sure about how RxHttpClient works.

Say you have a list stuffs. This is how I would do it:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap merges the responses as they come.

To limit the rate, you flatMap has a second parameter, which caps the number of inner streams it subscribes to in parallel. Say you want to make no more than 10 calls at once. Do this:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();

Question:

The actual code is here

private Function<String, Mono<? extends Offer>> keyToOffer(RedisReactiveCommands<String, String> commands) {
        return key -> {
            Flux<KeyValue<String, String>> values = commands.hmget(key, "price", "description");
            Map<String, String> map = new HashMap<>(3);
            return values.reduce(map, (all, keyValue) -> {
                all.put(keyValue.getKey(), keyValue.getValue());
                return all;
            })
                    .map(ConvertibleValues::of)
                    .flatMap(entries -> {
                        String description = entries.get("description", String.class).orElseThrow(() -> new IllegalStateException("No description"));
                        BigDecimal price = entries.get("price", BigDecimal.class).orElseThrow(() -> new IllegalStateException("No price"));
                        Flowable<Pet> findPetFlowable = petClient.find(key).toFlowable();
                        return Mono.from(findPetFlowable).map(pet -> new Offer(pet, description, price));
                    });
        };
    }

I have tried in a variety of different ways to convert above into groovy and all attempts so far not worked out too well. I wondered if anyone better with groovy could help

My attempt wasn't posted since the code itself firstly returns Ambiguous code block in Intelij and secondly looks totally wrong.

 private Function<String, Mono<? extends Orders>> keyToOrder(RedisReactiveCommands<String, String> commands) {
    return { key -> {
        Flux<KeyValue<String, String>> values = commands.hmget(key, "price", "description");
        Map map = [:]
        return values.reduce(map, (all, keyValue)= {all.put(keyValue.getKey(), keyValue.getValue());
            return all
        }).map({entries -> ConvertibleValues.of(entries)})
                .flatMap({entries -> {
            String description = entries.get("description", String.class).orElseThrow({ new IllegalStateException("No description")});
            BigDecimal price = entries.get("price", BigDecimal.class).orElseThrow({new IllegalStateException("No price")});
            Flowable<Item> findItemFlowable = itemClient.find(key).toFlowable();
            return Mono.from(findItemFlowable).map({item -> new Orders(item, description, price)});
        }});
    }}
}

When attempting to convert to groovy the biggest struggle appeared to be around:

return values.reduce(map, (all, keyValue)= {all.put(keyValue.getKey(), keyValue.getValue());
                    return all

This is nothing like what the original java code looked like and really unsure if it would even behave as it should. The issue I had was to find anything aorund RXJAVA Flux .reduce written in groovy out there.

The Ambiguous code block is around this entire flatMap segment at the very bottom

 .flatMap({entries -> {

I haven't checked in this change nor did I post it since frankly it was embarrassing.

I have also come across: http://reactivex.io/documentation/operators/reduce.html#collapseRxGroovy

numbers.reduce({ a, b -> a+b }).

and ended up with:

Map<String, String> map = new HashMap<>(3);
            return values.reduce({all, keyValue->
                all.put(keyValue.getKey(), keyValue.getValue());
                return all
        }).map({entries -> ConvertibleValues.of(entries)})

But this again looks wrong and doesn't really match what the java code was doing.

Final edit to suggest I have got Intelij to accept the code as groovy but not quite sure if it is what the java code was actually doing, since declared map is not even used:

private Function<String, Mono<? extends Orders>> keyToOrder(RedisReactiveCommands<String, String> commands) {
    Flux<KeyValue<String, String>> values = commands.hmget(key, "price", "description");
        Map<String, String> map = new HashMap<>(3);
        values.reduce({all, keyValue->
            all.put(keyValue.getKey(), keyValue.getValue());
            return all
    }).map({entries -> ConvertibleValues.of(entries)})
                .flatMap({entries ->  bindEntry(entries)});
    return values.key
}

private Mono<Orders> bindEntry(entries) {
    String description = entries.get("description", String.class).orElseThrow({ new IllegalStateException("No description")});
    BigDecimal price = entries.get("price", BigDecimal.class).orElseThrow({new IllegalStateException("No price")});
    Flowable<Item> findItemFlowable = itemClient.find(key).toFlowable();
    return Mono.from(findItemFlowable).map({item -> new Orders(item, description, price)});

}

Answer:

The likely issue you're facing is because Groovy doesn't support Java method references or lambdas.

The first line is returning a lambda

Java: return key -> {

Groovy: return { key - >

That is using a groovy closure that takes the key as the argument.

Other places that use method references need to be converted

Java: .map(ConvertibleValues::of)

Groovy: .map({ values -> ConvertibleValues.of(values) })

It seems like you have most of that worked out, however you specifically asked about the map not being used. That is because you simply aren't passing it to the method.

values.reduce({all, keyValue-> vs values.reduce(map, {all, keyValue ->