溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù)開(kāi)發(fā)中數(shù)據(jù)表監(jiān)控怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-31 14:46:55 來(lái)源:億速云 閱讀:138 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“大數(shù)據(jù)開(kāi)發(fā)中數(shù)據(jù)表監(jiān)控怎么實(shí)現(xiàn)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

1.簡(jiǎn)介

大數(shù)據(jù)開(kāi)發(fā)-表數(shù)據(jù)波動(dòng)、碼值分布波動(dòng)監(jiān)控 && 報(bào)警,是關(guān)于理論和設(shè)計(jì)部分,初步計(jì)算已經(jīng)寫完,管理平臺(tái)部分,后續(xù)完善,本文主要針對(duì)模塊設(shè)計(jì)部分,整體模塊實(shí)現(xiàn)上是離線數(shù)據(jù)源的異步模塊,分為指標(biāo)跑批模塊,監(jiān)控報(bào)警模塊,平臺(tái)管理模塊,指標(biāo)跑批模塊和監(jiān)控報(bào)警模塊主要是基于離線數(shù)據(jù)的表的通訊,即不是耦合的架構(gòu),分別是兩個(gè)例行的任務(wù),看下面的圖,

大數(shù)據(jù)開(kāi)發(fā)中數(shù)據(jù)表監(jiān)控怎么實(shí)現(xiàn)

2.說(shuō)明

其中表設(shè)計(jì)了五張表,分別是指標(biāo)跑批記錄表,報(bào)警跑批記錄表,和mysql表監(jiān)控配置表,報(bào)警配置表,分布字段配置表,指標(biāo)跑批和監(jiān)控跑批不直接任務(wù)依賴,而是通過(guò)指標(biāo)跑批記錄表,直接產(chǎn)生報(bào)警跑批記錄

3.代碼實(shí)現(xiàn)

整個(gè)項(xiàng)目目錄結(jié)構(gòu)如下圖:

resource: 配置文件

common: 一些公共的模塊,Builder是發(fā)送消息構(gòu)建器,F(xiàn)ather是Spark項(xiàng)目的公共代碼

rules: 下面有5個(gè)指標(biāo)的規(guī)則,分別是檢查分區(qū)是否存儲(chǔ),檢查分區(qū)數(shù)量是否大于某個(gè)值,檢查分區(qū)數(shù)量波動(dòng),檢查分布,檢查分布大向量波動(dòng)

utils: 里面放的是一些工具,比如日期處理工具類,表格式處理工具,sql處理工具等

Monitor: 指標(biāo)跑批的主類

SunRobot: 報(bào)警跑批的主類入庫(kù)

大數(shù)據(jù)開(kāi)發(fā)中數(shù)據(jù)表監(jiān)控怎么實(shí)現(xiàn)

4.Monitor類說(shuō)明

rule的一些實(shí)現(xiàn),不細(xì)說(shuō)了,根據(jù)源代碼很好看懂,而Monitor是怎么根據(jù)這些規(guī)則,生成對(duì)應(yīng)的流水,主要實(shí)現(xiàn)代碼如下:

package com.hoult

import com.beust.jcommander.JCommander
import com.hoult.common.Father
import com.hoult.rules.{Rule1, Rule2, Rule3, Rule4, Rule5, TableMonitorConf, TableMonitorRecord}
import com.hoult.utils.{DateTool, PropertiesUtils}
import org.apache.spark.sql.Dataset

import scala.collection.mutable.ListBuffer

object Monitor extends Father {

  val mysqlProps = PropertiesUtils.getMysqlProps()
  var broadTableConfs: Dataset[TableMonitorConf] = null

