V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
opengo
V2EX  ›  Python

[asyncio] 多线程中子线程使用主线程 event loop,这样是否与直接用主线程去进行事件循环无差别

  •  
  •   opengo · 2021-03-01 14:44:07 +08:00 · 1551 次点击
    这是一个创建于 1361 天前的主题,其中的信息可能已经有所发展或是发生改变。

    使用concurrent.futures创建线程池然后使用asynciorun_in_executor方法去利用线程池执行某异步任务,具体代码如下:

    # -*- coding: utf-8 -*-
    import asyncio
    import concurrent.futures
    import threading
    import time
    import logging
    
    from datetime import datetime
    
    
    logger = logging.getLogger(__file__)
    logger.setLevel('INFO')
    
    
    async def run_task(val):
        logger.info(f"run_task Owning thread - {threading.currentThread().native_id} {threading.currentThread().getName()}")
        await asyncio.sleep(val)
        return datetime.now()
    
    
    def thread_task(loop, val):
        logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}")
    
        v = asyncio.run_coroutine_threadsafe(run_task(val), loop)
        result = v.result()
    
        logger.info(f"thread - {threading.currentThread().ident}  {threading.currentThread().getName()} done ==== {result}")
    
    
    async def main(loop):
        result = [0.1, 0.2, 0.3, 0.4, 0.5]
        logger.info(f"main thread :: {threading.currentThread().ident} {threading.currentThread().getName()}")
    
        with concurrent.futures.ThreadPoolExecutor() as pool:
            blocking_tasks = [
                loop.run_in_executor(
                    pool, thread_task, loop, val
                ) for val in result
            ]
            await asyncio.gather(*blocking_tasks)
    
    
    if __name__ == '__main__':
    
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(main(loop))
        finally:
            loop.close()
    
    

    使用run_in_executor调用thread_task方法,再在子线程中使用 asyncio.run_coroutine_threadsafe 方法去调用真正的异步任务,而实际上源码中 asyncio.run_coroutine_threadsafe 方法通过ensure_future 创建了一个future 添加到主线程的 event loop 中并绑定当前线程 future,子线程的异步任务在主线程中被循环,循环完成后再从子线程中去获取结果


    感觉上不如直接在主线程中使用asyncio.gather 或者 asyncio.as_completed 去并发执行这些任务,另一种办法是每个子线程再设置一个 loop 去进行事件循环,但是实际测试中这几种方案性能相差并不多

    # 使用 asyncio.gather 或 asyncio.as_completed
    task_list = []
        
        for val in result:
            task_list.append(asyncio.create_task(run_task(val)))
            
        await asyncio.gather(*task_list)
    
        for f in asyncio.as_completed(task_list, loop=loop):
            results = await f
    
    
    # 设置新的 loop
    def thread_task(val):
    
        logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}")
    
        loop = asyncio.new_event_loop()
        try:
            asyncio.set_event_loop(loop)
            v = loop.run_until_complete(asyncio.gather(run_task(val)))
            logger.info(f"thread - {threading.currentThread().ident}  {threading.currentThread().getName()} done ==== {result}")
        finally:
            loop.close()
    
    

    大家有什么好的多线程+协程的实现方案吗,这里对应的场景是同时处理多个文件的 io 任务

    4 条回复    2021-03-02 23:06:19 +08:00
    liuxingdeyu
        1
    liuxingdeyu  
       2021-03-02 10:09:01 +08:00
    没太搞明白,线程加协程的意义
    opengo
        2
    opengo  
    OP
       2021-03-02 10:19:07 +08:00
    @liuxingdeyu 可以多线程去进行网络请求,文件 IO,这里我想利用多线程去处理文件 IO,利用协程去提高性能,多进程+协程的方式面对大量文件 IO 操作性能更好,但是资源开销也更大,所以想尝试多线程+协程的最优方案
    linw1995
        3
    linw1995  
       2021-03-02 11:37:47 +08:00
    应该等同,但会更慢
    opengo
        4
    opengo  
    OP
       2021-03-02 23:06:19 +08:00
    @linw1995 确实不如单线程+协程,应该是线程池化+上下文切换消耗的
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5191 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 05:49 · PVG 13:49 · LAX 21:49 · JFK 00:49
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.