溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Serverless如何解決數(shù)據(jù)采集分析痛點(diǎn)

發(fā)布時(shí)間:2021-12-07 10:45:07 來源:億速云 閱讀:119 作者:柒染 欄目:云計(jì)算

本篇文章給大家分享的是有關(guān)Serverless如何解決數(shù)據(jù)采集分析痛點(diǎn),小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

簡(jiǎn)介: 眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹。在疫情之前的 2019 年,中國(guó)游戲市場(chǎng)營(yíng)收規(guī)模約 2884.8 億元,同比增長(zhǎng) 17.1%。2020 年因?yàn)橐咔?,游戲行業(yè)更是突飛猛進(jìn)。玩游戲本就是中國(guó)網(wǎng)民最普遍的娛樂方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計(jì),截至 2019 年,中國(guó)移動(dòng)游戲用戶規(guī)模約 6.6 億人,占中國(guó)總網(wǎng)民規(guī)模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習(xí)以為常的一部分。

眾所周知,游戲行業(yè)在當(dāng)今的互聯(lián)網(wǎng)行業(yè)中算是一棵常青樹。在疫情之前的 2019 年,中國(guó)游戲市場(chǎng)營(yíng)收規(guī)模約 2884.8 億元,同比增長(zhǎng) 17.1%。2020 年因?yàn)橐咔椋螒蛐袠I(yè)更是突飛猛進(jìn)。玩游戲本就是中國(guó)網(wǎng)民最普遍的娛樂方式之一,疫情期間更甚。據(jù)不完全統(tǒng)計(jì),截至 2019 年,中國(guó)移動(dòng)游戲用戶規(guī)模約 6.6 億人,占中國(guó)總網(wǎng)民規(guī)模 8.47 億的 77.92%,可見游戲作為一種低門檻、低成本的娛樂手段,已成為大部分人生活中習(xí)以為常的一部分。

對(duì)于玩家而言,市面上的游戲數(shù)量多如牛毛,那么玩家如何能發(fā)現(xiàn)和認(rèn)知到一款游戲,并且持續(xù)的玩下去恐怕是所有游戲廠商需要思考的問題。加之 2018 年游戲版號(hào)停發(fā)事件,游戲廠商更加珍惜每一個(gè)已獲得版號(hào)的游戲產(chǎn)品,所以這也使得“深度打磨產(chǎn)品質(zhì)量”和“提高運(yùn)營(yíng)精細(xì)程度”這兩個(gè)游戲產(chǎn)業(yè)發(fā)展方向成為廣大游戲廠商的發(fā)展思路,無論是新游戲還是老游戲都在努力落實(shí)這兩點(diǎn):

  • 新游戲:面向玩家需要提供更充足的推廣資源和更完整的游戲內(nèi)容。

  • 老游戲:通過用戶行為分析,投入更多的精力和成本,制作更優(yōu)質(zhì)的版本內(nèi)容。

這里我們重點(diǎn)來看新游戲。一家游戲企業(yè)辛辛苦苦研發(fā)三年,等著新游戲發(fā)售時(shí)一飛沖天。那么問題來了,新游戲如何被廣大玩家看到?

首先來看看游戲行業(yè)公司的分類:

  • 游戲研發(fā)商:研發(fā)游戲的公司,生產(chǎn)和制作游戲內(nèi)容。比如王者榮耀的所有英雄設(shè)計(jì)、游戲戰(zhàn)斗場(chǎng)景、戰(zhàn)斗邏輯等,全部由游戲研發(fā)公司提供。

  • 游戲發(fā)行商:游戲發(fā)行商的主要工作分三大塊:市場(chǎng)工作、運(yùn)營(yíng)工作、客服工作。游戲發(fā)行商把控游戲命脈,市場(chǎng)工作核心是導(dǎo)入玩家,運(yùn)營(yíng)工作核心是將用戶價(jià)值最大化、賺取更多利益。

  • 游戲平臺(tái)/渠道商:游戲平臺(tái)和渠道商的核心目的就是曝光游戲,讓盡量多的人能發(fā)現(xiàn)你的游戲。

這三種類型的業(yè)務(wù),有專注于其中某一領(lǐng)域的獨(dú)立公司,也有能承接全部業(yè)務(wù)的公司,但無論那一種,這三者之間的關(guān)系是不會(huì)變的:

所以不難理解,想讓更多的玩家看到你的游戲,游戲發(fā)行和運(yùn)營(yíng)是關(guān)鍵。通俗來講,如果你的游戲出現(xiàn)在目前所有大家熟知的平臺(tái)廣告中,那么最起碼游戲的新用戶注冊(cè)數(shù)量是很可觀的。因此這就引入了一個(gè)關(guān)鍵詞:買量。

根據(jù)數(shù)據(jù)顯示,2019 年月均買量手游數(shù)達(dá) 6000+ 款,而 2018 年僅為 4200 款。另一方面,隨著抖音、微博等超級(jí) APP 在游戲買量市場(chǎng)的資源傾斜,也助推手游買量的效果和效率有所提升,游戲廠商更愿意使用買量的方式來吸引用戶。

但需要注意的是,在游戲買量的精準(zhǔn)化程度不斷提高的同時(shí),買量的成本也在節(jié)節(jié)攀升,唯有合理配置買量、渠道與整合營(yíng)銷之間的關(guān)系,才能將宣發(fā)資源發(fā)揮到最大的效果。

通俗來講,買量其實(shí)就是在各大主流平臺(tái)投放廣告,廣大用戶看到游戲廣告后,有可能會(huì)點(diǎn)擊廣告,然后進(jìn)入游戲廠商的宣傳頁面,同時(shí)會(huì)采集用戶的一些信息,然后游戲廠商對(duì)采集到的用戶信息進(jìn)行大數(shù)據(jù)分析,進(jìn)行進(jìn)一步的定向推廣。

游戲運(yùn)營(yíng)核心訴求

游戲廠商花錢買量,換來的用戶信息以及新用戶注冊(cè)信息是為持續(xù)的游戲運(yùn)營(yíng)服務(wù)的,那么這個(gè)場(chǎng)景的核心訴求就是采集用戶信息的完整性。

比如說,某游戲廠商一天花 5000w 投放廣告,在某平臺(tái)某時(shí)段產(chǎn)生了每秒 1w 次的廣告點(diǎn)擊率,那么在這個(gè)時(shí)段內(nèi)每一個(gè)點(diǎn)擊廣告的用戶信息要完整的被采集到,然后入庫進(jìn)行后續(xù)分析。這就對(duì)數(shù)據(jù)采集系統(tǒng)提出了很高的要求。

這其中,最核心的一點(diǎn)就是系統(tǒng)暴露接口的環(huán)節(jié)要能夠平穩(wěn)承載買量期間不定時(shí)的流量脈沖。在買量期間,游戲廠商通常會(huì)在多個(gè)平臺(tái)投放廣告,每個(gè)平臺(tái)投放廣告的時(shí)間是不一樣的,所以就會(huì)出現(xiàn)全天不定時(shí)的流量脈沖現(xiàn)象。如果這個(gè)環(huán)節(jié)出現(xiàn)問題,那么相當(dāng)于買量的錢就打水漂了。

數(shù)據(jù)采集系統(tǒng)傳統(tǒng)架構(gòu)

上圖是一個(gè)相對(duì)傳統(tǒng)的數(shù)據(jù)采集系統(tǒng)架構(gòu),最關(guān)鍵的就是暴露 HTTP 接口回傳數(shù)據(jù)這部分,這部分如果出問題,那么采集數(shù)據(jù)的鏈路就斷了。但這部分往往會(huì)面臨兩個(gè)挑戰(zhàn):

  • 當(dāng)流量脈沖來的時(shí)候,這部分是否可以快速擴(kuò)容以應(yīng)對(duì)流量沖擊。

  • 游戲運(yùn)營(yíng)具備潮汐特性,并非天天都在進(jìn)行,這就需要考慮如何優(yōu)化資源利用率。

通常情況下,在游戲有運(yùn)營(yíng)活動(dòng)之前,會(huì)提前通知運(yùn)維同學(xué),對(duì)這個(gè)環(huán)節(jié)的服務(wù)增加節(jié)點(diǎn),但要增加多少其實(shí)是無法預(yù)估的,只能大概拍一個(gè)數(shù)字。這是在傳統(tǒng)架構(gòu)下經(jīng)常會(huì)出現(xiàn)的場(chǎng)景,這就會(huì)導(dǎo)致兩個(gè)問題:

  • 流量太大,節(jié)點(diǎn)加少了,導(dǎo)致一部分流量的數(shù)據(jù)沒有采集到。

  • 流量沒有預(yù)期那么大,節(jié)點(diǎn)加多了,導(dǎo)致資源浪費(fèi)。

數(shù)據(jù)采集系統(tǒng) Serverless 架構(gòu)

我們可以通過函數(shù)計(jì)算 FC 來取代傳統(tǒng)架構(gòu)中暴露 HTTP 回傳數(shù)據(jù)這部分,從而完美解決傳統(tǒng)架構(gòu)中存在問題。

傳統(tǒng)架構(gòu)中的兩個(gè)問題均可以通過函數(shù)計(jì)算百毫秒彈性的特性來解決。我們并不需要去估算營(yíng)銷活動(dòng)會(huì)帶來多大的流量,也不需要去擔(dān)心和考慮對(duì)數(shù)據(jù)采集系統(tǒng)的性能,運(yùn)維同學(xué)更不需要提前預(yù)備 ECS。

因?yàn)楹瘮?shù)計(jì)算的極致彈性特性,當(dāng)沒有買量、沒有營(yíng)銷活動(dòng)的時(shí)候,函數(shù)計(jì)算的運(yùn)行實(shí)例是零。有買量活動(dòng)時(shí),在流量脈沖的情況下,函數(shù)計(jì)算會(huì)快速拉起實(shí)例來承載流量壓力;當(dāng)流量減少時(shí),函數(shù)計(jì)算會(huì)及時(shí)釋放沒有請(qǐng)求的實(shí)例進(jìn)行縮容。所以 Serverless 架構(gòu)帶來的優(yōu)勢(shì)有以下三點(diǎn):

  • 無需運(yùn)維介入,研發(fā)同學(xué)就可以很快的搭建出來。

  • 無論流量大小,均可以平穩(wěn)的承接。

  • 函數(shù)計(jì)算拉起的實(shí)例數(shù)量可以緊貼流量大小的曲線,做到資源利用率最優(yōu)化,再加上按量計(jì)費(fèi)的模式,可以最大程度優(yōu)化成本。

架構(gòu)解析

從上面的架構(gòu)圖可以看到,整個(gè)采集數(shù)據(jù)階段,分了兩個(gè)函數(shù)來實(shí)現(xiàn),第一個(gè)函數(shù)的作用是單純的暴露 HTTP 接口接收數(shù)據(jù),第二個(gè)函數(shù)用于處理數(shù)據(jù),然后將數(shù)據(jù)發(fā)送至消息隊(duì)列 Kafka 和數(shù)據(jù)庫 RDS。

1. 接收數(shù)據(jù)函數(shù)

我們打開函數(shù)計(jì)算控制臺(tái),創(chuàng)建一個(gè)函數(shù):

  • 函數(shù)類型:HTTP(即觸發(fā)器為 HTTP)

  • 函數(shù)名稱:receiveData

  • 運(yùn)行環(huán)境:Python3

  • 函數(shù)實(shí)例類型:彈性實(shí)例

  • 函數(shù)執(zhí)行內(nèi)存:512MB

  • 函數(shù)運(yùn)行超時(shí)時(shí)間:60 秒

  • 函數(shù)單實(shí)例并發(fā)度:1

  • 觸發(fā)器類型:HTTP 觸發(fā)器

  • 觸發(fā)器名稱:defaultTrigger

  • 認(rèn)證方式:anonymous(即無需認(rèn)證)

  • 請(qǐng)求方式:GET,POST

創(chuàng)建好函數(shù)之后,我們通過在線編輯器編寫代碼:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
HELLO_WORLD = b'Hello world!\n'
def handler(environ, start_response):
    logger = logging.getLogger() 
    context = environ['fc.context']
    request_uri = environ['fc.request_uri']
    for k, v in environ.items():
      if k.startswith('HTTP_'):
        # process custom request headers
        pass
    try:        
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))    
    except (ValueError):        
        request_body_size = 0   
    # 接收回傳的數(shù)據(jù)
    request_body = environ['wsgi.input'].read(request_body_size)  
    request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
    request_body_obj = json.loads(request_body_str)
    logger.info(request_body_obj["action"])
    logger.info(request_body_obj["articleAuthorId"])

    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [HELLO_WORLD]

