emit(Listtuple,ObjectmessageId){returnemit(Utils.DEFAULT_STREAM_ID,tuple,messageId);}這里的t" />

日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

理解Storm可靠性消息

系統 2281 0

看過一些別人寫的, 感覺有些東西沒太說清楚,個人主要以源代碼跟蹤,參考個人理解講述,有錯誤請指正。

1基本名詞

1.1 Tuple: 消息傳遞的基本單位。很多文章中介紹都是這么說的, 個人覺得應該更詳細一點。

?在spout發送的時候,函數原型

?public List<Integer> emit(List<Object> tuple, Object messageId) {
??????? return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
??? }

這里的tuple, 實際上是List<Object> 對象,返回的是 List<Integer> 是要發送的tast的IdsList

在bolt接收的時候, 函數原型

public void execute(Tuple tuple)

變成了一個Tuple對象,? 結構應該也是一個list, List<Field1, value1, Field2, value2..>這樣的一個結構, FieldList ValueList, 我們根據對應的fieldname就可以取出對應的getIntegerByField方法

回到spout對象中來, 在spout有一個定義的輸出字段

??? public void declareOutputFields(OutputFieldsDeclarer declarer) {
?? ??? ?declarer.declare(new Fields("word"));
?? ?}

這里定義的一個字段,所以我們在emit的時候就只能發送一個包含一個value的tuple(spout部分), storm會將field, 和 發送的value下標對應, 變成一個Tuple對象,? 也就是上面說的

List<Field1, value1, Field2, value2..>這樣的一個結構,? 在bolt 之間傳遞tuple, 發送又是List<Object> tuple, 根據組裝bolt定義的fiels, 再組合成Tuple對象給下一個Bolt處理

在發射的最后 還有一個 void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);? 因為上面emit的時候已經返回List<taskid>, 所以它就知道要發送給哪些taskid處理,然后將taskid 和 tuple放入隊列 LinkedBlockingQueue , 代碼如下

; worker.clj

( defn mk-transfer- fn [ transfer-queue ]

( fn [ task ^Tuple tuple ]

(.put ^LinkedBlockingQueue

transfer-queue [ task tuple ] )

))
然后單獨會開啟一個叫async-loop的線程,取出每條記錄(taskid, tuple), 然后worker會從當前task建立一個到目標task的zeromq連接, 通過zeromq將tuple發送給目標task
?

總結: 每次emit都是根據List<Object>和定義的輸出Fields組合成一個Tuple對象,,每個接受對象接收的是Tuple對象,如果處理完再發送又再組合字段, 在emit的時候返回LIst<taskids>,所以就知道發送給哪些Task, 然后拿這些taskid和tuple再組合成一個任務隊列,通過zeromq發送到目標task,目標task接收到tuple進程處理至于并發度控制, 參考

      http://www.cnblogs.com/chengxin1982/p/4001275.html


    

?

TupleID Tuple對應的ID,? 在創建的時候賦予一個64位的id,主要用來跟蹤消息

MsgID? 官方解釋 Emits a new tuple to the default output stream with the given message ID. 如果不指定,acker不會跟蹤。主要作用 , 在spout收到fail時候, 能夠定位到是哪條消息出錯,能夠決定重發. 使用實例? _collector.emit(new Values(sentence),? new Integer(num));

acker 消息跟蹤者. acker 存儲一個Map<taskid, ack val> ,? taskid為祖宗tuple創建者的taskid ,? ack_val 為消息傳遞過程中的 tupleid的xor值,如果為0則知道是哪個spout或者bolt已經處理完了, 為什么會有bolt, 因為bolt在發射的時候,如果非錨定,就是不帶tuple發射,它會被認為是祖宗tuple, 上一個tuple會認為已經結束.
至于分配發射源分配到acker, storm采用一致性hash 祖宗tupleid來分配,因為在所有的tuple中都能知道祖宗tupleid,所以在子孫tuple處理時, 知道該發送給哪個acker跟蹤

?

理解Storm可靠性消息


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 若尔盖县| 璧山县| 无棣县| 洛阳市| 新河县| 文水县| 武邑县| 吴堡县| 昭觉县| 诸暨市| 阿克陶县| 汪清县| 仲巴县| 云浮市| 中卫市| 区。| 驻马店市| 武强县| 泰安市| 平安县| 辽源市| 上思县| 津南区| 盐城市| 吴川市| 云浮市| 汉寿县| 徐州市| 卫辉市| 辽阳市| 六安市| 保康县| 广灵县| 澳门| 体育| 澳门| 边坝县| 甘肃省| 丹凤县| 五大连池市| 武冈市|