一个简单例子
import asyncio
import time
特殊函数
async def get_request(url):
print('正在下载',url)
# time.sleep(2) 不支持异步的模块 会中断整个的异步效果
await asyncio.sleep(2)
print('下载完成',url)
return 'page_text'
def parse(task):
print(task.result())
start = time.time()
urls = ['www.xxx1.com','www.xxx2.com','www.xxx3.com']
tasks = [] #存放多任务
for url in urls:
# 调用特殊函数
func = get_request(url)
# 创建任务对象
task = asyncio.ensure_future(func)
# 给任务对象绑定回调函数
task.add_done_callback(parse)
tasks.append(task)
创建事件循环对象
loop = asyncio.get_event_loop()
执行任务
loop.run_until_complete(asyncio.wait(tasks))
print('总耗时:',time.time()-start) #2.0015313625335693
可以在 get_request 判断任务是否成功,但是如何判断成功之后取消其他任务呢?
谢谢
1
chanchancl 2021-01-22 10:46:51 +08:00
将 func 包装成一个 function
在结尾的地方,将 tasks 中,除了自己的 task 都 cancle 掉 |
2
pursuer 2021-01-22 10:52:53 +08:00
如果想要让协程提前结束等待,可以使用 asyncio.wait(...,return_when='FIRST_COMPLETED')或者更灵活的方案 asyncio.Future,然后处理下资源释放
|
3
dongcheng OP @chanchancl 这个 func 能获取到 task 吗?
|
4
sujin190 2021-01-22 11:08:05 +08:00 4
你是不是想错了,对于协程来说,如果设计到 io,那么就算能取消协程其实并不能取消 io 操作
比如 http 请求,就算你要取消协程其实并不能取消已经发送出去的 http 请求,取消协程完全没有意义,如果你能取消 io,比如关闭 http 连接来取消 http 请求,那么你应该通过关闭 http 请求的方式来触发协程返回,即先取消 io 操作再由 io 完成来触发协程返回,而不能倒过来 对于非 io 请求,再整个协程完成前会独占线程,并不会调度到其他协程,所以你自然也不能用其他协程来取消当前正在运行的协程了 |
5
keepeye 2021-01-22 11:10:10 +08:00
不是有 tasks 吗?遍历一下都 cancel 掉
|
6
fiveelementgid 2021-01-22 11:10:13 +08:00 via Android
看见 Task 和 continuation 还激动了一下以为有. NET 玩家一起研究 TAP 了,一看是 Python,告辞
|
7
sujin190 2021-01-22 11:12:28 +08:00
对于非 io 请求协程再补充一点,似乎协程再创建的时候就会进行首次运行,没 io 操作,所以首次运行必然独占线程一直到运行完成,所以这种情况下完全做不了中途取消的操作
|
8
dongcheng OP |
9
sujin190 2021-01-22 11:26:56 +08:00
@dongcheng #8 但是没意义,不符合协程原理实现,从程序的可靠性严谨性来说也不应该出现这种设计,不关闭 io 而取消上层协程是很不科学的,很容易导致 io 、连接泄露啥的,而且还不容易排查是啥问题
|
10
xiaoHuang3 2021-01-22 11:48:58 +08:00
@sujin190 附议~,我觉得楼主这种需求根本不适合用 asyncio 来做- -
|
11
crclz 2021-01-22 12:10:44 +08:00
@fiveelementgid .NET 的设计就很清晰,TAP 、cancellation token
如果是.NET 的话,应该把一个 CancellationTokenSource ( cts )的 CancellationToken 传给所有异步方法,异步方法会返回 task,再等 Task.WhenAny 返回后,再取消 cts 。 由于 CancellationToken 是层层深入的( HTTP 、TCP ),所以底层负责 TCP 的代码也会取消正在进行的操作,然后会扔出一个取消异常,最后捕获这个异常就行了(在异步方法中)。 我想 python 的问题就是,底层的 api 都没有提供 cancellation token 的参数,导致无法让取消操作层层传播。 |
12
abersheeran 2021-01-22 12:21:18 +08:00 1
|
13
abersheeran 2021-01-22 12:32:01 +08:00
刚看到楼主在 #8 说有嵌套 Task 。很遗憾,asyncio 不支持嵌套取消。可以试试 trio,宣传上说是解决了这个问题。我也没试过。
|
14
optional 2021-01-22 12:33:41 +08:00 via Android
从设计上来说,这是不可取消的,如果有 transaction 对象之类,可以标记一下
|
15
fiveelementgid 2021-01-22 13:01:58 +08:00 via Android
@crclz Sure,我昨晚刚看完印象深刻,特地还去扒了一下源码。最底下就是 new 一个新的 Token 和上层传下来的 Token 然后 Link,整层整层地控制
|
16
dongcheng OP @abersheeran 感谢分享代码。研究了下 asyncio.as_completed(tasks)可以实现类似的功能
|
17
zhuangzhuang1988 2021-01-22 13:36:50 +08:00
@crclz 微软还是最牛的
cancal 进度报告是 api 中设计的 |
18
xuboying 2021-01-22 13:41:24 +08:00
@abersheeran #12 个人感觉这个代码片段用类来实现可读性可能会更好一点。loop = loop or asyncio.get_running_loop()学到了。之前写的时候还没这个函数。。。
|
19
imn1 2021-01-22 13:44:33 +08:00
刚好有类似需求
没找到满意的解决方案 目前是用异步队列,当满足一定条件就清空队列,但条件满足时,已经并行启动的不能取消,要继续工作,只是清空了未启动的队列 |
20
js8510 2021-01-22 14:19:27 +08:00
同 @sujin190 。 我猜测可能想的是 multiprocessing 而不是 asyncio.
如果 pre request/ post response 有 cpu heavy 的处理。并发倒是有意义的。 你可以 map_async 。如果 i/o 还没发生,或者已经发生,你可以 terminate 掉其他 process. 如果正在 i/o blocking 你还是要 wait 然后 kill 掉。 这样调度有开销,要具体分析。。简单的 post 请求就返回应该不值得。 |
21
zeroDev 2021-01-22 14:30:44 +08:00 via Android
用协程里的 active 来判断
|
22
zeroDev 2021-01-22 14:31:59 +08:00 via Android
另外,我感觉对于你这个需求,不应该进行并发请求。
|
23
muzuiget 2021-01-22 14:38:17 +08:00
老生常谈,从外部取消相当于强杀进程,好容易造成数据一致性问题。
正确姿势就是线程 /协程自己在某些关键点判断某个共享变量状态,然后自己优雅退出。 |