Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)
一、發(fā)布、訂閱:
我們將一個消息分發(fā)給
多個消費者
,這種模式被稱為
發(fā)布/訂閱
。
為了更好的理解這個模式,我們將構(gòu)建一個日志系統(tǒng),它包括兩個程序:
- 第一個程序,負(fù)責(zé)發(fā)送日志消息;
- 第二個程序,負(fù)責(zé)獲取消息并輸出內(nèi)容;
在日志系統(tǒng)中,所有正在運行的接收方程序都會接收消息;
- 一個接受者,把日志寫入硬盤中;
- 另一個接受者,把日志輸出到屏幕上;
最終,日志消息被廣播給所有的接受者。
二、交換機(jī)(Exchanges):
概念
:應(yīng)用程序發(fā)送消息時,先把消息給交換機(jī),由交換機(jī)投遞給隊列,而不是直接給隊列。交換機(jī)可以由多個
消息通道(Channel)
,用于投遞消息。
簡單概括下之前的知識 :
- 發(fā)布者(Producer):是發(fā)布消息的應(yīng)用程序。
- 隊列(Queue):用于消息存儲的緩沖。
- 消費者(Consumer):是接收消息的應(yīng)用程序。
- P:代表是發(fā)布者;
- X:是交換機(jī);
詳解圖意 :發(fā)布者(P )→交換機(jī)(X)→隊列(Q)→消費者(C );
-
交換機(jī)一邊從發(fā)布者方接收消息,一邊把消息推送到隊列(Q)。
交換機(jī)必須知道如何處理它接收到的消息,是推送到指定的隊列、還是多個隊列,或者是忽略消息
。這些都是通過交換機(jī)類型(Exchange Type)
來定義的。
交換機(jī)類型 :
1.直連交換機(jī)(Direct);
2.主題交換機(jī)(Topic);
3.頭交換機(jī)(Headers);
4.扇形交換機(jī)(Fanout);
-
主要說明—扇形交換,它把消息發(fā)送給它所知道的所有隊列。
channel . exchange_declare ( exchange = 'fanout_logs' , exchange_type = 'fanout' )
參數(shù)講解 :
-
exchange:就是交換機(jī)的名稱,
空字符串代表默認(rèn)或者匿名交換機(jī);
channel . basic_publish ( exchange = '' )
-
exchange_type:就是交換機(jī)的類型;
-
routing_key:分發(fā)到指定的隊列;
-
body:發(fā)送的內(nèi)容;
-
properties:使消息持久化;
查看交換器列表 :
命令:
rabbitmqctl list_exchanges
Listing exchanges
...
amq
.
rabbitmq
.
log topic
amq
.
direct direct
amq
.
topic topic
amq
.
headers headers
direct
amq
.
fanout fanout
amq
.
rabbitmq
.
trace topic
amq
.
match headers
列表中以amq.*的開頭的交換器,都是默認(rèn)創(chuàng)建的,目前不需要管它們。
三、臨時隊列:
我們連接上Rabbit MQ的時候,需要一個
全新的、空的隊列
(也就是說不使用之前提到的,routing_key參數(shù)指定的隊列名),我們可以
手動創(chuàng)建一個隨機(jī)的隊列名
,或者讓
服務(wù)器為我們選擇一個隨機(jī)的隊列名(推薦)
。我們僅需要在
調(diào)用queue_declare方法時,不提供queue參數(shù)
即可:
# 在管道里
,
不聲明隊列名稱
result
=
channel
.
queue_declare
(
)
可通過
result.method.queue
獲取已經(jīng)生成的隨機(jī)隊列名,大概的樣子如下所示:
amq
.
gen
-
DIAODS2sDSAKJKS
==
與消費者斷開連接時,這個隊列應(yīng)被立即刪除:
# 需要一個空的隊列 exclusive
=
True 表示與消費者斷開時
,
隊列立即刪除
result
=
channel
.
queue_declare
(
exclusive
=
True
)
四、綁定:
目前已經(jīng)創(chuàng)建一個扇形交換機(jī)和一個隊列。現(xiàn)在需要告訴交換機(jī)如果發(fā)送消息給隊列。
交換機(jī)和隊列之間的聯(lián)系我們稱為綁定(binding)
# 將fanount_logs交換機(jī)將會把消息添加到我們的隊列中
,
隊列名服務(wù)器隨機(jī)生成
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
result
.
method
.
queue
)
查看綁定列表 :
列出所有現(xiàn)存的綁定命令:
rabbitmqctl list_bindings
五、整理本節(jié)最終代碼:
圖解最終流程 :
發(fā)布日志與之前的區(qū)別 :
1.我們把消息發(fā)送給fanout_logs交換機(jī)而不是匿名的交換機(jī);
2.發(fā)送的時候需要提供routing_key參數(shù),但它的值會被扇形交換機(jī)忽略;
以下是
send.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 創(chuàng)建一個實例 本地訪問
IP
地址可以為 localhost
后面5672是端口地址
(
可以不用指
# 定
,
因為默認(rèn)就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 聲明一個管道
,
在管道里發(fā)送消息
channel
=
connection
.
channel
(
)
# 指定交換機(jī)的類型為fanout
,
執(zhí)行交換機(jī)名
:
fanout_logs
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 投遞消息 exchange
=
'fancout_logs'
交換機(jī)的名命
;
type
=
'fanout'
:
扇形交換機(jī)
channel
.
basic_publish
(
exchange
=
'fanout_logs'
,
routing_key
=
''
,
body
=
message
)
print
"[x] sent {}"
.
format
(
message
,
)
# 隊列關(guān)閉
connection
.
close
(
)
若沒有綁定隊列的交換器,消息將會丟失。以下是
receive.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
# 創(chuàng)建實例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 聲明管道
channel
=
connection
.
channel
(
)
# 指定交換機(jī)名為 fanout_logs 類型為扇形
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 表示與消費者斷開連接
,
隊列立即刪除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成隊列的名字
queue_name
=
result
.
method
.
queue
# 綁定交換機(jī)和隊列
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
queue_name
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
# 消費消息
channel
.
basic_consume
(
queue
=
queue_name
,
# 從指定的消息隊列中接收消息
on_message_callback
=
callback
,
# 如果收到消息
,
就調(diào)用callback函數(shù)來處理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 開始消費消息
3.如果想把日志保存到文件中,打開控制臺輸入:
python receive
.
py
>
logs_from_rabbit
.
log
4.在屏幕中查看日志,在打開一個新的終端運行:
python receive
.
py
===
===
=
正在等待消息
===
===
==
5.發(fā)送消息:
python send
.
py 發(fā)送第一條消息
6.可以看到消費者接收到了消息,并且日志中也記錄了這條消息。
cat logs_from_rabbit
.
log
===
===
=
正在等待消息
===
===
==
[
X
]
Received發(fā)送第一條消息
7.確認(rèn)已經(jīng)創(chuàng)建的隊列綁定:
rabbitmqctl list_bindings
Listing bindings
...
exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
exchange hello queue hello
[
]
exchange task_queue queue task_queue
[
]
fanout_logs exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
交換器fanout_logs把數(shù)據(jù)發(fā)送給兩個系統(tǒng)名命的隊列
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
