xDocxDoc
AI
前端
后端
iOS
Android
Flutter
AI
前端
后端
iOS
Android
Flutter
  • 将Kotlin协程的挂起函数和Flow转换为回调API

将挂起函数或流转换为回调

许多传统 API 使用回调报告异步操作结果。在 Kotlin 中,虽然挂起函数和 Flow 能以更可读的方式实现相同效果,但有时需要将其转换为基于回调的 API(尤其与遗留库/框架集成时)。本文详细探讨转换技术。

1️⃣ 将挂起函数转换为回调

挂起函数必须在协程中启动,最简单方式是使用协程作用域的 launch。转换要点:

  • 成功时通过回调返回结果
  • 异常时调用错误回调
  • 返回 Job 支持取消操作
// 原始挂起函数
suspend fun fetchData(): String {
    // 实际业务逻辑(网络请求/数据库操作等)
}

val scope = CoroutineScope(SupervisorJob())

// 转换为回调 API
fun fetchData(
    onSuccess: (String) -> Unit,
    onError: (Throwable) -> Unit,
): Job = scope.launch {  // 返回 Job 支持取消
    try {
        val result = fetchData()  // 执行挂起函数
        onSuccess(result)         // 成功回调
    } catch (e: Throwable) {
        onError(e)                // 失败回调
    }
}

🧩 生产环境增强实现

通过依赖注入提供作用域,封装可取消接口:

class AnkiConnectorCallback(
    private val connector: AnkiConnector, // 业务逻辑组件
    private val scope: CoroutineScope    // 依赖注入的作用域
) {
    // 封装可取消的操作
    fun checkConnection(
        onSuccess: (Boolean) -> Unit,
        onError: (Throwable) -> Unit
    ): Cancellable = scope.asyncWithCallback(onSuccess, onError) {
        connector.checkConnection()  // 实际挂起调用
    }
  
    // 扩展函数:标准化回调转换
    fun <T> CoroutineScope.asyncWithCallback(
        onSuccess: (T) -> Unit,
        onError: (Throwable) -> Unit,
        body: suspend () -> T        // 挂起函数体
    ): Cancellable {
        val job = launch {
            try {
                onSuccess(body())    // 执行并返回结果
            } catch (t: Throwable) {
                onError(t)           // 捕获异常
            }
        }
        return Cancellable(job)      // 返回可取消句柄
    }

    // 取消操作封装类
    class Cancellable(private val job: Job) {
        fun cancel() {
            job.cancel()  // 取消底层协程
        }
    }
}

2️⃣ 将 Flow 转换为回调函数

由于 Flow 的 collect 是挂起函数,需通过包装在协程中启动:

🎛️ Flow 回调订阅器

/**
 * Flow 回调订阅器
 * @param scope 协程作用域(Flow 随作用域取消而终止)
 * @param onEach 元素发射回调
 * @param onError 异常回调
 * @param onStart Flow 启动回调
 * @param onCompletion 完成回调(含异常/正常结束)
 */
fun <T> Flow<T>.subscribe(
    scope: CoroutineScope,
    onEach: ((T) -> Unit)? = null,
    onError: ((Throwable) -> Unit)? = null,
    onStart: (() -> Unit)? = null,
    onCompletion: (() -> Unit)? = null
): Job {
    return this
        .applyIfNotNull(onEach) { flow -> 
            flow.onEach { onEach?.invoke(it) }  // 元素处理
        }
        .applyIfNotNull(onStart) { flow -> 
            flow.onStart { onStart?.invoke() }  // 启动回调
        }
        .applyIfNotNull(onCompletion) { flow -> 
            flow.onCompletion { onCompletion?.invoke() }  // 完成回调
        }
        .applyIfNotNull(onError) { flow -> 
            flow.catch { onError?.invoke(it) }  // 异常捕获
        }
        .launchIn(scope)  // 在作用域中启动
}

// 工具函数:条件应用扩展
private inline fun <T> Flow<T>.applyIfNotNull(
    value: Any?, 
    block: (Flow<T>) -> Flow<T>
): Flow<T> = if (value != null) block(this) else this

📦 Flow 回调包装类

class FlowCallback<T>(
    private val flow: Flow<T>,
    private val scope: CoroutineScope
) {
    fun subscribe(
        onEach: ((T) -> Unit)? = null,
        onError: ((Throwable) -> Unit)? = null,
        onStart: (() -> Unit)? = null,
        onCompletion: (() -> Unit)? = null
    ): Job = flow.subscribe(  // 复用前述扩展函数
        scope = scope,
        onEach = onEach,
        onError = onError,
        onStart = onStart,
        onCompletion = onCompletion
    )
}

🎯 关键实现细节

  1. 取消传播
    当父作用域取消时,所有通过 subscribe() 或 asyncWithCallback() 启动的 Flow/协程会自动取消

  2. 异常隔离
    使用 SupervisorJob 防止单个回调失败影响其他操作:

  3. 资源清理
    onCompletion 回调确保在 Flow 终止时(无论成功/失败)执行清理逻辑

  4. 线程安全
    所有回调在协程的调度器线程执行,需注意线程切换需求


✨ 应用场景

  1. Android 原生 SDK 集成

    // 将 CameraX 的 Flow 转换为回调
    cameraProvider.bindToLifecycle(
         lifecycleOwner, 
         cameraSelector, 
         previewCase
    ).subscribe(
         scope = lifecycleScope,
         onEach = { frame -> processFrame(frame) },
         onError = { log("Camera error: $it") }
    )
  2. Node.js 遗留库集成

    // 将 KafkaJS 回调 API 转为 Flow
    val consumer = kafka.consumer { /* config */ }
    FlowCallback(consumer.flow(), coroutineScope).subscribe(
         onEach = { record -> handleMessage(record) },
         onCompletion = { consumer.close() }
    )

⚠️ 注意事项

  1. 回调线程控制
    使用 flowOn 或协程调度器控制回调执行线程:

    fetchUserData()
         .flowOn(Dispatchers.IO)  // 在IO线程执行
         .subscribe(scope, onEach = { updateUI(it) })  // UI线程回调
  2. 内存泄漏预防
    在 Android 等平台,作用域需绑定生命周期:

    class Activity : AppCompatActivity() {
         private val scope = lifecycleScope
         
         override fun onDestroy() {
             scope.cancel()  // 取消所有关联回调
             super.onDestroy()
         }
    }
  3. 背压处理
    对高频率 Flow 使用缓冲策略:

    sensorEvents
         .buffer(100)  // 设置100个元素的缓冲区
         .subscribe(/* ... */)

总结

🔍 核心要点回顾

  1. 挂起函数转回调

    • 通过 CoroutineScope.launch 启动协程
    • 使用 try/catch 分离成功/错误路径
    • 返回 Job 或封装 Cancellable 支持取消
  2. Flow 转回调

    • 通过 onEach/onStart/onCompletion 扩展函数挂载回调
    • 使用 catch 处理流异常
    • launchIn(scope) 绑定生命周期
  3. 生产级实践

    • 使用 SupervisorJob 实现错误隔离
    • 通过 DI 注入协程作用域
    • 绑定平台生命周期防止泄漏

💡 最佳实践:优先使用 FlowCallback 包装类,它提供类型安全的构建器模式,并集中处理取消和错误传播逻辑。对于高频场景,建议添加 buffer 和 flowOn 操作符优化性能。

最后更新: 2025/8/27 15:24