您好,登錄后才能下訂單哦!
Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。
CKafka 作為大數(shù)據(jù)架構(gòu)中的關(guān)鍵組件,起到了數(shù)據(jù)聚合,流量削峰,消息管道的作用。在 CKafka 上下游中的數(shù)據(jù)流轉(zhuǎn)中有各種優(yōu)秀的開源解決方案。如 Logstash,F(xiàn)ile Beats,Spark,F(xiàn)link 等等。下面將帶來一種新的解決方案:Serverless Function。其在學(xué)習(xí)成本,維護(hù)成本,擴(kuò)縮容能力等方面相對(duì)已有開源方案將有優(yōu)異的表現(xiàn)。
Tencent Cloud Kafka 是基于開源 Kafka 引擎研發(fā)的適合大規(guī)模公有云部署的 Cloud Kafka。是一款適合公有云部署、運(yùn)行、運(yùn)維的分布式的、高可靠、高吞吐和高可擴(kuò)展的消息隊(duì)列系統(tǒng)。它 100% 兼容開源的 Kafka API,目前主要支持開源的 0.9, 0.10, 1.1.1, 2.4.2 四個(gè)大版本 ,并提供向下兼容的能力。
目前 Tencent Cloud Kafka 維護(hù)了超過 4000+ 節(jié)點(diǎn)的集群,每日吞吐的消息量超過 9 萬億+條,峰值帶寬達(dá)到了 800GB+/s, 堆積數(shù)據(jù)達(dá)到了 20PB+。是一款集成了租戶隔離、限流、鑒權(quán)、安全、數(shù)據(jù)監(jiān)控告警、故障快速切換、跨可用區(qū)容災(zāi)等等一系列特性的,歷經(jīng)大流量檢驗(yàn)的、可靠的公有云上 Kafka 集群。
CKafka 作為一款高吞吐,高可靠的消息隊(duì)列引擎。需要承接大量數(shù)據(jù)的流入和流出,數(shù)據(jù)流動(dòng)的這一過程我們稱之它為數(shù)據(jù)流轉(zhuǎn)。而在處理數(shù)據(jù)的流入和流出過程中,會(huì)有很多成熟豐富的開源的解決方案,如 Logstash,Spark,F(xiàn)llink等。從簡單的數(shù)據(jù)轉(zhuǎn)儲(chǔ),到復(fù)雜的數(shù)據(jù)清洗,過濾,聚合等,都有現(xiàn)成的解決方案。
如圖所示,在 Kafka 上下游生態(tài)圖中,CKafka 處于中間層,起到數(shù)據(jù)聚合,流量削峰,消息管道的作用。圖左和圖上是數(shù)據(jù)寫入的組件概覽,圖右和圖下是下游流式數(shù)據(jù)處理方案和持久化存儲(chǔ)引擎。這些構(gòu)成了 Kafka 周邊的數(shù)據(jù)流動(dòng)的生態(tài)。
下圖是流式計(jì)算典型數(shù)據(jù)流動(dòng)示意圖。其中承接數(shù)據(jù)流轉(zhuǎn)方案的是各種開源解決方案。單純從功能和性能的角度來講,開源解決方案都有很優(yōu)秀的表現(xiàn)。
而從學(xué)習(xí)成本,維護(hù)成本,金錢成本,擴(kuò)縮容能力等角度來看,這些開源方案還是有欠缺的。怎么說呢?開源方案的缺點(diǎn)主要在于如下三點(diǎn):
學(xué)習(xí)成本
調(diào)優(yōu)、維護(hù)、解決問題的成本
擴(kuò)縮容能力
以 Logstash 為例,它的入門使用學(xué)習(xí)門檻不高,進(jìn)階使用有一定的成本,主要包括眾多 release 版本的使用成本,參數(shù)調(diào)優(yōu)和故障處理成本,后續(xù)的維護(hù)成本(進(jìn)程可用性,單機(jī)的負(fù)載處理)等。如果用流式計(jì)算引擎,如 spark 和 flink,其雖然具有分布式調(diào)度能力和即時(shí)的數(shù)據(jù)處理能力,但是其學(xué)習(xí)門檻和后期的集群維護(hù)成本,將大大提高。
來看 Serverless Function 是怎么處理數(shù)據(jù)流轉(zhuǎn)的。如圖所示,Serverless Function 運(yùn)行在數(shù)據(jù)的流入和流出的處理層的位置,代替了開源的解決方案。Serverless Function 是以自定義代碼的形式來實(shí)現(xiàn)數(shù)據(jù)清洗、過濾、聚合、轉(zhuǎn)儲(chǔ)等能力的。它具有學(xué)習(xí)成本低、無維護(hù)成本、自動(dòng)擴(kuò)縮容和按量計(jì)費(fèi)等優(yōu)秀特性。
接下來我們來看一下 Serverless Function 是怎么實(shí)現(xiàn)數(shù)據(jù)流轉(zhuǎn)的,并且了解一下其底層的運(yùn)行機(jī)制及其優(yōu)勢(shì)。
首先來看一下怎么使用 Serverless Function 實(shí)現(xiàn) Kafka To Elasticsearch 的數(shù)據(jù)流轉(zhuǎn)。下面以 Function 事件觸發(fā)的方式來說明 Function 是怎么實(shí)現(xiàn)低成本的數(shù)據(jù)清洗、過濾、格式化和轉(zhuǎn)儲(chǔ)的:
在業(yè)務(wù)錯(cuò)誤日志采集分析的場景中,會(huì)將機(jī)器上的日志信息采集并發(fā)送到服務(wù)端。服務(wù)端選擇 Kafka 作為消息中間件,起到數(shù)據(jù)可靠存儲(chǔ),流量削峰的作用。為了保存長時(shí)間的數(shù)據(jù)(月,年),一般會(huì)將數(shù)據(jù)清洗、格式化、過濾、聚合后,存儲(chǔ)到后端的分布式存儲(chǔ)系統(tǒng),如 HDFS,HBASE,Elasticsearch 中。
以下代碼段分為三部分:數(shù)據(jù)源的消息格式,處理后的目標(biāo)消息格式,功能實(shí)現(xiàn)的 Function 代碼段
源數(shù)據(jù)格式:
{ "version": 1, "componentName": "trade", "timestamp": 1595944295, "eventId": 9128499, "returnValue": -1, "returnCode": 101103, "returnMessage": "return has no deal return error[錯(cuò)誤:缺少**c參數(shù)][seqId:u3Becr8iz*]", "data": [], "seqId": "@kibana-highlighted-field@u3Becr8iz@/kibana-highlighted-field@*" }
目標(biāo)數(shù)據(jù)格式:
{ "timestamp": "2020-07-28 21:51:35", "returnCode": 101103, "returnError": "return has no deal return error", "returnMessage": "錯(cuò)誤:缺少**c參數(shù)", "requestId": "u3Becr8iz*" }
Function 代碼
Function 實(shí)現(xiàn)的功能是將數(shù)據(jù)從源格式,通過清洗,過濾,格式化轉(zhuǎn)化為目標(biāo)數(shù)據(jù)格式,并轉(zhuǎn)儲(chǔ)到 Elasticsearch。代碼的邏輯很簡單:CKafka 收到消息后,觸發(fā)了函數(shù)的執(zhí)行,函數(shù)接收到信息后會(huì)執(zhí)行 convertAndFilter 函數(shù)的過濾,重組,格式化操作,將源數(shù)據(jù)轉(zhuǎn)化為目標(biāo)格式,最后數(shù)據(jù)會(huì)被存儲(chǔ)到 Elasticsearch。
#!/usr/bin/python # -*- coding: UTF-8 -*- from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers esServer = "http://172.16.16.53:9200" # 修改為 es server 地址+端口 E.g. http://172.16.16.53:9200 esUsr = "elastic" # 修改為 es 用戶名 E.g. elastic esPw = "PW123" # 修改為 es 密碼 E.g. PW2312321321 esIndex = "pre1" # es 的 index 設(shè)置 # ... or specify common parameters as kwargs es = Elasticsearch([esServer], http_auth=(esUsr, esPw), sniff_on_start=False, sniff_on_connection_fail=False, sniffer_timeout=None) def convertAndFilter(sourceStr): target = {} source = json.loads(sourceStr) # 過濾掉returnCode=0的日志 if source["returnCode"] == 0: return dateArray = datetime.datetime.fromtimestamp(source["timestamp"]) target["timestamp"] = dateArray.strftime("%Y-%m-%d %H:%M:%S") target["returnCode"] = source["returnCode"] message = source["returnMessage"] message = message.split("][") errorInfo = message[0].split("[") target["returnError"] = errorInfo[0] target["returnMessage"] = errorInfo[1] target["requestId"] = message[1].replace("]", "").replace("seqId:", "") return target def main_handler(event, context): # 獲取 event Records 字段并做轉(zhuǎn)化操作 數(shù)據(jù)結(jié)構(gòu) https://cloud.tencent.com/document/product/583/17530 for record in event["Records"]: target = convertAndFilter(record) action = { "_index": esIndex, "_source": { "msgBody": target # 獲取 Ckafka 觸發(fā)器 msgBody } } helpers.bulk(es, action) return ("successful!")
看到這里,大家可能會(huì)發(fā)現(xiàn),這個(gè)代碼段平時(shí)是處理單機(jī)的少量數(shù)據(jù)的腳本是一樣的,就是做轉(zhuǎn)化,轉(zhuǎn)儲(chǔ),很簡單。其實(shí)很多分布式的系統(tǒng)做的系統(tǒng)從微觀的角度看,其實(shí)就是做的這么簡單的事情。分布式框架本身做的更多的是分布式調(diào)度,分布式運(yùn)行,可靠性,可用性等等工作,細(xì)化到執(zhí)行單元,功能其實(shí)和上面的代碼段是一樣的。
從宏觀來看,Serverless Function 做的事情和分布式計(jì)算框架 Spark, Flink 等做的事情是一樣的,都是調(diào)度,執(zhí)行基本的執(zhí)行單元,處理業(yè)務(wù)邏輯。區(qū)別在于用開源的方案,需要使用方去學(xué)習(xí),使用,維護(hù)運(yùn)行引擎,而 Serverless Function 則是平臺(tái)來幫用戶做這些事情。
接下來我們來看 Serverless Function 在底層是怎么去支持這些功能的,來看一下其底層的運(yùn)行機(jī)制。如圖所示:
Function 作為一個(gè)代碼片段,提交給平臺(tái)以后。需要有一種觸發(fā)函數(shù)運(yùn)行的方式,目前主要有如下三種:事件觸發(fā)、定時(shí)觸發(fā)和主動(dòng)觸發(fā)。
在上面的例子中,我們是以事件觸發(fā)為例的。當(dāng)消息提交到 Kafka,就會(huì)觸發(fā)函數(shù)的運(yùn)行。此時(shí) Serverless 調(diào)度運(yùn)行平臺(tái)就會(huì)調(diào)度底層的 Container 并發(fā)去執(zhí)行函數(shù),并執(zhí)行函數(shù)的邏輯。此時(shí)關(guān)于 Container 的并發(fā)度是由系統(tǒng)自動(dòng)調(diào)度,自動(dòng)計(jì)算的,當(dāng) Kafka 的源數(shù)據(jù)多的時(shí)候,并發(fā)量就大,當(dāng)數(shù)據(jù)少的時(shí)候,相應(yīng)的就會(huì)較少并發(fā)數(shù)。因?yàn)楹瘮?shù)是以運(yùn)行時(shí)長計(jì)費(fèi)的,當(dāng)源消息數(shù)據(jù)量少的時(shí)候,并發(fā)量小,自然運(yùn)行時(shí)長就少,自然所需付出的資金成本就降下來。
在函數(shù)執(zhí)行過程當(dāng)中,函數(shù)的可靠性運(yùn)行,自動(dòng)擴(kuò)縮容調(diào)度,并發(fā)度等都是用戶不需要關(guān)心的。用戶需要 Cover 的只是函數(shù)代碼段的可運(yùn)行,無 BUG。這對(duì)于研發(fā)人員的精力投入成本就降低很多。
值得一談的是,在開發(fā)語言方面,開源方案只支持其相對(duì)應(yīng)的語言,如 Logstash 的嵌入腳本用的是 ruby,spark 主要支持java,scala,python 等。而 Serverless Function 支持的是幾乎業(yè)界常見到的開發(fā)語言,包括不限于 java,golang,python,node JS,php 等等。這點(diǎn)就可以讓研發(fā)人員用其熟悉的語言去解決數(shù)據(jù)流轉(zhuǎn)問題,這在無形中就減少了很多代碼出錯(cuò)和出問題的機(jī)會(huì)。
下面我們來統(tǒng)一看一下 Serverless Function 和開源的方案的主要區(qū)別及優(yōu)勢(shì)。如圖5所示,和開源方案相比。在非實(shí)時(shí)的數(shù)據(jù)流轉(zhuǎn)場景中,Serverless Function 相對(duì)現(xiàn)有的開源方案,它具有的優(yōu)勢(shì)幾乎是壓倒性的。從功能和性能的角度,它在批式計(jì)算(實(shí)時(shí))的場景中是完全可以滿足的。但是它相對(duì)開源方案在學(xué)習(xí)成本,運(yùn)維成本幾乎可以忽略,其動(dòng)態(tài)擴(kuò)縮容,按需付費(fèi),毫秒級(jí)付費(fèi)對(duì)于資金成本的投入也是非常友好的。
用一句話總結(jié)就是:Serverless Function 能用一段熟悉的語言編寫一小段代碼去銜接契合流式計(jì)算中的數(shù)據(jù)流轉(zhuǎn)。
隨著流式計(jì)算的發(fā)展,慢慢演化出了批量計(jì)算 (batch computing)、流式計(jì)算 (stream computing)、交互計(jì)算 (interactive computing)、圖計(jì)算 (graph computing) 等方向。而架構(gòu)師在業(yè)務(wù)中選擇批式計(jì)算或者流式計(jì)算,其核心是希望按需使用批式計(jì)算或流式計(jì)算,以取得在延時(shí)、吞吐、容錯(cuò)、成本投入等方面的平衡。在使用者看來,批式處理可以提供精確的批式數(shù)據(jù)視圖,流式處理可以提供近實(shí)時(shí)的數(shù)據(jù)視圖。而在批式處理當(dāng)中,或者說在未來的批式處理和流式處理的底層技術(shù)的合流過程中,Lambda 架構(gòu)是其發(fā)展的必然路徑。
Serverless Function 以其按需使用,自動(dòng)擴(kuò)縮容及近乎無限的橫向擴(kuò)容能力給現(xiàn)階段的批式處理提供了一種選擇,并且在未來批流一體化的過程中,未來可期。
關(guān)于Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識(shí)。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。