溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何將數(shù)據(jù)按指定格式存入zookeeper

發(fā)布時間:2021-12-22 14:10:47 來源:億速云 閱讀:127 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要講解了“如何將數(shù)據(jù)按指定格式存入zookeeper”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“如何將數(shù)據(jù)按指定格式存入zookeeper”吧!

環(huán)境:

  scala版本:2.11.8

  zookeeper版本:3.4.5-cdh6.7.0

package com.ruozedata.zk
import java.util.concurrent.TimeUnit
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
  * Created by ganwei on 2018/08/21
  * 要求:
  * 1 通過storeOffsets方法把數(shù)據(jù)存入zookeeper中。
  *  存儲格式:
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/0
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/1
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/2
  * 2 通過obtainOffsets方法把存入的數(shù)據(jù)讀取出來
  * 輸出格式:
  *           topic:ruoze_offset_topic	partition:0	offset:7
  *           topic:ruoze_offset_topic	partition:1	offset:3
  *           topic:ruoze_offset_topic	partition:2	offset:5
  */
object ZkConnectApp{
  val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass)
  val client = {
    val client = CuratorFrameworkFactory
      .builder
      .connectString("172.16.100.31:2181")
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("consumers")
      .build()
    client.start()
    client
  }
  def lock(path: String)(body: => Unit) {
    val lock = new InterProcessMutex(client, path)
    lock.acquire()
    try {
      body
    } finally {
      lock.release()
    }
  }
  def tryDo(path: String)(body: => Unit): Boolean = {
    val lock = new InterProcessMutex(client, path)
    if (!lock.acquire(10, TimeUnit.SECONDS)) {
      LOG.info(s"不能獲得鎖 {$path},已經(jīng)有任務(wù)在運(yùn)行,本次任務(wù)退出")
      return false
    }
    try {
      LOG.info("獲準(zhǔn)運(yùn)行")
      body
      true
    } finally {
      lock.release()
      LOG.info(s"釋放鎖 {$path}")
    }
  }
  //zookeeper創(chuàng)建路徑
  def ensurePathExists(path: String): Unit = {
    if (client.checkExists().forPath(path) == null) {
      client.create().creatingParentsIfNeeded().forPath(path)
    }
  }
  /**
    * OffsetRange類定義(偏移量對象)
    * 用于存儲偏移量
    */
  case class OffsetRange(
                          val topic:String,     // 主題
                          val partition:Int,    // 分區(qū)
                          val fromOffset:Long,  // 起始偏移量
                          val utilOffset:Long   // 終止偏移量
                        )
  /**
    * zookeeper存儲offset的方法
    * 寫入格式:
    * /consumers/G322/offsets/ruoze_offset_topic/partition/0
    * /consumers/G322/offsets/ruoze_offset_topic/partition/1
    * /consumers/G322/offsets/ruoze_offset_topic/partition/2
    * @param OffsetsRanges
    * @param groupName
    */
  def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={
    val offsetRootPath = s"/"+groupName
    if (client.checkExists().forPath(offsetRootPath) == null) {
      client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
    }
    for(els <- OffsetsRanges ){
      val data = String.valueOf(els.utilOffset).getBytes
      val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}"
      // 創(chuàng)建路徑
      ensurePathExists(path)
      // 寫入數(shù)據(jù)
      client.setData().forPath(path, data)
    }
  }
  /**
    * TopicAndPartition類定義(偏移量key對象)
    *  用于提取偏移量
    */
  case class TopicAndPartition(
                                topic:String,  // 主題
                                partition:Int  // 分區(qū)
                              )
  /**
    * zookeeper提取offset的方法
    * @param topic
    * @param groupName
    * @return
    */
  def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={
    // 定義一個空的HashMap
    val maps = mutable.HashMap[TopicAndPartition,Long]()
    // offset的路徑
    val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition"
    // 判斷路徑是否存在
    val stat = client.checkExists().forPath(s"$offsetRootPath")
    if (stat == null ){
      println(stat)  // 路徑不存在 就將路徑打印在控制臺,檢查路徑
    }else{
      // 獲取 offsetRootPath路徑下一級的所有子目錄
      // 我們這里是獲取的所有分區(qū)
      val children = client.getChildren.forPath(s"$offsetRootPath")
     // 遍歷所有的分區(qū)
      for ( lines <- children ){
        // 獲取分區(qū)的數(shù)據(jù)
        val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong
        // 將 topic  partition  和數(shù)據(jù)賦值給 maps
        maps(TopicAndPartition(topic,lines.toInt)) = data
      }
    }
    // 按partition排序后 返回map對象
    maps.toList.sortBy(_._1.partition).toMap
  }
  def main(args: Array[String]) {
      //定義初始化數(shù)據(jù)
      val off1 = OffsetRange("ruoze_offset_topic",0,0,7)
      val off2 = OffsetRange("ruoze_offset_topic",1,0,3)
      val off3 = OffsetRange("ruoze_offset_topic",2,0,5)
      val arr = Array(off1,off2,off3)
      //獲取到namespace
//      println(client.getNamespace)
      // 創(chuàng)建路徑
//      val offsetRootPath = "/G322"
//      if (client.checkExists().forPath(offsetRootPath) == null) {
//        client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
//      }
      //存儲值
      storeOffsets(arr,"G322")
      //獲取值
      /**
        * 輸出格式:
        * topic:ruoze_offset_topic	partition:0	offset:7
        * topic:ruoze_offset_topic	partition:1	offset:3
        * topic:ruoze_offset_topic	partition:2	offset:5
        */
      val result = obtainOffsets("ruoze_offset_topic","G322")
      for (map <- result){
        println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2)
      }
  }
}

感謝各位的閱讀,以上就是“如何將數(shù)據(jù)按指定格式存入zookeeper”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對如何將數(shù)據(jù)按指定格式存入zookeeper這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI