溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析

發(fā)布時(shí)間:2021-12-30 10:29:57 來源:億速云 閱讀:152 作者:柒染 欄目:云計(jì)算

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

 CKafka 作為大數(shù)據(jù)架構(gòu)中的關(guān)鍵組件,起到了數(shù)據(jù)聚合,流量削峰,消息管道的作用。在 CKafka 上下游中的數(shù)據(jù)流轉(zhuǎn)中有各種優(yōu)秀的開源解決方案。如 Logstash,F(xiàn)ile Beats,Spark,F(xiàn)link 等等。下面將帶來一種新的解決方案:Serverless Function。其在學(xué)習(xí)成本,維護(hù)成本,擴(kuò)縮容能力等方面相對(duì)已有開源方案將有優(yōu)異的表現(xiàn)。

Tencent Cloud Kafka 介紹

Tencent Cloud Kafka 是基于開源 Kafka 引擎研發(fā)的適合大規(guī)模公有云部署的 Cloud Kafka。是一款適合公有云部署、運(yùn)行、運(yùn)維的分布式的、高可靠、高吞吐和高可擴(kuò)展的消息隊(duì)列系統(tǒng)。它 100% 兼容開源的 Kafka API,目前主要支持開源的 0.9, 0.10, 1.1.1, 2.4.2 四個(gè)大版本 ,并提供向下兼容的能力。

目前 Tencent Cloud Kafka 維護(hù)了超過 4000+ 節(jié)點(diǎn)的集群,每日吞吐的消息量超過 9 萬億+條,峰值帶寬達(dá)到了 800GB+/s, 堆積數(shù)據(jù)達(dá)到了 20PB+。是一款集成了租戶隔離、限流、鑒權(quán)、安全、數(shù)據(jù)監(jiān)控告警、故障快速切換、跨可用區(qū)容災(zāi)等等一系列特性的,歷經(jīng)大流量檢驗(yàn)的、可靠的公有云上 Kafka 集群。

什么是數(shù)據(jù)流轉(zhuǎn)

CKafka 作為一款高吞吐,高可靠的消息隊(duì)列引擎。需要承接大量數(shù)據(jù)的流入和流出,數(shù)據(jù)流動(dòng)的這一過程我們稱之它為數(shù)據(jù)流轉(zhuǎn)。而在處理數(shù)據(jù)的流入和流出過程中,會(huì)有很多成熟豐富的開源的解決方案,如 Logstash,Spark,F(xiàn)llink等。從簡單的數(shù)據(jù)轉(zhuǎn)儲(chǔ),到復(fù)雜的數(shù)據(jù)清洗,過濾,聚合等,都有現(xiàn)成的解決方案。

如圖所示,在 Kafka 上下游生態(tài)圖中,CKafka 處于中間層,起到數(shù)據(jù)聚合,流量削峰,消息管道的作用。圖左和圖上是數(shù)據(jù)寫入的組件概覽,圖右和圖下是下游流式數(shù)據(jù)處理方案和持久化存儲(chǔ)引擎。這些構(gòu)成了 Kafka 周邊的數(shù)據(jù)流動(dòng)的生態(tài)。

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析

數(shù)據(jù)流轉(zhuǎn)新方案: Serverless Function

下圖是流式計(jì)算典型數(shù)據(jù)流動(dòng)示意圖。其中承接數(shù)據(jù)流轉(zhuǎn)方案的是各種開源解決方案。單純從功能和性能的角度來講,開源解決方案都有很優(yōu)秀的表現(xiàn)。

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析

而從學(xué)習(xí)成本,維護(hù)成本,金錢成本,擴(kuò)縮容能力等角度來看,這些開源方案還是有欠缺的。怎么說呢?開源方案的缺點(diǎn)主要在于如下三點(diǎn):

  • 學(xué)習(xí)成本

  • 調(diào)優(yōu)、維護(hù)、解決問題的成本

  • 擴(kuò)縮容能力

