kotlin 将RXJava Single转换为协同程序的Deferred?

c2e8gylq  于 2023-03-03  发布在  Kotlin
关注(0)|答案(5)|浏览(143)

我有一个来自RxJava的Single,想继续使用一个来自Kotlin协程的Deferred。如何实现?

fun convert(data: rx.Single<String>): kotlinx.coroutines.Deferred<String> = ...

我会对一些库感兴趣(如果有库的话?),也会对自己做这个感兴趣......到目前为止,我自己做了这个手工实现:

private fun waitForRxJavaResult(resultSingle: Single<String>): String? {
    var resultReceived = false
    var result: String? = null

    resultSingle.subscribe({
        result = it
        resultReceived = true
    }, {
        resultReceived = true
        if (!(it is NoSuchElementException))
            it.printStackTrace()
    })
    while (!resultReceived)
        Thread.sleep(20)

    return result
}
lrl1mhuk

lrl1mhuk1#

有一个集成了RxJava和协程的库:https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive/kotlinx-coroutines-rx2
但是在那个库中没有直接将single转换为Deferred的函数,原因可能是RxJava Single没有绑定到协程作用域,如果你想将它转换为Deferred,你需要提供一个CoroutineScope
您可能会像这样实现它:

fun <T> Single<T>.toDeferred(scope: CoroutineScope) = scope.async { await() }

Single.await函数(用于async块)来自kotlinx-coroutines-rx 2库。
您可以像这样调用函数:

coroutineScope {
    val mySingle = getSingle()
    val deferred = mySingle.toDeferred(this)
}
bsxbgnwa

bsxbgnwa2#

只需将您的单曲设置为暂停功能:

suspend fun waitForRxJavaResult(): String? = suspendCoroutine { cont ->
        try {
            val result = resultSingle.blockingGet()
            cont.resume(result)
        } catch (e: Exception){
            cont.resume(null)
        }
    }
u4dcyp6a

u4dcyp6a3#

implementation 'io.reactivex.rxjava2:rxandroid:+'
implementation 'io.reactivex.rxjava2:rxjava:+'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-rx2:+'

并在代码中添加:

fun <T> Maybe<T>.toDeferred(coroutineScope: CoroutineScope) = coroutineScope.async { awaitSingleOrNull() }

fun <T> Single<T>.toDeferred(coroutineScope: CoroutineScope) = coroutineScope.async { await() }

用法:

lifecycleScope.launch {
            val result = getItemMaybe().toDeferred(this).await()
            ...
        }


 fun getItemMaybe(): Maybe<Int> {
       
 }
xqkwcwgp

xqkwcwgp4#

这个怎么👇样

suspend fun <T> Observable<out T>.toCoroutine(): T {
  return suspendCancellableCoroutine { continuation ->
    val subscription = this.subscribe(
      { continuation.resume(it) },
      { continuation.resumeWithException(it) }
    )
    continuation.invokeOnCancellation {
      subscription.unsubscribe()
    }
  }
}

我尝试用suspendCancellableCoroutine块生成器将回调方面转换为协程!

rslzwgfq

rslzwgfq5#

这将RxJava2 Single转换为带取消的暂停乐趣。

suspend fun <T> Single<T>.await(): T {
    return suspendCancellableCoroutine { cont ->
        val disposable = subscribe({ cont.resume(it) }, { cont.resumeWithException(it) })
        cont.invokeOnCancellation { disposable.dispose() }
    }
}

相关问题