關(guān)于我
一個(gè)有思想的程序猿,終身學(xué)習(xí)實(shí)踐者,目前在一個(gè)創(chuàng)業(yè)團(tuán)隊(duì)任team lead,技術(shù)棧涉及Android、Python、Java和Go,這個(gè)也是我們團(tuán)隊(duì)的主要技術(shù)棧。
Github:https://github.com/hylinux1024
微信公眾號(hào):終身開發(fā)者(angrycode)
在前一篇《一文徹底搞懂Python可迭代(Iterable)、迭代器(Iterator)和生成器(Generator)的概念》 的文中,知道生成器(
Generator
)可由以下兩種方式定義:
- 列表生成器
-
使用
yield
定義的函數(shù)
在
Python
早期的版本中協(xié)程也是通過生成器來實(shí)現(xiàn)的,也就是基于生成器的協(xié)程(Generator-based Coroutines)。在前一篇介紹生成器的文章末尾舉了一個(gè)生產(chǎn)者-消費(fèi)者的例子,就是基于生成器的協(xié)程來實(shí)現(xiàn)的。
def producer(c):
n = 0
while n < 5:
n += 1
print('producer {}'.format(n))
r = c.send(n)
print('consumer return {}'.format(r))
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('consumer {} '.format(n))
r = 'ok'
if __name__ == '__main__':
c = consumer()
next(c) # 啟動(dòng)consumer
producer(c)
看了這段代碼,相信很多初學(xué)者和我一樣對(duì)基于生成器的協(xié)程實(shí)現(xiàn)其實(shí)很難馬上就能夠根據(jù)業(yè)務(wù)寫出自己的協(xié)程代碼。
Python
實(shí)現(xiàn)者們也注意到這個(gè)問題,因?yàn)樗?
Pythonic
了。而基于生成器的協(xié)程也將被廢棄,因此本文將重點(diǎn)介紹
asyncio
包的使用,以及涉及到的一些相關(guān)類概念。
注:我使用的
Python
環(huán)境是3.7。
0x00 何為協(xié)程(Coroutine)
協(xié)程(
Coroutine
)是在線程中執(zhí)行的,可理解為微線程,但協(xié)程的切換沒有上下文的消耗,它比線程更加輕量些。一個(gè)協(xié)程可以隨時(shí)中斷自己讓另一個(gè)協(xié)程開始執(zhí)行,也可以從中斷處恢復(fù)并繼續(xù)執(zhí)行,它們之間的調(diào)度是由程序員來控制的(可以看本文開篇處生產(chǎn)者-消費(fèi)者的代碼)。
定義一個(gè)協(xié)程
在
Python
3.5+版本新增了
aysnc
和
await
關(guān)鍵字,這兩個(gè)語法糖讓我們非常方便地定義和使用協(xié)程。
在函數(shù)定義時(shí)用
async
聲明就定義了一個(gè)協(xié)程。
import asyncio
# 定義了一個(gè)簡(jiǎn)單的協(xié)程
async def simple_async():
print('hello')
await asyncio.sleep(1) # 休眠1秒
print('python')
# 使用asynio中run方法運(yùn)行一個(gè)協(xié)程
asyncio.run(simple_async())
# 執(zhí)行結(jié)果為
# hello
# python
在協(xié)程中如果要調(diào)用另一個(gè)協(xié)程就使用
await
。
要注意
await
關(guān)鍵字要在
async
定義的函數(shù)中使用,而反過來
async
函數(shù)可以不出現(xiàn)
await
# 定義了一個(gè)簡(jiǎn)單的協(xié)程
async def simple_async():
print('hello')
asyncio.run(simple_async())
# 執(zhí)行結(jié)果
# hello
asyncio.run()
將運(yùn)行傳入的協(xié)程,負(fù)責(zé)管理
asyncio
事件循環(huán)。
除了
run()
方法可直接執(zhí)行協(xié)程外,還可以使用事件循環(huán)
loop
async def do_something(index):
print(f'start {time.strftime("%X")}', index)
await asyncio.sleep(1)
print(f'finished at {time.strftime("%X")}', index)
def test_do_something():
# 生成器產(chǎn)生多個(gè)協(xié)程對(duì)象
task = [do_something(i) for i in range(5)]
# 獲取一個(gè)事件循環(huán)對(duì)象
loop = asyncio.get_event_loop()
# 在事件循環(huán)中執(zhí)行task列表
loop.run_until_complete(asyncio.wait(task))
loop.close()
test_do_something()
# 運(yùn)行結(jié)果
# start 00:04:03 3
# start 00:04:03 4
# start 00:04:03 1
# start 00:04:03 2
# start 00:04:03 0
# finished at 00:04:04 3
# finished at 00:04:04 4
# finished at 00:04:04 1
# finished at 00:04:04 2
# finished at 00:04:04 0
可以看出幾乎同時(shí)啟動(dòng)了所有的協(xié)程。
其實(shí)翻閱源碼可知
asyncio.run()
的實(shí)現(xiàn)也是封裝了
loop
對(duì)象及其調(diào)用。而
asyncio.run()
每次都會(huì)創(chuàng)建一個(gè)新的事件循環(huán)對(duì)象用于執(zhí)行協(xié)程。
0x01 Awaitable對(duì)象
在
Python
中可等待(
Awaitable
)對(duì)象有:協(xié)程(
corountine
)、任務(wù)(
Task
)、
Future
。即這些對(duì)象可以使用
await
關(guān)鍵字進(jìn)行調(diào)用
await awaitable_object
1. 協(xié)程(Coroutine)
協(xié)程由
async def
聲明定義,一個(gè)協(xié)程可由另一個(gè)協(xié)程使用
await
進(jìn)行調(diào)用
async def nested():
print('in nested func')
return 13
async def outer():
# 要使用await 關(guān)鍵字 才會(huì)執(zhí)行一個(gè)協(xié)程函數(shù)返回的協(xié)程對(duì)象
print(await nested())
asyncio.run(outer())
# 執(zhí)行結(jié)果
# in nested func
# 13
如果在
outer()
方法中直接調(diào)用
nested()
而不使用
await
,將拋出一個(gè)
RuntimeWarning
async def outer():
# 直接調(diào)用協(xié)程函數(shù)不會(huì)發(fā)生執(zhí)行,只是返回一個(gè) coroutine 對(duì)象
nested()
asyncio.run(outer())
運(yùn)行程序,控制臺(tái)將輸出以下信息
RuntimeWarning: coroutine 'nested' was never awaited
nested()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
2. 任務(wù)(Task)
任務(wù)(
Task
)是可以用來
并發(fā)地
執(zhí)行協(xié)程。可以使用
asyncio.create_task()
將一個(gè)協(xié)程對(duì)象封裝成任務(wù),該任務(wù)將很快被排入調(diào)度隊(duì)列并執(zhí)行。
async def nested():
print('in nested func')
return 13
async def create_task():
# create_task 將一個(gè)協(xié)程對(duì)象打包成一個(gè) 任務(wù)時(shí),該協(xié)程就會(huì)被自動(dòng)調(diào)度運(yùn)行
task = asyncio.create_task(nested())
# 如果要看到task的執(zhí)行結(jié)果
# 可以使用await等待協(xié)程執(zhí)行完成,并返回結(jié)果
ret = await task
print(f'nested return {ret}')
asyncio.run(create_task())
# 運(yùn)行結(jié)果
# in nested func
# nested return 13
注:關(guān)于并發(fā)下文還會(huì)詳細(xì)說明。
3. Future
Future
是一種特殊的低層級(jí)(low-level
)對(duì)象,它是異步操作的最終結(jié)果(eventual result
)。
當(dāng)一個(gè) Future 對(duì)象 被等待,這意味著協(xié)程將保持等待直到該 Future 對(duì)象在其他地方操作完畢。
通常在應(yīng)用層代碼不會(huì)直接創(chuàng)建
Future
對(duì)象。在某些庫和
asyncio
模塊中的會(huì)使用到該對(duì)象。
async def used_future_func():
await function_that_returns_a_future_object()
0x02 并發(fā)
1. Task
前面我們知道
Task
可以并發(fā)地執(zhí)行。
asyncio.create_task()
就是一個(gè)把協(xié)程封裝成
Task
的方法。
async def do_after(what, delay):
await asyncio.sleep(delay)
print(what)
# 利用asyncio.create_task創(chuàng)建并行任務(wù)
async def corun():
task1 = asyncio.create_task(do_after('hello', 1)) # 模擬執(zhí)行1秒的任務(wù)
task2 = asyncio.create_task(do_after('python', 2)) # 模擬執(zhí)行2秒的任務(wù)
print(f'started at {time.strftime("%X")}')
# 等待兩個(gè)任務(wù)都完成,兩個(gè)任務(wù)是并行的,所以總時(shí)間兩個(gè)任務(wù)中最大的執(zhí)行時(shí)間
await task1
await task2
print(f'finished at {time.strftime("%X")}')
asyncio.run(corun())
# 運(yùn)行結(jié)果
# started at 23:41:08
# hello
# python
# finished at 23:41:10
task1
是一個(gè)執(zhí)行1秒的任務(wù),
task2
是一個(gè)執(zhí)行2秒的任務(wù),兩個(gè)任務(wù)并發(fā)的執(zhí)行,總共消耗2秒。
2. gather
除了使用
asyncio.create_task()
外還可以使用
asyncio.gather()
,這個(gè)方法接收協(xié)程參數(shù)列表
async def do_after(what, delay):
await asyncio.sleep(delay)
print(what)
async def gather():
print(f'started at {time.strftime("%X")}')
# 使用gather可將多個(gè)協(xié)程傳入
await asyncio.gather(
do_after('hello', 1),
do_after('python', 2),
)
print(f'finished at {time.strftime("%X")}')
asyncio.run(gather())
# 運(yùn)行結(jié)果
# started at 23:47:50
# hello
# python
# finished at 23:47:52
兩個(gè)任務(wù)消耗的時(shí)間為其中消耗時(shí)間最長(zhǎng)的任務(wù)。
0x03 引用
- https://docs.python.org/3/library/asyncio-task.html
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
