溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架

發(fā)布時(shí)間:2020-08-11 10:30:02 來源:網(wǎng)絡(luò) 閱讀:861 作者:懂天馬 欄目:大數(shù)據(jù)

歡迎來到BigData的世界

現(xiàn)如今,我們來到了數(shù)據(jù)時(shí)代,數(shù)據(jù)信息化與我們的生活與工作息息相關(guān)。此篇文章簡(jiǎn)述利用大數(shù)據(jù)框架,實(shí)時(shí)處理數(shù)據(jù)的流程與相關(guān)框架的介紹,主要包括:

  • 數(shù)據(jù)實(shí)時(shí)處理的概念和意義

  • 數(shù)據(jù)實(shí)時(shí)處理能做什么

  • 數(shù)據(jù)實(shí)時(shí)處理架構(gòu)簡(jiǎn)介

  • 數(shù)據(jù)實(shí)時(shí)處理代碼演示

數(shù)據(jù)實(shí)時(shí)處理的概念和意義

什么是數(shù)據(jù)實(shí)時(shí)處理呢?我個(gè)人對(duì)數(shù)據(jù)實(shí)時(shí)處理的理解為:數(shù)據(jù)從生成->實(shí)時(shí)采集->實(shí)時(shí)緩存存儲(chǔ)->(準(zhǔn))實(shí)時(shí)計(jì)算->實(shí)時(shí)落地->實(shí)時(shí)展示->實(shí)時(shí)分析。這一個(gè)流程線下來,處理數(shù)據(jù)的速度在秒級(jí)甚至毫秒級(jí)。

數(shù)據(jù)實(shí)時(shí)處理有什么意義呢?我們得到數(shù)據(jù)可以進(jìn)行數(shù)據(jù)分析,利用數(shù)據(jù)統(tǒng)計(jì)方法,從錯(cuò)綜復(fù)雜的數(shù)據(jù)關(guān)系中梳理出事物的聯(lián)系,比如發(fā)展趨勢(shì)、影響因素、因果關(guān)系等。甚至建立一些BI,對(duì)一些數(shù)據(jù)的有用信息進(jìn)行可視化呈現(xiàn),并形成數(shù)據(jù)故事。

數(shù)據(jù)實(shí)時(shí)處理能做什么

數(shù)據(jù)的實(shí)時(shí)計(jì)算

何為數(shù)據(jù)的實(shí)時(shí)計(jì)算?我們從數(shù)據(jù)源端拿到數(shù)據(jù),可能不盡如人意,我們想對(duì)得到的數(shù)據(jù)進(jìn)行 ETL 操作、或者進(jìn)行關(guān)聯(lián)等等,那么我們就會(huì)用到數(shù)據(jù)的實(shí)時(shí)計(jì)算。目前主流的實(shí)時(shí)計(jì)算框架有 spark,storm,flink 等。

數(shù)據(jù)的實(shí)時(shí)落地

數(shù)據(jù)的實(shí)時(shí)落地,意思是將我們的源數(shù)據(jù)或者計(jì)算好的數(shù)據(jù)進(jìn)行實(shí)時(shí)的存儲(chǔ)。在大數(shù)據(jù)領(lǐng)域,推薦使用 HDFS,ES 等進(jìn)行存儲(chǔ)。

數(shù)據(jù)的實(shí)時(shí)展示與分析

我們拿到了數(shù)據(jù),要會(huì)用數(shù)據(jù)的價(jià)值。數(shù)據(jù)的價(jià)值體現(xiàn)在數(shù)據(jù)中相互關(guān)聯(lián)關(guān)系,或與歷史關(guān)聯(lián),或能預(yù)測(cè)未來。我們實(shí)時(shí)得到數(shù)據(jù),不僅能夠利用前端框架進(jìn)行實(shí)時(shí)展示,還可以對(duì)其中的一些數(shù)據(jù)進(jìn)行算法訓(xùn)練,預(yù)測(cè)未來走勢(shì)等。

example:

淘寶雙 11 大屏,每年的雙 11 是淘寶粉絲瘋狂的日子。馬云會(huì)在雙 11 的當(dāng)天在阿里總部豎起一面大的電子屏幕,展示淘寶這一天的成績(jī)。例如成交額,訪問人數(shù),訂單量,下單量,成交量等等。這個(gè)電子大屏的背后,就是用到的我們所說的數(shù)據(jù)的實(shí)時(shí)處理。首先,阿里的服務(wù)器遍布全國(guó)各地,這些服務(wù)器收集PC端、手機(jī)端等日志,上報(bào)到服務(wù)器,在服務(wù)上部署數(shù)據(jù)采集工具。接下來,由于數(shù)據(jù)量龐大,需要做數(shù)據(jù)的緩存緩沖處理。下一步,對(duì)原始日志進(jìn)行實(shí)時(shí)的計(jì)算,比如篩選出上面所述的各個(gè)指標(biāo)。最后,通過接口或者其他形式,進(jìn)行前端屏幕的實(shí)時(shí)展示。

數(shù)據(jù)實(shí)時(shí)處理架構(gòu)簡(jiǎn)介

接下來是我們介紹的重點(diǎn),先放一張數(shù)據(jù)流程圖:


簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架cdn.xitu.io/2018/9/3/1659d6798453f811?imageView2/0/w/1280/h/960/format/webp/ignore-error/1">


  • 數(shù)據(jù)采集端,選用目前采集數(shù)據(jù)的主流控件 flume。

  • 數(shù)據(jù)緩沖緩存,選用分布式消息隊(duì)列 kafka。

  • 數(shù)據(jù)實(shí)時(shí)計(jì)算,選用 spark 計(jì)算引擎。

  • 數(shù)據(jù)存儲(chǔ)位置,選用分布式數(shù)據(jù)存儲(chǔ) ES。

  • 其他,指從 ES 中拿到數(shù)據(jù)后進(jìn)行可視化展示,數(shù)據(jù)分析等。

下面將分別簡(jiǎn)單的介紹下各個(gè)組件:

flume

flume 是一個(gè)分布式的數(shù)據(jù)收集系統(tǒng),具有高可靠、高可用、事務(wù)管理、失敗重啟、聚合和傳輸?shù)裙δ?。?shù)據(jù)處理速度快,完全可以用于生產(chǎn)環(huán)境。

flume 的核心概念有:event,agent,source,channel,sink

event

flume 的數(shù)據(jù)流由事件 (event) 貫穿始終。event 是 flume 的基本數(shù)據(jù)單位,它攜帶日志數(shù)據(jù)并且攜帶數(shù)據(jù)的頭信息,這些 event 由 agent 外部的 source 生成,當(dāng) source 捕獲事件后會(huì)進(jìn)行特定的格式化,然后 source 會(huì)把事件推入 channel 中??梢园?channel 看作是一個(gè)緩沖區(qū),它將保存事件直到 sink 處理完該事件。sink 負(fù)責(zé)持久化日志或者把事件推向另一個(gè) source。

agent

flume 的核心是 agent。agent 是一個(gè) java 進(jìn)程,運(yùn)行在日志收集端,通過 agent 接收日志,然后暫存起來,再發(fā)送到目的地。 每臺(tái)機(jī)器運(yùn)行一個(gè) agent。 agent 里面可以包含多個(gè) source,channel,sink。

source

source 是數(shù)據(jù)的收集端,負(fù)責(zé)將數(shù)據(jù)捕獲后進(jìn)行特殊的格式化,將數(shù)據(jù)封裝到 event 里,然后將事件推入 channel 中。flume 提供了很多內(nèi)置的 source,支持 avro,log4j,syslog 等等。如果內(nèi)置的 source 無法滿足環(huán)境的需求,flume 還支持自定義 source。

channel

channel 是連接 source 和 sink 的組件,大家可以將它看做一個(gè)數(shù)據(jù)的緩沖區(qū)(數(shù)據(jù)隊(duì)列),它可以將事件暫存到內(nèi)存中也可以持久化到本地磁盤上, 直到 sink 處理完該事件。兩個(gè)較為常用的 channel,MemoryChannel 和 FileChannel。

sink

sink 從 channel 中取出事件,然后將數(shù)據(jù)發(fā)到別處,可以向文件系統(tǒng)、數(shù)據(jù)庫、hadoop、kafka,也可以是其他 agent 的 source。

flume 的可靠性與可恢復(fù)性

  • flume 的可靠性:當(dāng)節(jié)點(diǎn)出現(xiàn)故障時(shí),日志能夠被傳送到其他節(jié)點(diǎn)上而不會(huì)丟失。Flume 提供了可靠性保障,收到數(shù)據(jù)首先寫到磁盤上,當(dāng)數(shù)據(jù)傳送成功后,再刪除;如果數(shù)據(jù)發(fā)送失敗,可以重新發(fā)送。

  • flume 的可恢復(fù)性:可恢復(fù)性是靠 channel。