  def main(args: Array[String]): Unit = {

    val info: ObserverArgs = new ObserverArgs
    println("入?yún)?: " + args.mkString(","))
    JCommander.newBuilder().addObject(info).build().parse(args.toArray: _*)

    //廣播配置表
    prepare()

    //生成表 * 規(guī)則 個(gè) dataframe
    import spark.implicits._
    val tableConfArray: Array[TableMonitorConf] = spark.sql("select * from table_monitor_conf where db_table_name !='default.default'").as[TableMonitorConf].collect()
    val defaultTableConf = spark.sql("select * from table_monitor_conf where db_table_name ='default.default'").as[TableMonitorConf].collect().take(1)(0)
    var ll: ListBuffer[Dataset[TableMonitorRecord]] = ListBuffer[Dataset[TableMonitorRecord]]() //所有規(guī)則一起跑

    //默認(rèn)值填充
    val tConfs = tableConfArray.map( conf => {
      TableMonitorConf(
        if(conf.db_table_key == null) defaultTableConf.db_table_key else conf.db_table_key,
        conf.db_table_name,
        if (conf.table_charge_people == null) defaultTableConf.table_charge_people else conf.table_charge_people,
        if (conf.done_path == null) defaultTableConf.done_path else conf.done_path,
        if (conf.where_condition == null) defaultTableConf.where_condition else conf.where_condition,
        if (conf.if_done == null) defaultTableConf.if_done else conf.if_done,
        if (conf.if_check_partition == null) defaultTableConf.if_check_partition else conf.if_check_partition,
        if (conf.if_check_partition_count == null) defaultTableConf.if_check_partition_count else conf.if_check_partition_count,
        if (conf.if_check_partition_count_fluctuates == null) defaultTableConf.if_check_partition_count_fluctuates else conf.if_check_partition_count_fluctuates,
        if (conf.if_check_distribute == null) defaultTableConf.if_check_distribute else conf.if_check_distribute,
        if (conf.if_check_distribute_fluctuates == null) defaultTableConf.if_check_distribute_fluctuates else conf.if_check_distribute_fluctuates
      )})

    //遍歷所有規(guī)則
    for (elem <- tConfs) {
      //規(guī)則1
      if ("1".equals(elem.if_check_partition)) {
        ll +:= Rule1.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay)
      }

      //規(guī)則2
      if ("1".equals(elem.if_check_partition_count)) {
        ll +:= Rule2.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay)
      }

      //規(guī)則3
      if ("1".equals(elem.if_check_partition_count_fluctuates)) {
        ll +:= Rule3.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay)
      }
      //規(guī)則4
      if ("1".equals(elem.if_check_distribute)) {
        ll +:= Rule4.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay)
      }

      //規(guī)則5
      if ("1".equals(elem.if_check_distribute_fluctuates)) {
        ll +:= Rule5.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay)
      }
    }

    if (ll.size == 0)
      return
    ll.reduce(_.union(_)).select(
      "db_table_key",
      "db_table_name",
      "check_data_time",
      "rule_name",
      "rule_result",
      "rule_error",
      "checked_partition"
    ).createOrReplaceTempView("temp_table_rule_records")

    val partition = DateTool.getLatest30minutePatition
    spark.sql("set hive.reduce.tasks=1")
    spark.sql(s"insert overwrite table table_monitor.table_rule_records partition(dt=${info.runDay},hm=$partition) select * from temp_table_rule_records")

  }

  def prepare(): Unit = {
    import spark.implicits._
    //1.基礎(chǔ)配置表緩存到集群 table_monitor_conf
    val tableConfs: Dataset[TableMonitorConf] = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_conf", mysqlProps).as[TableMonitorConf].cache()
    tableConfs.createOrReplaceTempView("table_monitor_conf")

    //2.配置表緩存到集群 table_monitor_distribute_conf
    spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_distribute_conf", mysqlProps).cache().createOrReplaceTempView("table_monitor_distribute_conf")

  }

}

整理流程就是讀配置表的信息,包括設(shè)置默認(rèn)參數(shù),最后就是調(diào)用拿出來(lái)的配置表的信息和規(guī)則的信息

5.SunRobot說(shuō)明

這個(gè)模塊就是報(bào)警以及生成攔截done文件的模塊,主要功能就是根據(jù)前面的指標(biāo)流水以及配置表的配置的表元信息進(jìn)行比對(duì),看看是否滿足check要求,如果滿足了就生成done,如果不滿足就生成undone, 如果配置了報(bào)警,就會(huì)根據(jù)報(bào)警規(guī)則進(jìn)行報(bào)警,暫且只有發(fā)送到通訊工具一項(xiàng),主要代碼如下:

import com.beust.jcommander.JCommander
import com.hoult.common.{Father, Message, RobotConf}
import com.hoult.rules.TableMonitorConf
import com.hoult.utils.{DgsFileSystem, PropertiesUtils}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.Dataset
import org.joda.time.DateTime
import scalaj.http.Http

/**
 * 根據(jù)rule跑批結(jié)果 和 報(bào)警配置進(jìn)行報(bào)警
 */
object SunRobot extends Father {

