您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“大數(shù)據(jù)開(kāi)發(fā)中數(shù)據(jù)表監(jiān)控怎么實(shí)現(xiàn)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
大數(shù)據(jù)開(kāi)發(fā)-表數(shù)據(jù)波動(dòng)、碼值分布波動(dòng)監(jiān)控 && 報(bào)警,是關(guān)于理論和設(shè)計(jì)部分,初步計(jì)算已經(jīng)寫完,管理平臺(tái)部分,后續(xù)完善,本文主要針對(duì)模塊設(shè)計(jì)部分,整體模塊實(shí)現(xiàn)上是離線數(shù)據(jù)源的異步模塊,分為指標(biāo)跑批模塊,監(jiān)控報(bào)警模塊,平臺(tái)管理模塊,指標(biāo)跑批模塊和監(jiān)控報(bào)警模塊主要是基于離線數(shù)據(jù)的表的通訊,即不是耦合的架構(gòu),分別是兩個(gè)例行的任務(wù),看下面的圖,
其中表設(shè)計(jì)了五張表,分別是指標(biāo)跑批記錄表,報(bào)警跑批記錄表,和mysql表監(jiān)控配置表,報(bào)警配置表,分布字段配置表,指標(biāo)跑批和監(jiān)控跑批不直接任務(wù)依賴,而是通過(guò)指標(biāo)跑批記錄表,直接產(chǎn)生報(bào)警跑批記錄
整個(gè)項(xiàng)目目錄結(jié)構(gòu)如下圖:
resource: 配置文件
common: 一些公共的模塊,Builder是發(fā)送消息構(gòu)建器,F(xiàn)ather是Spark項(xiàng)目的公共代碼
rules: 下面有5個(gè)指標(biāo)的規(guī)則,分別是檢查分區(qū)是否存儲(chǔ),檢查分區(qū)數(shù)量是否大于某個(gè)值,檢查分區(qū)數(shù)量波動(dòng),檢查分布,檢查分布大向量波動(dòng)
utils: 里面放的是一些工具,比如日期處理工具類,表格式處理工具,sql處理工具等
Monitor: 指標(biāo)跑批的主類
SunRobot: 報(bào)警跑批的主類入庫(kù)
rule的一些實(shí)現(xiàn),不細(xì)說(shuō)了,根據(jù)源代碼很好看懂,而Monitor是怎么根據(jù)這些規(guī)則,生成對(duì)應(yīng)的流水,主要實(shí)現(xiàn)代碼如下:
package com.hoult import com.beust.jcommander.JCommander import com.hoult.common.Father import com.hoult.rules.{Rule1, Rule2, Rule3, Rule4, Rule5, TableMonitorConf, TableMonitorRecord} import com.hoult.utils.{DateTool, PropertiesUtils} import org.apache.spark.sql.Dataset import scala.collection.mutable.ListBuffer object Monitor extends Father { val mysqlProps = PropertiesUtils.getMysqlProps() var broadTableConfs: Dataset[TableMonitorConf] = null def main(args: Array[String]): Unit = { val info: ObserverArgs = new ObserverArgs println("入?yún)?: " + args.mkString(",")) JCommander.newBuilder().addObject(info).build().parse(args.toArray: _*) //廣播配置表 prepare() //生成表 * 規(guī)則 個(gè) dataframe import spark.implicits._ val tableConfArray: Array[TableMonitorConf] = spark.sql("select * from table_monitor_conf where db_table_name !='default.default'").as[TableMonitorConf].collect() val defaultTableConf = spark.sql("select * from table_monitor_conf where db_table_name ='default.default'").as[TableMonitorConf].collect().take(1)(0) var ll: ListBuffer[Dataset[TableMonitorRecord]] = ListBuffer[Dataset[TableMonitorRecord]]() //所有規(guī)則一起跑 //默認(rèn)值填充 val tConfs = tableConfArray.map( conf => { TableMonitorConf( if(conf.db_table_key == null) defaultTableConf.db_table_key else conf.db_table_key, conf.db_table_name, if (conf.table_charge_people == null) defaultTableConf.table_charge_people else conf.table_charge_people, if (conf.done_path == null) defaultTableConf.done_path else conf.done_path, if (conf.where_condition == null) defaultTableConf.where_condition else conf.where_condition, if (conf.if_done == null) defaultTableConf.if_done else conf.if_done, if (conf.if_check_partition == null) defaultTableConf.if_check_partition else conf.if_check_partition, if (conf.if_check_partition_count == null) defaultTableConf.if_check_partition_count else conf.if_check_partition_count, if (conf.if_check_partition_count_fluctuates == null) defaultTableConf.if_check_partition_count_fluctuates else conf.if_check_partition_count_fluctuates, if (conf.if_check_distribute == null) defaultTableConf.if_check_distribute else conf.if_check_distribute, if (conf.if_check_distribute_fluctuates == null) defaultTableConf.if_check_distribute_fluctuates else conf.if_check_distribute_fluctuates )}) //遍歷所有規(guī)則 for (elem <- tConfs) { //規(guī)則1 if ("1".equals(elem.if_check_partition)) { ll +:= Rule1.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規(guī)則2 if ("1".equals(elem.if_check_partition_count)) { ll +:= Rule2.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規(guī)則3 if ("1".equals(elem.if_check_partition_count_fluctuates)) { ll +:= Rule3.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規(guī)則4 if ("1".equals(elem.if_check_distribute)) { ll +:= Rule4.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規(guī)則5 if ("1".equals(elem.if_check_distribute_fluctuates)) { ll +:= Rule5.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } } if (ll.size == 0) return ll.reduce(_.union(_)).select( "db_table_key", "db_table_name", "check_data_time", "rule_name", "rule_result", "rule_error", "checked_partition" ).createOrReplaceTempView("temp_table_rule_records") val partition = DateTool.getLatest30minutePatition spark.sql("set hive.reduce.tasks=1") spark.sql(s"insert overwrite table table_monitor.table_rule_records partition(dt=${info.runDay},hm=$partition) select * from temp_table_rule_records") } def prepare(): Unit = { import spark.implicits._ //1.基礎(chǔ)配置表緩存到集群 table_monitor_conf val tableConfs: Dataset[TableMonitorConf] = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_conf", mysqlProps).as[TableMonitorConf].cache() tableConfs.createOrReplaceTempView("table_monitor_conf") //2.配置表緩存到集群 table_monitor_distribute_conf spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_distribute_conf", mysqlProps).cache().createOrReplaceTempView("table_monitor_distribute_conf") } }
整理流程就是讀配置表的信息,包括設(shè)置默認(rèn)參數(shù),最后就是調(diào)用拿出來(lái)的配置表的信息和規(guī)則的信息
這個(gè)模塊就是報(bào)警以及生成攔截done文件的模塊,主要功能就是根據(jù)前面的指標(biāo)流水以及配置表的配置的表元信息進(jìn)行比對(duì),看看是否滿足check要求,如果滿足了就生成done,如果不滿足就生成undone, 如果配置了報(bào)警,就會(huì)根據(jù)報(bào)警規(guī)則進(jìn)行報(bào)警,暫且只有發(fā)送到通訊工具一項(xiàng),主要代碼如下:
import com.beust.jcommander.JCommander import com.hoult.common.{Father, Message, RobotConf} import com.hoult.rules.TableMonitorConf import com.hoult.utils.{DgsFileSystem, PropertiesUtils} import org.apache.hadoop.fs.Path import org.apache.spark.sql.Dataset import org.joda.time.DateTime import scalaj.http.Http /** * 根據(jù)rule跑批結(jié)果 和 報(bào)警配置進(jìn)行報(bào)警 */ object SunRobot extends Father { import spark.implicits._ val mysqlProps = PropertiesUtils.getMysqlProps() def main(args: Array[String]): Unit = { val info: ObserverArgs = new ObserverArgs println("入?yún)?: " + args.mkString(",")) JCommander.newBuilder().addObject(info).build().parse(args.toArray: _*) //1.基礎(chǔ)配置表緩存到集群 table_monitor_conf val tableConfs: Dataset[TableMonitorConf] = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_conf", mysqlProps).as[TableMonitorConf].cache() //2.默認(rèn)規(guī)則提取-->driver val defaultConf: TableMonitorConf = tableConfs.where("db_table_name='default.default'").as[TableMonitorConf].take(1)(0) //3.配置表緩存到集群 table_monitor_notify_conf import org.apache.spark.sql.functions.broadcast val tableNotifyConf = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_notify_conf", mysqlProps).as[TableNotifyConf].cache() broadcast(tableNotifyConf).createOrReplaceTempView("table_monitor_notify_conf") import spark.implicits._ //從配置表?yè)迫⌒枰男畔?nbsp;and 默認(rèn)值填充 val tConfs = tableConfs.map( conf => { TableMonitorConf( if (conf.db_table_key == null) defaultConf.db_table_key else conf.db_table_key, conf.db_table_name, if (conf.table_charge_people == null) defaultConf.table_charge_people else conf.table_charge_people, if (conf.done_path == null) defaultConf.done_path else conf.done_path, if (conf.where_condition == null) defaultConf.where_condition else conf.where_condition, if (conf.if_done == null) defaultConf.if_done else conf.if_done, if (conf.if_check_partition == null) defaultConf.if_check_partition else conf.if_check_partition, if (conf.if_check_partition_count == null) defaultConf.if_check_partition_count else conf.if_check_partition_count, if (conf.if_check_partition_count_fluctuates == null) defaultConf.if_check_partition_count_fluctuates else conf.if_check_partition_count_fluctuates, if (conf.if_check_distribute == null) defaultConf.if_check_distribute else conf.if_check_distribute, if (conf.if_check_distribute_fluctuates == null) defaultConf.if_check_distribute_fluctuates else conf.if_check_distribute_fluctuates )}) broadcast(tConfs).createOrReplaceTempView("table_monitor_conf") val confUnions: Dataset[ConfUnion] = spark.sql( """ |SELECT | a.db_table_key, | a.db_table_name, | notify_enable, | check_count_threshold, | normal_produce_datetime, | check_distribute_json_threshold, | check_count_fluctuates_threshold, | b.if_done, | b.done_path, | b.table_charge_people |FROM table_monitor_notify_conf a |LEFT JOIN table_monitor_conf b on a.db_table_name = b.db_table_name and a.db_table_key = b.db_table_key |where notify_enable='1' |""".stripMargin).as[ConfUnion] excute(confUnions, info.runDay) } def excute(confUnions: Dataset[ConfUnion], runDay: String): Unit = { val record = spark.sql(s""" |SELECT db_table_key, | db_table_name, | check_data_time, | rule_name, | rule_result, | checked_partition, | rule_error |FROM | (SELECT db_table_key, | db_table_name, | check_data_time, | rule_name, | rule_result, | rule_error, | checked_partition, | row_number() over(partition by db_table_key, db_table_name, rule_name | order by check_data_time desc) rn | FROM table_monitor.table_rule_records | WHERE dt='$runDay' ) tmp | WHERE rn = 1 |""".stripMargin) record .join(confUnions, Seq("db_table_name","db_table_key"), "left") .filter("notify_enable='1'") .createOrReplaceTempView("tmp_records") val now = DateTime.now().toString("yyyy-MM-dd HH:mm:ss")
val result = spark.sql("") // done文件 val notifyRecords = result .as[NotifyRecord] .collect() .map(r => NotifyRecord( r.db_table_name, r.view_url, r.db_table_key, r.rule_name, r.if_ok, if (r.db_table_key != null) s"${r.done_path}/${r.db_table_name}/${r.db_table_key}/" else s"${r.done_path}/${r.db_table_name}/default/", r.table_charge_people, r.trouble_description, r.check_data_time, r.checked_partition )) sc.makeRDD(notifyRecords).toDS().createOrReplaceTempView("tmp_notify_records") val dgs = DgsFileSystem.getFileSystem //寫日志記錄 spark.sql("set hive.reduce.tasks = 1") spark.sql(s"insert overwrite table table_monitor.table_monitor_notify_records partition(dt=${runDay}) select * from tmp_notify_records") //取是否生成done 或者 undone文件 (針對(duì)只監(jiān)控超時(shí)產(chǎn)出的表,不需要做依賴,就不需要生成done依賴文件) val ifDoneMap = confUnions .selectExpr("concat(db_table_key, db_table_name) AS key_name", "if_done") .rdd .map(row => row.getAs("key_name").toString -> row.getAs("if_done").toString) .collectAsMap() //1.所有的寫done或者非done for (elem <- notifyRecords) { if (ifDoneMap.getOrElse(elem.db_table_key + elem.db_table_name, "0").equals("1")) { if ("0".equals(elem.if_ok)) { dgs.createNewFile(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.undone")) } else { dgs.deleteOnExit(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.undone")) dgs.createNewFile(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.done")) } } } //2.有問(wèn)題的才報(bào)警 val notifyRecordsTmp = notifyRecords.filter(_.trouble_description != null) //2.1 取超時(shí)時(shí)間 val normalTimeMap = spark.sql( s""" |SELECT concat(db_table_key, db_table_name) AS key_name, | normal_produce_datetime |FROM table_monitor_notify_conf |where normal_produce_datetime is not null |GROUP BY db_table_key, | db_table_name, | normal_produce_datetime |""".stripMargin) .rdd .map(row => row.getAs("key_name").toString -> row.getAs("normal_produce_datetime").toString) .collectAsMap() for (elem <- notifyRecordsTmp) { //未生成且大于超時(shí)時(shí)間才報(bào)警 if ("0".equals(elem.if_ok) && normalTimeMap.getOrElse(elem.db_table_key + elem.db_table_name, "08:00").compareTo(elem.check_data_time.substring(11, 16)) < 0) { resp(Message.build(RobotConf.GROUP_ID, elem.table_charge_people, s""" |【表監(jiān)控報(bào)警-${elem.db_table_key}/${elem.db_table_name}】 |分區(qū): ${elem.checked_partition} |檢查時(shí)間:${elem.check_data_time} |規(guī)則:${elem.rule_name} |問(wèn)題:${elem.trouble_description} |詳情:${elem.view_url}""".stripMargin)) } } // println("報(bào)警:" + Message.build(RobotConf.GROUP_ID, elem.table_charge_people, elem.db_table_name + "\n" + elem.trouble_description + "\n" + elem.view_url)) // println("done:" + elem) // println("寫日志:" + elem) } def resp(msg: String) = { Http(RobotConf.API_URL) .header("Content-Type", "application/json") .postData(msg).asBytes } case class NotifyRecord( db_table_name: String, view_url: String, db_table_key: String, rule_name: String, if_ok: String, done_path: String, table_charge_people: String, trouble_description: String, check_data_time: String, checked_partition: String ) case class TableNotifyConf( var db_table_key: String, var db_table_name: String, var notify_enable: String, var check_count_threshold: String, var normal_produce_datetime: String, var check_distribute_json_threshold: String, var check_count_fluctuates_threshold: String ) case class ConfUnion( var db_table_key: String, var db_table_name: String, var notify_enable: String, var check_count_threshold: String, var normal_produce_datetime: String, var check_distribute_json_threshold: String, var check_count_fluctuates_threshold: String, var if_done: String, var done_path: String, var table_charge_people: String ) }
“大數(shù)據(jù)開(kāi)發(fā)中數(shù)據(jù)表監(jiān)控怎么實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。