Hot questions for Using RxJava 2 in multithreading

Top Java Programmings / RxJava 2 / multithreading

Question:

I have a question about the RxJava2. I want to run the consumer in different thread of a fixed thread pool to parallel execute the List result. Here is my code:

    List<String> letters = Lists.newArrayList("a","b","c","d","e","f","g");
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(letters.size());
    Observable.fromIterable(letters).observeOn(Schedulers.from(fixedThreadPool)).forEach(new Consumer<String>() {
        @Override
        public void accept(String data) throws Exception {
            System.out.println(data + " forEach, thread is " + Thread.currentThread().getName());
        }
    });

I got the result is:

a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-1
c forEach, thread is pool-1-thread-1
d forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-1
f forEach, thread is pool-1-thread-1
g forEach, thread is pool-1-thread-1

But actually what I want is this result, each consumor parallel execute in different thread:

a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-2
c forEach, thread is pool-1-thread-3
d forEach, thread is pool-1-thread-4
e forEach, thread is pool-1-thread-5
f forEach, thread is pool-1-thread-6
g forEach, thread is pool-1-thread-7

Can somebody tell me how to make it happen?


Answer:

In order to read the items in paralell threads, use Flowable<> instead of Observable as it provides the parallel operator. For instance:

 Flowable.fromIterable(letters)
         .parallel(letters.size())
         .runOn(Schedulers.from(fixedThreadPool))
         .sequential()
         .forEach(data -> System.out.println(data + " forEach, thread is " + 
                          Thread.currentThread().getName()));

As you can not predict which one of the threads will be used for each invocation, the output may vary. In my testcase I got

c forEach, thread is pool-1-thread-3
g forEach, thread is pool-1-thread-7
a forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-5
d forEach, thread is pool-1-thread-4
b forEach, thread is pool-1-thread-2
f forEach, thread is pool-1-thread-6

For more information, please consult the parallel-flows section of the RxJava Wiki

Question:

I am passing two arguments in just() operator. Snippet of my code is :

