V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
tuoov
V2EX  ›  数据库

请教一个关于并发控制的问题

  •  
  •   tuoov · 2025 年 4 月 23 日 · 4125 次点击
    这是一个创建于 279 天前的主题,其中的信息可能已经有所发展或是发生改变。

    现在有这样一个函数 processBatch ,负责读取数据,执行一些操作后再更新它们,相关的数据库操作都在事务内执行。伪代码如下:

    function processBatch():
        tx = db.beginTransaction()
        // 1. 批量读取:取出最多 N 条“待处理”数据
        items = tx.query("SELECT * FROM tasks WHERE status = 'PENDING' LIMIT N")
        
        for item in items:
            // 2. 业务处理
            doBusinessLogic(item)
            // 3. 更新状态
            tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
        tx.commit()
    
    // 线程 A
    spawn threadA:
        processBatch()
    
    // 线程 B (几乎同时执行)
    spawn threadB:
        processBatch()
    

    但由于 processBatch 在多个地方都会被调用,因此存在并发问题。线程 A 和线程 B 执行时可能查询到同一批数据,导致这批数据被处理两次。解决这个问题有两个方案:

    • 方案 A:在 processBatch 的逻辑中增加锁,这样在任意时刻,该函数都不会并发执行
    • 方案 B:调整数据库事务的隔离级别或锁表,即使 processBatch 并发执行了,底层的数据操作不会出现并发的情况

    我的问题是:

    1. 哪个方案更符合最佳实践?原因是什么
    2. 在保持 processBatch 会被多个地方调用不变的前提下,有没有更好的方案?
    3. 如果想学习这类并发相关的问题和解决方案,应该搜索什么关键词

    感谢各位赐教

    第 1 条附言  ·  2025 年 4 月 24 日

    感谢各位的热情回复,但我想问的不是“如何在应用层实现锁来解决这个问题”,而是“为什么”的问题。大多数回复都给出了在应用层解决的方案,所以我再重新描述下我的问题:

    1. 为什么选择在应用层解决这个问题(即方案 A 或它的变种),而不是在数据库层面解决 (即方案B)
    2. 基于什么样的原则或考虑,让 方案 A 成大多数人的选择,而非方案 B?
    34 条回复    2025-04-25 09:50:44 +08:00
    wxyz
        1
    wxyz  
       2025 年 4 月 23 日
    感觉是外部事务的颗粒度太大了,
    一次查询多条数据,但没有立即更新该批次数据的状态,肯定会导致查询到重复的数据的;
    建议增加一层内存或缓存级别的互斥锁,锁任务的 id 以及一个任务一个事务,这样可以保证一个任务每次只有一个线程处理。
    wyntalgeer
        2
    wyntalgeer  
       2025 年 4 月 23 日
    噗……不会并发执行的多线程?
    luckyrayyy
        3
    luckyrayyy  
       2025 年 4 月 23 日   ❤️ 1
    互联网公司的习惯一般并发控制都放在业务逻辑上,不太依赖数据库,就是用方案 A ,当然你数据库还是需要设置合理的隔离级别。在你的业务场景里,遇到并发,没抢到锁的线程是等待,还是直接返回报错不处理了,如果是前者的话,可以把所有的处理逻辑放到一个有序队列里,依次执行。
    kanepan19
        4
    kanepan19  
       2025 年 4 月 23 日
    简单的很

    for item in items:
    //乐观锁
    boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)
    if(!flag){
    // 抛异常
    }
    // 2. 业务处理
    doBusinessLogic(item)
    // 3. 更新状态
    tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
    tx.commit()
    kanepan19
        5
    kanepan19  
       2025 年 4 月 23 日
    接上面的 加一个状态,处理中
    lvlongxiang199
        6
    lvlongxiang199  
       2025 年 4 月 23 日
    ` tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)` -> ` tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ? and status = 'PENDING' ", item.id)`

    `processBatch` 也可以想办法做成串行的
    kanepan19
        7
    kanepan19  
       2025 年 4 月 23 日
    boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)

    多线程 只有一个成功。
    是否需要抛异常,看业务决定
    Georgedoe
        8
    Georgedoe  
       2025 年 4 月 23 日
    3. 哥们不会还没用过 deepseek 和 chatgpt 吧
    siweipancc
        9
    siweipancc  
       2025 年 4 月 23 日 via iPhone
    不要在数据库玩这个……
    hwdq0012
        10
    hwdq0012  
       2025 年 4 月 23 日
    1.读写分离,写放队列执行
    2.读到的数据可能会重复,处理掉,用 etcd ,zookeeper, rides 之类的
    3.没写过后端,轻喷
    rekulas
        11
    rekulas  
       2025 年 4 月 23 日
    加锁不够优雅,不如消息队列,不过数据少用不上队列,但你可以用生产者消费者的思维来改写,保证只有一个生产者就行了
    vikaptain
        12
    vikaptain  
       2025 年 4 月 23 日
    悲观锁、乐观锁、队列
    unused
        13
    unused  
       2025 年 4 月 23 日
    除了上面提到的,还可以考虑用某种查询条件对数据分区,让不同线程查询到不同的数据
    yinmin
        14
    yinmin  
       2025 年 4 月 23 日
    方案 A 和方案 B 都有问题。

    如果 doBusinessLogic 是 IO 密集型,推荐使用 ThreadPoolExecutor 。操作步骤如下:

    1. 设置 ThreadPoolExecutor 并发 workers ,例如 workers=5 也就是有 5 个并发同时处理;
    2. 函数 processBatch:将 pending 的任务 ID submit 到 ThreadPoolExecutor 队列里;
    3. ThreadPoolExecutor workers 处理:先 update tasks set status='RUNNING' where id=? and status='PENDING',如果返回更新记录数=0 ,就直接 return 不处理; (这是一个防冲突的技巧),再 doBusinessLogic(id),然后 update tasks set status='DONE' where id=?

    好处是:突发高并发时,任务是加入到队列的,不会挤爆服务器;可以设置并发 workers 同时处理。
    Ayanokouji
        15
    Ayanokouji  
       2025 年 4 月 23 日
    这不是并发的问题,是设计的问题。
    简单点,给 processBatch 加一个 start_time 和 end_time 参数,保证查到不一样的数据。
    mooyo
        16
    mooyo  
       2025 年 4 月 23 日
    两阶段乐观锁吧,

    第一阶段先拿一部分,从 Waiting 改成 Running ,拿到了再去执行。
    sagaxu
        17
    sagaxu  
       2025 年 4 月 23 日
    SELECT 取出 id 列表,遍历的时候按照随机顺序(如果业务逻辑允许),用 SELECT FOR UPDATE 锁住每一行,检测状态,把 PENDING 更新为 PROCESSING ,处理完成后再更新为 DONE 。这里要有一个机制,把停留在 PROCESSING 超时的任务重新放回 PENDING 或者标记为 DONE 。

    如果类似任务比较多的话,可以引入一个任务调度系统,别自己搞了,要填的坑和细节非常多。
    encounter2017
        18
    encounter2017  
       2025 年 4 月 23 日
    这完全没必要用锁,是设计的问题,流程调整下就好了。
    我理解你这块是离线的业务对吧。

    首先 select id from tasks where status = 'PENDING' 拿出全量需要处理的数据,做成一个离线文件或者放内存里(看你自己的数据规模决定)
    接着实现一个缓冲,简单点可以在内存里面构造一个比如长度为 16 的队列,存放下一批需要处理的数据

    然后是设置并发度,比如说 4 ,这一块你用线程/纤程/进程 实现都可以,依次从队列里面取任务,队列空了在获取下一批数据到队列里面。

    这一块自己实现细节还挺多的,比如任务失败了如何重跑,需不需要做背压之类的。

    我之前做过类似的,用框架实现,对应的代码就很简洁,伪代码类似这样

    ```
    ids.toStream.buffer(16).mapPar(4)(row => processData(row))
    ```
    EMMMMMMMMM
        19
    EMMMMMMMMM  
       2025 年 4 月 23 日
    你就不能给任务分一下片吗?
    线程 1:WHERE status = 'PENDING' and id % 2 = 0
    线程 2:WHERE status = 'PENDING' and id % 2 = 1

    说实话, 在互联网干了七八年了,多线程的代码屈指可数,都是多进程
    Romic
        20
    Romic  
       2025 年 4 月 23 日
    1. 使用分布式锁 性能一般
    2. 使用数据库的乐观锁 加 version 开发成本太高,需要多维护一个 version 字段
    3. 推荐方案,将数据分批打散,比如 1000 条数据 2 个现成并行执行,那么 1-500 是线程 A 执行。501-1000 线程 B 执行。经典方案。
    话说这种问题直接丢给 ai ,很多方案。以前的 deepseek R1 模型的方案完整 准确率高。现在好像不行啦。
    gg 思密达。
    netnr
        21
    netnr  
       2025 年 4 月 23 日
    这类似发短信的系统,很多地方调用发短信接口,都先写入发送记录表,状态为待发送,然后起一个任务循环执行发送并改状态;

    如果是在一个进程的前提下,可以用线程安全的先进先出队列,把 processBatch 添加到队列,另起一个线程来消费队列
    netnr
        22
    netnr  
       2025 年 4 月 23 日
    贴一个 C# 实现的类

    ThreeK
        23
    ThreeK  
       2025 年 4 月 23 日
    1 、要是不能控制调用方并发就推介方案 B ,方案 A 容易等不到锁。
    2 、2.1 doBusinessLogic 如果 IO 多计算少可以考虑并发执行。2.2 processBatch 可以写成幂等的,最终一致就行,多执行几遍还能申请加资源。
    3 、你这不算高并发,高并发一般不存在同一数据并发处理。高并发并发大但调用都带着唯一 id ,直接分布式锁解决同一 id 并发问题,你这种同一数据多处调用应该是锁等待/唤醒问题。
    我认为你们这业务槽点太多
    1) 事务太大不能拆就只能等报错了。
    2 )改 status 像事件驱动(消息通知),又不知你们写的什么。
    3 ) limit N 像是要批处理又好多处调用,调用的地方还不加条件,光用 limit N 来确定数据属于一个事务。。。。
    geebos
        24
    geebos  
    PRO
       2025 年 4 月 23 日
    这种场景一般用生产-消费者模型,一个线程查,多个线程处理
    thevita
        25
    thevita  
       2025 年 4 月 24 日   ❤️ 1
    没说清楚啊,与你的事务会会发生冲突的都有啥啊,仅仅同一个逻辑的不同任务吗?有没有 读-写冲突?有没有其他不同粒度、不同逻辑的写-写冲突,doBusinessLogic 里面有不有 外部一致性要求?

    超大事务呗,某些系统很常见,并不是所有业务都是互联网,上面的不要看到这种就报警

    锁放外部(方案 A )正如你所说,只解决了 processBatch 的并发问题,但是不能避免其他事物的更新,依然可能导致 write-skew ,除非你保证只要该这个表,都拿锁,那和表锁其实也没太大差别,就看你们的数据库实现得整么样了

    锁表(方案 B )通过合理的加锁,能避免 write-skew, 但是冲突域会变大,影响系统吞吐,甚至某些 db 可能会阻塞读,但是话又说回来,如果你的场景类似,半夜批量计算,冲突可能低那种,耶完全可以接受

    其他方案:
    其实具体看你能接受 哪部分 可以被适当取舍,比如上面只讨论了锁的情况,取舍的就是与其他事物的冲突

    如果你能接受适当若化 这个超大事务的原子性的话还可以: processBatch 内加锁,这个锁止解决 不同 processBatch 任务间的冲突(更好的办法可能是引入一个协调者来保证 不同 processBatch 尽量不冲突),然后更新使用乐观锁+重试,让 这个 batch 实现最终一致,也不失为一种办法(当然,这里没讨论你的 doBusinessLogic 有不有外部一致性的情况)
    prosgtsr
        26
    prosgtsr  
       2025 年 4 月 24 日 via iPhone
    如果是我的话,我会改成一个线程查,然后多线程领取任务再处理。
    要问怎么学,我也不知道,我也是草台班子
    listenerri
        27
    listenerri  
       2025 年 4 月 24 日 via Android
    问题在哪里发生的,就尽可能在那里解决
    NoDataNoBB
        28
    NoDataNoBB  
       2025 年 4 月 24 日
    select for update
    shangfabao
        29
    shangfabao  
       2025 年 4 月 24 日
    上边写的是对的,先 update,毕竟看你的逻辑,是否 update 是没有看执行逻辑的返回结果的
    kai1412
        30
    kai1412  
       2025 年 4 月 24 日
    多线程分页取 分配好每页的数量 线程取完各自根据主键 id 更新也不会有冲突
    kai1412
        31
    kai1412  
       2025 年 4 月 24 日
    @kai1412 分页查的时候记得根据主键 id 排序
    heiya
        32
    heiya  
       2025 年 4 月 24 日
    实现上来说 A 和 B 都行,不过锁粒度都很大。根据你对 processBatch()发生并发可能性的预测,还有一些别的方案:1.并发频繁发生,可以用#28 的 select for update 2.并发偶尔发生,在表里边加一列版本号,每次更新时对比这个值是否和取出时的值一致,不一致就是有并发。这样的锁粒度控制在行上。不过假如 10 条数据 2 条有并发问题,这种情况又得额外处理。
    chiaoyuja
        33
    chiaoyuja  
       2025 年 4 月 24 日   ❤️ 1
    应用层加锁
    加锁可以做到显式控制并发,你能清楚知道「谁在处理」;
    不依赖数据库细节,业务逻辑统一,不受数据库种类、配置影响;
    代码层面的锁(如 Redis 分布式锁)支持跨进程、跨服务的同步;
    开发和调试更直观,出问题也容易定位。
    数据库加锁
    • 数据库事务的隔离级别越高,性能越差(如 SERIALIZABLE 会阻塞读取);
    • 很难用 SQL 实现「原子地读取并更新任务状态」这种逻辑,语法不友好;
    • 不同数据库实现不同,如 PostgreSQL 支持 SELECT ... FOR UPDATE SKIP LOCKED ,但 MySQL 实现不同,兼容性差;
    • 数据库锁只在事务内有效,不能用于跨服务同步控制;
    • 如果多个服务部署在不同机器上,仅靠 DB 锁,还是可能出现争抢/重入的问题。
    testliyu
        34
    testliyu  
       2025 年 4 月 25 日
    我们之前是直接加分布式锁
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   Solana   ·   3075 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 13:50 · PVG 21:50 · LAX 05:50 · JFK 08:50
    ♥ Do have faith in what you're doing.