以 Logstash 為例,它的入門使用學(xué)習(xí)門檻不高,進(jìn)階使用有一定的成本,主要包括眾多 release 版本的使用成本,參數(shù)調(diào)優(yōu)和故障處理成本,后續(xù)的維護(hù)成本(進(jìn)程可用性,單機(jī)的負(fù)載處理)等。如果用流式計(jì)算引擎,如 spark 和 flink,其雖然具有分布式調(diào)度能力和即時(shí)的數(shù)據(jù)處理能力,但是其學(xué)習(xí)門檻和后期的集群維護(hù)成本,將大大提高。

來看 Serverless Function 是怎么處理數(shù)據(jù)流轉(zhuǎn)的。如圖所示,Serverless Function 運(yùn)行在數(shù)據(jù)的流入和流出的處理層的位置,代替了開源的解決方案。Serverless Function 是以自定義代碼的形式來實(shí)現(xiàn)數(shù)據(jù)清洗、過濾、聚合、轉(zhuǎn)儲(chǔ)等能力的。它具有學(xué)習(xí)成本低、無維護(hù)成本、自動(dòng)擴(kuò)縮容和按量計(jì)費(fèi)等優(yōu)秀特性。

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析

接下來我們來看一下 Serverless Function 是怎么實(shí)現(xiàn)數(shù)據(jù)流轉(zhuǎn)的,并且了解一下其底層的運(yùn)行機(jī)制及其優(yōu)勢(shì)。

Serverless Function 實(shí)現(xiàn)數(shù)據(jù)流轉(zhuǎn)

首先來看一下怎么使用 Serverless Function 實(shí)現(xiàn) Kafka To Elasticsearch 的數(shù)據(jù)流轉(zhuǎn)。下面以 Function 事件觸發(fā)的方式來說明 Function 是怎么實(shí)現(xiàn)低成本的數(shù)據(jù)清洗、過濾、格式化和轉(zhuǎn)儲(chǔ)的:

在業(yè)務(wù)錯(cuò)誤日志采集分析的場景中,會(huì)將機(jī)器上的日志信息采集并發(fā)送到服務(wù)端。服務(wù)端選擇 Kafka 作為消息中間件,起到數(shù)據(jù)可靠存儲(chǔ),流量削峰的作用。為了保存長時(shí)間的數(shù)據(jù)(月,年),一般會(huì)將數(shù)據(jù)清洗、格式化、過濾、聚合后,存儲(chǔ)到后端的分布式存儲(chǔ)系統(tǒng),如 HDFS,HBASE,Elasticsearch 中。

以下代碼段分為三部分:數(shù)據(jù)源的消息格式,處理后的目標(biāo)消息格式,功能實(shí)現(xiàn)的 Function 代碼段

  • 源數(shù)據(jù)格式:

        {
            "version": 1,
            "componentName": "trade",
            "timestamp": 1595944295,
            "eventId": 9128499,
            "returnValue": -1,
            "returnCode": 101103,
            "returnMessage": "return has no deal return error[錯(cuò)誤:缺少**c參數(shù)][seqId:u3Becr8iz*]",
            "data": [],
            "seqId": "@kibana-highlighted-field@u3Becr8iz@/kibana-highlighted-field@*"
        }
  • 目標(biāo)數(shù)據(jù)格式:

        {
            "timestamp": "2020-07-28 21:51:35",
            "returnCode": 101103,
            "returnError": "return has no deal return error",
            "returnMessage": "錯(cuò)誤:缺少**c參數(shù)",
            "requestId": "u3Becr8iz*"
        }
  • Function 代碼

Function 實(shí)現(xiàn)的功能是將數(shù)據(jù)從源格式,通過清洗,過濾,格式化轉(zhuǎn)化為目標(biāo)數(shù)據(jù)格式,并轉(zhuǎn)儲(chǔ)到 Elasticsearch。代碼的邏輯很簡單:CKafka 收到消息后,觸發(fā)了函數(shù)的執(zhí)行,函數(shù)接收到信息后會(huì)執(zhí)行 convertAndFilter 函數(shù)的過濾,重組,格式化操作,將源數(shù)據(jù)轉(zhuǎn)化為目標(biāo)格式,最后數(shù)據(jù)會(huì)被存儲(chǔ)到 Elasticsearch。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers

esServer = "http://172.16.16.53:9200"  # 修改為 es server 地址+端口 E.g. http://172.16.16.53:9200
esUsr = "elastic"  # 修改為 es 用戶名 E.g. elastic
esPw = "PW123"  # 修改為 es 密碼 E.g. PW2312321321
esIndex = "pre1"  # es 的 index 設(shè)置

# ... or specify common parameters as kwargs
es = Elasticsearch([esServer],
                   http_auth=(esUsr, esPw),
                   sniff_on_start=False,
                   sniff_on_connection_fail=False,
                   sniffer_timeout=None)

def convertAndFilter(sourceStr):
    target = {}
    source = json.loads(sourceStr)
    # 過濾掉returnCode=0的日志
    if source["returnCode"] == 0:
        return
    dateArray = datetime.datetime.fromtimestamp(source["timestamp"])
    target["timestamp"] = dateArray.strftime("%Y-%m-%d %H:%M:%S")
    target["returnCode"] = source["returnCode"]
    message = source["returnMessage"]
    message = message.split("][")
    errorInfo = message[0].split("[")
    target["returnError"] = errorInfo[0]
    target["returnMessage"] = errorInfo[1]
    target["requestId"] = message[1].replace("]", "").replace("seqId:", "")
    return target


def main_handler(event, context):
    # 獲取 event Records 字段并做轉(zhuǎn)化操作 數(shù)據(jù)結(jié)構(gòu) https://cloud.tencent.com/document/product/583/17530
    for record in event["Records"]:
        target = convertAndFilter(record)
        action = {
            "_index": esIndex,
            "_source": {
                "msgBody": target  # 獲取 Ckafka 觸發(fā)器 msgBody
            }
        }
        helpers.bulk(es, action)
    return ("successful!")

看到這里,大家可能會(huì)發(fā)現(xiàn),這個(gè)代碼段平時(shí)是處理單機(jī)的少量數(shù)據(jù)的腳本是一樣的,就是做轉(zhuǎn)化,轉(zhuǎn)儲(chǔ),很簡單。其實(shí)很多分布式的系統(tǒng)做的系統(tǒng)從微觀的角度看,其實(shí)就是做的這么簡單的事情。分布式框架本身做的更多的是分布式調(diào)度,分布式運(yùn)行,可靠性,可用性等等工作,細(xì)化到執(zhí)行單元,功能其實(shí)和上面的代碼段是一樣的。

從宏觀來看,Serverless Function 做的事情和分布式計(jì)算框架 Spark, Flink 等做的事情是一樣的,都是調(diào)度,執(zhí)行基本的執(zhí)行單元,處理業(yè)務(wù)邏輯。區(qū)別在于用開源的方案,需要使用方去學(xué)習(xí),使用,維護(hù)運(yùn)行引擎,而 Serverless Function 則是平臺(tái)來幫用戶做這些事情。

接下來我們來看 Serverless Function 在底層是怎么去支持這些功能的,來看一下其底層的運(yùn)行機(jī)制。如圖所示:

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析

Function 作為一個(gè)代碼片段,提交給平臺(tái)以后。需要有一種觸發(fā)函數(shù)運(yùn)行的方式,目前主要有如下三種:事件觸發(fā)、定時(shí)觸發(fā)和主動(dòng)觸發(fā)。

