V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
qazwsxkevin
V2EX  ›  Python

multiprocessing 的进程池,能否做到监控一些状态?

  •  
  •   qazwsxkevin · 2019-10-24 00:17:21 +08:00 · 3163 次点击
    这是一个创建于 1859 天前的主题,其中的信息可能已经有所发展或是发生改变。
    from multiprocessing import Pool
    
    def doSomething(caseNumber):
        # do xxx use caseNumber.
        return Value
    
    def SomeFunc(poolpool):
        #do xxx
        if xxx > 10:
            return True
        else:
            return False
        pass
    
    if __name__ == '__main__':
        #建池
        Apool = Pool(10)
    
        # 向 Apool 加异步
        Apool.apply_async(doSomething, someArgs)
        Apool.apply_async(Afunc, someArgs)
        Apool.apply_async(Bfunc, someArgs)
        Apool.apply_async(Cfunc, someArgs)
        # Cret = Apool.apply_async(Cfunc, someArgs)
        #省略...
    
        #测试用,已注释,留着
        # Bret = Apool.apply_async(Bfunc, someArgs)
        
        # while True:
        for i in SomeSuit:
            #1、执行到此时,在这里能否判断出 Apool 进程池,此时有多少个进程(求数量)在跑?
            #2、在这里能否做到判断异步的 Bfunc 执行完毕没?
            #3、假设 2 的想法可以做到,并给出判断结果(True or False),
            #   在这里能否能马上拿到异步 Cfun 执行完毕后的返回值?我理解的线程池,是必须 close 和 join 完成后,统一出结果?
            pass
    
        print("结束 for SomeSuit.")
    
        Apool.close()
        Apool.join()
    

    1、执行到此时,在这里能否判断出 Apool 进程池,此时有多少个进程(求数量)在跑?
    2、在这里能否做到判断异步的 Bfunc 执行完毕没?
    3、假设 2 的想法可以做到,并给出判断结果(True or False),
    在这里能否能马上拿到异步 Cfun 执行完毕后的返回值?我理解的线程池,是必须 close 和 join 完成后,统一出结果?

    4 条回复    2019-10-26 18:26:31 +08:00
    ClericPy
        1
    ClericPy  
       2019-10-24 00:41:25 +08:00
    def apply_async(self, func, args=(), kwds={}, callback=None,
    error_callback=None):
    '''
    Asynchronous version of `apply()` method.
    '''
    if self._state != RUN:
    raise ValueError("Pool not running")
    result = ApplyResult(self._cache, callback, error_callback)
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    return result


    1 2 3 基本都有办法的, 在一切皆对象的 Python 里, 几乎所有玩意都能自省
    看看源码去吧, 一点点说太麻烦了
    比如 apply_async 方法的返回值就是 ApplyResult 对象, ApplyResult 对象里可以判断是否完成以及立刻取得结果
    Apool 的 self._pool = [] 这里也可以看有多少
    多看源码吧


    友情提醒, 你这个用法已经过时了, 现在多进程多线程的池都建议使用 concurrent.futures 里面那俩, 借助很多语言都在流行的 Future 概念, 可以在同步代码里面把异步操作简化. 尤其是借助 callback 方式(虽然你上面的代码也可以用回调)也算不难理解
    ClericPy
        2
    ClericPy  
       2019-10-24 00:43:02 +08:00
    然后还有 close 和 join 忘了说
    前者的意思是进程池已经关闭, 如果再添加新任务, 会直接抛错, 而不是真正关闭了所有进程
    后者意思是, 主线程 /主进程 整个阻塞住, 直到进程池里的任务全都完成

    你想直接拿那个结果, 别 join, 直接对那个提交后得到的对象使用 get 方法
    qazwsxkevin
        3
    qazwsxkevin  
    OP
       2019-10-26 17:25:50 +08:00
    @ClericPy,有不明白的地方,concurrent.futures,比如:

    ```
    eStatusSuit = []
    e = futures.ProcessPoolExecutor(max_workers=5)
    eStatus = e.submit(ProcessCaseID,someVarA ,someVarB)
    eStatusSuit.append(eStatus)
    #
    eStatus = e.submit(ProcessCaseID,someVarC ,someVarD)
    eStatusSuit.append(eStatus)
    #
    eStatus = e.submit(ProcessCaseID,someVarE ,someVarF)
    eStatusSuit.append(eStatus)

    #此时是向 e 提交了 3 个任务
    #eStatus 对象,我看了一下,似乎是无法查看到 33 个任务具体状态,只能等待 eStatus 全体执行完毕,全部返回 eStatus.result()?
    #eStatus.result()是个阻塞式,想不到怎么用。。。

    #我是想建立能跑 5 个进程的可控队列,不知道这么干是否合适,还是有更方便的方式?

    aExecutor = futures.ProcessPoolExecutor(max_workers=1)
    bExecutor = futures.ProcessPoolExecutor(max_workers=1)
    cExecutor = futures.ProcessPoolExecutor(max_workers=1)
    dExecutor = futures.ProcessPoolExecutor(max_workers=1)
    eExecutor = futures.ProcessPoolExecutor(max_workers=1)

    然后做个
    aExecutorStatus = aExecutor.submit(ProcessCaseID,someVarA ,someVarB)
    bExecutorStatus = bExecutor.submit(ProcessCaseID,someVarC ,someVarD)
    #省略...

    #对各个 ExecutorStatus 的 running(),done()进行循环判断,哪个 False/True 了,就从 queue 里取任务提交过去,哪个失败了,再调度一下优先权
    if aExecutorStatus.running():
    xxx
    #省略...

    不知道是不是这样乱来的?
    ```
    ClericPy
        4
    ClericPy  
       2019-10-26 18:26:31 +08:00
    ProcessPoolExecutor 可以看做一个进程池执行器, 朝里面提交函数和参数以后, 会返回一个 Future, 这时候任务就开始执行了, 所以常见的用法就是:
    1. 新建一个进程池执行器, 设置好并发数 pool = ProcessPoolExecutor(5)
    2. futures = [pool.submit(func, var[0], var[1]) for var in var_list]
    这时候任务都在后台派出的线程执行中
    3. 然后就该等待任务完成了, 如果想要按执行结束的顺序来处理, 就
    from concurrent.futures import as_completed
    for future in as_completed(futures):
    result = future.result(timeout=None)
    如果无所谓完成顺序, 但是在意任务匹配顺序, 就
    for future in futures:
    result = future.result(timeout=None)
    这里 timeout 可以配置成一个 float, 然后 try catch 住 timeouterror, 不过不确定多进程会不会杀死超时任务, 因为平时我大都用线程, 线程是肯定杀不死的...

    如上, 并发的好处就体现出来了, 也就是说, 在没达到并发限制的情况下, 整个任务理论上完成耗时不会超过最慢任务的耗时, 虽然实际上会受并发限制和 CPU 数量影响
    @qazwsxkevin
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1043 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 18:37 · PVG 02:37 · LAX 10:37 · JFK 13:37
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.