xDocxDoc
AI
前端
后端
iOS
Android
Flutter
AI
前端
后端
iOS
Android
Flutter
  • Kotlin 多文件异步分片断点续传

🚀 Kotlin+Compose+协程+Channel 多文件异步分片断点续传

1. 需求背景与技术选型

1.1 大文件上传的痛点

  • 网络不稳定:移动端弱网环境下易中断
  • 内存压力:单文件一次性加载导致OOM风险
  • 体验优化:上传进度可视化与暂停/恢复能力

1.2 技术栈优势

技术作用优势特性
Kotlin协程异步任务管理结构化并发、轻量级线程
Channel分片数据传输生产者-消费者模型
Jetpack ComposeUI构建声明式UI、状态驱动
OkHttp网络请求分片请求支持、拦截器扩展

2. 核心架构设计

3. 关键实现模块

3.1 文件分片与状态管理

class UploadTask(
    val file: File,
    val chunkSize: Int = 1024 * 1024 // 默认1MB分片
) {
    // 文件分片信息存储
    data class Chunk(
        val index: Int,
        val start: Long,
        val end: Long,
        var status: Status = Status.PENDING
    )
    
    enum class Status { PENDING, UPLOADING, COMPLETED, FAILED }
    
    // 分片计算逻辑
    fun calculateChunks(): List<Chunk> {
        val chunks = mutableListOf<Chunk>()
        val fileSize = file.length()
        var offset = 0L
        
        for (i in 0 until ceil(fileSize / chunkSize.toDouble()).toInt()) {
            val end = min(offset + chunkSize, fileSize)
            chunks.add(Chunk(i, offset, end))
            offset = end
        }
        return chunks
    }
    
    // 续传恢复逻辑
    fun restoreFromCache(cache: List<Chunk>) {
        // 实现省略:比对缓存恢复进度状态
    }
}

3.2 协程与Channel实现异步管道

class UploadManager(
    private val scope: CoroutineScope,
    private val maxConcurrency: Int = 3 // 并发上传数
) {
    // 状态Channel(用于UI更新)
    private val _progressState = Channel<UploadProgress>()
    val progressState = _progressState.receiveAsFlow()
    
    // 分片上传协程
    suspend fun uploadTask(task: UploadTask) {
        val chunks = task.calculateChunks()
        val channel = produce {
            chunks.filter { it.status != UploadTask.Status.COMPLETED }
                .forEach { send(it) }
        }
        
        // 启动并发协程组
        (1..maxConcurrency).map { workerId ->
            scope.launch(Dispatchers.IO) {
                for (chunk in channel) {
                    try {
                        uploadChunk(task.file, chunk).let {
                            _progressState.send(
                                UploadProgress(task.id, chunk.index)
                            )
                        }
                    } catch (e: Exception) {
                        // 错误处理
                    }
                }
            }
        }.joinAll() // 等待所有分片完成
    }
    
    // 单个分片上传逻辑
    private suspend fun uploadChunk(file: File, chunk: UploadTask.Chunk) {
        return withContext(Dispatchers.IO) {
            val inputStream = RandomAccessFile(file, "r").use { raf ->
                raf.seek(chunk.start)
                ByteArray((chunk.end - chunk.start).toInt()).apply {
                    raf.read(this)
                }
            }
            
            // 使用OkHttp上传分片
            OkHttpClient().newCall(
                Request.Builder()
                    .url("https://api.yourserver.com/upload")
                    .addHeader("Content-Range", "bytes ${chunk.start}-${chunk.end}/${file.length()}")
                    .post(inputStream.toRequestBody())
                    .build()
            ).execute().use { response ->
                if (!response.isSuccessful) throw IOException("Upload failed")
            }
        }
    }
}

3.3 Compose UI状态管理

@Composable
fun FileUploadScreen(viewModel: UploadViewModel = viewModel()) {
    val tasks by viewModel.tasks.collectAsState()
    
    LazyColumn {
        items(tasks) { task ->
            FileUploadItem(
                task = task,
                onPause = { viewModel.pauseUpload(task.id) },
                onResume = { viewModel.resumeUpload(task.id) }
            )
        }
    }
}

@Composable
fun FileUploadItem(task: UploadTask) {
    var progress by remember(task.id) { mutableStateOf(0f) }
    
    // 监听进度更新
    LaunchedEffect(task.id) {
        uploadManager.progressState.filter { it.taskId == task.id }
            .collect { progress = it.calculateProgress() }
    }
    
    Row {
        // 文件名显示
        Text(task.file.name)
        // 进度条
        LinearProgressIndicator(progress = progress)
        // 暂停/继续按钮
        Button(onClick = { if (task.isPaused) onResume() else onPause() }) {
            Text(if (task.isPaused) "▶" else "⏸")
        }
    }
}

