影分身术之-asyncio_part4

不同线程的事件循环

通过前面的介绍,我们现在能在单线程中实现多协程的工作。

这节就说说怎么在多线程中管理协程。

对于多线程中的协程操作,我们一般把一个线程当做管理线程,而另一个当做工作线程,其中有多个协程进行不同的 task 操作。

1
2
3
4
5
import asyncio
from threading import Thread
import time

now = lambda: time.time()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

def more_work(x):
print('More work start --> {}'.format(x))
time.sleep(x)
print('Finished more work {}'.format(x))

start = now()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))
t.start()

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)

print('TIME: {}'.format(now() - start))
TIME: 0.0023021697998046875
More work start --> 6
Finished more work 6
More work start --> 3
Finished more work 3

可见,当前线程不会阻塞。新的线程会按顺序执行 call_soon_threadsafe() 方法中注册的函数,因为注册的方法不是协程,所以新开的线程大概会执行 6+3 s。

那么,多线程实现协程的过程大致也是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

async def do_some_work(x):
print('More work --> {}'.format(x))
await asyncio.sleep(x)
print('{} Done after {}s'.format(x, x))

start = now()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))
t.start()

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(3), new_loop)

print('TIME: ', now() - start)
TIME:  0.0006730556488037109
More work --> 6
More work --> 3
3 Done after 3s
6 Done after 6s

由于我们在子线程开启的是协程任务,所以子线程大概耗时 6s。