kotlin 将callback转换为co-routine,并对初始调用的主线程进行约束

zi8p0yeb  于 2023-04-07  发布在  Kotlin
关注(0)|答案(1)|浏览(136)

我想把一个回调转换成co-routine,SDK说明API调用需要在主线程上进行,这个方案可行,但我不确定原则上是否正确。
它看起来大致是这样的:

override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> {

        return suspendCancellableCoroutine { continuation ->

            val observer =
                object : Observer<RegisterResponse<Profile?>> {
               fun onNext(t: Profile) {
                 continuation.resume(Success(t))
           }

     CoroutineScope(Dispatchers.Main).launch {
         userManager.register(email, observer)
       }
}
}

在我看来,SDK希望在Main线程上调用观察者回调,但我的进程是在IO线程上的视图模型范围内触发的(以避免阻塞main)。因此,我猜观察者实际上是在IO线程上运行的。
思考如何处理这一问题?

nhaq1z21

nhaq1z211#

为了解决这个问题,如果这个库提供了一个ObservableSource引用,而不是让你传递一个Observer,你可以在上面使用awaitFirst(),这当然比你自己实现要简单。
避免这样做:CoroutineScope(Dispatchers.Main).launch,本质上与使用GlobalScope.launch(Dispatchers.Main)没有什么不同。它创建了一个未绑定(从未取消)的作用域,这是内存泄漏的常见来源。如果调用此suspend函数的协程被取消,您启动的其他协程将不会被通知和取消,因为它不是子级。
其次,另一个协程不会等待它--内部协程的事件可能在未来某个时间到来。
为了确保在主线程上注册API,请使用withContext(Dispatchers.Main)调用整个函数。然后,suspendCancellableCoroutine lambda块将在主线程上运行,因此您将在主线程上调用API注册函数。
关于实现这一点的一些其他要点:

  • Observer有一个onSubscribe函数,它会给你一个Disposable,你可以用它来提前取消。你需要这样做才能支持取消。
  • 多次调用continuation.resume()会使协程崩溃,因此您需要一些保护措施,以防API意外地发出多个项。
  • 我添加了另一个安全措施,以防止订阅结束时没有发出任何内容。
  • onError中,我还检查了continuation.isActive,以避免在订阅结束之前发出单个项并随后发生错误的情况下发生多恢复崩溃。

由于Kotlin协程库是开源的,您可以在这里看到他们如何实现Observable.await,以了解如何正确地完成这种事情。
解决方案应类似于:

override suspend fun registerUser(email: String): ResultHandler<Profile, Exception> = withContext(Dispatchers.Main) {
    suspendCancellableCoroutine { continuation ->
        val observer = object : Observer<RegisterResponse<Profile?>> {
            lateinit var subscription: Disposable
            var seenValue = false

            override fun onSubscribe(disposable: Disposable) {
                subscription = disposable
                continuation.invokeOnCancellation { subscription.dispose() }
            }

            override fun onNext(t: Profile) {
                 if (!seenValue) {
                     seenValue = true
                     continuation.resume(Success(t))
                     subscription.dispose()
                 }
            }

            override fun onComplete() {
                if (continuation.isActive && !seenValue) {
                    continuation.resume(Error(NoSuchElementException("Observer completed without emitting any value.")))
                }
            }

            override fun onError(throwable: Throwable) {
                if (continuation.isActive) continuation.resume(Error(throwable))
            }
        }
        userManager.register(email, observer)
    }
}

相关问题