此時(shí)的代碼非常簡(jiǎn)單,就是接收用戶傳來的參數(shù),我們可以調(diào)用接口進(jìn)行驗(yàn)證:

可以在函數(shù)的日志查詢中看到此次調(diào)用的日志:

同時(shí),我們也可以查看函數(shù)的鏈路追蹤來分析每一個(gè)步驟的調(diào)用耗時(shí),比如函數(shù)接到請(qǐng)求→冷啟動(dòng)(無活躍實(shí)例時(shí))→準(zhǔn)備代碼→執(zhí)行初始化方法→執(zhí)行入口函數(shù)邏輯這個(gè)過程:

從調(diào)用鏈路圖中可以看到,剛才的那次請(qǐng)求包含了冷啟動(dòng)的時(shí)間,因?yàn)楫?dāng)時(shí)沒有活躍實(shí)例,整個(gè)過程耗時(shí) 418 毫秒,真正執(zhí)行入口函數(shù)代碼的時(shí)間為 8 毫秒。

當(dāng)再次調(diào)用接口時(shí),可以看到就直接執(zhí)行了入口函數(shù)的邏輯,因?yàn)榇藭r(shí)已經(jīng)有實(shí)例在運(yùn)行,整個(gè)耗時(shí)只有 2.3 毫秒

2. 處理數(shù)據(jù)的函數(shù)

第一個(gè)函數(shù)是通過在函數(shù)計(jì)算控制臺(tái)在界面上創(chuàng)建的,選擇了運(yùn)行環(huán)境是 Python3,我們可以在官方文檔中查看預(yù)置的 Python3 運(yùn)行環(huán)境內(nèi)置了哪些模塊,因?yàn)榈诙€(gè)函數(shù)要操作 Kafka 和 RDS,所以需要我們確認(rèn)對(duì)應(yīng)的模塊。


從文檔中可以看到,內(nèi)置的模塊中包含 RDS 的 SDK 模塊,但是沒有 Kafka 的 SDK 模塊,此時(shí)就需要我們手動(dòng)安裝 Kafka SDK 模塊,并且創(chuàng)建函數(shù)也會(huì)使用另一種方式。

1)Funcraft

Funcraft 是一個(gè)用于支持 Serverless 應(yīng)用部署的命令行工具,能幫助我們便捷地管理函數(shù)計(jì)算、API 網(wǎng)關(guān)、日志服務(wù)等資源。它通過一個(gè)資源配置文件(template.yml),協(xié)助我們進(jìn)行開發(fā)、構(gòu)建、部署操作。
所以第二個(gè)函數(shù)我們需要使用 Fun 來進(jìn)行操作,整個(gè)操作分為四個(gè)步驟:

  • 安裝 Fun 工具。

  • 編寫 template.yml 模板文件,用來描述函數(shù)。

  • 安裝我們需要的第三方依賴。

  • 上傳部署函數(shù)。

2)安裝 Fun

Fun 提供了三種安裝方式:

  • 通過 npm 包管理安裝 —— 適合所有平臺(tái)(Windows/Mac/Linux)且已經(jīng)預(yù)裝了 npm 的開發(fā)者。

  • 通過下載二進(jìn)制安裝 —— 適合所有平臺(tái)(Windows/Mac/Linux)。

  • 通過 Homebrew 包管理器安裝 —— 適合 Mac 平臺(tái),更符合 MacOS 開發(fā)者習(xí)慣。

文本示例環(huán)境為 Mac,所以使用 npm 方式安裝,非常的簡(jiǎn)單,一行命令搞定:

sudo npm install @alicloud/fun -g

安裝完成之后。在控制終端輸入 fun 命令可以查看版本信息:

$ fun --version
3.6.20

在第一次使用 fun 之前需要先執(zhí)行 fun config 命令進(jìn)行配置,按照提示,依次配置 Account ID、Access Key Id、Secret Access Key、 Default Region Name 即可。其中 Account ID、Access Key Id 你可以從函數(shù)計(jì)算控制臺(tái)首頁的右上方獲得:

fun config

? Aliyun Account ID *01
? Aliyun Access Key ID *qef6j
? Aliyun Access Key Secret *UFJG
? Default region name cn-hangzhou
? The timeout in seconds for each SDK client invoking 60
? The maximum number of retries for each SDK client 3

3)編寫 template.yml

新建一個(gè)目錄,在該目錄下創(chuàng)建一個(gè)名為 template.yml 的 YAML 文件,該文件主要描述要?jiǎng)?chuàng)建的函數(shù)的各項(xiàng)配置,說白了就是將函數(shù)計(jì)算控制臺(tái)上配置的那些配置信息以 YAML 格式寫在文件里:

ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
FCBigDataDemo:
Type: 'Aliyun::Serverless::Service'
Properties:
Description: 'local invoke demo'
VpcConfig:
VpcId: 'vpc-xxxxxxxxxxx'
VSwitchIds: [ 'vsw-xxxxxxxxxx' ]
SecurityGroupId: 'sg-xxxxxxxxx'
LogConfig:
Project: fcdemo
Logstore: fc_demo_store
dataToKafka:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: index.my_initializer
Handler: index.handler
CodeUri: './'
Description: ''
Runtime: python3

我們來解析以上文件的核心內(nèi)容:

  • FCBigDataDemo:自定義的服務(wù)名稱。通過下面的 Type 屬性標(biāo)明是服務(wù),即 Aliyun::Serverless::Service。

  • Properties:Properties 下的屬性都是該服務(wù)的各配置項(xiàng)。

  • VpcConfig:服務(wù)的 VPC 配置,包含:VpcId:VPC ID。VSwitchIds:交換機(jī) ID,這里是數(shù)組,可以配置多個(gè)交換機(jī)。SecurityGroupId:安全組 ID。

  • LogConfig:服務(wù)綁定的日志服務(wù)(SLS)配置,包含:Project:日志服務(wù)項(xiàng)目。Logstore:LogStore 名稱。

  • dataToKafka:該服務(wù)下自定義的函數(shù)名稱。通過下面的 Type 屬性標(biāo)明是函數(shù),即 Aliyun::Serverless::Function。

  • Properties:Properties下的屬性都是該函數(shù)的各配置項(xiàng)。

  • Initializer:配置初始化函數(shù)。

  • Handler:配置入口函數(shù)。

  • Runtime:函數(shù)運(yùn)行環(huán)境。


4)安裝第三方依賴

服務(wù)和函數(shù)的模板創(chuàng)建好之后,我們來安裝需要使用的第三方依賴。在這個(gè)示例的場(chǎng)景中,第二個(gè)函數(shù)需要使用 Kafka SDK,所以可以通過 fun 工具結(jié)合 Python 包管理工具 pip 進(jìn)行安裝:

fun install --runtime python3 --package-type pip kafka-python

執(zhí)行命令后有提示信息


此時(shí)我們會(huì)發(fā)現(xiàn)在目錄下會(huì)生成一個(gè).fun文件夾 ,我們安裝的依賴包就在該目錄下:


5)部署函數(shù)

現(xiàn)在編寫好了模板文件以及安裝好了我們需要的 Kafka SDK 后,還需要添加我們的代碼文件 index.py,代碼內(nèi)容如下:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
from kafka import KafkaProducer
producer = None
def my_initializer(context):    
    logger = logging.getLogger() 
    logger.info("init kafka producer")
    global producer
    producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
def handler(event, context):
    logger = logging.getLogger()   
    # 接收回傳的數(shù)據(jù)
    event_str = json.loads(event)
    event_obj = json.loads(event_str)
    logger.info(event_obj["action"])
    logger.info(event_obj["articleAuthorId"])
    # 向Kafka發(fā)送消息
    global producer
    producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
    producer.close()
    return 'hello world'

代碼很簡(jiǎn)單,這里做以簡(jiǎn)單的解析:

  • my_initializer:函數(shù)實(shí)例被拉起時(shí)會(huì)先執(zhí)行該函數(shù),然后再執(zhí)行 handler 函數(shù) ,當(dāng)函數(shù)實(shí)例在運(yùn)行時(shí),之后的請(qǐng)求都不會(huì)執(zhí)行 my_initializer 函數(shù) 。一般用于各種連接的初始化工作,這里將初始化 Kafka Producer 的方法放在了這里,避免反復(fù)初始化 Produer。

  •  handler:該函數(shù)只有兩個(gè)邏輯,接收回傳的數(shù)據(jù)和將數(shù)據(jù)發(fā)送至 Kafka 的指定 Topic。

  • 下面通過 fun deploy 命令部署函數(shù),該命令會(huì)做兩件事:根據(jù) template.yml 中的配置創(chuàng)建服務(wù)和函數(shù)。將 index.py 和 .fun 上傳至函數(shù)中。

登錄函數(shù)計(jì)算控制臺(tái),可以看到通過 fun 命令部署的服務(wù)和函數(shù)

進(jìn)入函數(shù),也可以清晰的看到第三方依賴包的目錄結(jié)構(gòu)

3. 函數(shù)之間調(diào)用

目前兩個(gè)函數(shù)都創(chuàng)建好了,下面的工作就是由第一個(gè)函數(shù)接收到數(shù)據(jù)后拉起第二個(gè)函數(shù)發(fā)送消息給 Kafka。我們只需要對(duì)第一個(gè)函數(shù)做些許改動(dòng)即可:

# -*- coding: utf-8 -*-
import logging
import json
import urllib.parse
import fc2
HELLO_WORLD = b'Hello world!\n'
client = None
def my_initializer(context):    
    logger = logging.getLogger() 
    logger.info("init fc client")
    global client
    client = fc2.Client(
        endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
        accessKeyID="your_ak",
        accessKeySecret="your_sk"
    )
def handler(environ, start_response):
    logger = logging.getLogger() 
    context = environ['fc.context']
    request_uri = environ['fc.request_uri']
    for k, v in environ.items():
      if k.startswith('HTTP_'):
        # process custom request headers
        pass
    try:        
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))    
    except (ValueError):        
        request_body_size = 0   
    # 接收回傳的數(shù)據(jù)
    request_body = environ['wsgi.input'].read(request_body_size)  
    request_body_str = urllib.parse.unquote(request_body.decode("GBK"))
    request_body_obj = json.loads(request_body_str)
    logger.info(request_body_obj["action"])
    logger.info(request_body_obj["articleAuthorId"])
    global client
    client.invoke_function(
        'FCBigDataDemo',
        'dataToKafka',
        payload=json.dumps(request_body_str),
        headers = {'x-fc-invocation-type': 'Async'}
    )

    status = '200 OK'
    response_headers = [('Content-type', 'text/plain')]
    start_response(status, response_headers)
    return [HELLO_WORLD]

如上面代碼所示,對(duì)第一個(gè)函數(shù)的代碼做了三個(gè)地方的改動(dòng):

  • 導(dǎo)入函數(shù)計(jì)算的庫:import fc2

  • 添加初始化方法,用于創(chuàng)建函數(shù)計(jì)算 Client:

def my_initializer(context):
        logger = logging.getLogger()
        logger.info("init fc client")
        global client
        client = fc2.Client(
            endpoint="http://your_account_id.cn-hangzhou-internal.fc.aliyuncs.com",
            accessKeyID="your_ak",
            accessKeySecret="your_sk"
)

這里需要注意的時(shí),當(dāng)我們?cè)诖a里增加了初始化方法后,需要在函數(shù)配置中指定初始化方法的入口

  • 通過函數(shù)計(jì)算 Client 調(diào)用第二個(gè)函數(shù)

global client
    client.invoke_function(
            'FCBigDataDemo',
            'dataToKafka',
          payload=json.dumps(request_body_str),
            headers = {'x-fc-invocation-type': 'Async'}
)

invoke_function 函數(shù)有四個(gè)參數(shù):

  • 第一個(gè)參數(shù):調(diào)用函數(shù)所在的服務(wù)名稱。

  • 第二個(gè)參數(shù):調(diào)用函數(shù)的函數(shù)名稱。

  • 第三個(gè)參數(shù):向調(diào)用函數(shù)傳的數(shù)據(jù)。

  • 第四個(gè)參數(shù):調(diào)用第二個(gè)函數(shù) Request Header 信息。這里主要通過 x-fc-invocation-type 這個(gè) Key 來設(shè)置是同步調(diào)用還是異步調(diào)用。這里設(shè)置 Async 為異步調(diào)用。

