V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
tenserG
V2EX  ›  程序员

请教个 Webflux 的问题

  •  
  •   tenserG · 2022-07-05 21:32:49 +08:00 · 2126 次点击
    这是一个创建于 927 天前的主题,其中的信息可能已经有所发展或是发生改变。
    接手了一个 WebFlux 的项目,以前接触的都是传统 MVC,现在一个接口响应一般有 40ms,偶尔会超过 100ms,大佬让我结合代码慢慢看,找出可能阻塞的地方.

    伪代码大致是

    public Mono saveUsers(request){

    Flux result = Flux.fromIterable(Arrays.asList(request.getAge()).flatmap(x->{

    if(x>35){
    return Mono.empty();
    }
    saveCatch(request.getUserName);
    return Mono.just(true);
    });

    return kafkaStreamSend(result);
    }

    saveCatch 是本地存一份,redis 存一份
    kafkaStreamSend 是发送到下游 kafka 消费

    大佬建议我在日志打印耗时,但是本菜鸟感觉异步执行太难了

    request 中有 1000 个用户的信息,在①处记录将会输出每一次的耗时,一共 1000 次
    在②处记录会因为异步执行输出时间很短

    目前的想法是用 Flux.onNext()放在 flatmap 前后记录,但是对 WebFlux 没有很深的了解,尤其是执行顺序,中文互联网上资料也少的可怜.不知道有没有靠谱的教程学习下.
    9 条回复    2022-07-06 18:45:06 +08:00
    chawuchiren
        1
    chawuchiren  
       2022-07-05 23:51:53 +08:00
    建议第一个 if 没什么执行逻辑的话,可以改成 filter
    chihiro2014
        2
    chihiro2014  
       2022-07-06 02:03:54 +08:00   ❤️ 1
    可以看下龙之春的 Reactive Spring ,里面有类似的东西。

    不要看知秋的东西,误人子弟
    palfortime
        3
    palfortime  
       2022-07-06 07:20:38 +08:00 via Android
    saveCatch 就阻塞了吧,异步里调用同步了。
    yazinnnn
        4
    yazinnnn  
       2022-07-06 08:11:25 +08:00
    如果你的业务不关心顺序, 那么就将业务并行处理

    如果你的业务不关心返回, 那么这个方法直接返回 Mono.just

    至少看你的例子 1 是不关心 saveCatch 的结果的, 是不是这里用的阻塞代码? 换成 reactive redis client, 相关业务结果在订阅回调里处理,不要扔给 webflux 了

    不想换非阻塞的 redis 的话, 就 Mono.defer { }.emitOn(WorkerExecutor).subscribe{}

    如果你既需要业务串行,又需要返回结果, 这种情况是无解的
    superchrisliu
        5
    superchrisliu  
       2022-07-06 09:31:30 +08:00   ❤️ 1
    Flux result = Flux.fromIterable(Arrays.asList(request.getAge()).flatmap(x->{

    if(x>35){
    return Mono.empty();
    }
    saveCatch(request.getUserName);
    return Mono.just(true);
    });
    这里看不到 saveCatch 的实现,所以不知道是否有阻塞,除了这行 saveCatch ,其他全部都是无阻塞的,你可以认为耗时为 0 (实际不是这样的),你要看耗时,只要看 saveCatch 这个函数的耗时就行了,如果 saveCatch 是阻塞的,同时你又要保证顺序(即先保存,再发送到 Kafka ),那么可以批量调用,比如每 300 条调用一下 saveCatch ,然后结果用 CompletableFuture.supplyAsync 包装一下就行了,应该会有提升的
    superchrisliu
        6
    superchrisliu  
       2022-07-06 09:32:35 +08:00
    说错了一点,应该是 CompletableFuture.supplyAsync 包装一下 saveCatch
    lancelee01
        7
    lancelee01  
       2022-07-06 10:06:57 +08:00
    如果 saveCatch 和 kafkaStreamSend 涉及到第三方交互,同时交互的客户端不是响应式规范,那么需要利用 webflux API 把这两个方法的调用包装成响应式编程
    Vkery
        8
    Vkery  
       2022-07-06 17:27:22 +08:00
    感觉在强行响应式
    git00ll
        9
    git00ll  
       2022-07-06 18:45:06 +08:00
    这里代码与 reactor 没啥关系, 你直接重构成 for 循环 吧!可能还快点
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2944 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 06:55 · PVG 14:55 · LAX 22:55 · JFK 01:55
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.