溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

storm操作zookeeper的方法是什么

發(fā)布時間:2021-12-23 11:58:44 來源:億速云 閱讀:150 作者:iii 欄目:云計算

這篇文章主要介紹“storm操作zookeeper的方法是什么”,在日常操作中,相信很多人在storm操作zookeeper的方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”storm操作zookeeper的方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

storm操作zookeeper的主要函數(shù)都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定義了兩個重要protocol:ClusterState和StormClusterState。
clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協(xié)議中封裝了一組與zookeeper進(jìn)行交互的基礎(chǔ)函數(shù),如獲取子節(jié)點函數(shù),獲取子節(jié)點數(shù)據(jù)函數(shù)等,ClusterState協(xié)議定義如下:

ClusterState協(xié)議

(defprotocol ClusterState
    (set-ephemeral-node [this path data])
    (delete-node [this path])
    (create-sequential [this path data])
    ;; if node does not exist, create persistent with this data
    (set-data [this path data])
    (get-data [this path watch?])
    (get-version [this path watch?])
    (get-data-with-version [this path watch?])
    (get-children [this path watch?])
    (mkdirs [this path])
    (close [this])
    (register [this callback])
    (unregister [this id]))

StormClusterState協(xié)議封裝了一組storm與zookeeper進(jìn)行交互的函數(shù),可以將StormClusterState協(xié)議中的函數(shù)看成ClusterState協(xié)議中函數(shù)的"組合"。StormClusterState協(xié)議定義如下:

StormClusterState協(xié)議

(defprotocol StormClusterState
  (assignments [this callback])
  (assignment-info [this storm-id callback])
  (assignment-info-with-version [this storm-id callback])
  (assignment-version [this storm-id callback])
  (active-storms [this])
  (storm-base [this storm-id callback])
  (get-worker-heartbeat [this storm-id node port])
  (executor-beats [this storm-id executor->node+port])
  (supervisors [this callback])
  (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
  (setup-heartbeats! [this storm-id])
  (teardown-heartbeats! [this storm-id])
  (teardown-topology-errors! [this storm-id])
  (heartbeat-storms [this])
  (error-topologies [this])
  (worker-heartbeat! [this storm-id node port info])
  (remove-worker-heartbeat! [this storm-id node port])
  (supervisor-heartbeat! [this supervisor-id info])
  (activate-storm! [this storm-id storm-base])
  (update-storm! [this storm-id new-elems])
  (remove-storm-base! [this storm-id])
  (set-assignment! [this storm-id info])
  (remove-storm! [this storm-id])
  (report-error [this storm-id task-id node port error])
  (errors [this storm-id task-id])
  (disconnect [this]))

命名空間backtype.storm.cluster除了定義ClusterState和StormClusterState這兩個重要協(xié)議外,還定義了兩個重要函數(shù):mk-distributed-cluster-state和mk-storm-cluster-state。
mk-distributed-cluster-state函數(shù)如下:
該函數(shù)返回一個實現(xiàn)了ClusterState協(xié)議的對象,通過這個對象就可以與zookeeper進(jìn)行交互了。

mk-distributed-cluster-state函數(shù)

(defn mk-distributed-cluster-state
;; conf綁定了storm.yaml中的配置信息,是一個map對象
[conf]
;; zk綁定一個zk client,Storm使用CuratorFramework與Zookeeper進(jìn)行交互 
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT)                       :auth-conf conf)]
    ;; 創(chuàng)建storm集群在zookeeper上的根目錄,默認(rèn)值為/storm
    (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
    (.close zk))
