日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

114 Python程序中的進(jìn)程操作-進(jìn)程間通信

系統(tǒng) 1947 0

目錄

  • 一、進(jìn)程間通訊
  • 二、隊(duì)列
    • 2.1 概念介紹---multiprocessing.Queue
    • 2.2 方法介紹
    • 2.3 其他方法(了解)
  • 三、Queue隊(duì)列-實(shí)例演示
    • 3.1 但看隊(duì)列用法
    • 3.2 子進(jìn)程發(fā)送數(shù)據(jù)給父進(jìn)程
  • 四、生產(chǎn)者消費(fèi)者模型
    • 4.1 為什么要使用生產(chǎn)者和消費(fèi)者模式
    • 4.2 什么是生產(chǎn)者消費(fèi)者模式
    • 4.3 基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
  • 五、 JoinableQueue隊(duì)列
    • 5.1 JoinableQueue的使用
    • 5.2 通過JoinableQueue隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

一、進(jìn)程間通訊

IPC。就是多個(gè)進(jìn)程間相互通訊。進(jìn)程間通訊并不只是單個(gè)語言的問題。而是每個(gè)語言都有的。實(shí)現(xiàn)進(jìn)程間通訊有很多。C語言中有:管道:pipe(匿名管道),有名管道、信號(hào)、共享內(nèi)存、消息隊(duì)列、信號(hào)量等等

Python中這里通過隊(duì)列實(shí)現(xiàn)進(jìn)程間通訊

二、隊(duì)列

2.1 概念介紹---multiprocessing.Queue

創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。

Queue([maxsize]) 創(chuàng)建共享的進(jìn)程隊(duì)列。
參數(shù) :maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。

底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。

2.2 方法介紹

Queue([maxsize]) :創(chuàng)建共享的進(jìn)程隊(duì)列。maxsize是隊(duì)列中允許的最大項(xiàng)數(shù)。如果省略此參數(shù),則無大小限制。底層隊(duì)列使用管道和鎖定實(shí)現(xiàn)。另外,還需要運(yùn)行支持進(jìn)程以便隊(duì)列中的數(shù)據(jù)傳輸?shù)降讓庸艿乐小?
Queue的實(shí)例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) :返回q中的一個(gè)項(xiàng)目。如果q為空,此方法將阻塞,直到隊(duì)列中有項(xiàng)目可用為止。block用于控制阻塞行為,默認(rèn)為True. 如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時(shí)時(shí)間,用在阻塞模式中。如果在制定的時(shí)間間隔內(nèi)沒有項(xiàng)目變?yōu)榭捎茫瑢⒁l(fā)Queue.Empty異常。

q.get_nowait() :同 q.get(False) 方法。

q.put(item [, block [,timeout ] ] ) :將item放入隊(duì)列。如果隊(duì)列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認(rèn)為True。如果設(shè)置為False,將引發(fā)Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時(shí)間長(zhǎng)短。超時(shí)后將引發(fā)Queue.Full異常。

q.qsize() :返回隊(duì)列中目前項(xiàng)目的正確數(shù)量。此函數(shù)的結(jié)果并不可靠,因?yàn)樵诜祷亟Y(jié)果和在稍后程序中使用結(jié)果之間,隊(duì)列中可能添加或刪除了項(xiàng)目。在某些系統(tǒng)上,此方法可能引發(fā)NotImplementedError異常。

q.empty() :如果調(diào)用此方法時(shí) q為空,返回True。如果其他進(jìn)程或進(jìn)程正在往隊(duì)列中添加項(xiàng)目,結(jié)果是不可靠的。也就是說,在返回和使用結(jié)果之間,隊(duì)列中可能已經(jīng)加入新的項(xiàng)目。

q.full() :如果q已滿,返回為True. 由于進(jìn)程的存在,結(jié)果也可能是不可靠的(參考 q.empty() 方法)。

2.3 其他方法(了解)

q.close() :關(guān)閉隊(duì)列,防止隊(duì)列中加入更多數(shù)據(jù)。調(diào)用此方法時(shí),后臺(tái)進(jìn)程將繼續(xù)寫入那些已入隊(duì)列但尚未寫入的數(shù)據(jù),但將在此方法完成時(shí)馬上關(guān)閉。如果q被垃圾收集,將自動(dòng)調(diào)用此方法。關(guān)閉隊(duì)列不會(huì)在隊(duì)列使用者中生成任何類型的數(shù)據(jù)結(jié)束信號(hào)或異常。例如,如果某個(gè)使用者正被阻塞在 get() 操作上,關(guān)閉生產(chǎn)者中的隊(duì)列不會(huì)導(dǎo)致 get() 方法返回錯(cuò)誤。

q.cancel_join_thread() :不會(huì)再進(jìn)程退出時(shí)自動(dòng)連接后臺(tái)進(jìn)程。這可以防止 join_thread() 方法阻塞。

q.join_thread() :連接隊(duì)列的后臺(tái)進(jìn)程。此方法用于在調(diào)用 q.close() 方法后,等待所有隊(duì)列項(xiàng)被消耗。默認(rèn)情況下,此方法由不是q的原始創(chuàng)建者的所有進(jìn)程調(diào)用。調(diào)用 q.cancel_join_thread() 方法可以禁止這種行為。

三、Queue隊(duì)列-實(shí)例演示

3.1 但看隊(duì)列用法

          
            '''
ipc 進(jìn)程間通訊。使用隊(duì)列實(shí)現(xiàn)
管道:pipe 基于共享的內(nèi)存空間
隊(duì)列:pipe+鎖 Queue
put:放(可以設(shè)置阻塞非阻塞,和等待時(shí)間)
get:取(可以設(shè)置阻塞非阻塞,和等待時(shí)間)
'''
### 語法
q = Queue(3) # 創(chuàng)建隊(duì)列, 可以設(shè)置最大值
## put 放
q.put('你好')     # 往隊(duì)列中放入值,可以設(shè)置阻塞和等待時(shí)間,默認(rèn):滿了再放就會(huì)阻塞等待
q.put([1,2,4])
q.put(2)

### 不設(shè)置等待時(shí)間,隊(duì)列滿了繼續(xù)放
# q.put(5)    # 阻塞,隊(duì)列滿了,等待隊(duì)列空了就放進(jìn)去

## get 拿
print(q.get())  # 獲取隊(duì)列中的內(nèi)容,可以設(shè)置阻塞和等待時(shí)間,默認(rèn):拿不到內(nèi)容就會(huì)阻塞等待
print(q.get())
print(q.get())

### 不設(shè)置等待時(shí)間
# print(q.get())  # 阻塞,一直等待獲取隊(duì)列內(nèi)容

### 設(shè)置等待時(shí)間
# print(q.get(timeout=2))     # 等兩秒,等不到就報(bào)錯(cuò)
# q.put(88,timeout=2)   # 不會(huì)報(bào)錯(cuò),因?yàn)殛?duì)列中沒內(nèi)容


### 全都設(shè)置非阻塞模式
q = Queue(3) # 創(chuàng)建隊(duì)列, 可以設(shè)置最大值
q.put('你好', block=False)     # 設(shè)置非阻塞,如果滿了再放就會(huì)報(bào)錯(cuò)
q.put([1,2,4], block=False)
q.put(2, block=False)
# q.put(4, block=False)       # 報(bào)錯(cuò),隊(duì)列滿了

# q.put_nowait('666') # 等同 block = False,報(bào)錯(cuò),隊(duì)列滿了
          
        

3.2 子進(jìn)程發(fā)送數(shù)據(jù)給父進(jìn)程

          
            import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), 'from Eva', 'hello'])  #調(diào)用主函數(shù)中p進(jìn)程傳遞過來的進(jìn)程參數(shù) put函數(shù)為向隊(duì)列中添加一條數(shù)據(jù)。

if __name__ == '__main__':
    q = Queue() #創(chuàng)建一個(gè)Queue對(duì)象
    p = Process(target=f, args=(q,)) #創(chuàng)建一個(gè)進(jìn)程
    p.start()
    print(q.get())
    p.join()
          
        

上面是一個(gè)queue的簡(jiǎn)單應(yīng)用,使用隊(duì)列q對(duì)象調(diào)用get函數(shù)來取得隊(duì)列中最先進(jìn)入的數(shù)據(jù)。 接下來看一個(gè)稍微復(fù)雜一些的例子:批量生產(chǎn)數(shù)據(jù)放入隊(duì)列再批量獲取結(jié)果。

四、生產(chǎn)者消費(fèi)者模型

在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)進(jìn)程和消費(fèi)進(jìn)程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。

4.1 為什么要使用生產(chǎn)者和消費(fèi)者模式

在進(jìn)程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的進(jìn)程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的進(jìn)程。在多進(jìn)程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。

4.2 什么是生產(chǎn)者消費(fèi)者模式

生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。

4.3 基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

          
            ef producer(q,name,food):
    '''生產(chǎn)者'''
    for i in range(10):
        print(f'{name}生產(chǎn)了{(lán)food}{i}')
        res = f'{food}{i}'
        q.put(res)


def consumer(q,name):
    '''消費(fèi)者'''
    while 1:
        res = q.get()
        if res == None:
            break
        print(f'{name}買了{(lán)res}')


if __name__ == '__main__':
    q = Queue()   # 使用隊(duì)列
    p1 = Process(target=producer,args=(q,"xc","意大利面"))
    c1 = Process(target=consumer,args=(q,"haha"))
    p2 = Process(target=producer, args=(q, "xc", "牛肉"))
    c2 = Process(target=consumer, args=(q, "xixi"))
    p3 = Process(target=producer, args=(q, "xc", "可樂"))
    p1.start()
    c1.start()
    p2.start()
    c2.start()
    p3.start()

    p1.join()   # 等待生產(chǎn)者結(jié)束
    p2.join()   # 等待生產(chǎn)者結(jié)束
    p3.join()   # 等待生產(chǎn)者結(jié)束

    q.put(None) # 有幾個(gè)消費(fèi)者就要發(fā)幾個(gè)None,讓 子進(jìn)程收到None就結(jié)束
    q.put(None) # 有幾個(gè)消費(fèi)者就要發(fā)幾個(gè)None,讓 子進(jìn)程收到None就結(jié)束
          
        

結(jié)束信號(hào)None,不一定要由生產(chǎn)者發(fā),主進(jìn)程里同樣可以發(fā),但主進(jìn)程需要等生產(chǎn)者結(jié)束后才應(yīng)該發(fā)送該信號(hào)。 有幾個(gè)消費(fèi)者就要發(fā)幾個(gè)None,讓 子進(jìn)程收到None就結(jié)束

五、 JoinableQueue隊(duì)列

JoinableQueue隊(duì)列就對(duì)Queue隊(duì)列的改良版,加入了類似信號(hào)量的機(jī)制。

5.1 JoinableQueue的使用

語法:

  1. 實(shí)例化對(duì)象: q = JoinableQueue()
  2. 向隊(duì)列中放入內(nèi)容,相當(dāng)于 信號(hào)量+1 操作: q.put(xxx)
  3. 從隊(duì)列中取出內(nèi)容: q.get()
  4. 任務(wù)結(jié)束,相當(dāng)于 信號(hào)量-1 操作: q.task_done()
  5. 當(dāng)這個(gè) 信號(hào)量不為0時(shí),會(huì)阻塞等待,計(jì)數(shù)器為0后通過。

實(shí)例:

          
            # JoinableQueue的使用
q = JoinableQueue()

q.put('1')  # +1
q.put('2')  # +1

print(q.get())
q.task_done()   # -1    
print(q.get())
q.task_done()  # -1
q.join() #計(jì)數(shù)器不為0會(huì)阻塞等待 計(jì)數(shù)器為0后通過
          
        

5.2 通過JoinableQueue隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型

          
            def producer(q, name, food):
    '''生產(chǎn)者'''
    for i in range(10):
        print(f'{name}生產(chǎn)了{(lán)food}{i}')
        res = f'{food}{i}'
        q.put(res)  # +1


def consumer(q, name):
    '''消費(fèi)者'''
    while 1:
        res = q.get()
        if res == None:
            break
        print(f'{name}買了{(lán)res}')
        q.task_done()


if __name__ == '__main__':
    q = JoinableQueue()     # 使用加了join的隊(duì)列
    p1 = Process(target=producer, args=(q, "xc", "意大利面"))
    c1 = Process(target=consumer, args=(q, "haha"))
    p2 = Process(target=producer, args=(q, "xc", "牛肉"))
    c2 = Process(target=consumer, args=(q, "xixi"))
    p3 = Process(target=producer, args=(q, "xc", "可樂"))
    # 把消費(fèi)者變成守護(hù)進(jìn)程,主進(jìn)程結(jié)束,子進(jìn)程就結(jié)束
    c1.daemon = True    # 進(jìn)程結(jié)束,子進(jìn)程就結(jié)束
    c2.daemon = True    # 進(jìn)程結(jié)束,子進(jìn)程就結(jié)束

    p1.start()
    c1.start()
    p2.start()
    c2.start()
    p3.start()

    p1.join()  # 等待生產(chǎn)者結(jié)束
    p2.join()  # 等待生產(chǎn)者結(jié)束
    p3.join()  # 等待生產(chǎn)者結(jié)束
          
        

上述代碼中,是 通過將子進(jìn)程變?yōu)槭刈o(hù)進(jìn)程 ,在消費(fèi)者的子進(jìn)程中 每次收到消息都task_done ,然后 等待主進(jìn)程結(jié)束,直接結(jié)束子進(jìn)程


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 吴江市| 玉溪市| 云阳县| 龙口市| 延寿县| 巴塘县| 丰台区| 鸡西市| 佛冈县| 全椒县| 临高县| 开封市| 抚顺市| 谢通门县| 温泉县| 河南省| 柘荣县| 桦甸市| 荣昌县| 临沭县| 乌鲁木齐县| 威海市| 红安县| 长治市| 内黄县| 綦江县| 恩平市| 渭南市| 克什克腾旗| 施秉县| 正阳县| 桓台县| 隆回县| 博爱县| 鹤岗市| 长武县| 瓦房店市| 大埔区| 肇庆市| 迁西县| 徐水县|