口述抽象,上兩張官網(wǎng)貼圖:

單個(gè) agent 收集數(shù)據(jù)流程圖


簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架多個(gè) agent 協(xié)作處理數(shù)據(jù)流程圖



簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架


kafka

Kafka 是一個(gè)高吞吐量的分布式發(fā)布-訂閱消息系統(tǒng)。企業(yè)中一般使用 kafka 做消息中間件,做緩沖緩存處理。需要 zookeeper 分布式協(xié)調(diào)組件管理。

kafka 的設(shè)計(jì)目標(biāo):

  1. 提供優(yōu)秀的消息持久化能力,對(duì) TB 級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間的訪問性能。

  2. 高吞吐率。即使在非常廉價(jià)的機(jī)器上也能做到每臺(tái)機(jī)每秒 100000 條消息的傳輸。

  3. 支持 kafka server 間的消息分區(qū),及分布式消費(fèi),同時(shí)保證每個(gè) partition 內(nèi)的消息順序傳輸。

  4. 同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。

kafka 核心概念

  • broker:消息中間件處理結(jié)點(diǎn),一個(gè) kafka 節(jié)點(diǎn)就是一個(gè) broker,多個(gè) broker 可以組成一個(gè) kafka 集群。

  • topic:主題,kafka 集群能夠同時(shí)負(fù)責(zé)多個(gè) topic 的分發(fā)。

  • partition:topic 物理上的分組,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。

  • offset:每個(gè) partition 都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到 partition 中。partition 中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做 offset,用于 partition 唯一標(biāo)識(shí)一條消息。

  • producer:負(fù)責(zé)發(fā)布消息到 kafka broker。

  • consumer:消息消費(fèi)者,向 kafka broker讀取消息的客戶端。

  • consumer group:每個(gè) consumer 屬于一個(gè)特定的 consumer group。

貼兩張官網(wǎng)圖

prodecer-broker-consumer


簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架分區(qū)圖


spark

spark 是一個(gè)分布式的計(jì)算框架,是我目前認(rèn)為最火的計(jì)算框架。

spark,是一種"one stack to rulethem all"的大數(shù)據(jù)計(jì)算框架,期望使用一個(gè)技術(shù)棧就完美地解決大數(shù)據(jù)領(lǐng)域的各種計(jì)算任務(wù)。apache 官方,對(duì) spark 的定義是:通用的大數(shù)據(jù)快速處理引擎(一“棧”式)。

spark組成
  1. spark core 用于離線計(jì)算

  2. spark sql 用于交互式查詢

  3. spark streaming,structed streaming 用于實(shí)時(shí)流式計(jì)算

  4. spark MLlib 用于機(jī)器學(xué)習(xí)

  5. spark GraphX 用于圖計(jì)算

spark 特點(diǎn)
  1. 速度快:spar k基于內(nèi)存進(jìn)行計(jì)算(當(dāng)然也有部分計(jì)算基于磁盤,比如 shuffle)。

  2. 容易上手開發(fā):spark 的基于 rdd 的計(jì)算模型,比 hadoop 的基于 map-reduce 的計(jì)算模型要更加易于理解,更加易于上手開發(fā),實(shí)現(xiàn)各種復(fù)雜功能。

  3. 通用性:spark 提供的技術(shù)組件,可以一站式地完成大數(shù)據(jù)領(lǐng)域的離線批處理、交互式查詢、流式計(jì)算、機(jī)器學(xué)習(xí)、圖計(jì)算等常見的任務(wù)。

  4. 與其他技術(shù)的完美集成:例如 hadoop,hdfs、hive、hbase 負(fù)責(zé)存儲(chǔ),yarn 負(fù)責(zé)資源調(diào)度,spark 負(fù)責(zé)大數(shù)據(jù)計(jì)算。

  5. 極高的活躍度:spark 目前是 apache 的頂級(jí)項(xiàng)目,全世界有大量的優(yōu)秀工程師是 spark 的 committer,并且世界上很多頂級(jí)的 IT 公司都在大規(guī)模地使用 spark。

貼個(gè)spark架構(gòu)圖


簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架


數(shù)據(jù)實(shí)時(shí)處理代碼演示

搭建好各個(gè)集群環(huán)境

需要搭建 flume 集群,kafka 集群,es 集群,zookeeper 集群,由于本例 spark 是在本地模式運(yùn)行,所以無需搭建 spark 集群。