;; callbacks綁定回調(diào)函數(shù)集合,是一個map對象
(let [callbacks (atom {})
    ;; active標(biāo)示zookeeper集群狀態(tài) 
    active (atom true)
    ;; zk重新綁定新的zk client,該zk client設(shè)置了watcher,這樣當(dāng)zookeeper集群的狀態(tài)發(fā)生變化時,zk server會給zk client發(fā)送相應(yīng)的event,zk client設(shè)置的watcher會調(diào)用callbacks中相應(yīng)回調(diào)函數(shù)來處理event
    ;; 啟動nimbus時,callbacks是一個空集合,所以nimbus端收到event后不會調(diào)用任何回調(diào)函數(shù);但是啟動supervisor時,callbacks中注冊了回調(diào)函數(shù),所以當(dāng)supervisor收到zk server發(fā)送的event后,會調(diào)用相應(yīng)的回調(diào)函數(shù)
    ;; mk-client函數(shù)定義在zookeeper.clj文件中,請參見其定義部分
    zk (zk/mk-client conf
                     (conf STORM-ZOOKEEPER-SERVERS)
                     (conf STORM-ZOOKEEPER-PORT)
                     :auth-conf conf
                     :root (conf STORM-ZOOKEEPER-ROOT)
                     ;; :watcher綁定一個函數(shù),指定zk client的默認(rèn)watcher函數(shù),state標(biāo)示當(dāng)前zk client的狀態(tài);type標(biāo)示事件類型;path標(biāo)示zookeeper上產(chǎn)生該事件的znode
                     ;; 該watcher函數(shù)主要功能就是執(zhí)行callbacks集合中的函數(shù),callbacks集合中的函數(shù)是在mk-storm-cluster-state函數(shù)中通過調(diào)用ClusterState的register函數(shù)添加的
                     :watcher (fn [state type path]
                                (when @active
                                  (when-not (= :connected state)
                                    (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                                  (when-not (= :none type)
                                    (doseq [callback (vals @callbacks)]
                                      (callback type path))))))]
;; reify相當(dāng)于java中的implements,這里表示實現(xiàn)一個協(xié)議
(reify
 ClusterState
 ;; register函數(shù)用于將回調(diào)函數(shù)加入callbacks中,key是一個32位的標(biāo)識
 (register
   [this callback]
   (let [id (uuid)]
     (swap! callbacks assoc id callback)
     id))
 ;; unregister函數(shù)用于將指定key的回調(diào)函數(shù)從callbacks中刪除
 (unregister
   [this id]
   (swap! callbacks dissoc id))
 ;; 在zookeeper上添加一個臨時節(jié)點
 (set-ephemeral-node
   [this path data]
   (zk/mkdirs zk (parent-path path))
   (if (zk/exists zk path false)
     (try-cause
       (zk/set-data zk path data) ; should verify that it's ephemeral
       (catch KeeperException$NoNodeException e
         (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
         (zk/create-node zk path data :ephemeral)
         ))
     (zk/create-node zk path data :ephemeral)))
 ;; 在zookeeper上添加一個順序節(jié)點
 (create-sequential
   [this path data]
   (zk/create-node zk path data :sequential))
 ;; 修改某個節(jié)點數(shù)據(jù)
 (set-data
   [this path data]
   ;; note: this does not turn off any existing watches
   (if (zk/exists zk path false)
     (zk/set-data zk path data)
     (do
       (zk/mkdirs zk (parent-path path))
       (zk/create-node zk path data :persistent))))
 ;; 刪除指定節(jié)點
 (delete-node
   [this path]
   (zk/delete-recursive zk path))
 ;; 獲取指定節(jié)點數(shù)據(jù)。path標(biāo)示節(jié)點路徑;watch?是一個布爾類型值,表示是否需要對該節(jié)點進(jìn)行"觀察",如果watch?=true,當(dāng)調(diào)用set-data函數(shù)修改該節(jié)點數(shù)據(jù)后,
 ;; 會給zk client發(fā)送一個事件,zk client接收事件后,會調(diào)用創(chuàng)建zk client時指定的默認(rèn)watcher函數(shù)(即:watcher綁定的函數(shù))
 (get-data
   [this path watch?]
   (zk/get-data zk path watch?))
 ;; 與get-data函數(shù)的區(qū)別就是獲取指定節(jié)點數(shù)據(jù)的同時,獲取節(jié)點數(shù)據(jù)的version,version表示節(jié)點數(shù)據(jù)修改的次數(shù)
 (get-data-with-version
   [this path watch?]
   (zk/get-data-with-version zk path watch?))
 ;; 獲取指定節(jié)點的version,watch?的含義與get-data函數(shù)中的watch?相同
 (get-version 
   [this path watch?]
   (zk/get-version zk path watch?))
 ;; 獲取指定節(jié)點的子節(jié)點列表,watch?的含義與get-data函數(shù)中的watch?相同
 (get-children
   [this path watch?]
   (zk/get-children zk path watch?))
 ;; 在zookeeper上創(chuàng)建一個節(jié)點
 (mkdirs
   [this path]
   (zk/mkdirs zk path))
 ;; 關(guān)閉zk client
 (close
   [this]
   (reset! active false)
   (.close zk)))))

mk-storm-cluster-state函數(shù)定義如下:
mk-storm-cluster-state函數(shù)非常重要,該函數(shù)返回一個實現(xiàn)了StormClusterState協(xié)議的實例,通過該實例storm就可以更加方便與zookeeper進(jìn)行交互。
在啟動nimbus和supervisor的函數(shù)中均調(diào)用了mk-storm-cluster-state函數(shù)。關(guān)于nimbus和supervisor的啟動將在之后的文章中介紹。

mk-storm-cluster-state函數(shù)

(defn mk-storm-cluster-state
  [cluster-state-spec]
  ;; satisfies?謂詞相當(dāng)于java中的instanceof,判斷cluster-state-spec是不是ClusterState實例
  (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                            [false cluster-state-spec]
                            [true (mk-distributed-cluster-state cluster-state-spec)])
    ;; 綁定topology id->回調(diào)函數(shù)的map,當(dāng)/assignments/{topology id}數(shù)據(jù)發(fā)生變化時,zk client執(zhí)行assignment-info-callback中topology id所對應(yīng)的回調(diào)函數(shù)
    assignment-info-callback (atom {})
    ;; assignment-info-with-version-callback與assignment-info-callback類似
    assignment-info-with-version-callback (atom {})
    ;; assignment-version-callback與assignments-callback類似
    assignment-version-callback (atom {})
    ;; 當(dāng)/supervisors標(biāo)示的znode的子節(jié)點發(fā)生變化時,zk client執(zhí)行supervisors-callback指向的函數(shù)
    supervisors-callback (atom nil)
    ;; 當(dāng)/assignments標(biāo)示的znode的子節(jié)點發(fā)生變化時,zk client執(zhí)行assignments-callback指向的函數(shù)
    assignments-callback (atom nil)
    ;; 當(dāng)/storms/{topology id}標(biāo)示的znode的數(shù)據(jù)發(fā)生變化時,zk client執(zhí)行storm-base-callback中topology id所對應(yīng)的回調(diào)函數(shù)
    storm-base-callback (atom {})
    ;; register函數(shù)將"回調(diào)函數(shù)(fn ...)"添加到cluster-state的callbacks集合中,并返回標(biāo)示該回調(diào)函數(shù)的uuid
    state-id (register
               cluster-state
               ;; 定義"回調(diào)函數(shù)",type標(biāo)示事件類型,path標(biāo)示znode
               (fn [type path]
                 ;; subtree綁定路徑前綴如"assignments"、"storms"、"supervisors"等,args存放topology id
                 (let [[subtree & args] (tokenize-path path)]
                   ;; condp相當(dāng)于java中的switch
                   (condp = subtree
                     ;; 當(dāng)subtree="assignments"時,如果args為空,說明是/assignments的子節(jié)點發(fā)生變化,執(zhí)行assignments-callback指向的回調(diào)函數(shù),否則
         ;; 說明/assignments/{topology id}標(biāo)示的節(jié)點數(shù)據(jù)發(fā)生變化,執(zhí)行assignment-info-callback指向的回調(diào)函數(shù)
                     ASSIGNMENTS-ROOT (if (empty? args)
                                        (issue-callback! assignments-callback)
                                        (issue-map-callback! assignment-info-callback (first args)))
                     ;; 當(dāng)subtree="supervisors"時,說明是/supervisors的子節(jié)點發(fā)生變化,執(zhí)行supervisors-callback指向的回調(diào)函數(shù)
                     SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                     ;; 當(dāng)subtree="storms"時,說明是/storms/{topology id}標(biāo)示的節(jié)點數(shù)據(jù)發(fā)生變化,執(zhí)行storm-base-callback指向的回調(diào)函數(shù)
                     STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                     ;; this should never happen
                     (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
;; 在zookeeper上創(chuàng)建storm運(yùn)行topology所必需的znode
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
  (mkdirs cluster-state p))
;; 返回一個實現(xiàn)StormClusterState協(xié)議的實例
(reify
  StormClusterState
  ;; 獲取/assignments的子節(jié)點列表,如果callback不為空,將其賦值給assignments-callback,并對/assignments添加"節(jié)點觀察"
  (assignments
    [this callback]
    (when callback
      (reset! assignments-callback callback))
    (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
  ;; 獲取/assignments/{storm-id}節(jié)點數(shù)據(jù),即storm-id的分配信息,如果callback不為空,將其添加到assignment-info-callback中,并對/assignments/{storm-id}添加"數(shù)據(jù)觀察"
  (assignment-info
    [this storm-id callback]
    (when callback
      (swap! assignment-info-callback assoc storm-id callback))
    (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
  ;; 獲取/assignments/{storm-id}節(jié)點數(shù)據(jù)包括version信息,如果callback不為空,將其添加到assignment-info-with-version-callback中,并對/assignments/{storm-id}添加"數(shù)據(jù)觀察"
  (assignment-info-with-version 
    [this storm-id callback]
    (when callback
      (swap! assignment-info-with-version-callback assoc storm-id callback))
    (let [{data :data version :version} 
          (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
    {:data (maybe-deserialize data)
     :version version}))
  ;; 獲取/assignments/{storm-id}節(jié)點數(shù)據(jù)的version信息,如果callback不為空,將其添加到assignment-version-callback中,并對/assignments/{storm-id}添加"數(shù)據(jù)觀察"
  (assignment-version 
    [this storm-id callback]
    (when callback
      (swap! assignment-version-callback assoc storm-id callback))
    (get-version cluster-state (assignment-path storm-id) (not-nil? callback)))
  ;; 獲取storm集群中正在運(yùn)行的topology id即/storms的子節(jié)點列表
  (active-storms
    [this]
    (get-children cluster-state STORMS-SUBTREE false))
  ;; 獲取storm集群中所有有心跳的topology id即/workerbeats的子節(jié)點列表
  (heartbeat-storms
    [this]
    (get-children cluster-state WORKERBEATS-SUBTREE false))
  ;; 獲取所有有錯誤的topology id即/errors的子節(jié)點列表
  (error-topologies
    [this]
    (get-children cluster-state ERRORS-SUBTREE false))
  ;; 獲取指定storm-id進(jìn)程的心跳信息,即/workerbeats/{storm-id}/{node-port}節(jié)點數(shù)據(jù)
  (get-worker-heartbeat
    [this storm-id node port]
    (-> cluster-state
        (get-data (workerbeat-path storm-id node port) false)
        maybe-deserialize))
  ;; 獲取指定進(jìn)程中所有線程的心跳信息
  (executor-beats
    [this storm-id executor->node+port]
    ;; need to take executor->node+port in explicitly so that we don't run into a situation where a
    ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
    ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
    ;; we avoid situations like that
    (let [node+port->executors (reverse-map executor->node+port)
          all-heartbeats (for [[[node port] executors] node+port->executors]
                           (->> (get-worker-heartbeat this storm-id node port)
                                (convert-executor-beats executors)
                                ))]
      (apply merge all-heartbeats)))
  ;; 獲取/supervisors的子節(jié)點列表,如果callback不為空,將其賦值給supervisors-callback,并對/supervisors添加"節(jié)點觀察" 
  (supervisors
    [this callback]
    (when callback
      (reset! supervisors-callback callback))
    (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
  ;; 獲取/supervisors/{supervisor-id}節(jié)點數(shù)據(jù),即supervisor的心跳信息
  (supervisor-info
    [this supervisor-id]
    (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
  ;; 設(shè)置進(jìn)程心跳信息
  (worker-heartbeat!
    [this storm-id node port info]
    (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))
  ;; 刪除進(jìn)程心跳信息
  (remove-worker-heartbeat!
    [this storm-id node port]
    (delete-node cluster-state (workerbeat-path storm-id node port)))
  ;; 創(chuàng)建指定storm-id的topology的用于存放心跳信息的節(jié)點
  (setup-heartbeats!
    [this storm-id]
    (mkdirs cluster-state (workerbeat-storm-root storm-id)))
  ;; 刪除指定storm-id的topology的心跳信息節(jié)點
  (teardown-heartbeats!
    [this storm-id]
    (try-cause
      (delete-node cluster-state (workerbeat-storm-root storm-id))
      (catch KeeperException e
        (log-warn-error e "Could not teardown heartbeats for " storm-id))))
  ;; 刪除指定storm-id的topology的錯誤信息節(jié)點
  (teardown-topology-errors!
    [this storm-id]
    (try-cause
      (delete-node cluster-state (error-storm-root storm-id))
      (catch KeeperException e
        (log-warn-error e "Could not teardown errors for " storm-id))))
  ;; 創(chuàng)建臨時節(jié)點存放supervisor的心跳信息
  (supervisor-heartbeat!
    [this supervisor-id info]
    (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))
  ;; 創(chuàng)建/storms/{storm-id}節(jié)點
  (activate-storm!
    [this storm-id storm-base]
    (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))
  ;; 更新topology對應(yīng)的StormBase對象,即更新/storm/{storm-id}節(jié)點
  (update-storm!
    [this storm-id new-elems]
    ;; base綁定storm-id在zookeeper上的StormBase對象
    (let [base (storm-base this storm-id nil)
          ;; executors綁定component名稱->組件并行度的map
          executors (:component->executors base)
          ;; new-elems綁定合并后的組件并行度map,update函數(shù)將組件新并行度map合并到舊map中
          new-elems (update new-elems :component->executors (partial merge executors))]
      ;; 更新StormBase對象中的組件并行度map,并寫入zookeeper的/storms/{storm-id}節(jié)點
      (set-data cluster-state (storm-path storm-id)
                (-> base
                    (merge new-elems)
                    Utils/serialize))))
  ;; 獲取storm-id的StormBase對象,即讀取/storms/{storm-id}節(jié)點數(shù)據(jù),如果callback不為空,將其賦值給storm-base-callback,并為/storms/{storm-id}節(jié)點添加"數(shù)據(jù)觀察"
  (storm-base
    [this storm-id callback]
    (when callback
      (swap! storm-base-callback assoc storm-id callback))
    (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
  ;; 刪除storm-id的StormBase對象,即刪除/storms/{storm-id}節(jié)點
  (remove-storm-base!
    [this storm-id]
    (delete-node cluster-state (storm-path storm-id)))
  ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}節(jié)點數(shù)據(jù)
  (set-assignment!
    [this storm-id info]
    (set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))
  ;; 刪除storm-id的分配信息,同時刪除其StormBase信息,即刪除/assignments/{storm-id}節(jié)點和/storms/{storm-id}節(jié)點
  (remove-storm!
    [this storm-id]
    (delete-node cluster-state (assignment-path storm-id))
    (remove-storm-base! this storm-id))
  ;; 將組件異常信息寫入zookeeper
  (report-error
    [this storm-id component-id node port error]
    ;; path綁定"/errors/{storm-id}/{component-id}"
    (let [path (error-path storm-id component-id)
          ;; data綁定異常信息,包括異常時間、異常堆棧信息、主機(jī)和端口
          data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
          ;; 創(chuàng)建/errors/{storm-id}/{component-id}節(jié)點
          _ (mkdirs cluster-state path)
          ;; 創(chuàng)建/errors/{storm-id}/{component-id}的子順序節(jié)點,并寫入異常信息
          _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))
          ;; to-kill綁定除去順序節(jié)點編號最大的前10個節(jié)點的剩余節(jié)點的集合
          to-kill (->> (get-children cluster-state path false)
                       (sort-by parse-error-path)
                       reverse
                       (drop 10))]
      ;; 刪除to-kill中包含的節(jié)點
      (doseq [k to-kill]
        (delete-node cluster-state (str path "/" k)))))
  ;; 得到給定的storm-id component-id下的異常信息
  (errors
    [this storm-id component-id]
    (let [path (error-path storm-id component-id)
          _ (mkdirs cluster-state path)
          children (get-children cluster-state path false)
          errors (dofor [c children]
                        (let [data (-> (get-data cluster-state (str path "/" c) false)
                                       maybe-deserialize)]
                          (when data
                            (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
                            )))
          ]
      (->> (filter not-nil? errors)
           (sort-by (comp - :time-secs)))))
  ;; 關(guān)閉連接,在關(guān)閉連接前,將回調(diào)函數(shù)從cluster-state的callbacks中刪除
  (disconnect
    [this]
    (unregister cluster-state state-id)
    (when solo?
      (close cluster-state))))))

zookeeper.clj中mk-client函數(shù)

mk-client函數(shù)創(chuàng)建一個CuratorFramework實例,為該實例注冊了CuratorListener,當(dāng)一個后臺操作完成或者指定的watch被觸發(fā)時將會執(zhí)行CuratorListener中的eventReceived()。eventReceived中調(diào)用的wacher函數(shù)就是mk-distributed-cluster-state中:watcher綁定的函數(shù)。

(defnk mk-client
  [conf servers port
   :root ""
   :watcher default-watcher
   :auth-conf nil]
  (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
    (.. fk
        (getCuratorListenable)
        (addListener
          (reify CuratorListener
            (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
               (when (= (.getType e) CuratorEventType/WATCHED)
                 (let [^WatchedEvent event (.getWatchedEvent e)]
                   (watcher (zk-keeper-states (.getState event))
                            (zk-event-types (.getType event))
                            (.getPath event))))))))
    (.start fk)
    fk))

以上就是storm與zookeeper進(jìn)行交互的源碼分析,我覺得最重要的部分就是如何給zk client添加"wacher",storm的很多功能都是通過zookeeper的wacher機(jī)制實現(xiàn)的,如"分配信息領(lǐng)取"。添加"wacher"大概分為以下幾個步驟:

  1. mk-distributed-cluster-state函數(shù)創(chuàng)建了一個zk client,并通過:watcher給該zk client指定了"wacher"函數(shù),這個"wacher"函數(shù)只是簡單調(diào)用ClusterState的callbacks集合中的函數(shù),這樣這個"wacher"函數(shù)執(zhí)行 哪些函數(shù)將由ClusterState實例決定

  2. ClusterState實例提供register函數(shù)來更新callbacks集合,ClusterState實例被傳遞給了mk-storm-cluster-state函數(shù),在mk-storm-cluster-state中調(diào)用register添加了一個函數(shù)(fn [type path] ... ),這個函數(shù)實現(xiàn)了"watcher"函數(shù)的全部邏輯

  3. mk-storm-cluster-state中注冊的函數(shù)執(zhí)行的具體內(nèi)容由StormClusterState實例決定,對zookeeper節(jié)點添加"觀察"也是通過StormClusterState實例實現(xiàn)的,這樣我們就可以通過StormClusterState實例對我們感興趣的節(jié)點添加"觀察"和"回調(diào)函數(shù)",當(dāng)節(jié)點或節(jié)點數(shù)據(jù)發(fā)生變化后,zk server就會給zk client發(fā)送"通知",zk client中的"wather"函數(shù)將被調(diào)用,進(jìn)而我們注冊的"回到函數(shù)"將被執(zhí)行。

到此,關(guān)于“storm操作zookeeper的方法是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI