Python 版本 3.4.2 ,主进程内的定义了一个 RabbitMQ 连接,变量为 connection ,保存了该链接的句柄。
我需要在四个子进程内分别使用这个句柄的方法,但是报错:
Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
Google 搜索结果都是建议使用 multiprocessing.Queue 来进行进程间通讯,但是看起来不能满足我的需求。请问大家对于这种子进程共享父进程内的变量(句柄)是怎么处理的呢?
代码如下
from multiprocessing import Pool
import os, time, random, sys, pika
def long_time_task(rabbitmq_handle):
def e_c(e):
print(e)
if __name__=='__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='172.17.0.9'))
p = Pool()
for i in range(5):
p.apply_async(long_time_task, args=( connection, ), error_callback=e_c)
print( 'Waiting for all subprocesses done...')
p.close()
p.join()
print( 'All subprocesses done.')
1
fffflyfish 2017-01-26 09:22:34 +08:00 via iPad
看起来加个进程锁可以解决
|
2
changwei 2017-01-26 09:25:58 +08:00 via Android
首先你要确定这个句柄是否为线程安全?比如说 mysqldb 连接就是非线程安全句柄。
一般内存比较大的情况下无论是不是线程安全句柄,我都是在 threading 的构造函数里面传入一个新的连接句柄,这样有多少个线程就会产生多少个连接句柄。好处是句柄其中一个句柄链接断了,不影响其他线程。 |
3
wwqgtxx 2017-01-26 09:46:24 +08:00 via iPhone
这种 socket 对象, lock 对象, thread 对象是不能跨进程传输的,你只能把主进程当做一个 ProxyServer 然后子进程调用主进程的方法,实际上还是由主进程去执行,不能把上面三种对象从一个进程传输到另一个进程上的
|
4
dangyuluo OP |
5
wwqgtxx 2017-01-26 09:53:29 +08:00 via iPhone
唯一一种传输 socket lock thread 的办法是利用 linux 的 fork 机制,因为那样可以完全复制主进程的文件句柄,但是也只是在启动的时候,后续依然无法传输,而且控制不好很容易死锁(比如 pymongo 的官方文档就明确说到不是 fork 安全的)
|
6
NoAnyLove 2017-01-30 07:13:19 +08:00
纠正一下, Lock 对象确实不能跨进程传输,不过 socket 对象是可以的。下面的代码中,`conn`变量可以直接传递给子进程:
```python from multiprocessing import Process import socket def echo_server(conn): while True: data=conn.recv(1024) conn.sendall(data) if __name__=="__main__": process_list=[] s=socket.socket() s.bind(('127.0.0.1', 5555)) s.listen(5) while True: conn, addr = s.accept() print('Connection from {addr[0]}:{addr[1]}'.format(addr=addr)) p=Process(target=echo_server, args=(conn,)) process_list.append(p) p.start() ``` 此外,`multiprocessing.Lock`是基于`SemLock`对象构造的。而`SemLock`又是`sem_open`/`CreateSemaphore`的封装。至少 Linux 下是使用命名信号量的实现的(但 Windows 下没用名字,不过要复制句柄还是可以的),所以理论上是可以跨进程复制的。 如果不嫌麻烦的话(我的意思是这样做很麻烦),可以参考`multiprocessing.reduction`,自己实现`multiprocessing.Lock`对象的可 pickle 化,以及传输之后重新打开命名的信号量 /或者复制句柄。然后`Lock`对象也就可以跨进程传输了。 不过考虑一下开发成本已经稳定性,用 4 个连接真的没什么不好的。。。。。 Orz |