影分身术之-asyncio_part2

异步实现

要想使用 asyncio 实现异步,和 gevent 差不多,都需要相应的代码支持。gevent 是通过打上“猴子补丁”来实现的,而 asyncioyield from 有着千丝万缕的关系,所以先实现就比较简单了。

你可以自己用 yield from 写个协程方法,要么就只能用用别人实现的协程模块。

1
2
3
4
import asyncio
import time

now = lambda: time.time()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'work {0} Done after {1}s'.format(x, x)

def done_callback(future):
print('Callback: ', future.result())

start = now()

coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

# 完成一个任务打印一个
for task in tasks:
task.add_done_callback(done_callback)

loop = asyncio.get_event_loop()

# 将协程 task 任务列表添加到事件循环中
# 这两种方法都能实现添加事件循环
loop.run_until_complete(asyncio.wait(tasks))
# loop.run_until_complete(asyncio.gather(*tasks))

# 最后一个任务完成的时候一起打印
# for task in tasks:
# print("Task result: ", task.result())

print('TIME: ', now() - start)
Waiting:  2
Waiting:  3
Waiting:  4
Callback:  work 2 Done after 2s
Callback:  work 3 Done after 3s
Callback:  work 4 Done after 4s
TIME:  4.004777193069458

总计花了 4s 多点的时间,如果是同步的话,至少 9s。可见,我们通过 asyncio 实现了并发功能。

协程嵌套

在协程的嵌套中,如何处理协程返回的数据是件很灵活的事。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'work {0} Done after {1}s'.format(x, x)

async def main():
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

# wait 返回 task 对象
# dones, pendings = await asyncio.wait(tasks)
# print(dones)
# print(pendings)
# for task in dones:
# print('Task result: ', task.result())

# gather 直接返回的是协程运行结果
results = await asyncio.gather(*tasks)
for result in results:
print("Task result: ", result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', now() - start)
Waiting:  2
Waiting:  3
Waiting:  4
Task result:  work 2 Done after 2s
Task result:  work 3 Done after 3s
Task result:  work 4 Done after 4s
TIME:  4.0052490234375

main() 协程函数中,可以不直接处理结果,直接返回 await 内容,那么,run_until_complete() 将会接受返回的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def main():
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

# wait 返回 task 对象
return await asyncio.wait(tasks)


start = now()

loop = asyncio.get_event_loop()
dones, pendings = loop.run_until_complete(main())
for done in dones:
print("Task result: ", done.result())

print('TIME: ', now() - start)
Waiting:  2
Waiting:  3
Waiting:  4
Task result:  work 4 Done after 4s
Task result:  work 2 Done after 2s
Task result:  work 3 Done after 3s
TIME:  4.0036780834198
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def main():
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

# gather 直接返回的是协程运行结果
return await asyncio.gather(*tasks)


start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print("Task result: ", result)

print('TIME: ', now() - start)
Waiting:  2
Waiting:  3
Waiting:  4
Task result:  work 2 Done after 2s
Task result:  work 3 Done after 3s
Task result:  work 4 Done after 4s
TIME:  4.003807544708252

除此之外,还可以使用 asyncio 的 as_completed() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
async def main():
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

for task in asyncio.as_completed(tasks):
result = await task
print("Task result: ", result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print('TIME: ', now() - start)
Waiting:  2
Waiting:  3
Waiting:  4
Task result:  work 2 Done after 2s
Task result:  work 3 Done after 3s
Task result:  work 4 Done after 4s
TIME:  4.002615690231323