溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么分析基于Spark的公安大數(shù)據(jù)實時運維技術(shù)實踐

發(fā)布時間:2021-12-17 11:38:44 來源:億速云 閱讀:153 作者:柒染 欄目:互聯(lián)網(wǎng)科技

怎么分析基于Spark的公安大數(shù)據(jù)實時運維技術(shù)實踐,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

公安行業(yè)存在數(shù)以萬計的前后端設(shè)備,前端設(shè)備包括相機、檢測器及感應(yīng)器,后端設(shè)備包括各級中心機房中的服務(wù)器、應(yīng)用服務(wù)器、網(wǎng)絡(luò)設(shè)備及機房動力系統(tǒng),數(shù)量巨大、種類繁多的設(shè)備給公安內(nèi)部運維管理帶來了巨大挑戰(zhàn)。傳統(tǒng)通過ICMP/SNMP、Trap/Syslog等工具對設(shè)備進行診斷分析的方式已不能滿足實際要求,由于公安內(nèi)部運維管理的特殊性,現(xiàn)行通過ELK等架構(gòu)的方式同樣也滿足不了需要。為尋求合理的方案,我們將目光轉(zhuǎn)向開源架構(gòu),構(gòu)建了一套適用于公安行業(yè)的實時運維管理平臺。

實時運維平臺整體架構(gòu)

  • 數(shù)據(jù)采集層:Logstash+Flume,負責(zé)在不同場景下收集、過濾各類前后端硬件設(shè)備輸出的Snmp Trap、Syslog日志信息以及應(yīng)用服務(wù)器自身產(chǎn)生的系統(tǒng)和業(yè)務(wù)日志;

  • 數(shù)據(jù)傳輸層:采用高吞吐的分布式消息隊列Kafka集群,保證匯聚的日志、消息的可靠傳輸;

  • 數(shù)據(jù)處理層:由Spark實時Pull Kafka數(shù)據(jù),通過Spark Streaming以及RDD操作進行數(shù)據(jù)流的處理以及邏輯分析;

  • 數(shù)據(jù)存儲層:實時數(shù)據(jù)存入MySQL中便于實時的業(yè)務(wù)應(yīng)用和展示;全量數(shù)據(jù)存入ES以及HBase中便于后續(xù)的檢索分析;

  • 業(yè)務(wù)服務(wù)層:基于存儲層,后續(xù)的整體業(yè)務(wù)應(yīng)用涵蓋了APM、網(wǎng)絡(luò)監(jiān)控、拓撲、告警、工單、CMDB等。

整體系統(tǒng)涉及的主要開源框架情況如下:

怎么分析基于Spark的公安大數(shù)據(jù)實時運維技術(shù)實踐

另外,整體環(huán)境基于JDK 8以及Scala 2.10.4。公安系統(tǒng)設(shè)備種類繁多,接下來將以交換機Syslog日志為例,詳細介紹日志處理分析的整體流程。

怎么分析基于Spark的公安大數(shù)據(jù)實時運維技術(shù)實踐
圖1 公安實時運維平臺整體架構(gòu)

Flume+Logstash日志收集

Flume是Cloudera貢獻的一個分布式、可靠及高可用的海量日志采集系統(tǒng),支持定制各類Source(數(shù)據(jù)源)用于數(shù)據(jù)收集,同時提供對數(shù)據(jù)的簡單處理以及通過緩存寫入Sink(數(shù)據(jù)接收端)的能力。

Flume中,Source、Channel及Sink的配置如下:

# 為該Flume Agent的source、channel以及sink命名 a1.sources = r1 a1.sinks = k1
a1.channels = c1 # 配置Syslog源 a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140 a1.sources.r1.host = localhost # Kafka Sink的相關(guān)配置 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = syslog-kafka
a1.sinks.k1.brokerList = gtcluster-slave01:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1 # Channel基于內(nèi)存作為緩存 a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 將Source以及Sink綁定至Channel a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

該配置通過syslog source配置localhost tcp 5140端口來接收網(wǎng)絡(luò)設(shè)備發(fā)送的Syslog信息,event緩存在內(nèi)存中,再通過KafkaSink將日志發(fā)送到kafka集群中名為“syslog-kafka”的topic中。

Logstash來自Elastic公司,專為收集、分析和傳輸各類日志、事件以及非結(jié)構(gòu)化的數(shù)據(jù)所設(shè)計。它有三個主要功能:事件輸入(Input)、事件過濾器(Filter)以及事件輸出(Output),在后綴為.conf的配置文件中設(shè)置,本例中Syslog配置如下:

# syslog.conf input {
    Syslog {
        port => "514" }
}
filter {
}
output {
    kafka {
        bootstrap_servers => "192.168.8.65:9092" topic_id => "syslog-kafka" compression_type => "snappy" codec => plain {
           format => "%{host} %{@timestamp} %{message}" }
    }
}

Input(輸入)插件用于指定各種數(shù)據(jù)源,本例中的Logstash通過udp 514端口接收Syslog信息;

Filter(過濾器)插件雖然在本例中不需要配置,但它的功能非常強大,可以進行復(fù)雜的邏輯處理,包括正則表達式處理、編解碼、k/v切分以及各種數(shù)值、時間等數(shù)據(jù)處理,具體可根據(jù)實際場景設(shè)置;

Output(輸出)插件用于將處理后的事件數(shù)據(jù)發(fā)送到指定目的地,指定了Kafka的位置、topic以及壓縮類型。在最后的Codec編碼插件中,指定來源主機的IP地址(host)、Logstash處理的時間戳(@timestamp)作為前綴并整合原始的事件消息(message),方便在事件傳輸過程中判斷Syslog信息來源。單條原始Syslog信息流樣例如下:

147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

Logstash Output插件處理后的信息流變成為:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

其中紅色字段就是codec編碼插件植入的host以及timestamp信息。處理后的Syslog信息會發(fā)送至Kafka集群中進行消息的緩存。

Kafka日志緩沖

Kafka是一個高吞吐的分布式消息隊列,也是一個訂閱/發(fā)布系統(tǒng)。Kafka集群中每個節(jié)點都有一個被稱為broker的實例,負責(zé)緩存數(shù)據(jù)。Kafka有兩類客戶端,Producer(消息生產(chǎn)者的)和Consumer(消息消費者)。Kafka中不同業(yè)務(wù)系統(tǒng)的消息可通過topic進行區(qū)分,每個消息都會被分區(qū),用以分擔(dān)消息讀寫負載,每個分區(qū)又可以有多個副本來防止數(shù)據(jù)丟失。消費者在具體消費某個topic消息時,指定起始偏移量。Kafka通過Zero-Copy、Exactly Once等技術(shù)語義保證了消息傳輸?shù)膶崟r、高效、可靠以及容錯性。

Kafka集群中某個broker的配置文件server.properties的部分配置如下:

########## Server Basics ########### # 為每一個broker設(shè)置獨立的數(shù)字作為id broker.id=1 ###### Socket Server Settings ###### # socket監(jiān)聽端口 port=9092 ########### Zookeeper ############## # Zookeeper的連接配置 zookeeper.connect=gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=3000

其中需指定集群里不同broker的id,此臺broker的id為1,默認(rèn)監(jiān)聽9092端口,然后配置Zookeeper(后續(xù)簡稱zk)集群,再啟動broker即可。

Kafka集群名為syslog-kafka的topic:

bin/kafka-topics.sh
--create --zookeeper gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181 --replication-factor 3 --partitions 3 --topic syslog-kafka

Kafka集群的topic以及partition等信息也可以通過登錄zk來觀察。然后再通過下列命令查看Kafka接收到的所有交換機日志信息:

bin/kafka-console-consumer.sh--zookeeper gtcluster-slave02:2181--from-beginning --topic Syslog-kafka

部分日志樣例如下:

10.1.1.10 2016-10-18T05:23:04.015Z <163>5585: Oct 18 13:22:45: %LINK-3-UPDOWN: Interface FastEthernet0/9, changed state to down 19.1.1.113 2016-10-18T05:24:04.425Z <149>10857: Oct 18 13:25:23.019 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to down 19.1.1.113 2016-10-18T05:24:08.312Z <149>10860: Oct 18 13:25:27.935 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to up

Spark日志處理邏輯

Spark是一個為大規(guī)模數(shù)據(jù)處理而生的快速、通用的引擎,在速度、效率及通用性上表現(xiàn)極為優(yōu)異。

在Spark主程序中,通過Scala的正則表達式解析Kafka Source中名為“syslog-kafka” 的topic中的所有Syslog信息,再將解析后的有效字段封裝為結(jié)果對象,最后通過MyBatis近實時地寫入MySQL中,供前端應(yīng)用進行實時地可視化展示。另外,全量數(shù)據(jù)存儲進入HBase及ES中,為后續(xù)海量日志的檢索分析及其它更高級的應(yīng)用提供支持。主程序示例代碼如下:

object SwSyslogProcessor { def main(args: Array[String]): Unit = { // 初始化SparkContext,批處理間隔5秒 val sparkConf: SparkConf = new SparkConf().setAppName("SwSyslogProcessorApp ")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(sparkConf, Seconds(5)) // 定義topic val topic = Set("syslog-kafka") // 定義kafka的broker list地址 val brokers = "192.168.8.65:9092,192.168.8.66:9092,192.168.8.67:9092" val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringDecoder") // 通過topic以及brokers,創(chuàng)建從kafka獲取的數(shù)據(jù)流 val swSyslogDstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topic) val totalcounts = ssc.sparkContext.accumulator(0L, "Total count") val lines = swSyslogDstream.map(x => x._2) // 將一行一行數(shù)據(jù)映射成SwSyslog對象 lines.filter(x => !x.isEmpty
            && x.contains("%LIN")
&& x.contains("Ethernet")
            .map(
                x => {
                    SwSyslogService.encapsulateSwSyslog(x) // 封裝并返回SwSyslog }).foreachRDD((s: RDD[SwSyslog], time: Time) => { // 遍歷DStream中的RDD if (!s.isEmpty()) { // 遍歷RDD中的分區(qū)記錄 s.foreachPartition {
                    records => { if (!records.isEmpty) records.toSet.foreach {
                            r: SwSyslog => // 統(tǒng)計當(dāng)前處理的記錄總數(shù) totalcounts.add(1L) // 保存SwSyslog信息到MySQL SwSyslogService.saveSwSyslog(r)
                        }
                    }
                }
            }
        }
        ) //啟動程序 ssc.start() // 阻塞等待 ssc.awaitTermination()
 }

整體的處理分析主要分為4步:

  1. 初始化SparkContext并指定Application的參數(shù);

  2. 創(chuàng)建基于Kafka topic “syslog-kafka” 的DirectStream;

  3. 將獲取的每一行數(shù)據(jù)映射為Syslog對象,調(diào)用Service進行對象封裝并返回;

  4. 遍歷RDD,記錄不為空時保存或者更新Syslog信息到MySQL中。

Syslog POJO的部分基本屬性如下:

@Table(name = "sw_syslog") public class SwSyslog { /**
     * 日志ID
     */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /**
     * 設(shè)備IP
     */ @Column(name = "dev_ip") private String devIp; /**
     * 服務(wù)器時間
     */ @Column(name = "server_time") private String serverTime; /**
     * 信息序號
     */ @Column(name = "syslog_num") private Long syslogNum;

    ……
}

SwSyslog實體中的基本屬性對應(yīng)Syslog中的接口信息,注解中的name對應(yīng)MySQL中的表sw_syslog 以及各個字段,MyBatis完成成員屬性和數(shù)據(jù)庫結(jié)構(gòu)的ORM(對象關(guān)系映射)。

程序中的SwSyslogService有兩個主要功能:

public static SwSyslog encapsulateSwSyslog(String syslogInfo) {
    SwSyslog swsyslog = new SwSyslog(); swsyslog.setDevIp(SwSyslogExtractorUtil.extractDevIp(syslogInfo)); swsyslog.setServerTime(SwSyslogExtractorUtil.extractServerTime(syslogInfo)); swsyslog.setSyslogNum(SwSyslogExtractorUtil.extractSyslogNum(syslogInfo)); swsyslog.setDevTime(SwSyslogExtractorUtil.extractDevTime(syslogInfo)); swsyslog.setSyslogType(SwSyslogExtractorUtil.extractSyslogType(syslogInfo)); swsyslog.setInfoType(SwSyslogExtractorUtil.extractInfoType(syslogInfo)); swsyslog.setDevInterface(SwSyslogExtractorUtil .extractDevInterface(syslogInfo)); swsyslog.setInterfaceState(SwSyslogExtractorUtil .extractInterfaceState(syslogInfo)); return swsyslog; }

public static void saveSwSyslog(SwSyslog swSyslog) {
    LOGGER.debug("開始保存或更新SwSyslog", swSyslog); // 根據(jù)ip查詢所有Syslog
    List<swsyslog> list = swSyslogMapper.queryAllByIp(swSyslog.getDevIp()); //  如果list非空,即查到對應(yīng)IP的SwSyslog
    if (list != null && !list.isEmpty()) {
        for (SwSyslog sys : list) {
            // 若IP的接口相同,則更新信息
            if (sys.getDevInterface().equals(swSyslog.getDevInterface())) {
                LOGGER.debug("有相同IP相同接口的記錄,開始更新SwSyslog"); sys.setServerTime(swSyslog.getServerTime()); sys.setSyslogNum(swSyslog.getSyslogNum()); sys.setDevTime(swSyslog.getDevTime()); sys.setSyslogType(swSyslog.getSyslogType()); sys.setInfoType(swSyslog.getInfoType()); sys.setInterfaceState(swSyslog.getInterfaceState()); sys.setUpdated(new Date()); swSyslogMapper.update(sys); } else {
                // 若接口不同,則直接保存
                LOGGER.debug("相同IP無對應(yīng)接口,保存SwSyslog"); swSyslog.setCreated(new Date()); swSyslog.setUpdated(swSyslog.getCreated()); swSyslogMapper.insert(swSyslog); }
        }
    } else {
        // 沒有對應(yīng)的IP記錄,直接保存信息
        LOGGER.debug("沒有相同IP記錄,直接保存SwSyslog"); swSyslog.setCreated(new Date()); swSyslog.setUpdated(swSyslog.getCreated()); swSyslogMapper.insert(swSyslog); }
}</swsyslog>

encapsulateSwSyslog()將Spark處理后的每一行Syslog通過Scala的正則表達式解析為不同的字段,然后封裝并返回Syslog對象;遍歷RDD分區(qū)生成的每一個Syslog對象中都有ip以及接口信息,saveSwSyslog()會據(jù)此判斷該插入還是更新Syslog信息至數(shù)據(jù)庫。另外,封裝好的Syslog對象通過ORM工具MyBatis與MySQL進行互操作。

獲取到的每一行Syslog信息如之前所述:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

這段信息需解析為設(shè)備ip、服務(wù)器時間、信息序號、設(shè)備時間、Syslog類型、屬性、設(shè)備接口、接口狀態(tài)等字段。Scala正則解析邏輯如下:

/**
      * 抽取服務(wù)器時間
      * 樣例:2016-10-09T10:04:54.517Z
      * @param line
      * @return */ def extractServerTime(line: String): String = { val regex1 = "20\\d{2}-\\d{2}-\\d{2}".r val regex2 = "\\d{2}:\\d{2}:\\d{2}.?(\\d{3})?".r val matchedDate = regex1.findFirstIn(line) val matchedTime = regex2.findFirstIn(line) val result1 = matchedDate match { case Some(date) => date case None => " " } val result2 = matchedTime match { case Some(time) => time case None => " " }
        result1 + " " + result2
} /**
      * 抽取設(shè)備時間
      * 樣例:Sep 29 09:33:06
      *       Oct  9 18:04:09.733
      * @param line
      * @return */ def extractDevTime(line: String): String = { val regex = "[a-zA-Z]{3}\\s+\\d+\\s\\d{2}:\\d{2}:\\d{2}((.\\d{3})|())".r val matchedDevTime = regex.findFirstIn(line) val result = matchedDevTime match { case Some(devTime) => devTime case None => " " }
        result
    }

通過正則過濾、Syslog封裝以及MyBatis持久層映射,Syslog接口狀態(tài)信息最終解析如下:

怎么分析基于Spark的公安大數(shù)據(jù)實時運維技術(shù)實踐

最后,諸如APM、網(wǎng)絡(luò)監(jiān)控或者告警等業(yè)務(wù)應(yīng)用便可以基于MySQL做可視化展示。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI