现在在写一个服务端的程序,主要有三个操作,1.是等待网络请求接收数据(使用 socket 而不是 http ),2.是对数据转格式,第三个把整理好格式的数据写入数据库中。现在是想把 1,2,3 都写到一个协程中,然后开多个 worker 来进行处理,怎样可以使协程开出多个 worker 来应对多并发呢?目前打算使用 asyncio,如果不支持可以考虑 tornado,求各位看看有什么解决方法
1
hcnhcn012 2017-10-24 11:43:36 +08:00 via iPhone
Twisted
|
2
hook923 2017-10-28 00:28:59 +08:00
我有个类似的做法,我是这样解决的。
因为我是多个线程共享一个数据库连接,每个线程都 execute,最后一起提交数据库。因此我在 savedata 中加了个锁 import threading lock = threading.Lock() from concurrent.futures import ThreadPoolExecutor max_workers=64 sock_pool =ThreadPoolExecutor(max_workers=max_workers) #注意这 3 个 max_workers 不必都相同的 chgdata_pool = ThreadPoolExecutor(max_workers=max_workers) chgdata_future = [] savedata_pool = ThreadPoolExecutor(max_workers=200) savedata_future = [] def sock(参数): 接收 shock 数据的代码 chgdata_future.append(chgdata_pool.submit(chgdata,参数) ) ##异步委托一个清洗数据的函数 chgdata 其它代码 def chgdata(参数): 清洗数据的代码 savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托一个保存数据的函数 savedata 其它代码 def savedata(参数): 保存语句生成 lock.acquire() #加个互斥锁 保存到数据库 lock.release() #释放锁 其它代码 if __name__ == '__main__': 执行 sock()之前的代码 sock_future = sock_pool.submit(sock,参数) for 参数 in 列表] ## 多个 sock 接收数据 for f in sock_future: f.result() for f in chgdata_future: f.result() for f in saveda'ta_future: f.result() conn.commit() #。这步你可以视你的实际需求放在 savadata 中。 每个 sock 接收数据后传递给 chgdata,不必等待 chgdata。每个 chgdata 清洗数据后传递给 savedata,不必等待 savedata。 这应该是楼主想要的效果 |