V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
holmes1412
V2EX  ›  C

分享新思路:一个逻辑完备的线程池

  •  4
     
  •   holmes1412 · 2022-04-26 15:41:21 +08:00 · 2780 次点击
    这是一个创建于 998 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个逻辑完备的线程池

    开源项目 Workflow 中有一个非常重要的基础模块:代码仅 300 行的 C 语言线程池

    观点写在前面,本文还是主要分享新思路,而线程池最基本的还是简单、易用、高效。新做法的三个特点在第 3 部分开始讲解,欢迎跳阅,或直接到 Github 主页上围观代码。

    https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c

    0 - Workflow 的 thrdpool

    Workflow 的特点是:计算通信融为一体的异步调度模式,而计算的核心:Executor 调度器,就是基于这个线程池实现的。可以说,一个通用而高效的线程池,是我们写 C/C++代码时离不开的基础模块。

    thrdpool代码位置在 src/kernel/,不仅可以直接拿来使用,同时也适合阅读学习。

    而更重要的,秉承 Workflow 项目本身一贯的严谨极简的作风,这个 thrdpool 代码极致简洁,实现逻辑上亦非常完备,结构精巧,处处严谨,不得不让我惊叹:妙啊!!!🤩

    我们可能会很好奇,线程池还能写出什么别致的新思路吗?以下列出一些特点:

    • 特点 1:创建完线程池后,无需记录任何线程 id 或对象,线程池可以通过一个等一个的方式优雅地去结束所有线程; 即:线程之间是对等的;

    • 特点 2:线程任务可以由另一个线程任务调起;甚至线程池正在被销毁时也可以提交下一个任务;(这很重要,因为线程本身很可能是不知道线程池的状态的;
      即:任务之间也是对等的,无论何时、由哪提交;

    • 特点 3:同理,线程任务也可以销毁这个线程池;(非常完整
      即:行为之间也是对等的,哪怕这个行为是 destroy ;

    我真的迫不及待为大家深层解读一下,这个我愿称之为“逻辑完备”的线程池

    1 - 前置知识

    第一部分我先从最基本的内容梳理一些个人理解,有基础的小伙伴可以直接跳过。如果有不准确的地方,欢迎大家指正交流~

    为什么需要线程池?(其实思路不仅对线程池,对任何有限资源的调度管理都是类似的)

    我们知道,通过系统提供的pthread或者std::thread创建线程,就可以实现多线程并发执行我们的代码。

    但是 CPU 的核数是固定的,所以真正并发执行的最大值也是固定的,过多的线程创建除了频繁产生创建的 overhead 以外,还会导致对系统资源进行争抢,这些都是不必要的浪费。

    因此我们可以管理有限个线程,循环且合理地利用它们。♻️

    那么线程池一般包含哪些内容呢?

    • 首先是管理若干个工具人线程;
    • 其次是管理交给线程去执行的任务,这个一般会有一个队列;
    • 再然后线程之间需要一些同步机制,比如 mutex 、condition 等;
    • 最后就是各线程池实现上自身需要的其他内容了;

    好了,接下来我们看看Workflowthrdpool是怎么做的。

    2 - 代码概览

    以下共 7 步常用思路,足以让我们把代码飞快过一遍。

    第 1 步:先看头文件,模块提供什么接口。

    我们打开thrdpool.h,可以只关注三个接口:

    // 创建线程池
    thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
    // 把任务交给线程池的入口
    int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool); 
    // 销毁线程池
    void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                          thrdpool_t *pool);
    

    第 2 步:接口上有什么数据结构。

    也就是,我们如何描述一个交给线程池的任务。

    struct thrdpool_task                                                            
    {                                                                               
        void (*routine)(void *);  // 一个函数指针
        void *context;            // 一个上下文
    };  
    

    第 3 步:再看实现.c ,有什么内部数据结构。

    struct __thrdpool
    {
        struct list_head task_queue;   // 任务队列
        size_t nthreads;               // 线程个数
        size_t stacksize;              // 构造线程时的参数
        pthread_t tid;                 // 运行起来之后,pool 上记录的这个是 zero 值
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        pthread_key_t key;
        pthread_cond_t *terminate;
    };
    

    没有一个多余,每一个成员都很到位:

    1. tid:线程 id ,整个线程池只有一个,它不会奇怪地去记录任何一个线程的 id ,这样就不完美了,它平时运行的时候是空值,退出的时候,它是用来实现链式等待的关键。
    2. mutexcond是常见的线程间同步的工具,其中这个 cond 是用来给生产者和消费者去操作任务队列用的。
    3. key:是线程池的 key ,然后会赋予给每个由线程池创建的线程作为他们的 thread local ,用于区分这个线程是否是线程池创建的。
    4. 我们还看到一个pthread_cond_t *terminate,这有两个用途:不仅是退出时的标记位 ,而且还是调用退出的那个人要等待的 condition 。

    以上各个成员的用途,好像说了,又好像没说,🤔是因为几乎每一个成员都值得深挖一下,所以我们记住它们,后面看代码的时候就会豁然开朗!😃

    第 4 步:接口都调用了什么核心函数。

    thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
    {
        thrdpool_t *pool;
        ret = pthread_key_create(&pool->key, NULL);
        if (ret == 0)
        {
            ... // 去掉了其他代码,但是注意到刚才的 tid 和 terminate 的赋值
            memset(&pool->tid, 0, sizeof (pthread_t));
            pool->terminate = NULL;
            if (__thrdpool_create_threads(nthreads, pool) >= 0)
                return pool;
            ...
    

    这里可以看到__thrdpool_create_threads()里边最关键的就是循环创建nthreads个线程。

            while (pool->nthreads < nthreads)                                       
            {                                                                       
                ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
                ...
    

    第 5 步:略读核心函数的功能。

    所以我们在上一步知道了,每个线程执行的是__thrdpool_routine()。不难想象,它会不停从队列拿任务出来执行

    static void *__thrdpool_routine(void *arg)                                      
    {                                                                               
        ...                                   
        while (1)                                                                   
        {
            // 1. 从队列里拿一个任务出来,没有就等待
            pthread_mutex_lock(&pool->mutex);
            while (!pool->terminate && list_empty(&pool->task_queue))
                pthread_cond_wait(&pool->cond, &pool->mutex);
    
            if (pool->terminate) // 2. 线程池结束的标志位,记住它,先跳过
                break;
    
            // 3. 如果能走到这里,恭喜你,拿到了任务~                                                         
            entry = list_entry(*pos, struct __thrdpool_task_entry, list);
            list_del(*pos);
            pthread_mutex_unlock(&pool->mutex); // 4. 先解锁
                                                                                    
            task_routine = entry->task.routine;
            task_context = entry->task.context;
            free(entry);                                                            
            task_routine(task_context); // 5. 再执行
    
            // 6. 这里也先记住它,意思是线程池里的线程可以销毁线程池
            if (pool->nthreads == 0) 
            {                                                                       
                /* Thread pool was destroyed by the task. */
                free(pool);
                return NULL;
            }                                                                       
        }
        ... // 后面还有魔法,留下一章解读~~~
    

    第 6 步:把函数之间的关系联系起来。

    刚才看到的__thrdpool_routine()就是线程的核心函数了,它可以和谁关联起来呢?

    可以和接口thrdpool_schedule()关联上。

    我们说过,线程池上有个队列管理任务,

    • 所以,每个执行routine的线程,都是消费者;
    • 而每个发起schedule的线程,都是生产者;

    我们已经看过消费者了,来看看生产者的代码:

    inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
                                    thrdpool_t *pool)
    {
        struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;  
    
        entry->task = *task;
        pthread_mutex_lock(&pool->mutex);
        list_add_tail(&entry->list, &pool->task_queue); // 添加到队列里
        pthread_cond_signal(&pool->cond);               // 叫醒在等待的线程
        pthread_mutex_unlock(&pool->mutex);
    }
    

    说到这里,特点 2就非常清晰了:

    开篇说的特点 2是说,”线程任务可以由另一个线程任务调起”。

    只要对队列的管理做得好,显然我们在消费者所执行的函数也可以做生产者。

    第 7 步:看其他情况的处理,对于线程池来说就是比如销毁的情况。

    只看我们接口 thrdpool_destroy()的实现是非常简单的:

    void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),            
                          thrdpool_t *pool)                                         
    {
        ...
        // 1. 内部会设置 pool->terminate ,并叫醒所有等在队列拿任务的线程
        __thrdpool_terminate(in_pool, pool);
    
        // 2. 把队列里还没有执行的任务都拿出来,通过 pending 返回给用户
        list_for_each_safe(pos, tmp, &pool->task_queue)                             
        {
            entry = list_entry(pos, struct __thrdpool_task_entry, list);            
            list_del(pos);                                                          
            if (pending)                                                            
                pending(&entry->task);                                              
            ... // 后面就是销毁各种内存,同样有魔法~
    

    在退出的时候,我们那些已经提交但是还没有被执行的任务是绝对不能就这么扔掉了的,于是我们可以传入一个pending()函数,上层可以做自己的回收、回调、任何保证上层逻辑完备的事情

    设计的完整性,无处不在。

    接下来我们就可以跟着我们的核心问题,针对性地看看每个特点都是怎么实现的。

    3 - 特点 1: 一个等待一个的优雅退出

    这里提出一个问题:线程池要退出,如何结束所有线程?

    一般线程池的实现都是需要记录下所有的线程 id ,或者 thread 对象,以便于我们去 jion 等待它们结束。

    而线性地退出,一环扣一环,长度本身不重要,让事情可以递归起来,是非常符合计算机世界的常规做法的。

    我们刚才看,pool 里并没有记录所有的 tid 呀?正如开篇说的,pool 上只有一个 tid ,而且还是个空的值

    所以特点 1给出了Workflowthrdpool的答案:

    无需记录所有线程,我可以让线程挨个自动退出、且一个等待一个,最终达到我调用完 thrdpool_destroy()后内存可以回收干净的目的。

    这里先给一个简单的图,假设发起 destroy 的人是 main 线程,我们如何做到一个等一个退出:

    最简单的:外部线程发起 destroy👇

    thrdpool_pic_01

    步骤如下:

    1. 线程的退出,由 thrdpool_destroy()设置pool->terminate开始。
    2. 我们每个线程,在 while(1)里会第一时间发现 terminate ,线程池要退出了,然后会 break 出这个 while 循环。
    3. 注意这个时候,还持有着 mutex 锁,我们拿出 pool 上唯一的那个 tid ,放到我的临时变量,我会根据拿出来的值做不同的处理。且我会把我自己的 tid 放上去,然后再解 mutex 锁。
    4. 那么很显然,第一个从 pool 上拿 tid 的人,会发现这是个 0 值,就可以直接结束了,不用负责等待任何其他人,但我在完全结束之前需要有人负责等待我的结束,所以我会把我的 id 放上去。
    5. 而如果发现自己从 pool 里拿到的 tid 不是 0 值,说明我要负责 join 上一个人,并且把我的 tid 放上去,让下一个人负责我
    6. 最后的那个人,是那个发现 pool->nthreads 为 0 的人,那么我就可以通过这个 terminate (它本身是个 condition )去通知发起 destroy 的人。
    7. 最后发起者就可以退了。🔚

    是不是非常有意思!!!非常优雅的做法!!!

    所以我们会发现,其实大家不太需要知道太多信息,只需要知道我要负责的上一个人

    当然每一步都是非常严谨的,我们结合刚才跳过的第一段魔法🔮感受一下:

    static void *__thrdpool_routine(void *arg)                                         
    {
        while (1)
        { 
            pthread_mutex_lock(&pool->mutex); // 1.注意这里还持有锁
            ... // 等着队列拿任务出来
            if (pool->terminate) // 2. 这既是标识位,也是发起销毁的那个人所等待的 condition
                break;
            ... // 执行拿到的任务
        }
    
        /* One thread joins another. Don't need to keep all thread IDs. */
        tid = pool->tid; // 3. 把线程池上记录的那个 tid 拿下来,我来负责上一人
        pool->tid = pthread_self(); // 4. 把我自己记录到线程池上,下一个人来负责我
        if (--pool->nthreads == 0) // 5. 每个人都减 1 ,最后一个人负责叫醒发起 detroy 的人
            pthread_cond_signal(pool->terminate);
    
        pthread_mutex_unlock(&pool->mutex); // 6. 这里可以解锁进行等待了
        if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 7. 只有第一个人拿到 0 值
            pthread_join(tid, NULL); // 8. 只要不 0 值,我就要负责等上一个结束才能退
                                                                                    
        return NULL; // 9. 退出,干干净净~
    }
    

    4 - 特点 2:线程任务可以由另一个线程任务调起

    在第二部分我们看过源码,只要队列管理得好,线程任务里提交下一个任务是完全 OK 的。

    这很合理。👌

    那么问题来了,特点 1又说,我们每个线程,是不太需要知道太多线程池的状态和信息的。而线程池的销毁是个过程,如果在这个过程间提交任务会怎么样呢?

    因此特点 2的一个重要解读是:线程池被销毁时也可以提交下一个任务,必须强调的是,是指线程任务里。而且刚才提过,还没有被执行的任务,可以通过我们传入的 pending()函数拿回来。

    简单看看销毁时的严谨做法:

    static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 
    {                                                                               
        pthread_cond_t term = PTHREAD_COND_INITIALIZER;                             
                                                                                    
        pthread_mutex_lock(&pool->mutex); // 1. 加锁设置标识位
        pool->terminate = &term;   // 2. 之后的添加任务不会被执行,但可以 pending 拿到
        pthread_cond_broadcast(&pool->cond); // 3. 广播所有等待的消费者
                                                                                
        if (in_pool) // 4. 这里的魔法等下讲>_<~
        {                                                                           
            /* Thread pool destroyed in a pool thread is legal. */                  
            pthread_detach(pthread_self());                                         
            pool->nthreads--;                                                       
        }                                                                           
    
        while (pool->nthreads > 0) // 5. 如果还有线程没有退完,我会等,注意这里是 while
            pthread_cond_wait(&term, &pool->mutex);                                 
    
        pthread_mutex_unlock(&pool->mutex);                                         
        if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)               
            pthread_join(pool->tid, NULL); // 6.同样地等待打算退出的上一个人
    }
    

    5 - 特点 3:同样可以在线程任务里销毁这个线程池

    既然线程任务可以做任何事情,理论上,线程任务也可以销毁线程池

    作为一个逻辑完备的线程池,大胆一点,我们把问号去掉。

    而且,销毁并不会结束当前任务,它会等这个任务执行完

    想象一下,刚才的__thrdpool_routine(),while 里拿出来的那个任务,做的事情竟然是发起thrdpool_destroy()...

    我们来把上面的图改一下:

    大胆点,我们让一个 routine 来 destroy 线程池👇

    thrdpool_pic_02

    如果发起销毁的人,是我们自己内部的线程,那么我们就不是等 n 个,而是等 n-1 ,少了一个外部线程等待我们。如何实现才能让这些逻辑都完美融合呢?我们把刚才跳过的三段魔法串起来看看。

    第一段魔法,销毁的发起者。

    如果发现发起销毁的人是线程池内部的线程,那么它具有较强的自我管理意识(因为前面说了,会等它这个任务执行完),而我们可以放心大胆地pthread_detach,无需任何人 join 它等待它结束。

    static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)                 
    { 
        …. 
        if (in_pool) // 每个由线程池创建的线程都设置了一个 key ,由此判断是否是 in_pool
        {                                                                           
            /* Thread pool destroyed in a pool thread is legal. */                  
            pthread_detach(pthread_self());
            pool->nthreads--;
        }        
    

    第二段魔法:线程池谁来 free ?

    一定是发起销毁的那个人。所以这里用in_pool来控制 main 线程的回收:

    void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                                           thrdpool_t *pool)
    {
        // 已经调用完第一段,且挨个 pending(未执行的 task)了
        ... // 销毁其他内部分配的内存
        if (!in_pool) // 如果不是内部线程发起的销毁,要负责回收线程池内存
            free(pool);
    }
    

    那现在不是 main 线程发起的销毁呢?发起的销毁的那个内部线程,怎么能保证我,这个正在 routine 里的销毁任务,可以在最后关头把所有资源回收干净、顺利 free(pool)、不会挂地功成身退呢?

    在前面阅读源码第 5 步,其实我们看过,__thrdpool_routine()里有 free 的地方。

    于是现在三段魔法终于串起来了。

    第三段魔法:严谨的并发。

    static void *__thrdpool_routine(void *arg)
    {
        while (1) 
        {
            ... // 前面执行完一个任务,如果任务里做的事情,是销毁线程池...
            // 注意这个时候,其他内存都已经被 destroy 的那个清掉了,万万不可以再用什么 mutex 、cond
            if (pool->nthreads == 0) 
            {
                /* Thread pool was destroyed by the task. */
                free(pool);
                return NULL;                                                        
            }
            ...
    

    非常重要的一点,由于并发,我们是不知道谁先操作的。假设我们稍微改一改这个顺序,就又是另一番逻辑

    比如我作为一个内部线程,在 routine 里调用 destroy 期间,发现还有线程没有执行完,我就要等在我的 terminate 上,待最后看到 nthreads==0 的那个人叫醒我。然后我的代码继续执行,函数栈就会从 destroy 回到 routine ,也就是上面那几行,然后,free(pool);,这时候我已经放飞自我 detach 了,直接退出即可。

    你看,无论如何,都可以完美地销毁线程池:

    thrdpool_pic_03

    是不是太妙了!分析到这里已经要被并发的世界感动了!😭

    6 - 简单的用法

    这个线程池只有两个文件: thrdpool.hthrdpool.c,而且只依赖内核的数据结构list.h。我们把它拿出来玩,自己写一段代码:

    void my_routine(void *context) // 我们要执行的函数                                                  
    {                                                                               
        printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
    }                                                                               
                                                                                    
    void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的任务会到这里
    {
        printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););                                    
    } 
    
    int main()                                                                         
    {
        thrdpool_t *thrd_pool = thrdpool_create(3, 1024);  // 创建                          
        struct thrdpool_task task;
        unsigned long long i;
                                   
        for (i = 0; i < 5; i++)
        {
            task.routine = &my_routine;                                             
            task.context = reinterpret_cast<void *>(i);                             
            thrdpool_schedule(&task, thrd_pool); // 调用
        }
        getchar(); // 卡住主线程,按回车继续
        thrdpool_destroy(&my_pending, thrd_pool); // 结束
        return 0;                                                                   
    } 
    

    我们再 printf 几行 log ,随便编译一下就可以跑起来:

    v2-20c8e3e655216139423a4f2dda37d789_1440w

    使用接口如此简单,也没有复杂的数据结构。

    7 - 并发与结构之美

    最后谈谈感受。

    作为并发架构领域的入门选手,看完之后很后悔为什么没有早点看、早点获取知识、打开视野,并且有一种,我肯定还没有完全理解到里边的精髓,毕竟我不能深刻地理解到设计者当时对并发的构思和模型上的选择

    只能说,没有十多年优秀的系统调用和并发编程的功底难以写出这样的代码,没有极致的审美与对品控的偏执也难以写出这样的代码。

    并发编程有很多说道,就正如退出这个这么简单的事情,想要做到退出时回收干净却很难。如果说你写业务逻辑自己管线程,退出什么的 sleep(1)都无所谓,但做框架的人如果不能把自己的框架做得完美无暇逻辑自洽,就难免让人感觉差点意思。

    而这个 thrdpool ,它作为一个线程池,是如此地逻辑完备。

    再次让我深深地感到震撼:我们身边那些原始的、底层的、基础的代码,还有很多新思路,还可以写得如此美。

    Workflow 项目 GiHut 源码地址:https://github.com/sogou/workflow

    13 条回复    2022-04-30 17:59:40 +08:00
    fawdlstty
        1
    fawdlstty  
       2022-04-26 15:52:34 +08:00   ❤️ 3
    勘误:“通过系统提供的 pthread 或者 std::thread 创建线程,就可以实现多线程并发执行我们的代码”
    首先 pthread 和 std::thread 都不是系统提供的,系统提供的函数比如 windows 平台,api 叫 CreateThread (或类似名称比如 NtCreateThread )。其次,“并发”应该改为“并行”(并发代表同时发生,并行代表同时运行)。
    “但是 CPU 的核数是固定的”这句联系上下文不太准确,cpu 并行线程数不等于核心数,所以这儿应该把核数改为线程数。
    ho121
        2
    ho121  
       2022-04-26 15:59:24 +08:00 via Android   ❤️ 1
    勘误 2:
    GiHut
    holmes1412
        3
    holmes1412  
    OP
       2022-04-26 16:54:09 +08:00
    @fawdlstty 对的,感谢~
    holmes1412
        4
    holmes1412  
    OP
       2022-04-26 16:54:19 +08:00
    @ho121 感谢!
    justou
        5
    justou  
       2022-04-26 18:38:33 +08:00
    UP 有没有好的流水线式处理实现思路?
    n_1 个生产者 -> n_2 个加工者 -> n_3 个加工者 -> ... ->n_k 个最终消费者
    设计可复用的 Pipe 来组成一个 Pipeline
    FrankHB
        6
    FrankHB  
       2022-04-26 18:46:56 +08:00
    销毁线程池是什么需求?没事制造循环所有权引用?正常不那么用的不吃你这 in_pool 亏?
    不考虑销毁这个不就是个 notify all+一轮 join 么,还要纠结什么 id 么……
    holmes1412
        7
    holmes1412  
    OP
       2022-04-26 22:01:50 +08:00   ❤️ 1
    @justou 这是一个超级好 + 超级实用的问题! Workflow 里是用了执行队列,在公司内部协助业务改造了以前的流水线 pipeline 模式,所以做法还蛮值得分享一下>_<

    计算这块是在线程池 thrdpool 之上做了一个 Executor 的模块:每个任务没有优先级的概念,但调度的时候需要带上一个队列名。像你这个例子,一般来说 2 ,3 ,4 ... 是加工者的话,就给每种 "加工操作" 起一个名字作为队列名,比如 "op2", "op3", "op4",然后任务带着对应的队列名直接扔给 Executor 模块调度就可以了。无需思考每种加工者的隔离,大家都是一个池子。

    因为直观来说,我们一件事情,是 "步骤 2" -> "步骤 3" -> "步骤 4" 这样,其实本来我们也不希望关心每个步骤要执行多久,只是因为资源调度和划分做得不好,所以才需要自己把 n_2, n_3, n_4 应该是多少给算出来。Workflow 的 Executor 不仅比较回归问题的本质,而且大家一个池子的话也没有资源浪费,东西来了也可以尽量都跑起来。欢迎到源码围观具体的做法: https://github.com/sogou/workflow/blob/master/src/kernel/Executor.h
    holmes1412
        8
    holmes1412  
    OP
       2022-04-26 22:12:20 +08:00
    @FrankHB 你说的是“为什么在线程任务里销毁线程”吗?那应该是判断 nthreads 而不是 in_pool 吧,在执行完一个 routine 之后才进行一个判断,没有多大的开销:
    ~~~cpp
    if (pool->nthreads == 0)
    {
    /* Thread pool was destroyed by the task. */
    free(pool);
    return NULL;
    }
    ~~~
    而 destroy 接口只调用一次,in_pool 只是全局一次的话就没有什么开销。
    holmes1412
        9
    holmes1412  
    OP
       2022-04-26 22:18:06 +08:00
    更正一下 pipeline 问题队列名的回复:Executor 模块本身是不涉及队列名的,接口上只有执行队列对象。Workflow 的计算任务是通过队列名来选择对应的队列~
    FrankHB
        10
    FrankHB  
       2022-04-27 12:49:00 +08:00
    @holmes1412 “销毁线程池”是你提出来的。
    正常来讲,语言中代表线程的一等对象(线程句柄之类)就是线程池所有的对象,作为线程池的负载的用户代码根本不关心这个对象背后蕴含的线程资源自身的生存期——因为说白了,线程池就是为了能放松对线程创建销毁的时机提出的优化。
    销毁线程池,正常理解就是把线程池作为对象销毁,这应该是在所有并发执行区间以外不和任何线程池并发的管理任务中集中处理的,而不需要在线程池内部考虑。这同时也是 fork-join 并行的标准处理方法(如果姑且不算纠结哪个算主线程的问题,但这种非对称模型你这里看来也不关心)。
    换句话说,创建和销毁线程池都不用关心和负载任务之间的并发。若硬是想套娃线程池,线程池自己就是普通的对象,是不同层次的东西。
    于是,涉及线程池中非负载特定的资源的销毁之间的同步也就两类:
    线程池负载的任务启动和终止需要同步,因为会共享线程对象本身作为临界资源;
    所有线程的生存期结束早于(happens before) 线程池的销毁结束。

    另一方面,线程池被销毁时不保证可以正常同步,也意味着不需要允许提交任务,这能简化实现减小开销。否则,逻辑上必须搬运出一个独立于线程池自身的并行的所有者代替线程池才能继续保存共享状态。
    在有全局 GC 的语言中,GC 就是默认的所有者,资源往 GC 上一扔就可以装作没区别(开销也是平摊到 GC 上了),但这在 C 和 C++行不通。
    在 C 和 C++这种一般的一等对象不可追溯所有者的语言里,destroy 这种行为原则上不可能对等;如果做到了表面上的对等,就说明必然存在一个其它对象来维护 destroy 不破坏资源对象生存的不变量。原则上只有平凡(trivial)的无状态的纯量(scalar)才可以无视不变量,你依赖的 pthread 就不行。
    这在 C 可能不太明显,在 C++更加强调:析构函数对每一个活的对象只能重入一次,对象生存期结束之后不能随意复活。
    你的实现中,pthread_setspecific 设置 key 依赖 TLS 就是隐含的每线程对象中的内部资源。而并行所有者体现在 thrdpool_destroy 中的 in_pool (然后在 pthread_key_delete 调用后最终可以根据仍然生存的 in_pool 来 free )。虽然只有一个 int 这么个纯量,看上去很聪明,但根本上相比上面说的正常设计仍然是冗余开销;而实际实现中,更该关心的问题是 pthread_key*往往很不便宜。
    在我说的正常的不需要考虑内部同步的实现中,用户程序保证 destroy 不可能在任务内部提交,所以整个 TLS 访问和 in_pool 的分支就是多余的。
    在 C++中,这种保证都不需要用户自己编码,析构函数直接就能隐含(正常用户不会没事自己显式调用析构); C 就更依赖用户自觉了,所以才容易更没自觉。
    holmes1412
        11
    holmes1412  
    OP
       2022-04-27 15:17:11 +08:00
    @FrankHB 首先说明,我们最重要的点是:“允许在线程任务里发起下一个任务”。

    而在发起任务时,我无法知道线程池的状况,但底层模块必须严格保证提交了的任务是必须有人管的,提交了就是正确的,不能说提交接口还要耦合线程池状态吧?这个特点对上层进行二次封装的生命周期管理非常重要。

    所以才会有“在线程池销毁期间也允许另一个线程任务里提交任务”,不允许发起肯定是不合适的。而更进一步,“在线程任务里也可以销毁”只是为了让逻辑完整的特点。

    最后强调一下,销毁本身并不是关键点,且 in_pool 这个域在别的地方有更重要的用途。

    可以看看这个二次封装的例子,会更加能理解里边的因果关系:
    https://github.com/sogou/workflow/blob/master/src/kernel/Executor.h
    https://github.com/sogou/workflow/blob/master/src/kernel/Executor.cc
    FrankHB
        12
    FrankHB  
       2022-04-30 17:11:10 +08:00
    @holmes1412 那么我认为有必要说明,就 C/C++的传统,“允许在线程任务里发起下一个任务”的所谓任务,是确切有限制的。
    只是让“逻辑完整”而放松接口限制并不是很有说服力的理由,反而容易隐藏使用错误的 bug 。

    C/C++没有 GC 这样的全局所有者,所以一切 housekeeping 根本上都需要调用侧的用户自觉配合。维护所有权,避免双重释放或者和引入环这样的逻辑错误(尽管语言层次上并不是同等未定义的),是下游开发者的分内事。(至少你救不了拿到一个自动对象强行 free(&xxx)的。)
    在“下一个任务”的负载(这里是线程函数)里,也不能违反这类假定。
    这类假定在接口意义上按 C++的说法叫 narrow contract ,原则上是可以静态确定的,但没有在类型系统上体现也没静态检查所以显得不安全罢了。而 C/C++用户日常工作之一就是对付这种不安全的接口使之符合业务目的。
    相对地,在接口内部添加的检查是 wide contract 。现代 C++明确拒绝随意 widen contract 替代不安全接口而实现所谓的防御性编程,因为这削减了下游用户正确使用接口的义务,反而让会遵守接口规范的用户受到了损害(调用侧一般无法消除实现内部的冗余检查)。

    有的时候,为了允许减少接口的不可用限制,在库的实现中添加(而不是替代)新的包含分支检查的接口使不同分支都合法(而不是 wide contract 这种内部处理逻辑错误)而适配不同情形(如果把 narrow contract 改成类型检查强制,这样修改本质上就是一种类型擦除),也是合理的设计。这种合理性主要体现在没有更好的替代做法实现需求。
    在并发同步的场景,这种做法的典例之一是在一般的非递归锁以外,并行地提供递归锁:因为确实存在设计时无法确保不重入临界区的合理需求,例如 API 设计者向其他 API 使用者暴露一段锁的保护范围且允许递归重入地占用临界资源。
    (当然,现实还可能是因为底层系统直接分别提供了递归锁和非递归锁的 API 所以就直接映射过去了,但它们自身内部的设计仍有这方面考虑。)

    只要不侵入调度逻辑,线程函数当然不应该知道线程池的内部状态,但为什么知道或者不知道线程池是否正在销毁会成为问题?
    根本上,“在线程池销毁期间也允许另一个线程任务里提交任务”是伪需求,或者至少是不符合常规 C/C++接口用户默认对资源管理职责的假定的需求。
    逻辑上,销毁一旦开始,对象的生存期就已经结束(特别地,参见 C++的生存期关于析构函数的定义),状态的完整性也不被确保。一整个线程池能用,不是整个的线程池可没义务保持能用。
    如果你要强行能用,那么你的设计实质上就不是个单纯的线程池,而是包含更复杂阶段的支持资源迁移的执行引擎。(从你命名上似乎也想这样做。)
    但真这样,你一个 in_pool 就显得很局促了,做得不到位。

    可以确定,这种支持一定会比线程池更复杂。至于原因……先回到“下一个任务”的话题。
    不限制下一个任务和 C/C++水土不服,因为 C/C++函数默认都是子例程而不支持捕获续延。
    子例程在 C/C++中的一个重要规则是其中创建的局部对象的生存期不超过调用的活动记录(对一般实现,就是栈帧)的生存期。
    线程函数也不例外。真要取消“下一个任务”中线程函数创建资源关于子例程机制的限制,至少还需要提供把资源搬运出来的保活机制——这需要魔改语言实现提供扩展,用库别指望有兼容实现。
    退而求其次,就是在外部提供跳板(trampoline)函数进行变通。但这部分传统上超出了线程池的范畴,我也没在你的设计中看到(根本上是异步框架,还可能涉及主动切换调度策略)。
    这种不限制下一个任务的模型是一种 CPS 的变体,它隐含了外部所有者的假定,其实是可以不违反任何 contract 而合理的,但是实现相对 C/C++子例程总是有附加开销,这种代价在单纯需要线程池的场合往往完全冗余乃至不可接受的。
    资源销毁在此还有另外的特殊性,就是生存期不能被任意延长,否则可能有非预期的泄露。
    当然你的设计只用了一个 in_pool ,这里倒不会有特别麻烦的问题。
    然而也就因为只是一个 in_pool ,能多做的也就比较鸡肋了,很难跳出为了把不经意的不合理设计洗白的窠臼。
    真正的并发执行引擎,其实逻辑上就不依赖线程池。用没用线程池对用户都是实现细节(要侵入调度改配置给参数就行,不用知道有个池),更不说暴露线程池的销毁状态了。
    holmes1412
        13
    holmes1412  
    OP
       2022-04-30 17:59:40 +08:00
    @FrankHB 项目作者想参与讨论,可以把讨论贴到 Github 上吗?非常欢迎继续交流想法:
    https://github.com/sogou/workflow/discussions
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2694 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 03:31 · PVG 11:31 · LAX 19:31 · JFK 22:31
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.