目錄
- 一、進(jìn)程間通訊
-
二、隊(duì)列
- 2.1 概念介紹---multiprocessing.Queue
- 2.2 方法介紹
- 2.3 其他方法(了解)
-
三、Queue隊(duì)列-實(shí)例演示
- 3.1 但看隊(duì)列用法
- 3.2 子進(jìn)程發(fā)送數(shù)據(jù)給父進(jìn)程
-
四、生產(chǎn)者消費(fèi)者模型
- 4.1 為什么要使用生產(chǎn)者和消費(fèi)者模式
- 4.2 什么是生產(chǎn)者消費(fèi)者模式
- 4.3 基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
-
五、 JoinableQueue隊(duì)列
- 5.1 JoinableQueue的使用
- 5.2 通過JoinableQueue隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
一、進(jìn)程間通訊
IPC。就是多個(gè)進(jìn)程間相互通訊。進(jìn)程間通訊并不只是單個(gè)語言的問題。而是每個(gè)語言都有的。實(shí)現(xiàn)進(jìn)程間通訊有很多。C語言中有:管道:pipe(匿名管道),有名管道、信號(hào)、共享內(nèi)存、消息隊(duì)列、信號(hào)量等等
Python中這里通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通訊
二、隊(duì)列
2.1 概念介紹---multiprocessing.Queue
創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
Queue([maxsize])
創(chuàng)建共享的進(jìn)程隊(duì)列。
參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。
底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。
2.2 方法介紹
Queue([maxsize])
:創(chuàng)建共享的進(jìn)程隊(duì)列。maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。另外,還需要運(yùn)行支持進(jìn)程以便隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?
Queue的實(shí)例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
:返回q中的一個(gè)項(xiàng)目。如果q為空,此方法將阻塞,直到隊(duì)列中有項(xiàng)目可用為止。block用于控制阻塞行為,默認(rèn)為True. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時(shí)時(shí)間,用在阻塞模式中。如果在制定的時(shí)間間隔內(nèi)沒有項(xiàng)目變?yōu)榭捎茫瑢⒁l(fā)Queue.Empty異常。
q.get_nowait()
:同
q.get(False)
方法。
q.put(item [, block [,timeout ] ] )
:將item放入隊(duì)列。如果隊(duì)列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認(rèn)為True。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時(shí)間長(zhǎng)短。超時(shí)后將引發(fā)Queue.Full異常。
q.qsize()
:返回隊(duì)列中目前項(xiàng)目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠,因?yàn)樵诜祷亟Y(jié)果和在稍后程序中使用結(jié)果之間,隊(duì)列中可能添加或刪除了項(xiàng)目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。
q.empty()
:如果調(diào)用此方法時(shí) q為空,返回True。如果其他進(jìn)程或進(jìn)程正在往隊(duì)列中添加項(xiàng)目,結(jié)果是不可靠的。也就是說,在返回和使用結(jié)果之間,隊(duì)列中可能已經(jīng)加入新的項(xiàng)目。
q.full()
:如果q已滿,返回為True. 由于進(jìn)程的存在,結(jié)果也可能是不可靠的(參考
q.empty()
方法)。
2.3 其他方法(了解)
q.close()
:關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法時(shí),后臺(tái)進(jìn)程將繼續(xù)寫入那些已入隊(duì)列但尚未寫入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將自動(dòng)調(diào)用此方法。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號(hào)或異常。例如,如果某個(gè)使用者正被阻塞在
get()
操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致
get()
方法返回錯(cuò)誤。
q.cancel_join_thread()
:不會(huì)再進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)進(jìn)程。這可以防止
join_thread()
方法阻塞。
q.join_thread()
:連接隊(duì)列的后臺(tái)進(jìn)程。此方法用于在調(diào)用
q.close()
方法后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用
q.cancel_join_thread()
方法可以禁止這種行為。
三、Queue隊(duì)列-實(shí)例演示
3.1 但看隊(duì)列用法
'''
ipc 進(jìn)程間通訊。使用隊(duì)列實(shí)現(xiàn)
管道:pipe 基于共享的內(nèi)存空間
隊(duì)列:pipe+鎖 Queue
put:放(可以設(shè)置阻塞非阻塞,和等待時(shí)間)
get:取(可以設(shè)置阻塞非阻塞,和等待時(shí)間)
'''
### 語法
q = Queue(3) # 創(chuàng)建隊(duì)列, 可以設(shè)置最大值
## put 放
q.put('你好') # 往隊(duì)列中放入值,可以設(shè)置阻塞和等待時(shí)間,默認(rèn):滿了再放就會(huì)阻塞等待
q.put([1,2,4])
q.put(2)
### 不設(shè)置等待時(shí)間,隊(duì)列滿了繼續(xù)放
# q.put(5) # 阻塞,隊(duì)列滿了,等待隊(duì)列空了就放進(jìn)去
## get 拿
print(q.get()) # 獲取隊(duì)列中的內(nèi)容,可以設(shè)置阻塞和等待時(shí)間,默認(rèn):拿不到內(nèi)容就會(huì)阻塞等待
print(q.get())
print(q.get())
### 不設(shè)置等待時(shí)間
# print(q.get()) # 阻塞,一直等待獲取隊(duì)列內(nèi)容
### 設(shè)置等待時(shí)間
# print(q.get(timeout=2)) # 等兩秒,等不到就報(bào)錯(cuò)
# q.put(88,timeout=2) # 不會(huì)報(bào)錯(cuò),因?yàn)殛?duì)列中沒內(nèi)容
### 全都設(shè)置非阻塞模式
q = Queue(3) # 創(chuàng)建隊(duì)列, 可以設(shè)置最大值
q.put('你好', block=False) # 設(shè)置非阻塞,如果滿了再放就會(huì)報(bào)錯(cuò)
q.put([1,2,4], block=False)
q.put(2, block=False)
# q.put(4, block=False) # 報(bào)錯(cuò),隊(duì)列滿了
# q.put_nowait('666') # 等同 block = False,報(bào)錯(cuò),隊(duì)列滿了
3.2 子進(jìn)程發(fā)送數(shù)據(jù)給父進(jìn)程
import time
from multiprocessing import Process, Queue
def f(q):
q.put([time.asctime(), 'from Eva', 'hello']) #調(diào)用主函數(shù)中p進(jìn)程傳遞過來的進(jìn)程參數(shù) put函數(shù)為向隊(duì)列中添加一條數(shù)據(jù)。
if __name__ == '__main__':
q = Queue() #創(chuàng)建一個(gè)Queue對(duì)象
p = Process(target=f, args=(q,)) #創(chuàng)建一個(gè)進(jìn)程
p.start()
print(q.get())
p.join()
上面是一個(gè)queue的簡(jiǎn)單應(yīng)用,使用隊(duì)列q對(duì)象調(diào)用get函數(shù)來取得隊(duì)列中最先進(jìn)入的數(shù)據(jù)。 接下來看一個(gè)稍微復(fù)雜一些的例子:批量生產(chǎn)數(shù)據(jù)放入隊(duì)列再批量獲取結(jié)果。
四、生產(chǎn)者消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)進(jìn)程和消費(fèi)進(jìn)程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
4.1 為什么要使用生產(chǎn)者和消費(fèi)者模式
在進(jìn)程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的進(jìn)程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的進(jìn)程。在多進(jìn)程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
4.2 什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
4.3 基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
ef producer(q,name,food):
'''生產(chǎn)者'''
for i in range(10):
print(f'{name}生產(chǎn)了{(lán)food}{i}')
res = f'{food}{i}'
q.put(res)
def consumer(q,name):
'''消費(fèi)者'''
while 1:
res = q.get()
if res == None:
break
print(f'{name}買了{(lán)res}')
if __name__ == '__main__':
q = Queue() # 使用隊(duì)列
p1 = Process(target=producer,args=(q,"xc","意大利面"))
c1 = Process(target=consumer,args=(q,"haha"))
p2 = Process(target=producer, args=(q, "xc", "牛肉"))
c2 = Process(target=consumer, args=(q, "xixi"))
p3 = Process(target=producer, args=(q, "xc", "可樂"))
p1.start()
c1.start()
p2.start()
c2.start()
p3.start()
p1.join() # 等待生產(chǎn)者結(jié)束
p2.join() # 等待生產(chǎn)者結(jié)束
p3.join() # 等待生產(chǎn)者結(jié)束
q.put(None) # 有幾個(gè)消費(fèi)者就要發(fā)幾個(gè)None,讓 子進(jìn)程收到None就結(jié)束
q.put(None) # 有幾個(gè)消費(fèi)者就要發(fā)幾個(gè)None,讓 子進(jìn)程收到None就結(jié)束
結(jié)束信號(hào)None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號(hào)。 有幾個(gè)消費(fèi)者就要發(fā)幾個(gè)None,讓 子進(jìn)程收到None就結(jié)束
五、 JoinableQueue隊(duì)列
JoinableQueue隊(duì)列就對(duì)Queue隊(duì)列的改良版,加入了類似信號(hào)量的機(jī)制。
5.1 JoinableQueue的使用
語法:
-
實(shí)例化對(duì)象:
q = JoinableQueue()
-
向隊(duì)列中放入內(nèi)容,相當(dāng)于 信號(hào)量+1 操作:
q.put(xxx)
-
從隊(duì)列中取出內(nèi)容:
q.get()
-
任務(wù)結(jié)束,相當(dāng)于 信號(hào)量-1 操作:
q.task_done()
- 當(dāng)這個(gè) 信號(hào)量不為0時(shí),會(huì)阻塞等待,計(jì)數(shù)器為0后通過。
實(shí)例:
# JoinableQueue的使用
q = JoinableQueue()
q.put('1') # +1
q.put('2') # +1
print(q.get())
q.task_done() # -1
print(q.get())
q.task_done() # -1
q.join() #計(jì)數(shù)器不為0會(huì)阻塞等待 計(jì)數(shù)器為0后通過
5.2 通過JoinableQueue隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
def producer(q, name, food):
'''生產(chǎn)者'''
for i in range(10):
print(f'{name}生產(chǎn)了{(lán)food}{i}')
res = f'{food}{i}'
q.put(res) # +1
def consumer(q, name):
'''消費(fèi)者'''
while 1:
res = q.get()
if res == None:
break
print(f'{name}買了{(lán)res}')
q.task_done()
if __name__ == '__main__':
q = JoinableQueue() # 使用加了join的隊(duì)列
p1 = Process(target=producer, args=(q, "xc", "意大利面"))
c1 = Process(target=consumer, args=(q, "haha"))
p2 = Process(target=producer, args=(q, "xc", "牛肉"))
c2 = Process(target=consumer, args=(q, "xixi"))
p3 = Process(target=producer, args=(q, "xc", "可樂"))
# 把消費(fèi)者變成守護(hù)進(jìn)程,主進(jìn)程結(jié)束,子進(jìn)程就結(jié)束
c1.daemon = True # 進(jìn)程結(jié)束,子進(jìn)程就結(jié)束
c2.daemon = True # 進(jìn)程結(jié)束,子進(jìn)程就結(jié)束
p1.start()
c1.start()
p2.start()
c2.start()
p3.start()
p1.join() # 等待生產(chǎn)者結(jié)束
p2.join() # 等待生產(chǎn)者結(jié)束
p3.join() # 等待生產(chǎn)者結(jié)束
上述代碼中,是 通過將子進(jìn)程變?yōu)槭刈o(hù)進(jìn)程 ,在消費(fèi)者的子進(jìn)程中 每次收到消息都task_done ,然后 等待主進(jìn)程結(jié)束,直接結(jié)束子進(jìn)程 。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