3.4 断点续传关键技术点

持久化存储设计

// 使用DataStore保存断点信息
object UploadCache {
    private val Context.dataStore: DataStore<Preferences> by preferencesDataStore("upload_cache")
    
    suspend fun saveChunkState(context: Context, taskId: String, chunks: List<UploadTask.Chunk>) {
        context.dataStore.edit { preferences ->
            preferences[stringSetPreferencesKey(taskId)] = chunks.map {
                "${it.index}:${it.status}" // 存储分片状态
            }.toSet()
        }
    }
    
    suspend fun loadChunkState(context: Context, taskId: String): Map<Int, UploadTask.Status> {
        return context.dataStore.data.first()[stringSetPreferencesKey(taskId)]
            ?.associate { record ->
                record.split(":").let {
                    it[0].toInt() to UploadTask.Status.valueOf(it[1])
                }
            } ?: emptyMap()
    }
}

分片续传流程图

4. 性能优化策略

4.1 智能分片策略

// 根据网络动态调整分片大小
fun calculateDynamicChunkSize(networkType: NetworkType): Int {
    return when(networkType) {
        NetworkType.WIFI -> 5 * 1024 * 1024  // 5MB
        NetworkType.ETHERNET -> 10 * 1024 * 1024 // 10MB
        else -> 512 * 1024 // 512KB
    }
}

4.2 上传失败重试机制

// 带指数退避的重试机制
private suspend fun uploadChunkWithRetry(chunk: UploadTask.Chunk, maxRetries: Int = 3) {
    var currentRetry = 0
    var delayFactor = 1L
    
    while (currentRetry < maxRetries) {
        try {
            uploadChunk(chunk)
            return
        } catch (e: Exception) {
            if (++currentRetry == maxRetries) throw e
            delay(1000 * delayFactor) // 指数等待
            delayFactor *= 2
        }
    }
}

4.3 内存优化技术

// 使用内存映射减少内存占用
private suspend fun uploadChunkWithMappedByteBuffer(file: File, chunk: Chunk) {
    val channel = FileInputStream(file).channel
    val buffer = channel.map(
        FileChannel.MapMode.READ_ONLY, 
        chunk.start, 
        chunk.end - chunk.start
    )
    
    // 上传buffer内容(避免创建临时字节数组)
    uploadBuffer(buffer)
}

5. 方案适用场景

  1. 大文件上传:视频/设计稿等百MB以上文件
  2. 弱网环境:移动端4G/5G网络波动场景
  3. 批量上传:相册备份/日志文件上传
  4. 后台任务:需要保持上传状态的应用

6. 关键问题解答

Q: 如何保证分片上传后的文件完整性?
A: 通过两种机制确保:

  1. 服务器端合并时使用MD5校验合并后文件
    fun verifyFile(file: File, expectedMd5: String): Boolean {
        val digest = MessageDigest.getInstance("MD5")
        file.forEachBlock { bytes, size ->
            digest.update(bytes, 0, size)
        }
        return Hex.encodeHexString(digest.digest()) == expectedMd5
    }
  2. 分片上传时采用CRC校验头
    POST /upload HTTP/1.1
    Content-Range: bytes 0-102399/2048000
    X-Checksum: 89f0e3e8

Q: 如何避免并行上传时的资源竞争?
A: 通过三层次控制:

  1. Channel提供线程安全的生产者-消费者队列
  2. 使用Mutex保护共享状态
    private val mutex = Mutex()
    
    suspend fun updateProgress(taskId: String, chunkId: Int) {
        mutex.withLock {
            // 更新状态
        }
    }
  3. Compose状态管理基于快照系统保证线程安全

7. 完整实现流程图解

8. 方案总结与展望

✅ 当前方案优势

  1. ⏱ 高效并发:协程+Channel实现生产消费模型
  2. 📶 断点续传:DataStore持久化上传状态
  3. 📊 实时反馈:Compose状态流驱动UI更新
  4. 📉 内存优化:MappedByteBuffer避免大文件加载

🔮 后续优化方向

  1. 分片压缩:在上传前对分片进行ZSTD压缩
  2. P2P增强:结合WebRTC实现客户端间分片共享
  3. 智能预测:基于历史数据预测最佳分片大小
  4. 跨平台扩展:通过KMM实现iOS端复用核心逻辑

技术选型建议

本方案特别适合以下技术栈组合:

  • Android客户端:Kotlin + Jetpack Compose
  • 服务端:Ktor + Kotlin Coroutines
  • 跨平台:KMM实现核心逻辑复用
最后更新: 2025/8/26 10:07