v2 的跟帖似乎无法用 markdown 语法? 所以就重新开个帖子请教这个问题。。。
# coding=utf-8
import codecs
import difflib
import os.path
import re
import time
import string
import chardet
import shutil
import copy
from concurrent.futures import ProcessPoolExecutor
import datetime
def PrintCount(PName):
if PName == '甲':
DelayTime = 0.9
if PName == '乙':
DelayTime = 1.2
if PName == '丙':
DelayTime = 1.7
if PName == '丁':
DelayTime = 2
if PName == '戊':
DelayTime = 2.5
countt = 0
while True:
countt += 1
time.sleep(DelayTime)
print(f'{PName}池:->',f'{countt}')
if __name__ == '__main__':
StartTime = time.clock()
FutureDict = {}
FutureRetDict = {}
FutureTimeRecoderDict = {}
PoolNameList = ["甲", "乙", "丙", "丁", "戊"]
# 初始化进程池
for i in range(len(PoolNameList)):
# 进程
FutureDict.update({PoolNameList[i]: ProcessPoolExecutor(max_workers=1)})
# 进程 Ret
FutureRetDict.update({PoolNameList[i]: futures.Future()})
# 进程启动时间
FutureTimeRecoderDict.update({PoolNameList[i]: None})
# 模拟测试清空进程池的信号
closeflag = True
# 开始工作
while True:
ProcessNum = 0
# 增加任务
for ProcessName,FutureRet in FutureRetDict.items():
# 模拟 40 秒后终结 [乙] 进程池
if closeflag == True:
if time.clock() - StartTime >= 40: # 在启动 40 秒后触发
print(f"开始强制结束 [乙] 进程池")
for pid, process in FutureDict['乙']._processes.items():
process.terminate()
FutureDict['乙'].shutdown()
closeflag = False
time.sleep(15)
# 如果进程池在运行
if FutureRet.running() == True:
pass
else:
# 增加任务
FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
print(f'{ProcessName} 进程池提交了开始.')
time.sleep(2)
break
time.sleep(3)
40 秒之后,乙进程池的确被停了,但是再向乙进程池提交任务的时候,会提示:
甲池:-> 57
甲池:-> 58
丙池:-> 25
戊池:-> 13
甲池:-> 59
丁池:-> 19
甲池:-> 60
丙池:-> 26
Traceback (most recent call last):
File "D:/TestForMu.py", line 80, in <module>
FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\concurrent\futures\process.py", line 452, in submit
raise BrokenProcessPool('A child process terminated '
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
甲池:-> 61
戊池:-> 14
丁池:-> 20
丙池:-> 27
应该要做些什么事情,再次启用?
原谅我这样设计流程,也许不是科学的。。。^_^
谢谢大家解答!
1
Latin 2020-07-08 11:05:08 +08:00
标题不严谨
已经停止少了一个,肯定有一个提交不了 或者重新实例化一个进程池提交一个乙进程 |
2
linw1995 2020-07-08 11:05:11 +08:00
pool 就是为了复用多个进程,创建那么多个 pool 干嘛……只要一个就好了
|
3
uti6770werty OP |
4
uti6770werty OP @linw1995 还有一个就是,Pool 实例自带了 Pool 中止的方法,如果一个 Pool 有 N 个进程,就放飞了。。。
|
5
uti6770werty OP 报告一下,下午做了个测试,在 process.terminate()后,对 Pool.shutdown(),系统看到进程池开的进程消失了,重新开实例继续也没问题,系统资源应该是被回收了。
|
6
oahebky 2020-07-16 09:31:51 +08:00
看了这个帖子楼主的逻辑。
楼主自己已经有答案了。对于怎么解决问题我就不多说了。 ====== 说一个我个人看这个贴子的程序设计的看法。 我觉得用 concurrent.futures.ProcessPoolExecutor 来达到楼主的目的,有种拿着锤子看什么问题都是钉子的感觉。 其实担心子进程异常死掉(比如 hang 死)之类的,不如直接使用 multiprocessing.Process 来创建子进程执行 work 。 既然是 concurrent.futures 包里面的类,其实已经说明了,这个类是为了 future 思想设计的。 说得简单点就是即提供高度封装的多进程 API,更重要的是为了兼容异步编程,和其它异步库同用。 楼主的程序设计其实重点不在异步思想上面( future ),所以楼主的这种并发编程其实可以考虑其它库(非 concurrent.futures ),因为在考虑到对子进程的“精细控制”上,concurrent.futures.ProcessPoolExecutor 提供的 API 实质上并不是很洽合这种场景。 当然,这是我个人的一点看法,仅供参考。 |