Observable<Integer> observable = Observable.just(1,2);
observable.subscribeOn(Schedulers.newThread())
                        .subscribe(
                                new Observer<Integer>() {
                                    @Override
                                    public void onSubscribe(Disposable d) {

                                    }

                                    @Override
                                    public void onError(Throwable e) {

                                    }

                                    @Override
                                    public void onComplete() {

                                    }

                                    @Override
                                    public void onNext(Integer e) {
                                        System.out.println(e);
                                        //request web service

                                });

What I observed is it is not making separate thread for each emitted item. Items appearing as just arguments are running sequentially. How to create separate thread for each emitted item?


Answer:

You can use flatMap and inside the flatMap create the new observable and use the subscribeOn

@Test
public void test() {
    Observable.just(1, 2)
            .flatMap(item -> Observable.just(item)
                    .subscribeOn(Schedulers.newThread())
                    .doOnNext(i -> System.out.println("Thread:" + Thread.currentThread())))
            .subscribe(System.out::println);
}

You can see more examples about async observable here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Question:

I'm trying to implement an RXJava2 Observable<String> off a BufferedReader. So far so good:

public class Launcher {
    public static void main(String[] args) throws Exception {

        Process process = Runtime.getRuntime().exec("ls /tmp");
        BufferedReader reader = new BufferedReader (new InputStreamReader(process.getInputStream()));

        Observable source = Observable.create(emitter -> {
            String line = "";
            while (line != null) {
                line = reader.readLine();
                if (line == null)
                    break;
                System.out.println("Engine " + Thread.currentThread().getName() + " - " + line); //process line
                emitter.onNext(line);
            }
            emitter.onComplete();
        }).subscribeOn(Schedulers.newThread());

        source.subscribe(line -> System.out.println("UI 1   " + Thread.currentThread().getName() + " - " + line));
        source.subscribe(line -> System.out.println("UI 2   " + Thread.currentThread().getName() + " - " + line));

        TimeUnit.SECONDS.sleep(10);

    }
}

onSubscribe makes subscribers be notified in a parallel fashion. Which, if I'm not mistaken, means that the lambda in create() will be executed in parallel for each consumer.

As a consequence, if I have two subscribers, each of them gets half of the lines of the reader. Thread-1 calls readLine() that gets a line Thread-2 will not get, just the next one.

This all makes sense, still, I must be missing something, because I can't figure out how to:

  1. read the lines in one thread
  2. notify all subscribers concurrently - so each gets all lines

I looked into Subjects, tried to chaining Observables, still couldn't figure it out yet.

Edit: I updated the example to a full runnable class. From what I understand the issue is hot vs cold Observables. As if the docs said Observable.create(...) should create a cold one, whereas my code clearly behaves as hot.

Follow-up question: if I add the type parameter making it Observable<String> then the onSubscribe call breaks the code, and it won't compile, as that would return Observable<Object>. Why? Calling onSubscribe on an intermediate parameter oddly works:

Observable<String> source = Observable.create(emitter -> {...});
Observable<String> source2 = source.subscribeOn(Schedulers.newThread())

Answer:

Use publish:

ConnectableObservable<String> o = Observable.create(emitter -> {
    try (BufferedReader reader = ...) {
        while (!emitter.isDisposed()) {
            String line = reader.readLine();
            if (line == null || line.equals("end")) {
                emitter.onComplete();
                return;          
            }
            emitter.onNext(line);
        }
    }
}).subscribeOn(Schedulers.io())
  .publish();

o.subscribe(/* consumer 1 */);
o.subscribe(/* consumer 2 */);

o.connect();

Question:

I have the following unit test in which I try to send 10 Strings from different threads and test that I receive those Strings from a single thread. My problem is that this test flaps. Sometimes it succeeds but sometimes I only receive 8 or 9 items and after that the test hangs until the latch times out. Am I using the SingleScheduler in the wrong way? Did I miss something else?

val consumerCallerThreadNames = mutableSetOf<String>()
val messageCount = AtomicInteger(0)

val latch = CountDownLatch(MESSAGE_COUNT)

@Test
fun someTest() {
    val msg = "foo"

    val subject = PublishSubject.create<String>()
    subject
            .observeOn(SingleScheduler())
            .subscribe({ message ->
                consumerCallerThreadNames.add(Thread.currentThread().name)
                messageCount.incrementAndGet()
                latch.countDown()
            }, Throwable::printStackTrace)

    1.rangeTo(MESSAGE_COUNT).forEach {
        Thread({
            try {
                subject.onNext(msg)
            } catch (t: Throwable) {
                t.printStackTrace()
            }
        }).start()
    }
    latch.await(10, SECONDS)

    assertThat(consumerCallerThreadNames).hasSize(1)
    assertThat(messageCount.get()).isEqualTo(MESSAGE_COUNT)
}

companion object {
    val MESSAGE_COUNT = 10
}

If I rewrite this to use a single threaded ExecutorService the test no longer flaps so the problem is either with Rx or my lacking knowledge about Rx.


Answer:

RxJava has a requirement that calls to on* do not happen at the same time. This means that your code is not thread-safe.

Since only the subject itself is used in a concurrent fashion it should be fixable by serializing (essentially Java's "synchronized") the subject itself using the Subject<T>.toSerialized() method.

val subject = PublishSubject.create<String>() becomes val subject = PublishSubject.create<String>().toSerialized().

Question:

Using RXJava 2, I'm trying to create an asynchronous Event Bus.

I have a singleton object, with a PublishSubject property. Emitters can send an event to the bus using onNext on the subject.

If subscribers have a long task to execute, I want my bus to dispatch the tasks on multiple threads to execute concurrently the tasks. Which means I want the work to start on an item immediatly after the item is emitted, even if the work on the previous item is not completed.

However, even using observeOn with a scheduler, I cannnot run my tasks concurrently.

Sample code:

public void test() throws Exception {
    Subject<Integer> busSubject = PublishSubject.<Integer>create().toSerialized();

    busSubject.observeOn(Schedulers.computation())
            .subscribe(new LongTaskConsumer());

    for (int i = 1; i < 5; i++) {
        System.out.println(i + " - event");
        busSubject.onNext(i);
        Thread.sleep(1000);
    }
    Thread.sleep(1000);
}

private static class LongTaskConsumer implements Consumer<Integer> {
    @Override
    public void accept(Integer i) throws Exception {
        System.out.println(i + " -   start work");
        System.out.println(i + " -     computation on thread " + Thread.currentThread().getName());
        Thread.sleep(2000);
        System.out.println(i + " -   end work");
    }
}

Prints:

1 - event
1 -   start work
1 -     computation on thread RxComputationThreadPool-1
2 - event
3 - event
1 -   end work
2 -   start work
2 -     computation on thread RxComputationThreadPool-1
4 - event
2 -   end work
3 -   start work
3 -     computation on thread RxComputationThreadPool-1
3 -   end work
4 -   start work
4 -     computation on thread RxComputationThreadPool-1
4 -   end work

Which means that the work on item 2 waited for the end of work on item 1, even if the event 2 was already emitted.


Answer:

When the call below happens one worker is created from Schedulers.computation() and is used for the whole stream. That's why all the of the work you submitted is done on RxComputationThreadPool-1.

busSubject.observeOn(Schedulers.computation())
        .subscribe(new LongTaskConsumer());

To schedule work on multiple threads:

busSubject.flatMap(x ->
        Flowable.just(x)
            .subscribeOn(Schedulers.computation()
            .doOnNext(somethingIntensive))
    .subscribe(new LongTaskConsumer());

Note also that the intensive work is performed inside the flatMap rather than in the LongTaskConsumer because all items will arrive serially to LongTaskConsumer.

There are other approaches to doing work in parallel that you may want to investigate depending on how many events are hitting the PublishSubject.

Question:

In my Android Application I used Rxjava2,But some strange situation was appeared.

In my Disposable I print to log current thread name:

    //1
    Observable
            .create((ObservableOnSubscribe<UserModel>) e -> {
                //mock io
                if (phoneNumber.equals("HolyHigh") && password.equals("111111")) {
                    e.onNext(new UserModel());
                    e.onComplete();
                } else {
                    e.onError(new RuntimeException("Error."));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .delay(1, TimeUnit.SECONDS)
            .subscribe(
                    r -> {
                        view.onLoginSuccess(new UserModel());
                        //test
                        String name = Thread.currentThread().getName();
                        Log.e("Thread Name", " Success Current Thread Name: " + name);
                    }
                    , e -> {
                        e.printStackTrace();
                        view.onLoginFailed(e.getMessage());
                        //test
                        String name = Thread.currentThread().getName();
                        Log.e("Thread Name", " Error Current Thread Name: " + name);
                    }
            );

then logged:

Thread Name: Error Current Thread Name: RxComputationThreadPool-3

It looks like observeOn and subscribeOn not working... why not main thread?

However,I wrote some simple ...

//2
    Single.timer(1, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(r -> {
                Log.e("Single Thread Name", "Single Thread Name: " + Thread.currentThread().getName());
                CommonUtil.showToast(r + "~");
            });

and this logged:

Single Thread Name: Single Thread Name: main

where is my mistake?...


Answer:

delay() operator operates by default on the computation scheduler, so it is changed upstream events to get notify on computation thread.

You simply need to change it right before the subscribe, just move the observeOn .observeOn(AndroidSchedulers.mainThread()) after the delay() operator.

BTW, delay() also has overload that gets Scheduler param that lets you change the default Scheduler.

Question:

Im kind of beginner at rxjava and i'm facing a problem.

I have two network calls.

Observable<ClassA> getClassA(){
  return networkExecutor.getClassAFromApi();
}
Observable<ClassB> getClassB(){
  return networkExecutor.getClassBFromApi();
}

now I need both of these in order for code to proceed, so i do a zip on them. Also I want to save those values temporary in private Class scope variables lets say cA and cB;

 getClassA().zipWith(getClassB, (classA, classB) -> {
  cA = classA;
  cB = classB;
  return true;
}

the problem is, when later on i repeat the same step, cA and cB is the old value instead of the new one.

for example first time I call zip method hashes of cA and cB are xxx1, xxx2

when i call the same method second time, new objects are generated from network with hashes xxx3 and xxx4.

when i call this class getter i get cA xxx1 instead i should already get the xxx3.


Answer:

You can use some of the do operators for after the event to clear up hash fields, or clear them before you proceed with new request. Look up Do operators. I would probably use doAfterNext() operator and reset the fields as necessary.

Question:

I have a simple application using RxJava 2:

public static void main(final String[] args) {

    final Scheduler scheduler = Schedulers.from(Executors.newCachedThreadPool());

    final Observable<String> ticker = Observable.interval(1L, TimeUnit.SECONDS)
        .take(10)
        .subscribeOn(scheduler)
        .map(x -> x + "s");

    ticker.subscribe(x -> {
        System.out.println(x);
    });
}

It correctly prints the timer 10 times:

0s
1s
2s
3s
4s
5s
6s
7s
8s
9s

However, the application does no terminate after 9s. It seems that there is some thread keeping it alive.

How should I implement this so that the application terminates after ticker completes?


Answer:

The issue is not related to RxJava specifically , but to the Executors.newCachedThreadPool(), by JVM specifications, the console app wil not exit until all non daemon threads are finished executing. (or explicitly call to System.exit()). Now newCachedThreadPool executer keeps a threads in a cache for future use, and will close the threads only after 60 sec of no use (see docs), so what's happen here is that RxJava uses a thread from this cache and then after all items emitted, you need to wait 60 sec for the app to quit (juat run it and wait).

By the way, interval() acts by default on computation Scheduler, you can override it using overload that also gets Scheduler as a param.

Question:

Anonymous class hold a reference to the enclosing class.

In the following example, I created a small Activity. In the onCreate method, I just add a timer on another Thread, add a CompositeDisposable and clear it in the onDestroy.

Obviously without the CompositeDisposable, it will create a memory leak. With the CompositeDisposable it doesn't create any memory leak but how is it even working ?

RxJava just interrupt the Thread and put null on every callback ? Can you provide some line that do this work in RxJava source code, i suppose it's somewhere near the dispose method.

public class MainActivity extends AppCompatActivity {

private String TAG = "MainActivity";

private CompositeDisposable composite = new CompositeDisposable();

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    composite.add(Flowable
            .just(1)
            .timer(90, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .subscribeWith(new DisposableSubscriber<Long>() {

                @Override
                public void onNext(Long aLong) { sayHello(); }

                @Override
                public void onError(Throwable t) { sayHello(); }

                @Override
                public void onComplete() { sayHello(); }
            }));
}

@Override
protected void onDestroy() {
    super.onDestroy();

    composite.clear();
}

public void sayHello () { Log.w(TAG, "Hello everyone"); }

Answer:

It is precisely in the source of the dispose method. You can probably jump into the source of methods in your libraries within your IDE as well, in IntelliJ it's Ctrl+B on Windows or ⌘B on Mac, and in Eclipse it's F3.

Anyhow, here's the source of the dispose method (comments mine):

@Override
public void dispose() {
    if (disposed) { // nothing to do
        return;
    }
    OpenHashSet<Disposable> set; // this is the same type as our field that holds the Disposables
    synchronized (this) {
        if (disposed) { 
            return; // another thread did it while we got our lock, so nothing to do
        }
        disposed = true; // setting this flag is safe now, we're the only ones disposing
        set = resources; // the references are now in this local variable
        resources = null; // our field no longer has the references
    }

    dispose(set); // from here on out, only this method has the references to the Disposables
}

And then the complete code of the dispose(OpenHashSet<Disposable>) method that we called above on the last line (mostly just error handling which I believe is self-explainatory):

/**
 * Dispose the contents of the OpenHashSet by suppressing non-fatal
 * Throwables till the end.
 * @param set the OpenHashSet to dispose elements of
 */
void dispose(OpenHashSet<Disposable> set) {
    if (set == null) {
        return;
    }
    List<Throwable> errors = null;
    Object[] array = set.keys();
    for (Object o : array) {
        if (o instanceof Disposable) {
            try {
                ((Disposable) o).dispose();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                if (errors == null) {
                    errors = new ArrayList<Throwable>();
                }
                errors.add(ex);
            }
        }
    }
    if (errors != null) {
        if (errors.size() == 1) {
            throw ExceptionHelper.wrapOrThrow(errors.get(0));
        }
        throw new CompositeException(errors);
    }
}

As you can see, at the end of that method, set can now be garbage collected, as nobody is holding a reference to it.

Question:

I've got an application that does a lot of communication between threads by having one thread set a volatile variable on some object which another thread checks. I find this to be very error prone, and I want to try and replace it using RxJava bu there are some cases I can't figure out how to convert.

The case I'm struggling with right now is where I have two threads, lets call one the controller and the other the measurer. The measurer's job is to record some quantity every 100ms. The controller does a lot of work talking to various pieces of the app and every so often it will tell the measurer to change what it's measuring. Right now it does that by setting a volatile variable, and every iteration of the measurer's loop it checks that variable to see what to measure.

The measurer can't be in the same thread as the controller as measuring takes time and the controller can't delay the other work it's doing.

It feels like the solution is something like making the controller an observable which will emit an item whenever the instructions to the measurer need updating but the only way I can see for the measurer to change it's behaviour when an event is received is to have a subscriber to these events setting the volatile variable like before, and then I haven't got anywhere.

I was wondering if I could somehow take the stream of items emitted by the controller and convert it into a stream that repeats each item over and over until the controller emits a different item, then I can subscribe to these repeated items in the measurer which would do a measurement every time it receives one. Is this the right approach, and if it is, how can I transform the items emitted by the controller into a repeated stream of items?


Answer:

I'm relatively new to Rx, but I would use a BehaviorSubject. You can use distinctUntilChanged(), or combine it with a timer Observable:

    public enum Stat { FOO, BAR }

    public class Controller
    {
        private Subject<Stat> statSubject;

        public Controller()
        {
            statSubject = BehaviorSubject.<Stat>create().toSerialized();
        }

        public Observable<Stat> getStatChange()
        {
            return statSubject.distinctUntilChanged();
        }

        public void setStat( Stat stat )
        {
            statSubject.onNext( stat );
        }
    }

    public class Measurer
    {
        public Measurer( Controller controller )
        {
            Observable.timer( 1, TimeUnit.SECONDS, Schedulers.newThread() )
                .repeat()
                .withLatestFrom(
                        controller.getStatChange(),
                        ( __, stat ) -> stat ) // ignore the Long emitted by timer
                .subscribe( this::measureStat );
        }

        private void measureStat( Stat stat )
        {
            switch( stat )
            {
            case FOO:
                measureFoo();
                break;

            default:
                measureBar();
                break;
            }
        }

        private void measureBar()
        {
            System.out.println( "Measuring Bar" );
        }

        private void measureFoo()
        {
            System.out.println( "Measuring Foo" );
        }
    }

    @Test
    public void testMeasureStats() throws InterruptedException
    {
        Controller controller = new Controller();
        controller.setStat( Stat.BAR );

        @SuppressWarnings( "unused" )
        Measurer measurer = new Measurer( controller );

        Thread.sleep( 5000 );

        controller.setStat( Stat.FOO );

        Thread.sleep( 5000 );

        controller.setStat( Stat.BAR );

        Thread.sleep( 5000 );
    }

Output:

Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Foo
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar
Measuring Bar

Question:

For example, I have the following code that uses RxJava library:

public class MultithreadingExample {
public static void main(String[] args) throws InterruptedException {
    Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
            .observeOn(Schedulers.computation())
            .map(numberToString())
            .subscribe(printResult());
    Thread.sleep(10000);
}

private static Func1<Integer, String> numberToString() {
    return number -> {
        System.out.println("Operator thread: " + Thread.currentThread().getName());
        return String.valueOf(number);
    };
}

private static Action1<String> printResult() {
    return result -> {
        System.out.println("Subscriber thread: " + Thread.currentThread().getName());
        System.out.println("Result: " + result);
    };
}

}

And I want events to be processed in the Observer by multiple threads, for example, item '1' by Thread-1, item '2' by Thread-2 and so on.

What is the best way to do it with RxJava?


Answer:

You can use theflatMap() operator.

Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
        .flatMap( number -> Observable.defer( numberToString() )
                              .subscribeOn( Schedulers.computation() ) )
        .observeOn(Schedulers.computation())
        .map(numberToString())
        .subscribe(printResult());

The flatMap() operator will subscribe to the new observable on a (likely new) thread, merging the results back on to the thread where the final observeOn() is done.

Question:

I'm trying to create a mechanism for limiting the number of concurrent network requests. The idea is that I want to have a fixed thread pool of, say, 20 threads, and use that pool to only allow a maximum of 20 outgoing HTTP requests.

What i've been trying to do is:

public class HttpClient {
  private final Scheduler scheduler;

  public HttpClient(int maxRequests) {
    this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
  }

  public Single<...> request() {
    return this.httpRequest()
      .subscribeOn(this.scheduler);
  }

  // sends the http request and returns a response
  private Single<...> httpRequest() {
    return ...
  }
}

But this is not working. I've tried setting the maxRequests to just 1, sending 5 requests, and then setting a breakpoint on the server that is receiving the requests on purpose to keep the first request "stuck" there, in order to see if the other 4 wait for an available thread. But all 5 of them execute, and after a while, I just get a timeout exception on all 5 requests.

I tried using observeOn too, but it didn't work either.

EDIT: I also tried to implement a Semaphore logic with the following code:

public HttpClient(int maxRequests) {
  this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}

public Single<...> request() {
  return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
   .andThen(this.httpRequest())
   .doFinally(concurrentRequestsSemaphore::release);
}

Where Semaphore is the native Java implementation of a semaphore. This mechanic worked as expected where if the maxRequests was 2, and I sent 5 requests, 2 would go out and the other 3 would get stuck inside fromAction waiting. But this approach came with other unexpected behaviors, such as the fact that even after the 2 requests received a response, none of the other 3 were executed because the .doFinally(concurrentRequestsSemaphore::release) never got executed. I did some tests, and it only executed after X requests had received a response. And it was completely unpredictable what X was going to be. So there could be a semaphore of 20 permits, 20 requests would go out and get back a response, and no other would get executed because the semaphore was never released by any request.


Answer:

You did not show the body of private Single<...> httpRequest(). I assume you call there some asynchronous method. Asynchronous methods occupy threads only to handle response, and when the request itself is moving to the server and back, no one thread is used. This explains why you see all 5 requests arrived at the server. Usually, to limit the number of activities of some kind, java.util.concurrent.Semaphores are used, but they limit activities by blocking a thread. Logically, since your program is asynchronous, you need to use asynchronous semaphore, but it is a rare beast. So you have following options:

  • do not limit the number of asynchronous http requests at all, as they do not take much resources anyway
  • launch a special thread which aquires permits from ordinary synchronous Semaphore, and then launches asynchronous http requests. The semaphore is released when a request is fully completed
  • use synchronous launch of http requests with fixed thread pool
  • use asynchronous semaphore. The only implementations I know are in my library DF4J: AsyncSemaphore is an extension of the standard Semaphore and so has both synchronous and asynchronous interface, and InpSignal used only in asynchronous programs. An example of InpSignal usage is at AsyncServerSocketChannel.java, where it is used to limit the number of opened client connections in an Echo Server implementation.

Question:

My event emmiter class has code:

 private val socketListeners: ArrayList<SocketContentListener> = ArrayList()

  //add listener here
 override fun subscribe(socketListener: SocketContentListener) {
        socketListeners.add(socketListener)
    }

       private fun getSocketConnectListener()
                : SocketContentListener {
            /**
             * Post received messages to listeners via Handler
             * because handler helps to set all messages in order on main thread.
             */
            return object : SocketContentListener {

                override fun onUdpServerListenerCreated(inetAddress: InetAddress?, port: Int) {
                    val subscribers = ArrayList<SocketContentListener>(socketListeners)
                    for (listener in subscribers) {
                        Handler(Looper.getMainLooper()).post({ listener.onUdpServerListenerCreated(inetAddress, port) })
                    }
            }
        }

I try to crete Observable:

val udpObservable = Observable.create<Int> { emitter ->
        val listener = object : SocketListener() {
            override fun onUdpServerListenerCreated(inetAddress: InetAddress, port: Int) {
                emitter.onNext(port)
                emitter.onComplete()
            }

        }
        //add listener here
        socketSource.subscribe(listener)
        emitter.setCancellable { socketSource.unSubscribe(listener) }
    }.subscribeOn(Schedulers.io())
            .doOnNext { Log.d("123-thread", "current is: " + Thread.currentThread().name) }
            .onErrorReturn { throw ConnectionException(it) }
            .subscribe()

But during test instead of the expected RxCachedThreadScheduler-1 thread work i saw

  D/123-thread: current is:-> main

So can You help me? please. Where is my mistake? How do I achieve the desired RxCachedThreadScheduler thread for rx chains?


Answer:

The code for creating Observable will be executed on the scheduler, there is no implicit context change anywhere.

  Your events arrive from the listener on the main thread. Then you send them to the emitter on the same thread.

So, subscription go on Schedulers io Thread, but emitters go on Main Thread

 

So the solution is to add observerOn(Schedulers.newThread()) after Observable create. Like this

val udpObservable = Observable.create<Int> { emitter ->
        val listener = object : SocketListener() {
            override fun onUdpServerListenerCreated(inetAddress: InetAddress, port: Int) {
                emitter.onNext(port)
                emitter.onComplete()
            }

        }
        //add listener here
        socketSource.subscribe(listener)
        emitter.setCancellable { socketSource.unSubscribe(listener) }
    }.subscribeOn(Schedulers.io())

      //need add this for work with emmit data on background 
     .observerOn(Schedulers.newThread())
            .doOnNext { Log.d("123-thread", "current is: " + Thread.currentThread().name) }
            .onErrorReturn { throw ConnectionException(it) }
            .subscribe()

Question:

I want to hold my main thread until RxJava 2 Observable complete. My coding is as here. The case is when I request the method contains below code, method was just executing without waiting for observable

 Observable
                        .fromArray(observableList)
                        .subscribeOn(Schedulers.io())
                        .subscribe(new Consumer<List<Observable<List<String>>>>() {
                        @Override
                        public void accept(List<Observable<List<String>>> list) throws Exception {

                            for (Observable<List<String>> observable : list) {
                                System.out.println("Thread name " + Thread.currentThread().getName());

                                observable.subscribe(new Consumer<List<String>>() {
                                    @Override
                                    public void accept(List<String> t) throws Exception {

                                        Gson jsonBuilder = new Gson();

                                        Object obj = new Object();

                                        JsonElement element = jsonBuilder.toJsonTree(obj);

                                        element.getAsJsonObject().addProperty(t.get(0), t.get(1));

                                        Gson g = new Gson();
                                        Object out = g.fromJson(t.get(1), Object.class);

                                        microResponses.put(t.get(0), out);
                                    }
                                });

                            }

                        }


         });

Answer:

You don't need multi-level subscribe, and you shouldn't need to nest your Observables:

Oservable
.fromArray(observableList)
.flatMapIterale(i -> i)
.flatMap(i -> i)
.doOnNext(t -> {
   Gson jsonBuilder = new Gson();
   Object obj = new Object();
   JsonElement element = jsonBuilder.toJsonTree(obj);
   element.getAsJsonObject().addProperty(t.get(0), t.get(1));
   Gson g = new Gson();
   Object out = g.fromJson(t.get(1), Object.class);
   microResponses.put(t.get(0), out);
})
// .subscribe(...) or .doOnComplete(...).subscribe();
;

However, your code as written has several issues:

  • the first four lines are a no-op, as you're not using the JsonElement.
  • Why not use new JsonObject() directly?
  • the Object out = g.fromJson(t.get(1), Object.class); will not produce anything besides an empty JsonObjecy, as an Object does not have any properties.
  • Why are you using List, and not something that makes actual sense, like a Map.Entry<String,Object>, or even an Pair<String, Object>?