如此設(shè)置,我們便可以驗(yàn)證通過第一個(gè)函數(shù)提供的 HTTP 接口發(fā)起請(qǐng)求→采集數(shù)據(jù)→調(diào)用第二個(gè)函數(shù)→將數(shù)據(jù)作為消息傳給 Kafka 這個(gè)流程了。

使用兩個(gè)函數(shù)的目的

到這里有些同學(xué)可能會(huì)有疑問,為什么需要兩個(gè)函數(shù),而不在第一個(gè)函數(shù)里直接向 Kafka 發(fā)送數(shù)據(jù)呢?

當(dāng)我們使用異步調(diào)用函數(shù)時(shí),在函數(shù)內(nèi)部會(huì)默認(rèn)先將請(qǐng)求的數(shù)據(jù)放入消息隊(duì)列進(jìn)行第一道削峰填谷,然后每一個(gè)隊(duì)列在對(duì)應(yīng)函數(shù)實(shí)例,通過函數(shù)實(shí)例的彈性拉起多個(gè)實(shí)例進(jìn)行第二道削峰填谷。所以這也就是為什么這個(gè)架構(gòu)能穩(wěn)定承載大并發(fā)請(qǐng)求的核心原因之一。

4. 配置 Kafka

在游戲運(yùn)營(yíng)這個(gè)場(chǎng)景中,數(shù)據(jù)量是比較大的,所以對(duì) Kafka 的性能要求也是比較高的,相比開源自建,使用云上的 Kafka 省去很多的運(yùn)維操作,比如:

  • 我們不再需要再維護(hù) Kafka 集群的各個(gè)節(jié)點(diǎn)。

  • 不需要關(guān)心主從節(jié)點(diǎn)數(shù)據(jù)同步問題。

  • 可以快速、動(dòng)態(tài)擴(kuò)展 Kafka 集群規(guī)格,動(dòng)態(tài)增加 Topic,動(dòng)態(tài)增加分區(qū)數(shù)。

  • 完善的指標(biāo)監(jiān)控功能,消息查詢功能。

總的來說,就是一切 SLA 都有云上兜底,我們只需要關(guān)注在消息發(fā)送和消息消費(fèi)即可。

所以我們可以打開 Kafka 開通界面,根據(jù)實(shí)際場(chǎng)景的需求一鍵開通 Kafka 實(shí)例,開通 Kafka 后登錄控制臺(tái),在基本信息中可以看到 Kafka 的接入點(diǎn):

  • 默認(rèn)接入點(diǎn):走 VPC 內(nèi)網(wǎng)場(chǎng)景的接入點(diǎn)。

  • SSL 接入點(diǎn):走公網(wǎng)場(chǎng)景的接入點(diǎn)。

將默認(rèn)接入點(diǎn)配置到函數(shù)計(jì)算的第二個(gè)函數(shù)中即可。

....
producer = KafkaProducer(bootstrap_servers='XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092')
....

然后點(diǎn)擊左側(cè)控制臺(tái) Topic 管理,創(chuàng)建 Topic

將創(chuàng)建好的 Topic 配置到函數(shù)計(jì)算的第二個(gè)函數(shù)中即可。

...
# 第一個(gè)參數(shù)為Topic名稱
producer.send('ikf-demo', json.dumps(event_str).encode('utf-8'))
...

上文已經(jīng)列舉過云上 Kafka 的優(yōu)勢(shì),比如動(dòng)態(tài)增加 Topic 的分區(qū)數(shù),我們可以在 Topic 列表中,對(duì) Topic 的分區(qū)數(shù)進(jìn)行動(dòng)態(tài)調(diào)整。

單 Topic 最大支持到 360 個(gè)分區(qū),這是開源自建無法做到的。

接下來點(diǎn)擊控制臺(tái)左側(cè) Consumer Group 管理,創(chuàng)建 Consumer Group。

