V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
qazwsxkevin
V2EX  ›  Python

请教如何在 concurrent.futures 子进程里,无冲突地使用整个程序的全局队列?

  •  
  •   qazwsxkevin · 2020-10-23 01:40:43 +08:00 · 465 次点击
    这是一个创建于 1494 天前的主题,其中的信息可能已经有所发展或是发生改变。
    from concurrent import futures
    from multiprocessing import Manager
    
    
    # 此函数无限循环,消费掉 sqltextqueen
    def procSQL(sqltextqueen):
        while True
            while not sqltextqueen:
                inserttext = sqltextqueen.get()
                mysqldb =xxxx
                mysqldb.insert(inserttext)
                #略
            time.sleep(3)
    
    
    if __name__ == '__main__':
    
    	MainSQLProcess = futures.ProcessPoolExecutor(max_workers=1)
      
           # 全局 SQL 队列
            manager = Manager()
    	SQLQueen = manager.queue
    	MainSQLProcessRet = MainSQLProcess.sumit(procSQL,SQLQueen)
    	
    

    ################ another.py ##################

    	TaskProcess = {}
    	TaskProcessRet = {}
    	
    	# 提交任务
    	TaskinfoA = {
    	    'TYPE': 'CPS'
    	    'countt': countt,
    	    'Rget': stRget,
    	    'DLoption': DLoption,
    	    'DTO': mtinfo.get('DTO'),
    	    'errFlag': errFlag
    	    # 字典内容引用的一些值,有些是从函数外几层的函数传过来的,距离 main()已经好 N 层了
    	}
    	
    	
    	TaskProcess['A'] = futures.ProcessPoolExecutor(max_workers=1)
    	TaskProcess['B'] = futures.ProcessPoolExecutor(max_workers=1)
    	# 交给进程池
    	TaskRet['A'] = TaskProcess['A'].submit(ProcessSuit,TaskinfoA)
    	TaskRet['B'] = TaskProcess['B'].submit(ProcessSuit,TaskinfoB)
    

    ProcessSuit 函数里,产生的一些 SQL 语句,希望能及时送到全局队列里去消费,而不是通过 concurrent.futures 的回调 函数一层一层地往回 main 送到才处理...

    我尝试了把 SQLQueen 引用在

    	TaskinfoA = {
    	    'TYPE': 'CPS'
    	    'countt': countt,
    	    'Rget': stRget,
    	    'DLoption': DLoption,
    	    'DTO': mtinfo.get('DTO'),
    	    'errFlag': errFlag,
    	    'SQLQueen' : SQLQueen    # <-----
    	}
    

    是不行的,貌似是有个 concurrent.futures pick 锁什么的 请教如何可以让各个子进程,都能不冲突地,实时送到全局的队列里,各自进程都可以 put,get,但是又不冲突?

    1 条回复    2020-10-25 00:25:56 +08:00
    qazwsxkevin
        1
    qazwsxkevin  
    OP
       2020-10-25 00:25:56 +08:00
    最后重新改写了大部分函数
    再用了 Manager().Queue() 作为队列
    事件算是解决了。。。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4760 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 01:13 · PVG 09:13 · LAX 17:13 · JFK 20:13
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.