日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

python使用pika操作rabbitmq編程入門(mén)(二)

系統(tǒng) 1709 0

pika生產(chǎn)者程序大致步驟:

            
              1. 建立連接connection , 需要認(rèn)證的調(diào)用認(rèn)證參數(shù)
2. 創(chuàng)建通道channel  當(dāng)然 channel可以池化,這樣可以重復(fù)使用
3. 聲明隊(duì)列   指定隊(duì)列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱(chēng)
4. 聲明交換機(jī)  交換機(jī)類(lèi)型,名稱(chēng)等, 也可以不用聲明,直接使用 “” 空字符串,默認(rèn)交換機(jī)也可以
5. 將隊(duì)列與交換機(jī)綁定   queue_bind 
6. basic_publish 發(fā)送到交換機(jī) 指定路由鍵

            
          

pika消費(fèi)者程序大致步驟:

            
              1. 建立連接connection , 需要認(rèn)證的調(diào)用認(rèn)證參數(shù)
2. 創(chuàng)建通道channel  當(dāng)然 channel可以池化,這樣可以重復(fù)使用
3. 聲明隊(duì)列   指定隊(duì)列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱(chēng)
4. 聲明交換機(jī)  交換機(jī)類(lèi)型,名稱(chēng)等, 也可以不用聲明,直接使用 “” 空字符串,默認(rèn)交換機(jī)也可以
5. 將隊(duì)列與交換機(jī)綁定   queue_bind 
6. basic_consume 消費(fèi)消息

            
          

1. 輪詢(xún)接收消息

使用消息隊(duì)列的一個(gè)好處就是, 可以將任務(wù)消息發(fā)送到隊(duì)列中,由消費(fèi)者異步進(jìn)行處理, 同時(shí)對(duì)于后端消費(fèi)者可以很容易地增加減少,只需要運(yùn)行多個(gè)進(jìn)程即可, 方便擴(kuò)展, 之前的示例中消費(fèi)端程序就可以開(kāi)啟多個(gè),然后可以看到消費(fèi)被輪詢(xún)得分配給每個(gè)消費(fèi)者

將之前的消費(fèi)者略作更改, 加入客戶(hù)端編號(hào),啟動(dòng)三個(gè)消費(fèi)者, 通過(guò)生產(chǎn)者發(fā)送4個(gè)消息, 依次收到消息, 即是 輪詢(xún)(round-robin):

            
               [*] Waiting for messages. To exit press CTRL+C
1. [x] Received 'Hello World!'
1. [x] Received 'Hello World!'

            
          
            
               [*] Waiting for messages. To exit press CTRL+C
2. [x] Received 'Hello World!'

            
          
            
               [*] Waiting for messages. To exit press CTRL+C
3. [x] Received 'Hello World!'

            
          

2. 消息確認(rèn):

為了防止消息丟失,RabbitMQ提供了消息響應(yīng)(acknowledgments)。消費(fèi)者會(huì)通過(guò)一個(gè)ack(響應(yīng)),告訴RabbitMQ已經(jīng)收到并處理了某條消息,然后RabbitMQ就會(huì)釋放并刪除這條消息。 如果消費(fèi)者(consumer)掛掉了,沒(méi)有發(fā)送響應(yīng),RabbitMQ就會(huì)認(rèn)為消息沒(méi)有被完全處理,然后重新發(fā)送給其他消費(fèi)者(consumer)。這樣,及時(shí)工作者(workers)偶爾的掛掉,也不會(huì)丟失消息。

消息響應(yīng)默認(rèn)auto_ack=False, 不自動(dòng)確認(rèn)消息, 即是需要我們處理并確認(rèn)消息的

確認(rèn)需要發(fā)送確認(rèn)消息:

在回調(diào)callback中加入basic_ack

            
              channel
              
                .
              
              basic_ack
              
                (
              
              delivery_tag 
              
                =
              
               method
              
                .
              
              delivery_tag
              
                )
              
            
          

