V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
drymonfidelia
V2EX  ›  .NET

求助 C#有办法在自己程序内部实现一个跨线程的简易消息队列吗?现在这种轮询数据库的实现 CPU 占用率很高, 4C8G 阿 里 云占用一直在 100%

  •  
  •   drymonfidelia · 352 天前 · 2953 次点击
    这是一个创建于 352 天前的主题,其中的信息可能已经有所发展或是发生改变。

    需求是几百个客户端不断给我们上报一些数据(加起来每分钟 2000 条左右)我们分类后上报给不同上游。上游的接口设计非常差劲(按照同一份文档),一次只能接受一条数据,有的上游一个请求 3 分钟才响应。上游的程序不是我们能控制的,我们也没权利要求他们修改。

    目前我的设计是一个独立 ASP.Net Core 程序接受数据存入数据库(因为有在线率要求,处理任务的程序需要经常重启更新,有的时候会更新坏掉),另一个程序每 2 秒查询一次数据库的新数据,按需要上报的上游分类好进入 5 个不同队列(不能接受数据的时候就分类,因为分类的逻辑也要经常改),另外启动的时候开 5 个线程在数据库里扫描这些队列,发现新的任务就开一个异步 Task 上报。不同上游能接受的并发不一样,Task 外面有个 semaphoreSlim.WaitAsync();防止把上游服务弄炸。这种实现 CPU 占用率很高,4C8G 阿 里 云占用一直在 100%,有没有人知道最佳实现是什么?

    27 条回复    2023-12-07 17:16:24 +08:00
    thinkershare
        1
    thinkershare  
       352 天前
    跨线程? 线程不是本身就可以直接共享数据吗? 你需要的是一个多线程安全的进程内消息队列吧?
    感觉是你的实现不对。开 5 个线程扫描干啥? 后台任务 1 个线程就够了,你又不是 CPU 限制的计算类任务。
    encro
        2
    encro  
       352 天前
    问题:

    1 ,你 1 分钟接收 2000 条;
    2 ,上游 3 分钟才能处理一条;
    3 ,你得开几千个并发才能不阻塞。。。

    即使你用了队列,也会造成队列阻塞,内存爆炸。。。。

    4C8G ,开几千个并发,应该是不够的。。。


    至于 cpu ,需要 cpu 干啥呢。。。

    直接存队列就好了,最简单的队列就是 redis 。还有 ZeroMQ
    RedBeanIce
        3
    RedBeanIce  
       352 天前
    1 ,你 1 分钟收到 2000 条数据
    2 ,你需要进行分类,每个分类有不同的处理方式,转成不同的报文
    3 ,上游一次只能接收一条数据
    4 ,上游处理数据很慢

    问题,
    1 ,在线率是什么
    2 ,处理任务的程序是指你写的这个程序么,为什么要一直重启

    根据现有信息,整理的方案如下
    1 ,下游客户端给你提供数据,你直接入库
    2 ,你的程序直接去数据库获取,一次性获取一条
    3 ,按照分类业务,处理成固定的报文,推送给上游,
    4 ,如果上游上次未处理完成,你的程序不要做新的推送数据处理,继续等待。
    ragnaroks
        4
    ragnaroks  
       352 天前
    你的上报,开几个 singleton service 不就好了么,里面用自锁 timer 幂等。看你的描述和我以前做支付接口差不多,但是那会用 core 3.1 跑 200 多并发(每分钟 12000 请求),只用了几个 1t2g 的轻量云做高可用,外加一个 2t4g80g 的轻量数据库。
    009694
        5
    009694  
       352 天前 via iPhone
    如果你的上报数据平均速率大于你的上游处理速率的话 你就是想破天也没用 再大的消息队列都承受不住这个注定要爆炸的业务
    pming1
        6
    pming1  
       352 天前
    为什么要接这样的需求呢?生产效率远大于消费,不管用什么消息队列还是数据库,上游按这个效率,永远都接收不完。
    drymonfidelia
        7
    drymonfidelia  
    OP
       352 天前
    @009694
    @pming1
    @RedBeanIce 可能是我没描述清楚,是一个请求只能带一条数据,但是三分钟响应的上游也是可以并发的
    drymonfidelia
        8
    drymonfidelia  
    OP
       352 天前
    @drymonfidelia 并且并发数量不会导致响应时间叠加
    DTCPSS
        9
    DTCPSS  
       352 天前
    justFxxk2060
        10
    justFxxk2060  
       352 天前
    削峰填谷,要啥数据库?
    瓶颈是在数据库上,把数据库取消就好了,方案随意选
    justFxxk2060
        11
    justFxxk2060  
       352 天前
    哎,一想着这种问题丢到 ai 里面就能得到正确答案,就觉得程序员前途真是渺茫
    justFxxk2060
        12
    justFxxk2060  
       352 天前
    而多年的研发经验也就是帮着具体分析下,哪儿导致 CPU 占用率最高?
    1 、数据库轮询?
    2 、频繁写入导致 IO CPU 高?
    3 、SemaphoreSlim 和异步 Task 控制并发让线程管理不当,增加 CPU 使用率?

    完全没啥用,按照 AI 的答案直接取消数据库 拿 redis 或者第三方消息队列 梭哈就好了
    popvlovs
        13
    popvlovs  
       352 天前
    disruptor 有.net 版吧,对应 N producer M consumer 这个模式,M 看样子可以大一点
    drymonfidelia
        14
    drymonfidelia  
    OP
       352 天前
    @RedBeanIce
    在线率是指我接收客户端数据的这个接口,不能动不动挂掉
    经常重启是因为经常要改分类逻辑
    xiangyuecn
        15
    xiangyuecn  
       352 天前
    “开 5 个线程在数据库里扫描这些队列,发现新的任务就开一个异步 Task 上报”

    神奇的逻辑。直接给结论:1 个线程足够,立即释放出 80% cpu 。
    Kinnice
        16
    Kinnice  
       352 天前
    为啥使用数据库?增加个队列中间件,例如 RabbitMQ ,或直接买队列云服务
    1. A 程序为 提供接受客户端数据的接口:接受请求 => 校验 => 入队列
    2. B 程序为 从队列取数据,并根据分类发送请求

    按这个,B 更新不需要 A 不用停机,且热更新 B 都可以
    drymonfidelia
        17
    drymonfidelia  
    OP
       352 天前
    @Kinnice 如果 B 取了数据还没上报就被我关掉了,怎么防止这条数据丢掉?
    Kinnice
        18
    Kinnice  
       352 天前
    @drymonfidelia #17 手动 ack 呀
    接受到消息 => 处理(推荐的实践是开始发消息了就认为已经处理,而不是发成功后再 ack[太久了],如果没有发成功,可以重新发条消息到 mq 中,再次消费) => ack
    hez2010
        19
    hez2010  
       351 天前
    其实最简单的方法直接弄个 ConcurrentQueue 就行了,没必要从数据库轮询。在存数据库的时候顺便往 ConcurrentQueue 里面塞一份直接用就行了。

    ```cs
    class Worker
    {
    public static readonly ConcurrentQueue<T> Queue = new();
    public static readonly SemaphoreSlim Semaphore = new(...);

    async Task ProcessAsync(CancellationToken token)
    {
    while (!token.IsCancellationRequested)
    {
    await Semaphore.WaitAsync(token);
    while (Queue.TryDequeue(out var entry))
    {
    // ...
    }
    }
    }
    }
    ```
    kokutou
        20
    kokutou  
       351 天前 via Android
    在线 debug 看看呢,说不定 CPU 时间是在莫名其妙的地方
    gitdoit
        21
    gitdoit  
       351 天前 via iPhone
    看你这描述,为啥 CPU 会 100%啊?另外,你说的是跨进程消息队列吗
    liuhan907
        22
    liuhan907  
       351 天前
    @drymonfidelia
    你不应该用数据库做这个工作,这种事情很适合用本地 Log 存储。例如说微软自家的 https://github.com/microsoft/FASTER 。每次收到消息就写入提交,然后你本地多开几组 Task 去处理本地日志就行了。
    jiangzm
        23
    jiangzm  
       351 天前
    用数据库做统计干嘛, 单机直接用线程安全队列即可,同时把队列也持久化到数据库,启动的时候扫一下数据库运行的时候只做更新
    多台机器/多服务,就用分布式缓存 redis 做队列+分布式锁即可,redis 自带持久化甚至数据库都不用
    night98
        24
    night98  
       351 天前
    这不纯纯典型的:生产-》消费场景吗,不过你这个是不是得聚合数据后去上报?不是的话就很简单,直接弄个 mq 或者 redis 消息,生产端拿到客户端数据后塞 db ,然后你收到消息后去处理 db 里对应的数据。

    你要是聚合数据后去上报,就稍微麻烦一点,不过也都差不多,既可以用批量消息的功能去进行批量消费,也可以记一个 lastid 然后按区间进行消费,再稍微改进一下就是你接受到消息之后根据业务类型放到不同的线程池里面,根据上游的消费能力去调整不同线程池的大小来控制流速。
    lesismal
        25
    lesismal  
       351 天前
    目测队列不适合解决你的问题:
    1. 队列适合解耦、削峰,适合不需要响应队列处理后的结果的请求
    2. 队列不能减少处理数量,如果请求需要响应、积压后响应延迟更高甚至超时

    要提高性能又不至于让大量请求超时:
    1. 数据库前面加缓存
    2. 操作缓存和数据库的前面,加 singleflight 、合并同类请求、减少不必要的数据层操作
    3. 如果真的量大,仍然需要软、硬件扩容

    缺少业务的实际细节,先列这几条吧
    yicong135
        26
    yicong135  
       350 天前
    数据存一份到数据库,一份存消息队列;
    你读消息队列数据发送,发送成功后更新数据库
    qx4235
        27
    qx4235  
       337 天前
    简单点的你用 Channel 就可以.net3 以上,进程内消息队列。
    如果框架不支持,简单点的用 redis stream 来做,redis 版本低了就用 pub/sub,list
    复杂或者足够成熟的用 rabbitmq 这些,一般小功能我都用 redis stream 来做
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2835 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 37ms · UTC 11:25 · PVG 19:25 · LAX 03:25 · JFK 06:25
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.