在上面的例子中,我們是以事件觸發(fā)為例的。當(dāng)消息提交到 Kafka,就會(huì)觸發(fā)函數(shù)的運(yùn)行。此時(shí) Serverless 調(diào)度運(yùn)行平臺(tái)就會(huì)調(diào)度底層的 Container 并發(fā)去執(zhí)行函數(shù),并執(zhí)行函數(shù)的邏輯。此時(shí)關(guān)于 Container 的并發(fā)度是由系統(tǒng)自動(dòng)調(diào)度,自動(dòng)計(jì)算的,當(dāng) Kafka 的源數(shù)據(jù)多的時(shí)候,并發(fā)量就大,當(dāng)數(shù)據(jù)少的時(shí)候,相應(yīng)的就會(huì)較少并發(fā)數(shù)。因?yàn)楹瘮?shù)是以運(yùn)行時(shí)長計(jì)費(fèi)的,當(dāng)源消息數(shù)據(jù)量少的時(shí)候,并發(fā)量小,自然運(yùn)行時(shí)長就少,自然所需付出的資金成本就降下來。

在函數(shù)執(zhí)行過程當(dāng)中,函數(shù)的可靠性運(yùn)行,自動(dòng)擴(kuò)縮容調(diào)度,并發(fā)度等都是用戶不需要關(guān)心的。用戶需要 Cover 的只是函數(shù)代碼段的可運(yùn)行,無 BUG。這對(duì)于研發(fā)人員的精力投入成本就降低很多。

值得一談的是,在開發(fā)語言方面,開源方案只支持其相對(duì)應(yīng)的語言,如 Logstash 的嵌入腳本用的是 ruby,spark 主要支持java,scala,python 等。而 Serverless Function 支持的是幾乎業(yè)界常見到的開發(fā)語言,包括不限于 java,golang,python,node JS,php 等等。這點(diǎn)就可以讓研發(fā)人員用其熟悉的語言去解決數(shù)據(jù)流轉(zhuǎn)問題,這在無形中就減少了很多代碼出錯(cuò)和出問題的機(jī)會(huì)。

Serverless Function 在數(shù)據(jù)流轉(zhuǎn)場景的優(yōu)勢(shì)

下面我們來統(tǒng)一看一下 Serverless Function 和開源的方案的主要區(qū)別及優(yōu)勢(shì)。如圖5所示,和開源方案相比。在非實(shí)時(shí)的數(shù)據(jù)流轉(zhuǎn)場景中,Serverless Function 相對(duì)現(xiàn)有的開源方案,它具有的優(yōu)勢(shì)幾乎是壓倒性的。從功能和性能的角度,它在批式計(jì)算(實(shí)時(shí))的場景中是完全可以滿足的。但是它相對(duì)開源方案在學(xué)習(xí)成本,運(yùn)維成本幾乎可以忽略,其動(dòng)態(tài)擴(kuò)縮容,按需付費(fèi),毫秒級(jí)付費(fèi)對(duì)于資金成本的投入也是非常友好的。

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析

用一句話總結(jié)就是:Serverless Function 能用一段熟悉的語言編寫一小段代碼去銜接契合流式計(jì)算中的數(shù)據(jù)流轉(zhuǎn)。

Serverless Function 在批式計(jì)算場景的展望

隨著流式計(jì)算的發(fā)展,慢慢演化出了批量計(jì)算 (batch computing)、流式計(jì)算 (stream computing)、交互計(jì)算 (interactive computing)、圖計(jì)算 (graph computing) 等方向。而架構(gòu)師在業(yè)務(wù)中選擇批式計(jì)算或者流式計(jì)算,其核心是希望按需使用批式計(jì)算或流式計(jì)算,以取得在延時(shí)、吞吐、容錯(cuò)、成本投入等方面的平衡。在使用者看來,批式處理可以提供精確的批式數(shù)據(jù)視圖,流式處理可以提供近實(shí)時(shí)的數(shù)據(jù)視圖。而在批式處理當(dāng)中,或者說在未來的批式處理和流式處理的底層技術(shù)的合流過程中,Lambda 架構(gòu)是其發(fā)展的必然路徑。

Serverless Function 以其按需使用,自動(dòng)擴(kuò)縮容及近乎無限的橫向擴(kuò)容能力給現(xiàn)階段的批式處理提供了一種選擇,并且在未來批流一體化的過程中,未來可期。

Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析

關(guān)于Serverless 銜接Kafka上下游數(shù)據(jù)流轉(zhuǎn)的實(shí)戰(zhàn)分析問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI