🚀 Kotlin+Compose+协程+Channel 多文件异步分片断点续传
1. 需求背景与技术选型
1.1 大文件上传的痛点
- 网络不稳定:移动端弱网环境下易中断
- 内存压力:单文件一次性加载导致OOM风险
- 体验优化:上传进度可视化与暂停/恢复能力
1.2 技术栈优势
技术 | 作用 | 优势特性 |
---|---|---|
Kotlin协程 | 异步任务管理 | 结构化并发、轻量级线程 |
Channel | 分片数据传输 | 生产者-消费者模型 |
Jetpack Compose | UI构建 | 声明式UI、状态驱动 |
OkHttp | 网络请求 | 分片请求支持、拦截器扩展 |
2. 核心架构设计
3. 关键实现模块
3.1 文件分片与状态管理
class UploadTask(
val file: File,
val chunkSize: Int = 1024 * 1024 // 默认1MB分片
) {
// 文件分片信息存储
data class Chunk(
val index: Int,
val start: Long,
val end: Long,
var status: Status = Status.PENDING
)
enum class Status { PENDING, UPLOADING, COMPLETED, FAILED }
// 分片计算逻辑
fun calculateChunks(): List<Chunk> {
val chunks = mutableListOf<Chunk>()
val fileSize = file.length()
var offset = 0L
for (i in 0 until ceil(fileSize / chunkSize.toDouble()).toInt()) {
val end = min(offset + chunkSize, fileSize)
chunks.add(Chunk(i, offset, end))
offset = end
}
return chunks
}
// 续传恢复逻辑
fun restoreFromCache(cache: List<Chunk>) {
// 实现省略:比对缓存恢复进度状态
}
}
3.2 协程与Channel实现异步管道
class UploadManager(
private val scope: CoroutineScope,
private val maxConcurrency: Int = 3 // 并发上传数
) {
// 状态Channel(用于UI更新)
private val _progressState = Channel<UploadProgress>()
val progressState = _progressState.receiveAsFlow()
// 分片上传协程
suspend fun uploadTask(task: UploadTask) {
val chunks = task.calculateChunks()
val channel = produce {
chunks.filter { it.status != UploadTask.Status.COMPLETED }
.forEach { send(it) }
}
// 启动并发协程组
(1..maxConcurrency).map { workerId ->
scope.launch(Dispatchers.IO) {
for (chunk in channel) {
try {
uploadChunk(task.file, chunk).let {
_progressState.send(
UploadProgress(task.id, chunk.index)
)
}
} catch (e: Exception) {
// 错误处理
}
}
}
}.joinAll() // 等待所有分片完成
}
// 单个分片上传逻辑
private suspend fun uploadChunk(file: File, chunk: UploadTask.Chunk) {
return withContext(Dispatchers.IO) {
val inputStream = RandomAccessFile(file, "r").use { raf ->
raf.seek(chunk.start)
ByteArray((chunk.end - chunk.start).toInt()).apply {
raf.read(this)
}
}
// 使用OkHttp上传分片
OkHttpClient().newCall(
Request.Builder()
.url("https://api.yourserver.com/upload")
.addHeader("Content-Range", "bytes ${chunk.start}-${chunk.end}/${file.length()}")
.post(inputStream.toRequestBody())
.build()
).execute().use { response ->
if (!response.isSuccessful) throw IOException("Upload failed")
}
}
}
}
3.3 Compose UI状态管理
@Composable
fun FileUploadScreen(viewModel: UploadViewModel = viewModel()) {
val tasks by viewModel.tasks.collectAsState()
LazyColumn {
items(tasks) { task ->
FileUploadItem(
task = task,
onPause = { viewModel.pauseUpload(task.id) },
onResume = { viewModel.resumeUpload(task.id) }
)
}
}
}
@Composable
fun FileUploadItem(task: UploadTask) {
var progress by remember(task.id) { mutableStateOf(0f) }
// 监听进度更新
LaunchedEffect(task.id) {
uploadManager.progressState.filter { it.taskId == task.id }
.collect { progress = it.calculateProgress() }
}
Row {
// 文件名显示
Text(task.file.name)
// 进度条
LinearProgressIndicator(progress = progress)
// 暂停/继续按钮
Button(onClick = { if (task.isPaused) onResume() else onPause() }) {
Text(if (task.isPaused) "▶" else "⏸")
}
}
}
3.4 断点续传关键技术点
持久化存储设计
// 使用DataStore保存断点信息
object UploadCache {
private val Context.dataStore: DataStore<Preferences> by preferencesDataStore("upload_cache")
suspend fun saveChunkState(context: Context, taskId: String, chunks: List<UploadTask.Chunk>) {
context.dataStore.edit { preferences ->
preferences[stringSetPreferencesKey(taskId)] = chunks.map {
"${it.index}:${it.status}" // 存储分片状态
}.toSet()
}
}
suspend fun loadChunkState(context: Context, taskId: String): Map<Int, UploadTask.Status> {
return context.dataStore.data.first()[stringSetPreferencesKey(taskId)]
?.associate { record ->
record.split(":").let {
it[0].toInt() to UploadTask.Status.valueOf(it[1])
}
} ?: emptyMap()
}
}
分片续传流程图
4. 性能优化策略
4.1 智能分片策略
// 根据网络动态调整分片大小
fun calculateDynamicChunkSize(networkType: NetworkType): Int {
return when(networkType) {
NetworkType.WIFI -> 5 * 1024 * 1024 // 5MB
NetworkType.ETHERNET -> 10 * 1024 * 1024 // 10MB
else -> 512 * 1024 // 512KB
}
}
4.2 上传失败重试机制
// 带指数退避的重试机制
private suspend fun uploadChunkWithRetry(chunk: UploadTask.Chunk, maxRetries: Int = 3) {
var currentRetry = 0
var delayFactor = 1L
while (currentRetry < maxRetries) {
try {
uploadChunk(chunk)
return
} catch (e: Exception) {
if (++currentRetry == maxRetries) throw e
delay(1000 * delayFactor) // 指数等待
delayFactor *= 2
}
}
}
4.3 内存优化技术
// 使用内存映射减少内存占用
private suspend fun uploadChunkWithMappedByteBuffer(file: File, chunk: Chunk) {
val channel = FileInputStream(file).channel
val buffer = channel.map(
FileChannel.MapMode.READ_ONLY,
chunk.start,
chunk.end - chunk.start
)
// 上传buffer内容(避免创建临时字节数组)
uploadBuffer(buffer)
}
5. 方案适用场景
- 大文件上传:视频/设计稿等百MB以上文件
- 弱网环境:移动端4G/5G网络波动场景
- 批量上传:相册备份/日志文件上传
- 后台任务:需要保持上传状态的应用
6. 关键问题解答
Q: 如何保证分片上传后的文件完整性?
A: 通过两种机制确保:
- 服务器端合并时使用MD5校验合并后文件
fun verifyFile(file: File, expectedMd5: String): Boolean { val digest = MessageDigest.getInstance("MD5") file.forEachBlock { bytes, size -> digest.update(bytes, 0, size) } return Hex.encodeHexString(digest.digest()) == expectedMd5 }
- 分片上传时采用CRC校验头
POST /upload HTTP/1.1 Content-Range: bytes 0-102399/2048000 X-Checksum: 89f0e3e8
Q: 如何避免并行上传时的资源竞争?
A: 通过三层次控制:
- Channel提供线程安全的生产者-消费者队列
- 使用
Mutex
保护共享状态private val mutex = Mutex() suspend fun updateProgress(taskId: String, chunkId: Int) { mutex.withLock { // 更新状态 } }
- Compose状态管理基于快照系统保证线程安全
7. 完整实现流程图解
8. 方案总结与展望
✅ 当前方案优势
- ⏱ 高效并发:协程+Channel实现生产消费模型
- 📶 断点续传:DataStore持久化上传状态
- 📊 实时反馈:Compose状态流驱动UI更新
- 📉 内存优化:MappedByteBuffer避免大文件加载
🔮 后续优化方向
- 分片压缩:在上传前对分片进行ZSTD压缩
- P2P增强:结合WebRTC实现客户端间分片共享
- 智能预测:基于历史数据预测最佳分片大小
- 跨平台扩展:通过KMM实现iOS端复用核心逻辑
技术选型建议
本方案特别适合以下技术栈组合:
- Android客户端:Kotlin + Jetpack Compose
- 服务端:Ktor + Kotlin Coroutines
- 跨平台:KMM实现核心逻辑复用