配置好組件之間整合的配置文件

搭建好集群后,根據(jù)集群組件直接的整合關(guān)系,配置好配置文件。其中主要的配置為 flume 的配置,如下圖:


簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架可以看到,我們的 agent 的 source 為 r1,channel 為 c1,sink 為 k1,source 為我本地 nc 服務(wù),收集日志時(shí),只需要打開 9999 端口就可以把日志收集。channel 選擇為 memory 內(nèi)存模式。sink 為 kafka 的 topic8 主題。


開啟各個(gè)集群進(jìn)程

  1. 開啟 zookeeper 服務(wù)。其中 QuorumPeerMain 為 zookeeper 進(jìn)程。

  2. 開啟 kafka 服務(wù)。

  3. 開啟 es 服務(wù)。

  4. 開啟 flume 服務(wù)。其中 Application 為 flume 進(jìn)程。


簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架


創(chuàng)建好 es 對(duì)應(yīng) table

創(chuàng)建好 es 對(duì)應(yīng)的表,表有三個(gè)字段,對(duì)應(yīng)代碼里面的 case class(代碼隨后貼上)。


簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架



簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架



簡(jiǎn)述大數(shù)據(jù)實(shí)時(shí)處理框架


代碼如下:

package?run?import?org.apache.kafka.common.serialization.StringDeserializer?import?org.apache.log4j.Logger?import?org.apache.spark.{SparkConf,?SparkContext}?import?org.apache.spark.sql.SparkSession?import?org.apache.spark.streaming.dstream.DStream?import?org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe?import?org.apache.spark.streaming.kafka010.KafkaUtils?import?org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent?import?org.apache.spark.streaming.{Seconds,?StreamingContext}?import?org.elasticsearch.spark.rdd.EsSpark?/**???*?@author?wangjx???*?測(cè)試kafka數(shù)據(jù)進(jìn)行統(tǒng)計(jì)??kafka自身維護(hù)offset(建議使用自定義維護(hù)方式維護(hù)偏移量)???*/?object?SparkStreamingAutoOffsetKafka?{???//定義樣例類?與es表對(duì)應(yīng)???case?class?people(name:String,country:String,age:Int)???def?main(args:?Array[String]):?Unit?=?{?????val?logger?=?Logger.getLogger(this.getClass);?????//spark?配置?????val?conf?=?new?SparkConf().setAppName("SparkStreamingAutoOffsetKafka").setMaster("local[2]")?????conf.set("es.index.auto.create","true")?????conf.set("es.nodes","127.0.0.1")?????conf.set("es.port","9200")?????//spark?streaming實(shí)時(shí)計(jì)算初始化?定義每10秒一個(gè)批次?準(zhǔn)實(shí)時(shí)處理?企業(yè)一般都是準(zhǔn)實(shí)時(shí)?比如每隔10秒統(tǒng)計(jì)近1分鐘的數(shù)據(jù)等等?????val?ssc?=?new?StreamingContext(conf,?Seconds(10))?????val?spark?=?SparkSession.builder()???????.config(conf)???????.getOrCreate()?????spark.sparkContext.setLogLevel("WARN");?????//設(shè)置kafka參數(shù)?????val?kafkaParams?=?Map[String,?Object](???????"bootstrap.servers"?->?"x:9092",???????"key.deserializer"?->?classOf[StringDeserializer],???????"value.deserializer"?->?classOf[StringDeserializer],???????"group.id"?->?"exactly-once",???????"auto.offset.reset"?->?"latest",???????"enable.auto.commit"?->?(false:?java.lang.Boolean)?????)?????//kafka主題?????val?topic?=?Set("kafka8")?????//從kafka獲取數(shù)據(jù)?????val?stream?=?KafkaUtils.createDirectStream[String,?String](???????ssc,???????PreferConsistent,???????Subscribe[String,?String](topic,?kafkaParams)?????)?????//具體的業(yè)務(wù)邏輯?????val?kafkaValue:?DStream[String]?=?stream.flatMap(line=>Some(line.value()))?????val?peopleStream?=?kafkaValue???????.map(_.split(":"))???????//形成people樣例對(duì)象???????.map(m=>people(m(0),m(1),m(2).toInt))?????//存入ES?????peopleStream.foreachRDD(rdd?=>{???????EsSpark.saveToEs(rdd,?"people/man")?????})?????//啟動(dòng)程序入口?????ssc.start()?????ssc.awaitTermination()???}?}?復(fù)制代碼


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI