日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)

系統(tǒng) 2172 0

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)

一、發(fā)布、訂閱:

我們將一個消息分發(fā)給 多個消費者 ,這種模式被稱為 發(fā)布/訂閱

為了更好的理解這個模式,我們將構(gòu)建一個日志系統(tǒng),它包括兩個程序:

  • 第一個程序,負(fù)責(zé)發(fā)送日志消息;
  • 第二個程序,負(fù)責(zé)獲取消息并輸出內(nèi)容;

在日志系統(tǒng)中,所有正在運行的接收方程序都會接收消息;

  • 一個接受者,把日志寫入硬盤中;
  • 另一個接受者,把日志輸出到屏幕上;

最終,日志消息被廣播給所有的接受者。

二、交換機(jī)(Exchanges):

概念 :應(yīng)用程序發(fā)送消息時,先把消息給交換機(jī),由交換機(jī)投遞給隊列,而不是直接給隊列。交換機(jī)可以由多個 消息通道(Channel) ,用于投遞消息。

簡單概括下之前的知識

  • 發(fā)布者(Producer):是發(fā)布消息的應(yīng)用程序。
  • 隊列(Queue):用于消息存儲的緩沖。
  • 消費者(Consumer):是接收消息的應(yīng)用程序。

圖解大體流程
Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)_第1張圖片

  • P:代表是發(fā)布者;
  • X:是交換機(jī);

詳解圖意 :發(fā)布者(P )→交換機(jī)(X)→隊列(Q)→消費者(C );

  • 交換機(jī)一邊從發(fā)布者方接收消息,一邊把消息推送到隊列(Q)。 交換機(jī)必須知道如何處理它接收到的消息,是推送到指定的隊列、還是多個隊列,或者是忽略消息 。這些都是通過 交換機(jī)類型(Exchange Type) 來定義的。

交換機(jī)類型

1.直連交換機(jī)(Direct);

2.主題交換機(jī)(Topic);

3.頭交換機(jī)(Headers);

4.扇形交換機(jī)(Fanout);

  • 主要說明—扇形交換,它把消息發(fā)送給它所知道的所有隊列。

                    
                      channel
                      
                        .
                      
                      
                        exchange_declare
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        'fanout_logs'
                      
                      
                        ,
                      
                      
                             exchange_type
                      
                        =
                      
                      
                        'fanout'
                      
                      
                        )
                      
                    
                  

參數(shù)講解

  • exchange:就是交換機(jī)的名稱, 空字符串代表默認(rèn)或者匿名交換機(jī);

                    
                      channel
                      
                        .
                      
                      
                        basic_publish
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        ''
                      
                      
                        )
                      
                    
                  
  • exchange_type:就是交換機(jī)的類型;

  • routing_key:分發(fā)到指定的隊列;

  • body:發(fā)送的內(nèi)容;

  • properties:使消息持久化;

查看交換器列表

命令: rabbitmqctl list_exchanges

            
              Listing exchanges 
              
                ...
              
              
amq
              
                .
              
              rabbitmq
              
                .
              
              log	topic
amq
              
                .
              
              direct	direct
amq
              
                .
              
              topic	topic
amq
              
                .
              
              headers	headers
	direct
amq
              
                .
              
              fanout	fanout
amq
              
                .
              
              rabbitmq
              
                .
              
              trace	topic
amq
              
                .
              
              match	headers

            
          

列表中以amq.*的開頭的交換器,都是默認(rèn)創(chuàng)建的,目前不需要管它們。

三、臨時隊列:

我們連接上Rabbit MQ的時候,需要一個 全新的、空的隊列 (也就是說不使用之前提到的,routing_key參數(shù)指定的隊列名),我們可以 手動創(chuàng)建一個隨機(jī)的隊列名 ,或者讓 服務(wù)器為我們選擇一個隨機(jī)的隊列名(推薦) 。我們僅需要在 調(diào)用queue_declare方法時,不提供queue參數(shù) 即可:

            
              # 在管道里
              
                ,
              
               不聲明隊列名稱
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              
                )
              
            
          

可通過 result.method.queue 獲取已經(jīng)生成的隨機(jī)隊列名,大概的樣子如下所示:

            
              amq
              
                .
              
              gen
              
                -
              
              DIAODS2sDSAKJKS
              
                ==
              
            
          

與消費者斷開連接時,這個隊列應(yīng)被立即刪除:

            
              # 需要一個空的隊列  exclusive
              
                =
              
              True 表示與消費者斷開時
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              exclusive
              
                =
              
              True
              
                )
              
            
          

四、綁定:

img

目前已經(jīng)創(chuàng)建一個扇形交換機(jī)和一個隊列。現(xiàn)在需要告訴交換機(jī)如果發(fā)送消息給隊列。

交換機(jī)和隊列之間的聯(lián)系我們稱為綁定(binding)

            
              # 將fanount_logs交換機(jī)將會把消息添加到我們的隊列中
              
                ,
              
               隊列名服務(wù)器隨機(jī)生成
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              result
              
                .
              
              method
              
                .
              
              queue
              
                )
              
            
          

查看綁定列表

列出所有現(xiàn)存的綁定命令: rabbitmqctl list_bindings

五、整理本節(jié)最終代碼:

圖解最終流程

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)_第2張圖片

發(fā)布日志與之前的區(qū)別

1.我們把消息發(fā)送給fanout_logs交換機(jī)而不是匿名的交換機(jī);

2.發(fā)送的時候需要提供routing_key參數(shù),但它的值會被扇形交換機(jī)忽略;

以下是 send.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 創(chuàng)建一個實例  本地訪問
              
                IP
              
              地址可以為 localhost 
              
                后面5672是端口地址
              
              
                (
              
              可以不用指
# 定
              
                ,
              
               因為默認(rèn)就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 聲明一個管道
              
                ,
              
               在管道里發(fā)送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機(jī)的類型為fanout
              
                ,
              
               執(zhí)行交換機(jī)名
              
                :
              
              fanout_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 投遞消息 exchange
              
                =
              
              
                'fancout_logs'
              
              交換機(jī)的名命
              
                ;
              
               type
              
                =
              
              
                'fanout'
              
              
                :
              
              扇形交換機(jī)
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              
                ''
              
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              message
              
                ,
              
              
                )
              
              
# 隊列關(guān)閉
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

若沒有綁定隊列的交換器,消息將會丟失。以下是 receive.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

# 創(chuàng)建實例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 聲明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機(jī)名為 fanout_logs 類型為扇形
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 表示與消費者斷開連接
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

# 綁定交換機(jī)和隊列
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消費消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 從指定的消息隊列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就調(diào)用callback函數(shù)來處理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 開始消費消息

            
          

3.如果想把日志保存到文件中,打開控制臺輸入:

            
              python receive
              
                .
              
              py 
              
                >
              
               logs_from_rabbit
              
                .
              
              log 

            
          

4.在屏幕中查看日志,在打開一個新的終端運行:

            
              python receive
              
                .
              
              py 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
            
          

5.發(fā)送消息:

            
              python send
              
                .
              
              py 發(fā)送第一條消息

            
          

6.可以看到消費者接收到了消息,并且日志中也記錄了這條消息。

            
              cat logs_from_rabbit
              
                .
              
              log 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received發(fā)送第一條消息

            
          

7.確認(rèn)已經(jīng)創(chuàng)建的隊列綁定:

            
              rabbitmqctl list_bindings
Listing bindings 
              
                ...
              
              
	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
              
	exchange	hello	queue	hello	
              
                [
              
              
                ]
              
              
	exchange	task_queue	queue	task_queue	
              
                [
              
              
                ]
              
              
fanout_logs	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
            
          

交換器fanout_logs把數(shù)據(jù)發(fā)送給兩個系統(tǒng)名命的隊列


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 丽水市| 敖汉旗| 西畴县| 孝昌县| 潜江市| 嘉黎县| 万年县| 乐至县| 德庆县| 眉山市| 江西省| 锡林郭勒盟| 上高县| 安泽县| 金沙县| 凤阳县| 芦溪县| 汉源县| 尉氏县| 墨竹工卡县| 赤壁市| 保康县| 临猗县| 罗源县| 大英县| 温宿县| 扎鲁特旗| 桓台县| 英山县| 旌德县| 陆丰市| 盐边县| 璧山县| 泸西县| 乐安县| 泸水县| 延寿县| 永靖县| 平南县| 洪洞县| 射阳县|