影分身术之-asyncio_part3

停止协程

Future 对象有几种状态:

  • pending
  • running
  • done
  • cancelled

Future 对象刚创建的时候是 pending 状态,处于调用状态为 running,执行完毕就是 done,如果在执行期间停止事件,则是 cancelled。

1
2
3
4
import asyncio
import time

now = lambda: time.time()

逐个停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'work {0} Done after {1}s'.format(x, x)

coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()

try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()

print('TIME: ', now() - start)
Waiting:  2
Waiting:  3
Waiting:  4
{<Task pending coro=<do_some_work() running at <ipython-input-2-d349a09958ba>:3> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<wait() running at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/tasks.py:355> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_run_until_complete_cb() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at <ipython-input-2-d349a09958ba>:3> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<do_some_work() running at <ipython-input-2-d349a09958ba>:3> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/tasks.py:422]>}
True
True
True
True
TIME:  1.8798322677612305

True 表示取消成功。

上述方法是逐个 cancel。也可以像之前提到的,把所有的 task 封装在一个函数中,这样,该函数就相当于是最外面的一个 task,那么可以对该函数处理:

打包停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'work {0} Done after {1}s'.format(x, x)

async def main():
coroutine1 = do_some_work(2)
coroutine2 = do_some_work(3)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]

dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task result: ', task.result())

start = now()

loop = asyncio.get_event_loop()

task = asyncio.ensure_future(main())

try:
loop.run_until_complete(task)
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()

print('TIME: ', now() - start)
Waiting:  2
Waiting:  3
Waiting:  4
{<Task pending coro=<do_some_work() running at <ipython-input-2-daa7e16e8aa3>:3> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<main() running at <ipython-input-2-daa7e16e8aa3>:17> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_run_until_complete_cb() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at <ipython-input-2-daa7e16e8aa3>:3> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<do_some_work() running at <ipython-input-2-daa7e16e8aa3>:3> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /home/chy/.pyenv/versions/3.5.3/lib/python3.5/asyncio/tasks.py:422]>}
True
TIME:  0.649970531463623

到目前为止,我们已经能够简单地开启多个协程,管理多个协程,可是,如果一个协程耗时过长,那么整个线程依旧会阻塞,这时我们只有开启多线程或者多进程来实现并行操作。