pika生產(chǎn)者程序大致步驟:
1. 建立連接connection , 需要認(rèn)證的調(diào)用認(rèn)證參數(shù)
2. 創(chuàng)建通道channel 當(dāng)然 channel可以池化,這樣可以重復(fù)使用
3. 聲明隊(duì)列 指定隊(duì)列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱(chēng)
4. 聲明交換機(jī) 交換機(jī)類(lèi)型,名稱(chēng)等, 也可以不用聲明,直接使用 “” 空字符串,默認(rèn)交換機(jī)也可以
5. 將隊(duì)列與交換機(jī)綁定 queue_bind
6. basic_publish 發(fā)送到交換機(jī) 指定路由鍵
pika消費(fèi)者程序大致步驟:
1. 建立連接connection , 需要認(rèn)證的調(diào)用認(rèn)證參數(shù)
2. 創(chuàng)建通道channel 當(dāng)然 channel可以池化,這樣可以重復(fù)使用
3. 聲明隊(duì)列 指定隊(duì)列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱(chēng)
4. 聲明交換機(jī) 交換機(jī)類(lèi)型,名稱(chēng)等, 也可以不用聲明,直接使用 “” 空字符串,默認(rèn)交換機(jī)也可以
5. 將隊(duì)列與交換機(jī)綁定 queue_bind
6. basic_consume 消費(fèi)消息
1. 輪詢(xún)接收消息
使用消息隊(duì)列的一個(gè)好處就是, 可以將任務(wù)消息發(fā)送到隊(duì)列中,由消費(fèi)者異步進(jìn)行處理, 同時(shí)對(duì)于后端消費(fèi)者可以很容易地增加減少,只需要運(yùn)行多個(gè)進(jìn)程即可, 方便擴(kuò)展, 之前的示例中消費(fèi)端程序就可以開(kāi)啟多個(gè),然后可以看到消費(fèi)被輪詢(xún)得分配給每個(gè)消費(fèi)者
將之前的消費(fèi)者略作更改, 加入客戶(hù)端編號(hào),啟動(dòng)三個(gè)消費(fèi)者, 通過(guò)生產(chǎn)者發(fā)送4個(gè)消息, 依次收到消息, 即是 輪詢(xún)(round-robin):
[*] Waiting for messages. To exit press CTRL+C
1. [x] Received 'Hello World!'
1. [x] Received 'Hello World!'
[*] Waiting for messages. To exit press CTRL+C
2. [x] Received 'Hello World!'
[*] Waiting for messages. To exit press CTRL+C
3. [x] Received 'Hello World!'
2. 消息確認(rèn):
為了防止消息丟失,RabbitMQ提供了消息響應(yīng)(acknowledgments)。消費(fèi)者會(huì)通過(guò)一個(gè)ack(響應(yīng)),告訴RabbitMQ已經(jīng)收到并處理了某條消息,然后RabbitMQ就會(huì)釋放并刪除這條消息。 如果消費(fèi)者(consumer)掛掉了,沒(méi)有發(fā)送響應(yīng),RabbitMQ就會(huì)認(rèn)為消息沒(méi)有被完全處理,然后重新發(fā)送給其他消費(fèi)者(consumer)。這樣,及時(shí)工作者(workers)偶爾的掛掉,也不會(huì)丟失消息。
消息響應(yīng)默認(rèn)auto_ack=False, 不自動(dòng)確認(rèn)消息, 即是需要我們處理并確認(rèn)消息的
確認(rèn)需要發(fā)送確認(rèn)消息:
在回調(diào)callback中加入basic_ack
channel
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
如果auto_ack設(shè)置為T(mén)rue,而忘記basic_ack消息確認(rèn),消息在程序退出之后就會(huì)重新發(fā)送,如果不及時(shí)釋放沒(méi)響應(yīng)的消息,RabbitMQ就會(huì)占用越來(lái)越多的內(nèi)存。
為了排除這種錯(cuò)誤,可以使用rabbitmqctl命令,輸出messages_unacknowledged字段:
# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
hello 0 0
TEST01 0 0
3. 消息持久化
如果沒(méi)有向rabbitmq指定消息持久化, 則退出或者崩潰的時(shí)候,將會(huì)丟失所有隊(duì)列和消息, 消息持久化必須把“隊(duì)列”和“消息”設(shè)為持久化
- 隊(duì)列聲明持久化:
channel.queue_declare(queue='hello', durable=True)
注意一個(gè)消息隊(duì)列被聲明過(guò)一次后,rabbitmq不允許使用不同的參數(shù)重新定義隊(duì)列, 因此如果存在hello隊(duì)列,上面會(huì)提示錯(cuò)誤
- 消息聲明持久化
將publish生產(chǎn)者發(fā)送消息時(shí)候消息屬性, delivery_mode的屬性設(shè)為2
properties=pika.BasicProperties(delivery_mode=2)
生產(chǎn)者代碼:
channel.queue_declare(queue='TEST02', durable=True)
channel.basic_publish(exchange='',
routing_key='TEST02',
body='Hello World!',
properties=pika.BasicProperties(delivery_mode=2)
)
客戶(hù)端代碼:
channel.queue_declare(queue='TEST02', durable=True)
def callback(ch, method, properties, body):
print(". [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback,
queue='TEST02',
auto_ack=False,
)
4. 設(shè)置客戶(hù)端QOS
開(kāi)啟客戶(hù)端最大的未處理消息隊(duì)列大小:
channel.basic_qos(prefetch_count=1)
代碼示例:
channel.queue_declare(queue='TEST02', durable=True)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print(". [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback,
queue='TEST02',
auto_ack=False,
)
5. 發(fā)布訂閱模式:
rabbitmq在之前介紹的時(shí)候可以看到,消息是被依次發(fā)送給消費(fèi)者,即是消息只會(huì)被發(fā)送給一個(gè)消費(fèi)者,除非開(kāi)啟確認(rèn)機(jī)制時(shí)處理失敗了, 一個(gè)消息發(fā)送給多個(gè)消費(fèi)者, 這個(gè)是rabbitmq提供的發(fā)布訂閱模式
發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)交換機(jī)(exchange)。交換機(jī)非常簡(jiǎn)單,它一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列。交換機(jī)必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過(guò)交換機(jī)類(lèi)型(exchange type)來(lái)定義的
交換機(jī)類(lèi)型:直連交換機(jī)(direct)-- 一對(duì)一, 之前使用的就是這個(gè);主題交換機(jī)(topic)-- 模糊匹配,需要符合匹配規(guī)則; headers(頭交換機(jī))和 扇型交換機(jī)(fanout)-- 進(jìn)行消息廣播
fanout會(huì)發(fā)送消息到交換機(jī)所有的消息隊(duì)列
消息將會(huì)根據(jù)指定的routing_key分發(fā)到指定的隊(duì)列
rabbitmq擁有一個(gè)默認(rèn)交換機(jī) 即是 空字符串(""),
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
在pika編程中,可以不用指定隊(duì)列名稱(chēng),系統(tǒng)會(huì)隨機(jī)生成一個(gè)名稱(chēng), 在重啟都該隊(duì)列丟失
只需在聲明時(shí)不提供參數(shù)就可以了
result = channel.queue_declare()
隊(duì)列需要綁定到交換, 才能通過(guò)交換機(jī)發(fā)送消息到該隊(duì)列
channel.queue_bind(exchange='logs',
queue=result.method.queue)
代碼:
生產(chǎn)者:
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
connection.close()
消費(fèi)者:
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
6. 幾個(gè)重要概念的程序?qū)崿F(xiàn)
- 路由 routing
路由鍵在發(fā)送消息的時(shí)候進(jìn)行指定
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
- 隊(duì)列綁定 binding
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
- 交換機(jī)類(lèi)型
交換機(jī)聲明的時(shí)候進(jìn)行指定, 一般常用direct, fanout, topic三種類(lèi)型
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
- 路由鍵
一般路由鍵不等于綁定鍵, 但是我們通常在direct的時(shí)候可以近似的認(rèn)為這兩個(gè)等價(jià)的
在隊(duì)列綁定的時(shí)候,通過(guò)指定routing_key 指定
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='tologs')
- 一個(gè)路由鍵多個(gè)隊(duì)列
也就是多個(gè)隊(duì)列使用相同的綁定鍵, 這個(gè)是合法 的, 這樣就可以將消息發(fā)生到不同的隊(duì)列中
例如:
channel.queue_bind(exchange="logs",queue="info",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="warn",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="debug",routing_key='tologs')
- 排他隊(duì)列:
一個(gè)只有自己可見(jiàn)的隊(duì)列,即不允許其它用戶(hù)訪(fǎng)問(wèn),RabbitMQ允許你將一個(gè)Queue聲明成為排他性的
channel.queue_declare(exclusive=True)
示例代碼:
生產(chǎn)者:
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
消費(fèi)者:
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
7. 主題交換機(jī) (topic)
發(fā)送到主題交換機(jī)(topic exchange)的消息不可以攜帶隨意什么樣子的路由鍵(routing_key),它的路由鍵必須是一個(gè)由
.
分隔開(kāi)的詞語(yǔ)列表, 例如: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”
綁定鍵也必須擁有同樣的格式。主題交換機(jī)背后的邏輯跟直連交換機(jī)很相似, 攜帶著特定路由鍵的消息會(huì)被主題交換機(jī)投遞給綁定鍵與之想匹配的隊(duì)列
它的綁定鍵和路由鍵有兩個(gè)特殊應(yīng)用方式, 即是支持模糊匹配:
-
*
(星號(hào)) 用來(lái)表示一個(gè)單詞. -
#
(井號(hào)) 用來(lái)表示任意數(shù)量(零個(gè)或多個(gè))單詞。
例如路由鍵:
*.*.rabbit
,
lazy.#
lazy.pink.rabbit 會(huì)匹配
*.*.rabbit
和
lazy.#
lazy.x 匹配
lazy.#
注特殊情況:
當(dāng)一個(gè)隊(duì)列的綁定鍵為 “#”(井號(hào)) 的時(shí)候,這個(gè)隊(duì)列將會(huì)無(wú)視消息的路由鍵,接收所有的消息。
當(dāng)
*
(星號(hào)) 和
#
(井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為。
示例代碼:
生產(chǎn)者:
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
消費(fèi)者:
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
