How To Respond To Ble Characteristic Notifications By Sending A New Write Command
I'm updating an app to use RxAndroidBLE, and struggling with how to translate my existing callback pattern into an Rx pattern. In particular, I need to respond to characteristic no
Solution 1:
The best approach, according to this Jake Wharton's talk would be to construct an Observable that would emit just values that are needed for updating your model.
(example in Kotlin)
We could have these outputs of the Observable:
sealedclassConnectionEvent{
    object CloseConnection : ConnectionEvent() // dummy event to notify when the connection can be closeddataclassSerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    dataclassBatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    dataclassAnswer4(val byteArray: ByteArray) : ConnectionEvent()
}
And the whole flow could look like this:
bleDevice.establishConnection(false)
        .flatMap { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial numberval continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        when {
                            it != matchingSerialNumber -> Observable.just(ConnectionEvent.CloseConnection as ConnectionEvent) // close connection if serial does not matchelse -> createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
                        }
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .takeWhile { it != ConnectionEvent.CloseConnection } // if the connection is to be closed -> unsubscribe
        .subscribe(
                { connectionEvent ->
                    when(connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )
where the write/notification dance is:
private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.flatMap {
                    when (answerFromBytes(it)) {
                        answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(it)).ignoreEmissions()
                        answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                        answer3 -> when (it.isEmpty()) {
                            true -> Observable.just(ConnectionEvent.CloseConnection)
                            else -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                        }
                        answer4 -> connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()
                                .startWith(Observable.just(ConnectionEvent.Answer4(it)))
                        else -> Observable.error(Exception("Unexpected answer! => ${answerFromBytes(it)}"))
                    }
                }
                        .startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // initiate with the command1
            }
}
I have used an extension function for more clarity:
fun Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<ConnectionEvent>()
Edit:
I have changed the code a bit to get rid of CloseConnection event and leverage the completions of the observables. So now the outputs look like this:
sealedclassConnectionEvent{
    dataclassSerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    dataclassBatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    dataclassAnswer4(val byteArray: ByteArray) : ConnectionEvent()
}
The main flow:
bleDevice.establishConnection(false)
        .map { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial numberval continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        if (it == matchingSerialNumber) createContinuationObservable(connection) // create flow for getting more data via additional writes and notificationselse Observable.empty() // do not continue if serial number does not match
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .publish {
            // create a Completable from the above Observable.concat()val dataDownloadCompletable = it.take(1) // take the first emission (there will be only one)
                    .flatMapCompletable { it.ignoreElements() } // and wait until the first emission completes
            it.takeUntil(dataDownloadCompletable.toObservable<Any>()) // when dataDownloadCompletable completes —> unsubscribe from the upstream, mainly .establishConnection() to close it
        }
        .flatMap { it } // unwrap the above flow
        .subscribe(
                { connectionEvent ->
                    when (connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )
Write/notification part:
private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.map { Pair(answerFromBytes(it), it) } // map every response to a pair of <answer, bytes>.startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // and start with writing command1 to initiate the data exchange
            }
            .takeWhile { (answer, bytes) -> !(answer == answer3 && bytes.isEmpty()) } // end the createContinuationObservable on the first answer3 with an empty bytes.flatMap<ConnectionEvent> { (answer, bytes) ->
                when (answer) {
                    answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(bytes)).ignoreEmissions()
                    answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                    answer3 -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                    answer4 -> Observable.just(ConnectionEvent.Answer4(bytes)) // when answer4 is received emit actionable item to update the model.concatWith(connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()) // and send the next command5
                    else -> Observable.error(Exception("Unexpected answer! => $answer"))
                }
            }
}
And the extension:
fun<T> Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<T>()
Post a Comment for "How To Respond To Ble Characteristic Notifications By Sending A New Write Command"