如果auto_ack設(shè)置為T(mén)rue,而忘記basic_ack消息確認(rèn),消息在程序退出之后就會(huì)重新發(fā)送,如果不及時(shí)釋放沒(méi)響應(yīng)的消息,RabbitMQ就會(huì)占用越來(lái)越多的內(nèi)存。

為了排除這種錯(cuò)誤,可以使用rabbitmqctl命令,輸出messages_unacknowledged字段:

            
              # rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
hello   0       0
TEST01  0       0

            
          

3. 消息持久化

如果沒(méi)有向rabbitmq指定消息持久化, 則退出或者崩潰的時(shí)候,將會(huì)丟失所有隊(duì)列和消息, 消息持久化必須把“隊(duì)列”和“消息”設(shè)為持久化

  1. 隊(duì)列聲明持久化:
            
              channel.queue_declare(queue='hello', durable=True)

            
          

注意一個(gè)消息隊(duì)列被聲明過(guò)一次后,rabbitmq不允許使用不同的參數(shù)重新定義隊(duì)列, 因此如果存在hello隊(duì)列,上面會(huì)提示錯(cuò)誤

  1. 消息聲明持久化

將publish生產(chǎn)者發(fā)送消息時(shí)候消息屬性, delivery_mode的屬性設(shè)為2

            
               properties=pika.BasicProperties(delivery_mode=2)

            
          

生產(chǎn)者代碼:

            
              channel.queue_declare(queue='TEST02', durable=True)

channel.basic_publish(exchange='',
                      routing_key='TEST02',
                      body='Hello World!',
                      properties=pika.BasicProperties(delivery_mode=2)
                      )

            
          

客戶(hù)端代碼:

            
              channel.queue_declare(queue='TEST02', durable=True)

def callback(ch, method, properties, body):
    print(". [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(on_message_callback=callback,
                      queue='TEST02',
                      auto_ack=False,
                      )

            
          

4. 設(shè)置客戶(hù)端QOS

開(kāi)啟客戶(hù)端最大的未處理消息隊(duì)列大小:

            
              channel.basic_qos(prefetch_count=1)

            
          

代碼示例:

            
              channel.queue_declare(queue='TEST02', durable=True)
channel.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    print(". [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(on_message_callback=callback,
                      queue='TEST02',
                      auto_ack=False,
                      )

            
          

5. 發(fā)布訂閱模式:

rabbitmq在之前介紹的時(shí)候可以看到,消息是被依次發(fā)送給消費(fèi)者,即是消息只會(huì)被發(fā)送給一個(gè)消費(fèi)者,除非開(kāi)啟確認(rèn)機(jī)制時(shí)處理失敗了, 一個(gè)消息發(fā)送給多個(gè)消費(fèi)者, 這個(gè)是rabbitmq提供的發(fā)布訂閱模式

發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)交換機(jī)(exchange)。交換機(jī)非常簡(jiǎn)單,它一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列。交換機(jī)必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過(guò)交換機(jī)類(lèi)型(exchange type)來(lái)定義的

交換機(jī)類(lèi)型:直連交換機(jī)(direct)-- 一對(duì)一, 之前使用的就是這個(gè);主題交換機(jī)(topic)-- 模糊匹配,需要符合匹配規(guī)則; headers(頭交換機(jī))和 扇型交換機(jī)(fanout)-- 進(jìn)行消息廣播

fanout會(huì)發(fā)送消息到交換機(jī)所有的消息隊(duì)列

消息將會(huì)根據(jù)指定的routing_key分發(fā)到指定的隊(duì)列

rabbitmq擁有一個(gè)默認(rèn)交換機(jī) 即是 空字符串(""),

            
              channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

            
          

在pika編程中,可以不用指定隊(duì)列名稱(chēng),系統(tǒng)會(huì)隨機(jī)生成一個(gè)名稱(chēng), 在重啟都該隊(duì)列丟失

只需在聲明時(shí)不提供參數(shù)就可以了

            
              result = channel.queue_declare()

            
          

隊(duì)列需要綁定到交換, 才能通過(guò)交換機(jī)發(fā)送消息到該隊(duì)列

            
              channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

            
          

代碼:

生產(chǎn)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
connection.close()

            
          

消費(fèi)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()

            
          

6. 幾個(gè)重要概念的程序?qū)崿F(xiàn)

  1. 路由 routing

路由鍵在發(fā)送消息的時(shí)候進(jìn)行指定

            
              channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

            
          
  1. 隊(duì)列綁定 binding
            
              channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

            
          
  1. 交換機(jī)類(lèi)型

交換機(jī)聲明的時(shí)候進(jìn)行指定, 一般常用direct, fanout, topic三種類(lèi)型

            
              channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

            
          
  1. 路由鍵

一般路由鍵不等于綁定鍵, 但是我們通常在direct的時(shí)候可以近似的認(rèn)為這兩個(gè)等價(jià)的

在隊(duì)列綁定的時(shí)候,通過(guò)指定routing_key 指定

            
              channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='tologs')

            
          
  1. 一個(gè)路由鍵多個(gè)隊(duì)列

也就是多個(gè)隊(duì)列使用相同的綁定鍵, 這個(gè)是合法 的, 這樣就可以將消息發(fā)生到不同的隊(duì)列中

例如:

            
              channel.queue_bind(exchange="logs",queue="info",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="warn",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="debug",routing_key='tologs')

            
          
  1. 排他隊(duì)列:

一個(gè)只有自己可見(jiàn)的隊(duì)列,即不允許其它用戶(hù)訪(fǎng)問(wèn),RabbitMQ允許你將一個(gè)Queue聲明成為排他性的

            
              channel.queue_declare(exclusive=True)

            
          

示例代碼:

生產(chǎn)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

            
          

消費(fèi)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()

            
          

7. 主題交換機(jī) (topic)

發(fā)送到主題交換機(jī)(topic exchange)的消息不可以攜帶隨意什么樣子的路由鍵(routing_key),它的路由鍵必須是一個(gè)由 . 分隔開(kāi)的詞語(yǔ)列表, 例如: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”

綁定鍵也必須擁有同樣的格式。主題交換機(jī)背后的邏輯跟直連交換機(jī)很相似, 攜帶著特定路由鍵的消息會(huì)被主題交換機(jī)投遞給綁定鍵與之想匹配的隊(duì)列

它的綁定鍵和路由鍵有兩個(gè)特殊應(yīng)用方式, 即是支持模糊匹配:

  • * (星號(hào)) 用來(lái)表示一個(gè)單詞.
  • # (井號(hào)) 用來(lái)表示任意數(shù)量(零個(gè)或多個(gè))單詞。

例如路由鍵: *.*.rabbit lazy.#

lazy.pink.rabbit 會(huì)匹配 *.*.rabbit lazy.#

lazy.x 匹配 lazy.#

注特殊情況:

當(dāng)一個(gè)隊(duì)列的綁定鍵為 “#”(井號(hào)) 的時(shí)候,這個(gè)隊(duì)列將會(huì)無(wú)視消息的路由鍵,接收所有的消息。

當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為。

示例代碼:

生產(chǎn)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

            
          

消費(fèi)者:

            
              channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()

            
          

更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 龙游县| 河津市| 大理市| 礼泉县| 博罗县| 台南市| 太仓市| 柘荣县| 金昌市| 惠来县| 文成县| 南丹县| 阜城县| 新竹县| 屏南县| 古丈县| 云林县| 韩城市| 陵川县| 通州市| 龙川县| 景德镇市| 万盛区| 峡江县| 孝义市| 江永县| 成安县| 类乌齐县| 康平县| 会同县| 翁源县| 凤台县| 大冶市| 普洱| 萝北县| 龙江县| 临夏县| 延庆县| 贵阳市| 梓潼县| 全南县|