V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
withBruce
V2EX  ›  程序员

真诚请教原因, RabbitMQ 增加多消费者导致生产者推送消息阻塞(10s~30s)

  •  1
     
  •   withBruce · 2022-06-21 09:47:04 +08:00 · 1696 次点击
    这是一个创建于 885 天前的主题,其中的信息可能已经有所发展或是发生改变。

    RabbitMQ 增加多消费者导致生产者推送消息阻塞(10s~30s) 设备上传数据到系统 A(netty),系统 A 根据数据类型推送消息到不同的队列,因为设备量增多的原因,之前单消费者开始处理的不及时,就想着多增加个消费者(和之前的消费者代码一样),然后系统 A 推送消息开始出现卡顿,数据帧应答的很慢,感觉不像是流控的事,管理端看着也没问题 相关代码: 系统 A:

    channelRead(ChannelHandlerContext ctx,  Object msg){
        ....
        sendAck(ctx,ack);
        switch (data.getClass().getName()) {
            case "realTimeData":
                RabbitUtil.getInstance().publish(realTimeData);
        }
    }
    
    publish(RealTimeData realTimeData){
         .......
         Map<String, Object> header = new HashMap<String, Object>();
         header.put("DataType", "RealTimeData");
         BasicProperties props = new BasicProperties().builder().headers(header).build();
         channel.basicPublish(exchangeName, routeKey_CollectedData, props, CollectedRealTimeDataPackageTransform.toBytes(data));
    }
    
    channel init:
        private Channel channel;
        private ConnectionFactory factory = new ConnectionFactory();
        @PostConstruct
        public void init() {
            instance = this;
            factory.setUsername(mqUserName);
            factory.setPassword(mqPassword);
            factory.setHost(mqHost);
            factory.setVirtualHost(mqVirtualHost);
            factory.setPort(mqPort);
          }
         channel = factory.newConnection().createChannel();
    }
    

    消费者代码:

        @Autowired
        DataProcessor processor;
        @Autowired
        @Qualifier("threadpool")
        ThreadPoolExecutor threadPool;
    @RabbitListener(queues = "${mq.queue.Original.CollectedData}", ackMode = "MANUAL")
     public void process(Message msg, Channel channel) {
            MessageProperties mp = msg.getMessageProperties();
            Map<String, Object> headers = mp.getHeaders();
            String dataType = (String) headers.get("DataType");
     switch (dataType) {
                case "RealTimeData":
                    CompletableFuture.runAsync(() -> {
                        try {
                            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                            CollectedRealTimeData crtd = CollectedRealTimeDataPackageTransform.fromBytes(msg.getBody());
                            processor.process(crtd);
                        } catch (Exception e) {
                            try {
                                channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
                            } catch (IOException ioException) {
                                ioException.printStackTrace();
                            }
                            e.printStackTrace();
                        }
                    }, threadPool);
                    break;
            }
    
    
    
     }
    
    5 条回复    2022-06-21 12:09:45 +08:00
    qW7bo2FbzbC0
        1
    qW7bo2FbzbC0  
       2022-06-21 09:51:09 +08:00
    用 rabbit 经常会遇到 server no response 的情况,只能手动 kill 进程重新启动,改用 kafka 后未出现同样问题。我们的场景比较简单,切换起来很快。我们这边观测 rabbit client 很多或消息大的话,对 server 有压力。和 rabbit 的性能宣传语不同,可能使我们的使用方式有问题吧
    wupher
        2
    wupher  
       2022-06-21 10:27:15 +08:00
    没碰到类似的情况,可能是量级未到?

    之前写的一个系统,使用 RabbitMQ 进行多端通讯。日常大约在 5000 ~ 8000 个客户端进行数据交换,同步消息和异步消息都有。

    消费者一直都是多节点通过 RabbitListener 连接 RabbitMQ 。刚才又看了一下 application.yml
    批量获取一次 10 条,concurrency 5 max 10

    之前未碰到过 publish 超时的情况。

    你用的版本是?
    withBruce
        3
    withBruce  
    OP
       2022-06-21 12:09:20 +08:00
    concurrency 这个属性开多线程不是消费的同一批数据把
    concurrency 5 max 10 配置好这个问题解决了
    还是自己对于 mq 没弄明白
    谢谢了!
    withBruce
        4
    withBruce  
    OP
       2022-06-21 12:09:31 +08:00
    @wupher 谢谢
    withBruce
        5
    withBruce  
    OP
       2022-06-21 12:09:45 +08:00
    @qW7bo2FbzbC0 谢谢
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2812 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 11:29 · PVG 19:29 · LAX 03:29 · JFK 06:29
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.