方法client.downloadFile
会不断发射FilePart
,需要collec
写入到文件中即可。
在接收FilePart
期间会有网络等其他异常,现在直接用onErrorResume
从 offset 开始请求返回新的 Flux 会有一个问题。
第一次异常会进入onErrorResume
返回新的 Flux ,由于新的 Flux 没有声明onErrorResume
就噶了
我也不可能在新的 Flux 里声明onErrorResume
,无限套娃了属于是。
client.downloadFile(fileReferenceId)
.publishOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor()))
.timeout(Duration.ofMinutes(3))
.onErrorResume(RpcException::class.java) {
if (it.error.errorCode() == TIMEOUT_CODE && monitoredChannel.isDone().not()) {
log.warn("Download timeout, resuming: $fileDownloadPath")
return@onErrorResume client.downloadFile(
fileReferenceId,
monitoredChannel.getDownloadedBytes(),
MAX_FILE_PART_SIZE,
true
)
}
Flux.error(it)
}
.collect({ monitoredChannel }, { fc, filePart ->
fc.write(filePart.bytes.nioBuffer())
})
.doOnSuccess {
tempDownloadPath.moveTo(fileDownloadPath)
downloadCounting.incrementAndGet()
log.info("Downloaded file: $fileDownloadPath")
}
.doOnError {
log.error("Error downloading file:$fileDownloadPath", it)
}
.onErrorMap {
wrapRetryableExceptionIfNeeded(it)
}
.doFinally {
runCatching {
closePath(fileDownloadPath)
}.onFailure {
log.error("Error closing file channel", it)
}
hashingPathMapping.remove(hashing)
}
.block()
1
guyeu 360 天前 via iPhone
retry 操作符?
|
3
yuhongtai114514 359 天前
把 flux 中的动作先用操作符转成 mono ,然后把 retry 挂在 mono 上试试?
|