V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
zhady009
V2EX  ›  程序员

关于 Reactor API,如何写异常后恢复

  •  
  •   zhady009 · 360 天前 · 657 次点击
    这是一个创建于 360 天前的主题,其中的信息可能已经有所发展或是发生改变。

    方法client.downloadFile会不断发射FilePart,需要collec写入到文件中即可。

    在接收FilePart期间会有网络等其他异常,现在直接用onErrorResume从 offset 开始请求返回新的 Flux 会有一个问题。 第一次异常会进入onErrorResume返回新的 Flux ,由于新的 Flux 没有声明onErrorResume就噶了

    我也不可能在新的 Flux 里声明onErrorResume,无限套娃了属于是。

    client.downloadFile(fileReferenceId)
        .publishOn(Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor()))
        .timeout(Duration.ofMinutes(3))
        .onErrorResume(RpcException::class.java) {
            if (it.error.errorCode() == TIMEOUT_CODE && monitoredChannel.isDone().not()) {
                log.warn("Download timeout, resuming: $fileDownloadPath")
                return@onErrorResume client.downloadFile(
                    fileReferenceId,
                    monitoredChannel.getDownloadedBytes(),
                    MAX_FILE_PART_SIZE,
                    true
                )
            }
            Flux.error(it)
        }
        .collect({ monitoredChannel }, { fc, filePart ->
            fc.write(filePart.bytes.nioBuffer())
        })
        .doOnSuccess {
            tempDownloadPath.moveTo(fileDownloadPath)
            downloadCounting.incrementAndGet()
            log.info("Downloaded file: $fileDownloadPath")
        }
        .doOnError {
            log.error("Error downloading file:$fileDownloadPath", it)
        }
        .onErrorMap {
            wrapRetryableExceptionIfNeeded(it)
        }
        .doFinally {
            runCatching {
                closePath(fileDownloadPath)
            }.onFailure {
                log.error("Error closing file channel", it)
            }
            hashingPathMapping.remove(hashing)
        }
        .block()
    
    3 条回复    2023-11-15 11:15:25 +08:00
    guyeu
        1
    guyeu  
       360 天前 via iPhone
    retry 操作符?
    zhady009
        2
    zhady009  
    OP
       360 天前 via iPhone
    @guyeu 一开始就用的 retry 不过流会重头开始
    yuhongtai114514
        3
    yuhongtai114514  
       359 天前
    把 flux 中的动作先用操作符转成 mono ,然后把 retry 挂在 mono 上试试?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1205 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 18:12 · PVG 02:12 · LAX 10:12 · JFK 13:12
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.