Hot questions for Using RxJava 2 in rxandroidble

Top Java Programmings / RxJava 2 / rxandroidble

Question:

I am trying to implement this example of write/notification handling (Using RxAndroidBle, how do I subscribe to responses from writing to a characteristic?).

connectionObservable
                    .flatMap((Function<RxBleConnection, Observable<Observable<byte[]>>>)
                            (RxBleConnection rxBleConnection) -> {
                                return rxBleConnection.setupNotification(TX_CHAR_UUID);
                            },
                            (BiFunction<RxBleConnection, Observable<byte[]>, Observable<byte[]>>)
                                    (rxBleConnection, apScanDataNotificationObservable) -> {
                                        return Observable.combineLatest(
                                                rxBleConnection.writeCharacteristic(RX_CHAR_UUID, getInputBytes()),
                                                apScanDataNotificationObservable.first(),
                                                new BiFunction<byte[], byte[], byte[]>() {
                                                    @Override
                                                    public byte[] apply(byte[] writtenBytes, byte[] responseBytes) throws Exception {
                                                        return responseBytes;
                                                    }
                                                }
                                        );
                                    }
                    ).flatMap(new Function<Observable<byte[]>, Observable<byte[]>>() {
                @Override
                public Observable<byte[]> apply(Observable<byte[]> observable) throws Exception {
                    return observable;
                }

            })
                    .first()
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<byte[]>() {
                        @Override
                        public void accept(byte[] bytes) throws Exception {
                            Log.i("Ivan1", "notification response...." + bytes.toString());
                        }

                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.i("Ivan", "notification response...." + throwable.toString());

                        }

                    });

I tried to write with rxjava1 and rxjava2, but in both cases I got a compile time error for apScanDataNotificationObservable.first(). It says "first(byte[]) in Observable cannot be applied to ()". So I dont know what argument I should pass to the first method.


Answer:

The reason why you get this error is because the original answer was provided for a RxJava1 version of the RxAndroidBle and you are using RxJava2 at least in the above example.

Between RxJava1 and RxJava2 the method Observable.first() has changed signature and implementation. Equivalent function for RxJava2 is Observable.take(int count)

You should change this line:

apScanDataNotificationObservable.first(),

To this:

apScanDataNotificationObservable.take(1),

Also, the Observable.combineLatest() accepts two Observable parameters where rxBleConnection.writeCharacteristic() is a Single. You should change this line:

rxBleConnection.writeCharacteristic(RX_CHAR_UUID, getInputBytes()),

To this:

rxBleConnection.writeCharacteristic(RX_CHAR_UUID, getInputBytes()).toObservable(),

Question:

I am just discovering rxandroidble and can reliably send a single command to the BLE device after connection

However I am struggling to find the best way to write a chain of commands, ie if I have a series of 3 commands that need to be sent

Of course this can be done by nesting the sends, but Im sure there is a better approach!!

Single command send code is

rxBleMainConection.writeCharacteristic(COMS_WRITE_CHAR_UUID,bytes).toObservable()
.subscribe(
                    characteristicValue -> {
                        // Written characteristic value.
                        Log.d(TAG,"Written command: " + Arrays.toString(characteristicValue));

                    },
                    throwable -> {
                        // Handle an error here.
                        Log.d(TAG,"Error writing command");
                        throwable.printStackTrace();
                    }
            );

What is the best way to send a series of say 5 commands?


Answer:

You could concatenate all the writes you want to make like this:

Single.concat(Arrays.asList(
        rxBleMainConnection.writeCharacteristic(COMS_WRITE_CHAR_UUID, bytes0),
        rxBleMainConnection.writeCharacteristic(COMS_WRITE_CHAR_UUID, bytes1),
        rxBleMainConnection.writeCharacteristic(COMS_WRITE_CHAR_UUID, bytes2),
        rxBleMainConnection.writeCharacteristic(COMS_WRITE_CHAR_UUID, bytes3),
        // ...
        rxBleMainConnection.writeCharacteristic(COMS_WRITE_CHAR_UUID, bytesn)
))
        .subscribe(
                characteristicValue -> {
                    // Written characteristic value.
                    Log.d(TAG, "Written command: " + Arrays.toString(characteristicValue));
                },
                throwable -> {
                    // Handle an error here.
                    Log.d(TAG, "Error writing command");
                    throwable.printStackTrace();
                },
                () -> {
                    Log.d(TAG, "All writes completed");
                }
        );

I would encourage you to take a look on other questions regarding "multiple writes" with RxAndroidBle that were already asked on this site. There are some posts that could give you hints/ideas.

As a side note: it is best to create code that uses only a single .subscribe() as then you have the least state you need to manage by yourself.

Question:

I'm writing an Android application that is using RxAndroidBle, to support my device I need a higher MTU

I followed the provided library example:https://github.com/Polidea/RxAndroidBle/wiki/Tutorial:-MTU-negotiation

But it is not compiling

private ObservableTransformer<RxBleConnection, RxBleConnection> mtuNegotiationObservableTransformer = upstream -> {
        return upstream.doOnSubscribe(ignoredDisposable -> Log.i("MTU", "MTU negotiation is supported")
                     .flatMapSingle(connection ->
                                connection.requestMtu(GATT_MTU_MAXIMUM)
                                        .doOnSubscribe(ignoredDisposable -> Log.i("MTU", "Negotiating MTU started"))
                                        .doOnSuccess(mtu -> Log.i("MTU", "Negotiated MTU: " + mtu))
                                        .ignoreElement()
                                        .andThen(Single.just(connection)));
    };

The compiler message is: cannot resolve method 'flatmapsingle'

Why is it that it is not working? In other parts of my code I´m using .flatMapSingle without a problem. Thanks for helping!


Answer:

There seem to be a mistake in number of closing brackets. Try the below code:

private ObservableTransformer<RxBleConnection, RxBleConnection> mtuNegotiationObservableTransformer = upstream -> {
    if (Build.VERSION.SDK_INT < Build.VERSION_CODES.LOLLIPOP) {
        return upstream.doOnSubscribe(ignoredDisposable -> Log.i("MTU", "MTU negotiation is not supported")); // added a closing bracket here
    }

    return upstream
            .doOnSubscribe(ignoredDisposable -> Log.i("MTU", "MTU negotiation is supported")) // and here
                    .flatMapSingle(connection ->
                            connection.requestMtu(GATT_MTU_MAXIMUM)
                                    .doOnSubscribe(ignoredDisposable -> Log.i("MTU", "Negotiating MTU started"))
                                    .doOnSuccess(mtu -> Log.i("MTU", "Negotiated MTU: " + mtu))
                                    .ignoreElement()
                                    .andThen(Single.just(connection)));
};