  import spark.implicits._
  val mysqlProps = PropertiesUtils.getMysqlProps()
  def main(args: Array[String]): Unit = {

    val info: ObserverArgs = new ObserverArgs
    println("入?yún)?: " + args.mkString(","))
    JCommander.newBuilder().addObject(info).build().parse(args.toArray: _*)
    //1.基礎(chǔ)配置表緩存到集群 table_monitor_conf
    val tableConfs: Dataset[TableMonitorConf] = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_conf", mysqlProps).as[TableMonitorConf].cache()

    //2.默認(rèn)規(guī)則提取-->driver
    val defaultConf: TableMonitorConf = tableConfs.where("db_table_name='default.default'").as[TableMonitorConf].take(1)(0)

    //3.配置表緩存到集群 table_monitor_notify_conf
    import org.apache.spark.sql.functions.broadcast
    val tableNotifyConf = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_notify_conf", mysqlProps).as[TableNotifyConf].cache()
    broadcast(tableNotifyConf).createOrReplaceTempView("table_monitor_notify_conf")

    import spark.implicits._
    //從配置表?yè)迫⌒枰男畔?nbsp;and 默認(rèn)值填充
    val tConfs = tableConfs.map( conf => {
      TableMonitorConf(
        if (conf.db_table_key == null) defaultConf.db_table_key else conf.db_table_key,
        conf.db_table_name,
        if (conf.table_charge_people == null) defaultConf.table_charge_people else conf.table_charge_people,
        if (conf.done_path == null) defaultConf.done_path else conf.done_path,
        if (conf.where_condition == null) defaultConf.where_condition else conf.where_condition,
        if (conf.if_done == null) defaultConf.if_done else conf.if_done,
        if (conf.if_check_partition == null) defaultConf.if_check_partition else conf.if_check_partition,
        if (conf.if_check_partition_count == null) defaultConf.if_check_partition_count else conf.if_check_partition_count,
        if (conf.if_check_partition_count_fluctuates == null) defaultConf.if_check_partition_count_fluctuates else conf.if_check_partition_count_fluctuates,
        if (conf.if_check_distribute == null) defaultConf.if_check_distribute else conf.if_check_distribute,
        if (conf.if_check_distribute_fluctuates == null) defaultConf.if_check_distribute_fluctuates else conf.if_check_distribute_fluctuates
    )})

    broadcast(tConfs).createOrReplaceTempView("table_monitor_conf")

    val confUnions: Dataset[ConfUnion] = spark.sql(
      """
        |SELECT
        |       a.db_table_key,
        |       a.db_table_name,
        |       notify_enable,
        |       check_count_threshold,
        |       normal_produce_datetime,
        |       check_distribute_json_threshold,
        |       check_count_fluctuates_threshold,
        |       b.if_done,
        |       b.done_path,
        |       b.table_charge_people
        |FROM table_monitor_notify_conf a
        |LEFT JOIN table_monitor_conf b on a.db_table_name = b.db_table_name and a.db_table_key = b.db_table_key
        |where notify_enable='1'
        |""".stripMargin).as[ConfUnion]

    excute(confUnions, info.runDay)

  }

  def excute(confUnions: Dataset[ConfUnion], runDay: String): Unit = {

    val record = spark.sql(s"""
         |SELECT db_table_key,
         |       db_table_name,
         |       check_data_time,
         |       rule_name,
         |       rule_result,
         |       checked_partition,
         |       rule_error
         |FROM
         |  (SELECT db_table_key,
         |          db_table_name,
         |          check_data_time,
         |          rule_name,
         |          rule_result,
         |          rule_error,
         |          checked_partition,
         |          row_number() over(partition by db_table_key, db_table_name, rule_name
         |                            order by check_data_time desc) rn
         |   FROM table_monitor.table_rule_records
         |   WHERE dt='$runDay' ) tmp
         |   WHERE rn = 1
         |""".stripMargin)

    record
      .join(confUnions, Seq("db_table_name","db_table_key"), "left")
      .filter("notify_enable='1'")
      .createOrReplaceTempView("tmp_records")


    val now = DateTime.now().toString("yyyy-MM-dd HH:mm:ss")
val result = spark.sql("")
// done文件
    val notifyRecords = result
      .as[NotifyRecord]
      .collect()
      .map(r => NotifyRecord(
      r.db_table_name,
      r.view_url,
      r.db_table_key,
      r.rule_name,
      r.if_ok,
      if (r.db_table_key != null) s"${r.done_path}/${r.db_table_name}/${r.db_table_key}/" else s"${r.done_path}/${r.db_table_name}/default/",
      r.table_charge_people,
      r.trouble_description,
      r.check_data_time,
      r.checked_partition
    ))


    sc.makeRDD(notifyRecords).toDS().createOrReplaceTempView("tmp_notify_records")
    val dgs = DgsFileSystem.getFileSystem

    //寫日志記錄
    spark.sql("set hive.reduce.tasks = 1")
    spark.sql(s"insert overwrite table table_monitor.table_monitor_notify_records partition(dt=${runDay}) select * from tmp_notify_records")

    //取是否生成done 或者 undone文件 (針對(duì)只監(jiān)控超時(shí)產(chǎn)出的表,不需要做依賴,就不需要生成done依賴文件)
    val ifDoneMap = confUnions
      .selectExpr("concat(db_table_key, db_table_name) AS key_name", "if_done")
      .rdd
      .map(row => row.getAs("key_name").toString -> row.getAs("if_done").toString)
      .collectAsMap()

    //1.所有的寫done或者非done
    for (elem <- notifyRecords) {
      if (ifDoneMap.getOrElse(elem.db_table_key + elem.db_table_name, "0").equals("1")) {
        if ("0".equals(elem.if_ok)) {
          dgs.createNewFile(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.undone"))
        } else {
          dgs.deleteOnExit(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.undone"))
          dgs.createNewFile(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.done"))
        }
      }

    }

    //2.有問(wèn)題的才報(bào)警
    val notifyRecordsTmp = notifyRecords.filter(_.trouble_description != null)
    //2.1 取超時(shí)時(shí)間
    val normalTimeMap = spark.sql(
      s"""
         |SELECT concat(db_table_key, db_table_name) AS key_name,
         |       normal_produce_datetime
         |FROM table_monitor_notify_conf
         |where normal_produce_datetime is not null
         |GROUP BY db_table_key,
         |         db_table_name,
         |         normal_produce_datetime
         |""".stripMargin)
        .rdd
        .map(row => row.getAs("key_name").toString -> row.getAs("normal_produce_datetime").toString)
      .collectAsMap()


    for (elem <- notifyRecordsTmp) {
      //未生成且大于超時(shí)時(shí)間才報(bào)警
      if ("0".equals(elem.if_ok) && normalTimeMap.getOrElse(elem.db_table_key + elem.db_table_name, "08:00").compareTo(elem.check_data_time.substring(11, 16)) < 0) {
        resp(Message.build(RobotConf.GROUP_ID, elem.table_charge_people,
          s"""
             |【表監(jiān)控報(bào)警-${elem.db_table_key}/${elem.db_table_name}】
             |分區(qū): ${elem.checked_partition}
             |檢查時(shí)間:${elem.check_data_time}
             |規(guī)則:${elem.rule_name}
             |問(wèn)題:${elem.trouble_description}
             |詳情:${elem.view_url}""".stripMargin))
      }
    }


//      println("報(bào)警:" + Message.build(RobotConf.GROUP_ID, elem.table_charge_people, elem.db_table_name + "\n" + elem.trouble_description + "\n" + elem.view_url))
//      println("done:" + elem)
//      println("寫日志:" + elem)



  }


  def resp(msg: String) = {
    Http(RobotConf.API_URL)
      .header("Content-Type", "application/json")
      .postData(msg).asBytes
  }


  case class NotifyRecord(
                           db_table_name: String,
                           view_url: String,
                           db_table_key: String,
                           rule_name: String,
                           if_ok: String,
                           done_path: String,
                           table_charge_people: String,
                           trouble_description: String,
                           check_data_time: String,
                           checked_partition: String
                         )

  case class TableNotifyConf(
                                 var db_table_key: String,
                                 var db_table_name: String,
                                 var notify_enable: String,
                                 var check_count_threshold: String,
                                 var normal_produce_datetime: String,
                                 var check_distribute_json_threshold: String,
                                 var check_count_fluctuates_threshold: String
                               )

  case class ConfUnion(
                        var db_table_key: String,
                        var db_table_name: String,
                        var notify_enable: String,
                        var check_count_threshold: String,
                        var normal_produce_datetime: String,
                        var check_distribute_json_threshold: String,
                        var check_count_fluctuates_threshold: String,
                        var if_done: String,
                        var done_path: String,
                        var table_charge_people: String
                      )
}

“大數(shù)據(jù)開(kāi)發(fā)中數(shù)據(jù)表監(jiān)控怎么實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI