遇到一个问题,是关于多进程队列的问题。当队列放进去的数量不能被每次新建的进程数整除的情况下,如果把多余的进程关闭并且退出创建多进程的循环。这是我现在问题上面的一个简单抽象(本质一样,免除了需求环境),把场景抽象出来的代码如下: import multiprocessing
def printt(q): if q.empty(): pass else: data = q.get() print data
if name == "main": q = multiprocessing.JoinableQueue() for i in range(5): q.put() while 1: for i in range(3): process = multiprocessing.Process(target=printt, arg=(q,)) process.start() process.join() q.join()
上面代码如何才能把数字完全打印出来并且不报错
1
ray1888 OP import multiprocessing
def printt(q): if q.empty(): pass else: data = q.get() print data if __name__ == "__main__": q = multiprocessing.JoinableQueue() for i in range(5): q.put() while 1: for i in range(3): process = multiprocessing.Process(target=printt, arg=(q,)) process.start() process.join() q.join() 代码变形了,重新贴一遍 |
2
kingmo888 2017-06-20 16:59:07 +08:00
尝试给楼主美化一波
``` import multiprocessing def printt(q): if q.empty(): pass else: data = q.get() print data if __name__ == "__main__": q = multiprocessing.JoinableQueue() for i in range(5): q.put() while 1: for i in range(3): process = multiprocessing.Process(target=printt, arg=(q,)) process.start() process.join() q.join() ``` |
3
kingmo888 2017-06-20 17:00:05 +08:00
`
import xx if __name__ == '__main__': test() ` |
4
kingmo888 2017-06-20 17:05:57 +08:00
唉,水楼了成了。放弃。抱歉了。
|
5
EchoUtopia 2017-06-20 17:48:03 +08:00
import multiprocessing
def printt(q): while 1: if q.empty(): break else: data = q.get() print data if __name__ == "__main__": q = multiprocessing.JoinableQueue() for i in range(5): q.put(i) while 1: for i in range(3): process = multiprocessing.Process(target=printt, args=(q,)) process.start() q.join() |
6
EchoUtopia 2017-06-20 17:55:40 +08:00
@EchoUtopia #5 没仔细检查,没退出
import multiprocessing def printt(q): while 1: if q.empty(): break else: data = q.get() q.task_done() print data if __name__ == "__main__": q = multiprocessing.JoinableQueue() for i in range(5): q.put(i) for i in range(3): process = multiprocessing.Process(target=printt, args=(q,)) process.start() q.join() print "over" 不知道这个满足楼主需求不 |
7
ray1888 OP @EchoUtopia 不行,还是会阻塞
|
8
ray1888 OP @EchoUtopia 这个可以了,想请问一下,我看了文档还是不太懂那个 task_done 的函数作用
|
9
EchoUtopia 2017-06-20 18:12:49 +08:00 1
@ray1888 #8 其实就是一个 semaphores,它有个计数器,put 一次就加一,调用一次 task_done 表示当前对 queue 里面 get 到的东西处理已经完成,计数器减一,当queue里面的所有任务都完成后,锁就是释放了,join 就会取消阻塞,我自己是这样理解的,不知道会不会误导你。。
|
10
ray1888 OP @EchoUtopia 你的理解是对的。我刚刚认真看了官方文档也是类似这样描述的。翻译不太好,但是意思跟你的理解差不多。
task_done() Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. 表明之前入队的任务完成(即现在出栈的任务)。使用队列消费者线程。当每个 get 方法获取一个任务,随后调用 task_done 告诉队列 运行在那个线程上的任务已经完成 当线程当前拥塞,当所有任务(个体)完成处理后,会继续(意味着 task_done 回调 是接收每个个体(任务)被放置到队列里面) 当被调用的次数多余队列中的个体数,会提出 Raise ValueError 值错误。 join() Block until all items in the queue have been gotten and processed. The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. 阻塞直到队列里面所有的任务(个体)被获取并且被处理。 当队列里面被放入一个个体(任务)时,未完成任务数会+1,这个计数会随着一个消费进程调用 Task_done 去表示那个个体(任务)已经被取回,并且所有的队列里面的任务都已经完成。当未完成计数=0 时,队列不在阻塞 |
11
araraloren 2017-06-21 09:15:04 +08:00
@kingmo888 python 的缩进 + V2EX 的格式 = 让人绝望。。
|
12
ray1888 OP @araraloren 其实如果 V2 可以像 stackoverFlow 上面那样,一键把代码全部格式化,那挺好的
|
13
araraloren 2017-06-21 10:14:09 +08:00
@ray1888 ...不 我只求不要把原来 的格式破坏了 这样就谢天谢地了。。
|
14
atempcode 2017-06-21 10:47:33 +08:00
不应该判断 q.empty() 来决定是不是退出吧,应该是去 catch queue.empty() 这个 exception。
|
15
ray1888 OP Multiprocessing.JoinableQueue 的 empty 方法只返回 True 和 False,没有 exception 产生
|
16
atempcode 2017-06-21 22:18:39 +08:00
@ray1888
1. q.empty() 由於多綫程 /多進程的特性,是不可相信的 empty() Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable. 2. 所以判斷 queue 裏面有沒有元素不能用 q.empty()來判斷 3. 推薦的辦法是一直 get(False),直到抓住 queue.empty() 这个 exception。 |