至此,云上的 Kafka 就算配置完畢了,即 Producer 可以往剛剛創(chuàng)建的 Topic 中發(fā)消息了,Consumer 可以設(shè)置剛剛創(chuàng)建的 GID 以及訂閱 Topic 進(jìn)行消息接受和消費(fèi)。

Flink Kafka 消費(fèi)者

在這個(gè)場(chǎng)景中,Kafka 后面往往會(huì)跟著 Flink,所以這里簡(jiǎn)要給大家介紹一下在 Flink 中如何創(chuàng)建 Kafka Consumer 并消費(fèi)數(shù)據(jù)。代碼片段如下:

final ParameterTool parameterTool = ParameterTool.fromArgs(args);
String kafkaTopic = parameterTool.get("kafka-topic","ikf-demo");
String brokers = parameterTool.get("brokers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092,XX.XX.XX.XX:9092");
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "ikf-demo");
FlinkKafkaConsumer<UserBehaviorEvent> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new UserBehaviorEventSchema(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserBehaviorEvent> dataStreamByEventTime = env.addSource(kafka);

以上就是構(gòu)建 Flink Kafka Consumer 和添加 Kafka Source 的代碼片段,還是非常簡(jiǎn)單的。

壓測(cè)驗(yàn)證

至此,整個(gè)數(shù)據(jù)采集的架構(gòu)就搭建完畢了,下面我們通過壓測(cè)來檢驗(yàn)一下整個(gè)架構(gòu)的性能。這里使用阿里云 PTS 來進(jìn)行壓測(cè)。

創(chuàng)建壓測(cè)場(chǎng)景

打開 PTS 控制臺(tái),點(diǎn)擊左側(cè)菜單創(chuàng)建壓測(cè)/創(chuàng)建 PTS 場(chǎng)景

在場(chǎng)景配置中,將第一個(gè)函數(shù)計(jì)算函數(shù)暴露的 HTTP 接口作為串聯(lián)鏈路。


接口配置完后,我們來配置施壓

  • 壓力模式:并發(fā)模式:指定有多少并發(fā)用戶同時(shí)發(fā)請(qǐng)求。RPS模式:指定每秒有多少請(qǐng)求數(shù)。

  • 遞增模式:在壓測(cè)過程中可以通過手動(dòng)調(diào)節(jié)壓力,也可以自動(dòng)按百分比遞增壓力。

  • 最大并發(fā):同時(shí)有多少個(gè)虛擬用戶發(fā)起請(qǐng)求。

  • 遞增百分比:如果是自動(dòng)遞增的話,按這里的百分比遞增。

  • 單量級(jí)持續(xù)時(shí)長(zhǎng):在未完全達(dá)到壓力全量的時(shí)候,每一級(jí)梯度的壓力保持的時(shí)長(zhǎng)。

  • 壓測(cè)總時(shí)長(zhǎng):一共需要壓測(cè)的時(shí)長(zhǎng)。

這里因?yàn)橘Y源成本原因,并發(fā)用戶數(shù)設(shè)置為 2500 來進(jìn)行驗(yàn)證。

從上圖壓測(cè)中的情況來看,TPS 達(dá)到了 2w 的封頂,549w+ 的請(qǐng)求,99.99% 的請(qǐng)求是成功的,那 369 個(gè)異常也可以點(diǎn)擊查看,都是壓測(cè)工具請(qǐng)求超時(shí)導(dǎo)致的。

至此,整個(gè)基于 Serverless 搭建的大數(shù)據(jù)采集傳輸?shù)募軜?gòu)就搭建好了,并且進(jìn)行了壓測(cè)驗(yàn)證,整體的性能也是不錯(cuò)的,并且整個(gè)架構(gòu)搭建起來也非常簡(jiǎn)單和容易理解。這個(gè)架構(gòu)不光適用于游戲運(yùn)營(yíng)行業(yè),其實(shí)任何大數(shù)據(jù)采集傳輸?shù)膱?chǎng)景都是適用的,目前也已經(jīng)有很多客戶正在基于 Serverless 的架構(gòu)跑在生產(chǎn)環(huán)境,或者正走在改造 Serverless 架構(gòu)的路上。

以上就是Serverless如何解決數(shù)據(jù)采集分析痛點(diǎn),小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI