python異步IO編程(二)
目錄
開門見山
Async IO設計模式
事件循環
asyncio
?中的其他頂層函數
?
開門見山
下面我們用兩個簡單的例子來讓你對異步IO有所了解
import asyncio async def count(): print ( " One " ) await asyncio.sleep( 1 ) print ( " Two " ) async def main(): await asyncio.gather(count(),count(),count()) if __name__ == " __main__ " : import time start_time = time.time() asyncio.run(main()) end_time = time.time() print ( " 執行時間:%s " %(end_time-start_time)+ " 秒 " )
運行結果:
One
One
One
Two
Two
Two
執行時間:
1.0010051727294922秒
這個輸出的順序是異步IO的核心,由單一事件循環或協調器負責與每一個?
count()
?方法調用交流。當每一個任務執行到?
await asyncio.sleep(1))
?時,函數會通知事件循環并交出控制權限,“我要睡眠一秒鐘,在此期間,繼續做其他有意義的事”(其他協程)。
與同步的版本對比:
def count(): print ( " One " ) time.sleep( 1 ) print ( " Two " ) def main(): for _ in range(3 ): count() if __name__ == " __main__ " : import time start_time = time.time() main() end_time = time.time() print ( " 執行時間:%s " %(end_time-start_time)+ " 秒 " )
運行結果:
One
Two
One
Two
One
Two
執行時間:
3.001182794570923秒
使用?
time.sleep()
?和?
asyncio.sleep()
?看起來有點簡陋,這里一般用來替代標準輸入等耗時的操作。(最簡單的等待就是使用?
sleep()
,基本上什么也不做。)也就是說?
time.sleep()
?可以表示任何耗時的阻塞函數的調用,而?
asyncio.sleep()
?用于表示非阻塞的函數調用(但是也是需要一定時間來完成)。
異步io的關鍵在于,await io操作,此時,當前協程就會被掛起,時間循環轉而執行其他攜程,但是要注意前面這句話,并不是說所有攜程里的await都會導致當前攜程的掛起,要看await后面跟的是什么,如果跟的是我們定義的攜程,則會執行這個攜程,如果是asyncio模塊制作者定義的固有攜程,比如模擬io操作的asyncio.sleep,以及io操作,比如網絡io:
asyncio.open_connection這些,才會掛起當前攜程。
?
?
?
Async IO設計模式
鏈接協程
協程的一個關鍵特性是它們可以被鏈接到一起。(記住,一個協程是可等待的,所以另一個協程可以使用?
await
?來等待它。)這個特性允許你將程序劃分成更小的,可管理可回收的協程:
注意觀察輸出,
part1()
?的睡眠時間是可變的,而當它的返回結果可用的時候,
part2()
?開始執行并使用這些結果。
?按照設定,
main()
?函數執行的時間應該與它聚集在一起的任務中最長的一個執行時間相同。
?
?
使用隊列
在?
asyncio
?包中提供了與隊列模塊中相似的隊列類。目前為止,我們的例子中還沒有使用到隊列結構。在?
chained.py
?中的每一個?
task(feature)
?都由一組協程組成,這些協程都有一個單一的輸入鏈,并顯式的等待其它協程。
還有一種結構同樣可以配合異步IO使用:許多互不關聯的生產者將元素加入到一個隊列中,每一個生產者可能在不同的時間,隨機且無序的加入多個元素到隊列中。還有一組消費者不管任何信號,不停地從隊列中拉取元素。
這種設計中,任何一個生產者和消費者都沒有關聯。消費者事先并不知道生產者的數量,甚至不知道將累計添加的隊列中的元素數。
它需要一個單獨的生產者或消費者在一個可指定的時間內,分別向隊列中放入或從隊列中提取元素。生產者與消費者通過隊列的吞吐進行通信,而不是兩者直接通信。
----------
注:由于?
queue.Queue()
?是線程安全的,所以它經常被用于開發基于線程的程序,而在異步IO編程中你不需要關心線程安全問題。(除非你將這兩者合并在一起使用,但在本教程中并沒有這么做。)
隊列的一種用法(比如這里的情況)是充當生產者與消費者之間的通信通道,從而避免它們直接關聯或聯系。
----------
這個程序的同步版本看起來有些讓人不忍直視:一組生產者連續且阻塞的向隊列中添加元素,一次只有一個生產者在工作。只有當所有生產者都運行結束,消費者才會從隊列中一個接一個的取出元素并處理。這會造成大量的延時。元素可能會在隊列中被擱置,而不是被立刻取出并處理。
而異步版本的程序?
asyncq.py
?如下所示。運行過程中的難點是向消費者發送生產者已經完成的信號。否則,
await q.get()
?將會因為隊列已滿而被無限掛起,但是消費者卻不知道生產者已經完成的信息。
這里是全部的腳本文件:
幾個協程作為輔助函數返回隨機字符串,幾分之一秒的性能計數器以及隨機整數。生產者將1-5的元素放入隊列中,每一個元素都是一個?
(i, t)
?的元組,其中?
i
?是一個隨機字符串,
t
?是生產者嘗試將元組放入隊列所需要的時間。
當消費者從隊列取出元素時,它只使用元素放入隊列時所使用的時間戳計算耗費時間。
牢記?
asyncio.sleep()
?用于模仿其他復雜的協程,如果這里是常規阻塞函數,則會耗盡時間并阻塞其他所有函數的運行。
這里有一個實現了兩個生產者和五個消費者的測試例子
這個例子中,元素在幾分之一秒內被處理好,產生延時可能有兩個原因:
? ? 1.?很大程度上不可避免的標準開銷
? ? 2.?元素出現在隊列中,而所有消費者都在睡眠的情況
幸運的是,關于第二個原因,正常情況下,將消費者擴展到成百上個也是允許的。你不應該對?
python3 asyncq.py -p 5 -c 100
?有什么疑問。這里有一點比較關鍵,理論上你可以使用不同的操作系統和用戶來管理和控制生產者和消費者,而隊列作為中間消息吞吐的媒介。
目前,你已經進入到異步IO的學習中,并且看到了三個由?
async
?和?
await
?定義的協程以及?
asyncio
?調用的相關示例。如果你只是想深入了解Python新式協程的實現機制,而并不是想全盤關注,下一節將會有一個直觀的介紹。
?
?
事件循環
可以把事件循環看作一個監控協程的?
while True
?循環,獲取協程被閑置期間的反饋,以及可以在此期間執行的內容。它可以對處于等待態的協程可用時喚起閑置協程。
?Python 3.7中引入的?
asyncio.run()
?負責獲取事件循環,在任務被標記完成前運行任務,然后關閉事件循環。
?事件循環的整個管理過程由一個函數調用隱式處理:
asyncio.run(main())
使用?
get_event_loop()
?管理?
asyncio
?事件循環還有一種更為繁復的方式,典型的示例如下所示:
loop = asyncio.get_event_loop() try : loop.run_until_complete(main()) finally : loop.colse()
可能經常在舊的示例中看到?
loop.get_event_loop()
,但是除非你需要對事件循環進行精細的控制,使用?
asyncio.run()
?應該足以滿足大多數程序的需要。
如果你需要在Python程序中與事件循環交互,那么舊式風格的?
loop
?是一個不錯的選擇,它支持使用?
loop.is_running()
?和?
loop.is_closed()
?進行自省。如果需要獲得更精細的控制也可以進行操作,例如通過循環參數傳遞來?調度回調函數。
更重要的是理解事件循環的底層實現機制,這里有幾點關于事件循環需要強調的:
# 1:協程在與事件循環綁定之前不會自行處理。
你已經在關于生成器的解釋中看到過這一點,但是這仍值得重申一遍。如果有一個需要等待其它協程的主協程,那么簡單的單獨調用它幾乎沒有效果:
?記住,在調度?
main()
?協程(future對象)時使用?
asyncio.run()
?會真正強制執行事件循環。
(其它協程可以通過?
await
?執行,通常會在?
asyncio.run()
?中包裝?
main()
?函數,然后在這里使用?
await
?調用鏈式協程。)
#2:默認情況下,一個異步IO事件循環會運行在單核CPU的單線程中 , 通常,單核CPU運行一個單線程的事件循環是綽綽有余的。跨多核運行事件循環也是可行的。更多信息可以參考?John Reese 的演講,同時要關注你的筆記本可能會超負荷運載轉。
#3:事件循環是可插拔的
。就是說,如果你想,你可以實現自己的事件循環并執行相同的任務。CPython實現的?
uvloop
?包就很好的說明了這一點。
”可插拔式事件循環“可以可以理解為:你可以使用任何可用的事件循環的實現,這與協程本身的結構無關。
asyncio
?包自帶了兩個不同的事件循環實現,默認實現基于?
selectors
?模塊。(第二種實現僅適用于Windows系統。)
?
?
?
asyncio
?中的其他頂層函數
除了?
asyncio.run()
,你還看到過其它諸如?
asyncio.create_task()
?和?
asyncio.gather()
?的包級函數。
你可以在?
asyncio.run()
?之后使用?
create_task()
?來調度協程對象的執行。
這種模式有一個巧妙之處:如果你不在?
main()
?中使用?
await t
,那么它可能在執行完成之前收到?
main()
?的信號并結束。因為?
asyncio.run(main())
?調用了?
loop.run_until_complete(main())
?方法,事件循環只會關注(沒有顯式使用
await t
的情況下)
main()
?方法是否完成,而不會關注?
main()
?中創建的任務是否完成。沒有?
await t
,循環中的其他任務在它們運行完成之前可能會被取消。如果有需要,你可以使用?
asyncio.Task.all_tasks()
獲取當前掛起的任務列表。
注:
asyncio.create_task()
?是在Python 3.7被引入的。在Python 3.6以及更早的版本中,使用?
asyncio.ensure_future()
?代替?
create_task()
。
另外,還有?
asyncio.gather()
。雖然?
gather()
?并沒有做什么特殊的事情,只是將一組協程(futures)放到一個 future 中。它會返回一個 future 對象作為結果,并且,如果你使用了?
await asyncio.gather()
?并指定了多個任務或協程,那么它會等待所有的任務或協程運行結束。(這與我們之前示例中
queue.join()
?有些相似。)在?
gather()
?的返回中包含所有輸入后的結果列表。
你可能已經注意到?
gather()
?會等待你傳給它的 Futures 或協程的結果集。或者你可以順序遍歷?
asyncio.as_completed()
?來完成任務,該方法會返回一個迭代器,在完成任務時返回結果。像下面這樣,在?
coro([10, 5, 0])
?完成之前,
coro([3, 2, 1])
?的結果已經可用了,而使用?
gather()
?則與之不同:
最后,你還應該知道?
asyncio.ensure_future()
。你可能很少會用到它,因為它是一個比較低級別的管道API,并且很大程度上被后來引入的?
create_task()
?所替代。
?
await
?的優先級
雖然關鍵字?
await
?與?
yield
?的行為有些相似,但是?
await
?的優先級明高于后者。這意味著更嚴格的編碼約束,在很多實例中需要在?
yield
?語句中時使用括號,而相同功能時使用?
await
?則不需要加括號。有關更多信息,請參閱PEP 492中
await
表達式示例。
?
?
資源附錄
Python的幾個特殊版本
Python中的異步IO發展之迅速,已經難以追蹤其更新歷程。這里列出了幾個與?
asyncio
?有關的Python的小版本變更以及介紹:
? ? 3.3:在生成器中開始允許?
yield from
?表達式。
? ? 3.4:
asyncio
?作為臨時性功能被引入Python的標準庫。
? ? 3.5:
async
?和?
await
?加入Python語法,用于標識和等待協程運行。此時它們還沒有成為保留關鍵字。(你仍然可以使用
async
?和
await
?定義函數名或變量名。)
? ? 3.6:引入了異步生成器與異步推導式。宣布?
asyncio
?為穩定版,而不再是臨時版。
? ? 3.7:
async
?和?
await
?成為保留關鍵字。(它們不能再用作標識符。)它們主要用在替換?
asyncio.coroutine()
?生成器。在?
asyncio
?包中引入了?
asyncio.run()
?以及其他許多功能。
如果你想要保證安全(并且能夠使用?
asyncio.run()
),那么你可以使用Python 3.7或更高的版本來獲取完整的功能集。
?
轉載:
https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ
https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA
https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
