storm操作zookeeper的主要函數(shù)都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。backtype.storm.cluster定義了兩個重要protocol:ClusterState和StormClusterState。clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協(xié)議中封裝了一組與zookeeper進(jìn)行交互的基礎(chǔ)函數(shù),如獲取子節(jié)點函數(shù),獲取子節(jié)點數(shù)據(jù)函數(shù)等,ClusterState協(xié)議定義如下:
ClusterState協(xié)議
?( set-ephemeral-node [ this path data ])
?( delete-node [ this path ])
?( create-sequential [ this path data ])
? ;; if node does not exist, create persistent with this data
?( set-data [ this path data ])
?( get-data [ this path watch? ])
?( get-version [ this path watch? ])
?( get-data-with-version [ this path watch? ])
?( get-children [ this path watch? ])
?( mkdirs [ this path ])
?( close [ this ])
?( register [ this callback ])
?( unregister [ this id ]))
StormClusterState協(xié)議封裝了一組storm與zookeeper進(jìn)行交互的函數(shù),可以將StormClusterState協(xié)議中的函數(shù)看成ClusterState協(xié)議中函數(shù)的"組合"。StormClusterState協(xié)議定義如下:
StormClusterState協(xié)議
?( assignments [ this callback ])
?( assignment-info [ this storm-id callback ])
?( assignment-info-with-version [ this storm-id callback ])
?( assignment-version [ this storm-id callback ])
?( active-storms [ this ])
?( storm-base [ this storm-id callback ])
?( get-worker-heartbeat [ this storm-id node port ])
?( executor-beats [ this storm-id executor->node+port ])
?( supervisors [ this callback ])
?( supervisor-info [ this supervisor-id ]) ;; returns nil if doesn't exist
?( setup-heartbeats! [ this storm-id ])
?( teardown-heartbeats! [ this storm-id ])
?( teardown-topology-errors! [ this storm-id ])
?( heartbeat-storms [ this ])
?( error-topologies [ this ])
?( worker-heartbeat! [ this storm-id node port info ])
?( remove-worker-heartbeat! [ this storm-id node port ])
?( supervisor-heartbeat! [ this supervisor-id info ])
?( activate-storm! [ this storm-id storm-base ])
?( update-storm! [ this storm-id new-elems ])
?( remove-storm-base! [ this storm-id ])
?( set-assignment! [ this storm-id info ])
?( remove-storm! [ this storm-id ])
?( report-error [ this storm-id task-id node port error ])
?( errors [ this storm-id task-id ])
?( disconnect [ this ]))
命名空間backtype.storm.cluster除了定義ClusterState和StormClusterState這兩個重要協(xié)議外,還定義了兩個重要函數(shù):mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函數(shù)如下:
該函數(shù)返回一個實現(xiàn)了ClusterState協(xié)議的對象,通過這個對象就可以與zookeeper進(jìn)行交互了。
mk-distributed-cluster-state函數(shù)
? ;; conf綁定了storm.yaml中的配置信息,是一個map對象
? [ conf ]
? ;; zk綁定一個zk client,Storm使用CuratorFramework與Zookeeper進(jìn)行交互
?( let [ zk ( zk/mk-client conf ( conf STORM-ZOOKEEPER-SERVERS ) ( conf STORM-ZOOKEEPER-PORT ) :auth-conf conf )]
? ? ;; 創(chuàng)建storm集群在zookeeper上的根目錄,默認(rèn)值為/storm
? ?( zk/mkdirs zk ( conf STORM-ZOOKEEPER-ROOT ))
? ?( .close zk ))
? ;; callbacks綁定回調(diào)函數(shù)集合,是一個map對象
?( let [ callbacks ( atom {})
? ? ? ? ;; active標(biāo)示zookeeper集群狀態(tài)
? ? ? ? active ( atom true )
? ? ? ? ;; zk重新綁定新的zk client,該zk client設(shè)置了watcher,這樣當(dāng)zookeeper集群的狀態(tài)發(fā)生變化時,zk server會給zk client發(fā)送相應(yīng)的event,zk client設(shè)置的watcher會調(diào)用callbacks中相應(yīng)回調(diào)函數(shù)來處理event
? ? ? ? ;; 啟動nimbus時,callbacks是一個空集合,所以nimbus端收到event后不會調(diào)用任何回調(diào)函數(shù);但是啟動supervisor時,callbacks中注冊了回調(diào)函數(shù),所以當(dāng)supervisor收到zk server發(fā)送的event后,會調(diào)用相應(yīng)的回調(diào)函數(shù)
? ? ? ? ;; mk-client函數(shù)定義在zookeeper.clj文件中,請參見其定義部分
? ? zk ( zk/mk-client conf
? ? ? ? ? ? ? ? ? ? ? ? ( conf STORM-ZOOKEEPER-SERVERS )
? ? ? ? ? ? ? ? ? ? ? ? ( conf STORM-ZOOKEEPER-PORT )
? ? ? ? ? ? ? ? ? ? ? ? :auth-conf conf
? ? ? ? ? ? ? ? ? ? ? ? :root ( conf STORM-ZOOKEEPER-ROOT )
? ? ? ? ? ? ? ? ? ? ? ? ;; :watcher綁定一個函數(shù),指定zk client的默認(rèn)watcher函數(shù),state標(biāo)示當(dāng)前zk client的狀態(tài);type標(biāo)示事件類型;path標(biāo)示zookeeper上產(chǎn)生該事件的znode
? ? ? ? ? ? ? ? ? ? ? ? ;; 該watcher函數(shù)主要功能就是執(zhí)行callbacks集合中的函數(shù),callbacks集合中的函數(shù)是在mk-storm-cluster-state函數(shù)中通過調(diào)用ClusterState的register函數(shù)添加的
? ? ? ? ? ? ? ? ? ? ? ? :watcher ( fn [ state type path ]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when @ active
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when-not ( = :connected state )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper." ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when-not ( = :none type )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( doseq [ callback ( vals @ callbacks )]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( callback type path ))))))]
? ? ;; reify相當(dāng)于java中的implements,這里表示實現(xiàn)一個協(xié)議
? ?( reify
? ? ClusterState
? ? ;; register函數(shù)用于將回調(diào)函數(shù)加入callbacks中,key是一個32位的標(biāo)識
? ? ( register
? ? ? [ this callback ]
? ? ? ( let [ id ( uuid )]
? ? ? ? ( swap! callbacks assoc id callback )
? ? ? ? id ))
? ? ;; unregister函數(shù)用于將指定key的回調(diào)函數(shù)從callbacks中刪除
? ? ( unregister
? ? ? [ this id ]
? ? ? ( swap! callbacks dissoc id ))
? ? ;; 在zookeeper上添加一個臨時節(jié)點
? ? ( set-ephemeral-node
? ? ? [ this path data ]
? ? ? ( zk/mkdirs zk ( parent-path path ))
? ? ? ( if ( zk/exists zk path false )
? ? ? ? ( try-cause
? ? ? ? ? ( zk/set-data zk path data ) ; should verify that it's ephemeral
? ? ? ? ? ( catch KeeperException$NoNodeException e
? ? ? ? ? ? ( log-warn-error e "Ephemeral node disappeared between checking for existing and setting data" )
? ? ? ? ? ? ( zk/create-node zk path data :ephemeral )
? ? ? ? ? ? ))
? ? ? ? ( zk/create-node zk path data :ephemeral )))
? ? ;; 在zookeeper上添加一個順序節(jié)點
? ? ( create-sequential
? ? ? [ this path data ]
? ? ? ( zk/create-node zk path data :sequential ))
? ? ;; 修改某個節(jié)點數(shù)據(jù)
? ? ( set-data
? ? ? [ this path data ]
? ? ? ;; note: this does not turn off any existing watches
? ? ? ( if ( zk/exists zk path false )
? ? ? ? ( zk/set-data zk path data )
? ? ? ? ( do
? ? ? ? ? ( zk/mkdirs zk ( parent-path path ))
? ? ? ? ? ( zk/create-node zk path data :persistent ))))
? ? ;; 刪除指定節(jié)點
? ? ( delete-node
? ? ? [ this path ]
? ? ? ( zk/delete-recursive zk path ))
? ? ;; 獲取指定節(jié)點數(shù)據(jù)。path標(biāo)示節(jié)點路徑;watch?是一個布爾類型值,表示是否需要對該節(jié)點進(jìn)行"觀察",如果watch?=true,當(dāng)調(diào)用set-data函數(shù)修改該節(jié)點數(shù)據(jù)后,
? ? ;; 會給zk client發(fā)送一個事件,zk client接收事件后,會調(diào)用創(chuàng)建zk client時指定的默認(rèn)watcher函數(shù)(即:watcher綁定的函數(shù))
? ? ( get-data
? ? ? [ this path watch? ]
? ? ? ( zk/get-data zk path watch? ))
? ? ;; 與get-data函數(shù)的區(qū)別就是獲取指定節(jié)點數(shù)據(jù)的同時,獲取節(jié)點數(shù)據(jù)的version,version表示節(jié)點數(shù)據(jù)修改的次數(shù)
? ? ( get-data-with-version
? ? ? [ this path watch? ]
? ? ? ( zk/get-data-with-version zk path watch? ))
? ? ;; 獲取指定節(jié)點的version,watch?的含義與get-data函數(shù)中的watch?相同
? ? ( get-version
? ? ? [ this path watch? ]
? ? ? ( zk/get-version zk path watch? ))
? ? ;; 獲取指定節(jié)點的子節(jié)點列表,watch?的含義與get-data函數(shù)中的watch?相同
? ? ( get-children
? ? ? [ this path watch? ]
? ? ? ( zk/get-children zk path watch? ))
? ? ;; 在zookeeper上創(chuàng)建一個節(jié)點
? ? ( mkdirs
? ? ? [ this path ]
? ? ? ( zk/mkdirs zk path ))
? ? ;; 關(guān)閉zk client
? ? ( close
? ? ? [ this ]
? ? ? ( reset! active false )
? ? ? ( .close zk )))))
mk-storm-cluster-state函數(shù)定義如下:
mk-storm-cluster-state函數(shù)非常重要,該函數(shù)返回一個實現(xiàn)了StormClusterState協(xié)議的實例,通過該實例storm就可以更加方便與zookeeper進(jìn)行交互在啟動nimbus和supervisor的函數(shù)中均調(diào)用了
mk-storm-cluster-state函數(shù)。關(guān)于nimbus和supervisor的啟動將在之后的文章中介紹。
mk-storm-cluster-state函數(shù)
? [ cluster-state-spec ]
? ;; satisfies?謂詞相當(dāng)于java中的instanceof,判斷cluster-state-spec是不是ClusterState實例
?( let [[ solo? cluster-state ] ( if ( satisfies? ClusterState cluster-state-spec )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? [ false cluster-state-spec ]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? [ true ( mk-distributed-cluster-state cluster-state-spec )])
? ? ;; 綁定topology id->回調(diào)函數(shù)的map,當(dāng)/assignments/{topology id}數(shù)據(jù)發(fā)生變化時,zk client執(zhí)行assignment-info-callback中topology id所對應(yīng)的回調(diào)函數(shù)
? ? ? ? assignment-info-callback ( atom {})
? ? ;; assignment-info-with-version-callback與assignment-info-callback類似
? ? ? ? assignment-info-with-version-callback ( atom {})
? ? ;; assignment-version-callback與assignments-callback類似
? ? ? ? assignment-version-callback ( atom {})
? ? ;; 當(dāng)/supervisors標(biāo)示的znode的子節(jié)點發(fā)生變化時,zk client執(zhí)行supervisors-callback指向的函數(shù)
? ? ? ? supervisors-callback ( atom nil )
? ? ;; 當(dāng)/assignments標(biāo)示的znode的子節(jié)點發(fā)生變化時,zk client執(zhí)行assignments-callback指向的函數(shù)
? ? ? ? assignments-callback ( atom nil )
? ? ;; 當(dāng)/storms/{topology id}標(biāo)示的znode的數(shù)據(jù)發(fā)生變化時,zk client執(zhí)行storm-base-callback中topology id所對應(yīng)的回調(diào)函數(shù)
? ? ? ? storm-base-callback ( atom {})
? ? ;; register函數(shù)將"回調(diào)函數(shù)(fn ...)"添加到cluster-state的callbacks集合中,并返回標(biāo)示該回調(diào)函數(shù)的uuid
? ? ? ? state-id ( register
? ? ? ? ? ? ? ? ? cluster-state
? ? ? ? ? ? ;; 定義"回調(diào)函數(shù)",type標(biāo)示事件類型,path標(biāo)示znode
? ? ? ? ? ? ? ? ? ( fn [ type path ]
? ? ? ? ? ? ? ;; subtree綁定路徑前綴如"assignments"、"storms"、"supervisors"等,args存放topology id
? ? ? ? ? ? ? ? ? ? ( let [[ subtree & args ] ( tokenize-path path )]
? ? ? ? ? ? ? ? ;; condp相當(dāng)于java中的switch
? ? ? ? ? ? ? ? ? ? ? ( condp = subtree
? ? ? ? ? ? ? ? ? ;; 當(dāng)subtree="assignments"時,如果args為空,說明是/assignments的子節(jié)點發(fā)生變化,執(zhí)行assignments-callback指向的回調(diào)函數(shù),否則
? ? ? ? ? ? ? ;; 說明/assignments/{topology id}標(biāo)示的節(jié)點數(shù)據(jù)發(fā)生變化,執(zhí)行assignment-info-callback指向的回調(diào)函數(shù)
? ? ? ? ? ? ? ? ? ? ? ? ASSIGNMENTS-ROOT ( if ( empty? args )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( issue-callback! assignments-callback )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( issue-map-callback! assignment-info-callback ( first args )))
? ? ? ? ? ? ? ? ? ;; 當(dāng)subtree="supervisors"時,說明是/supervisors的子節(jié)點發(fā)生變化,執(zhí)行supervisors-callback指向的回調(diào)函數(shù)
? ? ? ? ? ? ? ? ? ? ? ? SUPERVISORS-ROOT ( issue-callback! supervisors-callback )
? ? ? ? ? ? ;; 當(dāng)subtree="storms"時,說明是/storms/{topology id}標(biāo)示的節(jié)點數(shù)據(jù)發(fā)生變化,執(zhí)行storm-base-callback指向的回調(diào)函數(shù)
? ? ? ? ? ? ? ? ? ? ? ? STORMS-ROOT ( issue-map-callback! storm-base-callback ( first args ))
? ? ? ? ? ? ? ? ? ? ? ? ;; this should never happen
? ? ? ? ? ? ? ? ? ? ? ? ( exit-process! 30 "Unknown callback for subtree " subtree args )))))]
? ? ;; 在zookeeper上創(chuàng)建storm運行topology所必需的znode
? ?( doseq [ p [ ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE ]]
? ? ?( mkdirs cluster-state p))
? ? ;; 返回一個實現(xiàn)StormClusterState協(xié)議的實例
? ?( reify
? ? ? StormClusterState
? ? ? ;; 獲取/assignments的子節(jié)點列表,如果callback不為空,將其賦值給assignments-callback,并對/assignments添加"節(jié)點觀察"
? ? ?( assignments
? ? ? ? [ this callback ]
? ? ? ?( when callback
? ? ? ? ?( reset! assignments-callback callback ))
? ? ? ?( get-children cluster-state ASSIGNMENTS-SUBTREE ( not-nil? callback )))
? ? ? ;; 獲取/assignments/{storm-id}節(jié)點數(shù)據(jù),即storm-id的分配信息,如果callback不為空,將其添加到assignment-info-callback中,并對/assignments/{storm-id}添加"數(shù)據(jù)觀察"
? ? ?( assignment-info
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! assignment-info-callback assoc storm-id callback ))
? ? ? ?( maybe-deserialize ( get-data cluster-state ( assignment-path storm-id ) ( not-nil? callback ))))
? ? ? ;; 獲取/assignments/{storm-id}節(jié)點數(shù)據(jù)包括version信息,如果callback不為空,將其添加到assignment-info-with-version-callback中,并對/assignments/{storm-id}添加"數(shù)據(jù)觀察"
? ? ?( assignment-info-with-version
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! assignment-info-with-version-callback assoc storm-id callback ))
? ? ? ?( let [{ data :data version :version }
? ? ? ? ? ? ?( get-data-with-version cluster-state ( assignment-path storm-id ) ( not-nil? callback ))]
? ? ? ? { :data ( maybe-deserialize data )
? ? ? ? :version version }))
? ? ? ;; 獲取/assignments/{storm-id}節(jié)點數(shù)據(jù)的version信息,如果callback不為空,將其添加到assignment-version-callback中,并對/assignments/{storm-id}添加"數(shù)據(jù)觀察"
? ? ?( assignment-version
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! assignment-version-callback assoc storm-id callback ))
? ? ? ?( get-version cluster-state ( assignment-path storm-id ) ( not-nil? callback )))
? ? ? ;; 獲取storm集群中正在運行的topology id即/storms的子節(jié)點列表
? ? ?( active-storms
? ? ? ? [ this ]
? ? ? ?( get-children cluster-state STORMS-SUBTREE false ))
? ? ? ;; 獲取storm集群中所有有心跳的topology id即/workerbeats的子節(jié)點列表
? ? ?( heartbeat-storms
? ? ? ? [ this ]
? ? ? ?( get-children cluster-state WORKERBEATS-SUBTREE false ))
? ? ? ;; 獲取所有有錯誤的topology id即/errors的子節(jié)點列表
? ? ?( error-topologies
? ? ? ? [ this ]
? ? ? ?( get-children cluster-state ERRORS-SUBTREE false ))
? ? ? ;; 獲取指定storm-id進(jìn)程的心跳信息,即/workerbeats/{storm-id}/{node-port}節(jié)點數(shù)據(jù)
? ? ?( get-worker-heartbeat
? ? ? ? [ this storm-id node port ]
? ? ? ?( -> cluster-state
? ? ? ? ? ?( get-data ( workerbeat-path storm-id node port ) false )
? ? ? ? ? ? maybe-deserialize ))
? ? ? ;; 獲取指定進(jìn)程中所有線程的心跳信息
? ? ?( executor-beats
? ? ? ? [ this storm-id executor->node+port ]
? ? ? ? ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
? ? ? ? ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
? ? ? ? ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
? ? ? ? ;; we avoid situations like that
? ? ? ?( let [ node+port->executors ( reverse-map executor->node+port )
? ? ? ? ? ? ? all-heartbeats ( for [[[ node port ] executors ] node+port->executors ]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ( ->> ( get-worker-heartbeat this storm-id node port )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( convert-executor-beats executors )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ))]
? ? ? ? ?( apply merge all-heartbeats )))
? ? ? ;; 獲取/supervisors的子節(jié)點列表,如果callback不為空,將其賦值給supervisors-callback,并對/supervisors添加"節(jié)點觀察"
? ? ?( supervisors
? ? ? ? [ this callback ]
? ? ? ?( when callback
? ? ? ? ?( reset! supervisors-callback callback ))
? ? ? ?( get-children cluster-state SUPERVISORS-SUBTREE ( not-nil? callback )))
? ? ? ;; 獲取/supervisors/{supervisor-id}節(jié)點數(shù)據(jù),即supervisor的心跳信息
? ? ?( supervisor-info
? ? ? ? [ this supervisor-id ]
? ? ? ?( maybe-deserialize ( get-data cluster-state ( supervisor-path supervisor-id ) false )))
? ? ? ;; 設(shè)置進(jìn)程心跳信息
? ? ?( worker-heartbeat!
? ? ? ? [ this storm-id node port info ]
? ? ? ?( set-data cluster-state ( workerbeat-path storm-id node port ) ( Utils/serialize info )))
? ? ? ;; 刪除進(jìn)程心跳信息
? ? ?( remove-worker-heartbeat!
? ? ? ? [ this storm-id node port ]
? ? ? ?( delete-node cluster-state ( workerbeat-path storm-id node port )))
? ? ? ;; 創(chuàng)建指定storm-id的topology的用于存放心跳信息的節(jié)點
? ? ?( setup-heartbeats!
? ? ? ? [ this storm-id ]
? ? ? ?( mkdirs cluster-state ( workerbeat-storm-root storm-id )))
? ? ? ;; 刪除指定storm-id的topology的心跳信息節(jié)點
? ? ?( teardown-heartbeats!
? ? ? ? [ this storm-id ]
? ? ? ?( try-cause
? ? ? ? ?( delete-node cluster-state ( workerbeat-storm-root storm-id ))
? ? ? ? ?( catch KeeperException e
? ? ? ? ? ?( log-warn-error e "Could not teardown heartbeats for " storm-id ))))
? ? ? ;; 刪除指定storm-id的topology的錯誤信息節(jié)點
? ? ?( teardown-topology-errors!
? ? ? ? [ this storm-id ]
? ? ? ?( try-cause
? ? ? ? ?( delete-node cluster-state ( error-storm-root storm-id ))
? ? ? ? ?( catch KeeperException e
? ? ? ? ? ?( log-warn-error e "Could not teardown errors for " storm-id ))))
? ? ? ;; 創(chuàng)建臨時節(jié)點存放supervisor的心跳信息
? ? ?( supervisor-heartbeat!
? ? ? ? [ this supervisor-id info ]
? ? ? ?( set-ephemeral-node cluster-state ( supervisor-path supervisor-id ) ( Utils/serialize info )))
? ? ? ;; 創(chuàng)建/storms/{storm-id}節(jié)點
? ? ?( activate-storm!
? ? ? ? [ this storm-id storm-base ]
? ? ? ?( set-data cluster-state ( storm-path storm-id ) ( Utils/serialize storm-base )))
? ? ? ;; 更新topology對應(yīng)的StormBase對象,即更新/storm/{storm-id}節(jié)點
? ? ?( update-storm!
? ? ? ? [ this storm-id new-elems ]
? ? ? ? ;; base綁定storm-id在zookeeper上的StormBase對象
? ? ? ?( let [ base ( storm-base this storm-id nil )
? ? ? ? ? ? ? ;; executors綁定component名稱->組件并行度的map
? ? ? ? ? ? ? executors ( :component->executors base )
? ? ? ? ? ? ? ;; new-elems綁定合并后的組件并行度map,update函數(shù)將組件新并行度map合并到舊map中
? ? ? ? ? ? ? new-elems ( update new-elems :component->executors ( partial merge executors ))]
? ? ? ? ? ;; 更新StormBase對象中的組件并行度map,并寫入zookeeper的/storms/{storm-id}節(jié)點
? ? ? ? ?( set-data cluster-state ( storm-path storm-id )
? ? ? ? ? ? ? ? ? ?( -> base
? ? ? ? ? ? ? ? ? ? ? ?( merge new-elems )
? ? ? ? ? ? ? ? ? ? ? ? Utils/serialize ))))
? ? ? ;; 獲取storm-id的StormBase對象,即讀取/storms/{storm-id}節(jié)點數(shù)據(jù),如果callback不為空,將其賦值給storm-base-callback,并為/storms/{storm-id}節(jié)點添加"數(shù)據(jù)觀察"
? ? ?( storm-base
? ? ? ? [ this storm-id callback ]
? ? ? ?( when callback
? ? ? ? ?( swap! storm-base-callback assoc storm-id callback ))
? ? ? ?( maybe-deserialize ( get-data cluster-state ( storm-path storm-id ) ( not-nil? callback ))))
? ? ? ;; 刪除storm-id的StormBase對象,即刪除/storms/{storm-id}節(jié)點
? ? ?( remove-storm-base!
? ? ? ? [ this storm-id ]
? ? ? ?( delete-node cluster-state ( storm-path storm-id )))
? ? ? ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}節(jié)點數(shù)據(jù)
? ? ?( set-assignment!
? ? ? ? [ this storm-id info ]
? ? ? ?( set-data cluster-state ( assignment-path storm-id ) ( Utils/serialize info )))
? ? ? ;; 刪除storm-id的分配信息,同時刪除其StormBase信息,即刪除/assignments/{storm-id}節(jié)點和/storms/{storm-id}節(jié)點
? ? ?( remove-storm!
? ? ? ? [ this storm-id ]
? ? ? ?( delete-node cluster-state ( assignment-path storm-id ))
? ? ? ?( remove-storm-base! this storm-id ))
? ? ? ;; 將組件異常信息寫入zookeeper
? ? ?( report-error
? ? ? ? [ this storm-id component-id node port error ]
? ? ? ? ;; path綁定"/errors/{storm-id}/{component-id}"
? ? ? ?( let [ path ( error-path storm-id component-id )
? ? ? ? ? ? ? ;; data綁定異常信息,包括異常時間、異常堆棧信息、主機(jī)和端口
? ? ? ? ? ? ? data { :time-secs ( current-time-secs ) :error ( stringify-error error ) :host node :port port }
? ? ? ? ? ? ? ;; 創(chuàng)建/errors/{storm-id}/{component-id}節(jié)點
? ? ? ? ? ? ? _ ( mkdirs cluster-state path )
? ? ? ? ? ? ? ;; 創(chuàng)建/errors/{storm-id}/{component-id}的子順序節(jié)點,并寫入異常信息
? ? ? ? ? ? ? _ ( create-sequential cluster-state ( str path "/e" ) ( Utils/serialize data ))
? ? ? ? ? ? ? ;; to-kill綁定除去順序節(jié)點編號最大的前10個節(jié)點的剩余節(jié)點的集合
? ? ? ? ? ? ? to-kill ( ->> ( get-children cluster-state path false )
? ? ? ? ? ? ? ? ? ? ? ? ? ( sort-by parse-error-path )
? ? ? ? ? ? ? ? ? ? ? ? ? reverse
? ? ? ? ? ? ? ? ? ? ? ? ? ( drop 10 ))]
? ? ? ? ? ;; 刪除to-kill中包含的節(jié)點
? ? ? ? ?( doseq [ k to-kill ]
? ? ? ? ? ?( delete-node cluster-state ( str path "/" k )))))
? ? ? ;; 得到給定的storm-id component-id下的異常信息
? ? ?( errors
? ? ? ? [ this storm-id component-id ]
? ? ? ?( let [ path ( error-path storm-id component-id )
? ? ? ? ? ? ? _ ( mkdirs cluster-state path )
? ? ? ? ? ? ? children ( get-children cluster-state path false )
? ? ? ? ? ? ? errors ( dofor [ c children ]
? ? ? ? ? ? ? ? ? ? ? ? ? ?( let [ data ( -> ( get-data cluster-state ( str path "/" c ) false )
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? maybe-deserialize )]
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( when data
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( struct TaskError ( :error data ) ( :time-secs data ) ( :host data ) ( :port data ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?)))
? ? ? ? ? ? ? ]
? ? ? ? ?( ->> ( filter not-nil? errors )
? ? ? ? ? ? ? ( sort-by ( comp - :time-secs )))))
? ? ? ;; 關(guān)閉連接,在關(guān)閉連接前,將回調(diào)函數(shù)從cluster-state的callbacks中刪除
? ? ?( disconnect
? ? ? ? [ this ]
? ? ? ?( unregister cluster-state state-id )
? ? ? ?( when solo?
? ? ? ? ?( close cluster-state ))))))
zookeeper.clj中mk-client函數(shù)定義如下:
mk-client函數(shù)創(chuàng)建一個CuratorFramework實例,為該實例注冊了CuratorListener,當(dāng)一個后臺操作完成或者指定的watch被觸發(fā)時將會執(zhí)行CuratorListener中的eventReceived()。eventReceived中調(diào)用的wacher函數(shù)就是mk-distributed-cluster-state中:watcher綁定的函數(shù)。
mk-client函數(shù)
? [ conf servers port
? :root ""
? :watcher default-watcher
? :auth-conf nil ]
?( let [ fk ( Utils/newCurator conf servers port root ( when auth-conf ( ZookeeperAuthInfo. auth-conf )))]
? ?( .. fk
? ? ? ?( getCuratorListenable )
? ? ? ?( addListener
? ? ? ? ?( reify CuratorListener
? ? ? ? ? ?( ^ void eventReceived [ this ^ CuratorFramework _fk ^ CuratorEvent e ]
? ? ? ? ? ? ? ? ? ( when ( = ( .getType e ) CuratorEventType/WATCHED )
? ? ? ? ? ? ? ? ? ? ( let [ ^ WatchedEvent event ( .getWatchedEvent e )]
? ? ? ? ? ? ? ? ? ? ? ( watcher ( zk-keeper-states ( .getState event ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( zk-event-types ( .getType event ))
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?( .getPath event ))))))))
? ?( .start fk )
? ? fk ))
以上就是storm與zookeeper進(jìn)行交互的源碼分析,我覺得最重要的部分就是如何給zk client添加"wacher",storm的很多功能都是通過zookeeper的wacher機(jī)制實現(xiàn)的,如"分配信息領(lǐng)取"。添加"wacher"大概分為以下幾個步驟:
-
mk-distributed-cluster-state函數(shù)創(chuàng)建了一個zk client,并通過:watcher給該zk client指定了"wacher"函數(shù),這個"wacher"函數(shù)只是簡單調(diào)用ClusterState的callbacks集合中的函數(shù),這樣這個"wacher"函數(shù)執(zhí)行哪些函數(shù)將由ClusterState實例決定
- ClusterState實例提供register函數(shù)來更新callbacks集合,ClusterState實例被傳遞給了mk-storm-cluster-state函數(shù),在mk-storm-cluster-state中調(diào)用register添加了一個函數(shù)(fn [type path] ... ),這個函數(shù)實現(xiàn)了"watcher"函數(shù)的全部邏輯
-
mk-storm-cluster-state中注冊的函數(shù)執(zhí)行的具體內(nèi)容由StormClusterState實例決定,對zookeeper節(jié)點添加"觀察"也是通過StormClusterState實例實現(xiàn)的,這樣我們就可以通過StormClusterState實例對我們感興趣的節(jié)點添加"觀察"和"回調(diào)函數(shù)",當(dāng)節(jié)點或節(jié)點數(shù)據(jù)發(fā)生變化后,zk server就會給zk client發(fā)送"通知",zk client中的"wather"函數(shù)將被調(diào)用,進(jìn)而我們注冊的"回到函數(shù)"將被執(zhí)行。
這部分源碼與zookeeper聯(lián)系十分緊密,涉及了很多zookeeper中的概念和特性,如"數(shù)據(jù)觀察"和"節(jié)點觀察"等,有關(guān)zookeeper的wacher機(jī)制請參考
http://www.cnblogs.com/ggjucheng/p/3369946.html
http://www.cnblogs.com/zhangchaoyang/articles/3813217.html
storm并沒有直接使用zookeeper的api,而是使用Curator框架,Curator框架簡化了訪問zookeeper的操作。關(guān)于Curator框架請參考
http://f.dataguru.cn/thread-120125-1-1.html
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
