日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

RabbitMQ的工作隊(duì)列和路由

系統(tǒng) 2290 0

RabbitMQ的工作隊(duì)列和路由

工作隊(duì)列:Working Queue
?
工作隊(duì)列這個(gè)概念與簡單的發(fā)送/接收消息的區(qū)別就是:接收方接收到消息后,可能需要花費(fèi)更長的時(shí)間來處理消息,這個(gè)過程就叫一個(gè)Work/Task。
?
幾個(gè)概念
分配:多個(gè)接收端接收同一個(gè)Queue時(shí),如何分配?
消息確認(rèn):Server端如何確定接收方的Work已經(jīng)對消息進(jìn)行了完整的處理?
消息持久化:發(fā)送方、服務(wù)端Queue如何對未處理的消息進(jìn)行磁盤持久化?
?
Round-robin分配
多個(gè)接收端接收同一個(gè)Queue時(shí),采用了Round-robin分配算法,即輪叫調(diào)度——依次分配給各個(gè)接收方。
?
消息確認(rèn)
默認(rèn)開啟了消息確認(rèn)(接收方接收到消息后,立即向服務(wù)器發(fā)回確認(rèn))。消息接收方處理完消息后,向服務(wù)器發(fā)送消息確認(rèn),服務(wù)器再刪除該消息。
?
對于耗時(shí)的work,可以先關(guān)閉自動(dòng)消息確認(rèn),在work完成后,再手動(dòng)發(fā)回確認(rèn)。
channel.basicConsume("hello",false/*關(guān)閉自動(dòng)消息確認(rèn)*/,consumer);
// ...work完成后
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
?
持久化
?
1. Server端的Queue持久化
注意的是,如果已經(jīng)聲明了同名非持久化的Queue,則再次聲明無效。
發(fā)送方和接收方都需要指定該參數(shù)。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);?
?
2. Message持久化
channel.basicPublish("", "task_queue",?MessageProperties. PERSISTENT_TEXT_PLAIN ,message.getBytes());
?
負(fù)載分配
?
為了解決各個(gè)接收端工作量相差太大的問題(有的一直busy,有的空閑比較多),突破Round-robin。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
意思為,最多為當(dāng)前接收方發(fā)送一條消息。如果接收方還未處理完畢消息,還沒有回發(fā)確認(rèn),就不要再給他分配消息了,應(yīng)該把當(dāng)前消息分配給其它空閑接收方。
?

固定關(guān)鍵詞路由:Routing
?
使用類型為direct的exchange,發(fā)送特定關(guān)鍵詞( RoutingKey )的消息給訂閱該關(guān)鍵詞的Queue。
?
場景示例:消息發(fā)送方發(fā)送了類型為[error][info]的兩種消息,寫磁盤的消息接受者只接受error類型的消息,Console打印的接收兩者。
?
RabbitMQ的工作隊(duì)列和路由
(上圖采用了不同顏色來作為routingKey)
?
發(fā)送方
?
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "direct"
            
              /*
            
            
              exchange類型為direct
            
            
              */
            
            
              );

 

channel.basicPublish(EXCHANGE_NAME, 
            
            "info"
            
              /*
            
            
              關(guān)鍵詞=info
            
            
              */
            
            , 
            
              null
            
            
              , message.getBytes());

channel.close();

connection.close();
            
          
?
接收方
?
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "direct"
            
              /*
            
            
              exchange類型為direct
            
            
              */
            
            
              );


            
            
              //
            
            
               創(chuàng)建匿名Queue
            
            

String queueName =
            
               channel.queueDeclare().getQueue();


            
            
              //
            
            
               訂閱某個(gè)關(guān)鍵詞,綁定到匿名Queue中
            
            

channel.queueBind(quueName,EXCHANGE_NAME,"error"
            
              );

channel.queueBind(quueName,EXCHANGE_NAME,
            
            "info"
            
              );

 

QueueingConsumer consumer 
            
            = 
            
              new
            
            
               QueueingConsumer(channel);

channel.basicConsume(queueName, 
            
            
              true
            
            
              , consumer);

 

QueueingConsumer.Delivery delivery 
            
            = consumer.nextDelivery(); 
            
              //
            
            
               Blocking...
            
            

String message = 
            
              new
            
            
               String(delivery.getBody());

String routingKey 
            
            = delivery.getEnvelope().getRoutingKey(); 
            
              //
            
            
               可獲取路由關(guān)鍵詞
            
          

?


關(guān)鍵詞模式路由:Topics
?
這種模式可以看做對Routing的擴(kuò)展。Routing只能使用固定關(guān)鍵詞,而Topics模式可以訂閱 模糊關(guān)鍵詞
?
關(guān)鍵詞必須是一組word,由點(diǎn)號分割。例如"xxx.yyy.zzz",限定255bytes。
* 表示一個(gè)word;
# 表示0個(gè)或者多個(gè)word;
?
RabbitMQ的工作隊(duì)列和路由
?
發(fā)送方
?
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "topic"
            
              /*
            
            
              exchange類型
            
            
              */
            
            
              );

 

channel.basicPublish(EXCHANGE_NAME, 
            
            "xxx.yyy"
            
              /*
            
            
              關(guān)鍵詞routingKey
            
            
              */
            
            , 
            
              null
            
            
              , message.getBytes());

channel.close();

connection.close();
            
          

?

接收方
?
            ConnectionFactory factory = 
            
              new
            
            
               ConnectionFactory();

factory.setHost(
            
            "localhost"
            
              );

Connection connection 
            
            =
            
               factory.newConnection();

Channel channel 
            
            =
            
               connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, 
            
            "topic"
            
              /*
            
            
              exchange類型
            
            
              */
            
            
              );


            
            
              //
            
            
               創(chuàng)建匿名Queue
            
            

String queueName =
            
               channel.queueDeclare().getQueue();


            
            
              //
            
            
               訂閱某個(gè)關(guān)鍵詞,綁定到匿名Queue中
            
            

channel.queueBind(quueName,EXCHANGE_NAME,"*.yyy"
            
              );

 

QueueingConsumer consumer 
            
            = 
            
              new
            
            
               QueueingConsumer(channel);

channel.basicConsume(queueName, 
            
            
              true
            
            
              , consumer);

 

QueueingConsumer.Delivery delivery 
            
            = consumer.nextDelivery(); 
            
              //
            
            
               Blocking...
            
            

String message = 
            
              new
            
            
               String(delivery.getBody());

String routingKey 
            
            = delivery.getEnvelope().getRoutingKey(); 
            
              //
            
            
               可獲取路由關(guān)鍵詞
            
          

?


Refs
?
?
?
標(biāo)簽:? rabbitmq ,? messagequeue ,? mq

RabbitMQ的工作隊(duì)列和路由


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會(huì)非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 会东县| 隆子县| 怀来县| 临潭县| 绍兴市| 武功县| 安康市| 岗巴县| 蒲江县| 海口市| 泗洪县| 沙雅县| 禄丰县| 招远市| 涟源市| 安远县| 奇台县| 高淳县| 闻喜县| 瓮安县| 交城县| 明光市| 陕西省| 赤峰市| 吉首市| 大埔区| 丰城市| 金门县| 读书| 准格尔旗| 和龙市| 云浮市| 吉隆县| 盐津县| 日喀则市| 北辰区| 班玛县| 贵港市| 昭平县| 金山区| 陇西县|