溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka數(shù)據(jù)如何同步至MaxCompute

發(fā)布時間:2021-12-15 10:45:55 來源:億速云 閱讀:133 作者:柒染 欄目:互聯(lián)網科技

Kafka數(shù)據(jù)如何同步至MaxCompute,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

一、背景介紹
1. 實驗目的
在日常工作中,很多企業(yè)將APP或網站產生的行為日志和業(yè)務數(shù)據(jù)通過Kafka收集之后做兩方面的處理。一方面是離線處理,一方面是實時處理。并且一般會投遞到MaxCompute中作為模型的構建,進行相關的業(yè)務處理,如用戶的特征、銷售排名、訂單地區(qū)分布等。這些數(shù)據(jù)形成之后會在數(shù)據(jù)報表中作為展示。

2. 方案說明
Kafka數(shù)據(jù)同步到DataWorks有兩條鏈路。一條鏈路是業(yè)務數(shù)據(jù)和行為日志通過Kafka,再通過Flume 上傳到Datahub,以及Max Compute,最終在QuickBI進行展示。另一條鏈路是業(yè)務數(shù)據(jù)和行為日志通過Kafka以及DataWorks,MaxCompute,最終在QuickBI當中展示。
本次展示Kafka通過DataWorks上傳到MaxCompute的流程。從DataWorks上傳到MaxCompute是通過兩種方案進行上傳數(shù)據(jù)同步的。方案一是自定義資源組,方案二是獨享資源組。自定義資源組一般適用于復雜網絡的數(shù)據(jù)上云場景。獨享資源組操作方式主要針對集成資源不足的情況。

Kafka數(shù)據(jù)如何同步至MaxComputecdn.com/4b5c9287751761510b5df37aabb73b67974bb430.png">

二、具體操作流程
1.Kafka消息隊列使用及其原理
Kafka產品概述:消息隊列 for Apache Kafka 是阿里云提供的分布式、高吞吐、可擴展的消息隊列服務。消息隊列for Apache Kafka一般用于日志收集、監(jiān)控數(shù)據(jù)聚合、流式數(shù)據(jù)處理、在線離線分析等大數(shù)據(jù)領域。消息隊列 for Apache Kafka 針對開源的 Apache Kafka 提供全托管服務,徹底解決開源產品長期以來的痛點。云上Kafka具有低成本、更彈性、更可靠的優(yōu)勢,用戶只需專注于業(yè)務開發(fā),無需部署運維。

Kafka架構介紹:一個典型的Kafka集群主要分為四部分。Producer生產數(shù)據(jù)并通過 push 模式向消息隊列 for Apache Kafka 的 Kafka Broker 發(fā)送消息。發(fā)送的消息可以是網站的頁面訪問、服務器日志,也可以是 CPU 和內存相關的系統(tǒng)資源信息。Kafka Broker用于存儲消息的服務器。Kafka Broker 支持水平擴展。 Kafka Broker 節(jié)點的數(shù)量越多,Kafka 集群的吞吐率越高。Kafka Broker針對topic會partition一個概念,partition有l(wèi)eader、follower的角色分配。Consumer通過 pull 模式從消息隊列 for Apache Kafka Broker 訂閱并消費leader的信息數(shù)據(jù)。其中partition內部有offset作為消息的消費點位。通過ZooKeeper管理集群的配置、選舉 leader 分區(qū),并且在Consumer Group 發(fā)生變化時,管理partition_leader的負載均衡

Kafka數(shù)據(jù)如何同步至MaxCompute

Kafka消息隊列購買以及部署:用戶首先可以到Kafka消息隊列產品頁面點擊購買,根據(jù)個人情況選擇對應包年、包月等消費方式、地區(qū)、實例類型、磁盤、流量以及消息存放時間。其中較為重要的一點是要選擇對應地區(qū),如果用戶的MaxCompute在華北,那么盡量選擇華北地區(qū)。選擇開通完成后需要進行部署。點擊部署,選擇合適的VPC及其交換機進行部署。

部署完成后進入Kafka Topic管理頁面,點擊創(chuàng)建Topic輸入自己的Topic。Topic命名下面有三條注意信息,命名盡量跟自己的業(yè)務一致,比如是財經業(yè)務或者是商務業(yè)務,盡量進行區(qū)分。第四步進入Consumer Group管理,點擊創(chuàng)建Consumer Group創(chuàng)建自己所需要的Consumer Group。Consumer Group的命名也需要規(guī)范,如果是財經或商務業(yè)務,盡量和自己的Topic相對應。

Kafka數(shù)據(jù)如何同步至MaxCompute

Kafka白名單配置:Kafka安裝部署完成之后確認需要訪問Kafka的服務器或產品的白名單。下圖中的默認接入點即為訪問接口。

2.資源組介紹及其配置

自定義資源組的使用背景:自定義資源組一般針對IDC之間的網絡問題。本地網絡和云上網絡存在差異,如DataWorks可以通過免費傳輸能力(默認任務資源組)進行海量數(shù)據(jù)上云,但默認資源組無法實現(xiàn)傳輸速度存在較高要求或復雜環(huán)境中的數(shù)據(jù)源同步上云的需求。此時用戶可以使用自定義資源組可實現(xiàn)復雜環(huán)境同步上云的需求,解決DataWorks默 認資源組與您的數(shù)據(jù)源不通的問題,或實現(xiàn)更高速度的傳輸能力。然而,自定義資源組主要解決的還是復雜網絡環(huán)境上云同步問題,打通任意網絡環(huán)境之間的數(shù)據(jù)傳輸同步。

Kafka數(shù)據(jù)如何同步至MaxCompute

自定義資源組的配置:自定義資源組的配置需要六步操作,首先點擊進入DataWorks控制臺,點開工作空間的列表,選擇用戶需要的項目空間,點擊進入數(shù)據(jù)集成,即確認自己的數(shù)據(jù)集成是要在哪個空間項目下進行添加。之后,點擊進入數(shù)據(jù)源界面,點擊新增自定義資源組。要注意頁面右上角的新增自定義資源組是只有項目管理員有權限添加。

第三步是確認Kafka與需要添加的自定義資源組屬于同一個VPC下。本次實驗是ECS向Kafka發(fā)送消息,二者的VPC應該一致。第四步登錄ECS,即個人的自定義資源組。執(zhí)行命令dmidecode|grep UUID得到ECS的UUID。

Kafka數(shù)據(jù)如何同步至MaxCompute

第五步是將添加服務器UUID以及自定義資源組的IP或機器CPU和內存填寫進來。最后是在ECS上執(zhí)行相關命令,Agent安裝共5步,做一一確認,在第4小步完成后點擊刷新查看服務是否為可用狀態(tài)。添加完成后進行檢查連通測試,檢查是否添加成功。

獨享資源組的使用背景:一些客戶反映在Kafka同步到MaxCompute時會報資源不足的問題,可以通過新增獨享資源組的方式進行數(shù)據(jù)同步。獨享資源模式下,機器的物理資源(網絡、磁盤、CPU和內存等)完全獨享。不僅可以隔離用戶間的資源使用,也可以隔離不同工作空間任務的資源使用。此外,獨享資源也支持靈活的擴容、縮容功能,可以滿足資源獨 
享、靈活配置等需求。獨享資源組可以訪問在同一地域下的VPC數(shù)據(jù)源,同時也可以訪問跨地域的公網RDS地址。

獨享資源組的配置:獨享資源組的配置主要需要兩步操作,首先進入DataWorks控制臺的資源列表,點擊新增獨享資源組,包括獨享集成資源組和獨享調度資源組。此處選擇新增獨享集成資源組,點擊購買時仍要注意選擇對應的購買方式、區(qū)域、資源、內存、時間期限、數(shù)量等。

購買完成后需要把獨享集成資源組綁定到與Kafka對應的VPC,點擊專有網絡綁定,選擇與Kafka對應的交換機(最明顯的是可用區(qū)的區(qū)別)、安全組。

3.同步過程及其注意事項
Kafka同步到MaxCompute的需要進行相關參數(shù)配置同時需要注意以下幾個事項。
DataWorks數(shù)據(jù)集成操作:進入DataWorks操作界面,點擊創(chuàng)建業(yè)務流程,在新建的業(yè)務流程添加數(shù)據(jù)同步節(jié)點,再進行命名。

進入數(shù)據(jù)同步節(jié)點,包括Reader端和Writer端,點擊Reader端數(shù)據(jù)源為Kafka,Writer端數(shù)據(jù)源為ODPS。點擊轉化為腳本模式。Reader或Writer端的一些同步參數(shù)可以在此處就近點擊,方便閱讀、操作和理解。

Kafka Reader的主要參數(shù):Kafka Reader的主要參數(shù)首先server,上文所述Kafka的默認接入點就是其中一個server,ip:port。注意此處server是必填參數(shù)。topic,表示在Kafka部署完成之后,Kafka處理數(shù)據(jù)源的topic,此處也是必填參數(shù)。下一個參數(shù)是針對列column,column支持常量列、數(shù)據(jù)列、屬性列。常量列和數(shù)據(jù)列不太重要。同步的完整消息一般存放在屬性列 value 中,如果需要其它信息,如partition、offset、timestamp,也可以在屬性列中篩選。column是必填參數(shù)。

keyType、valueType各有6種類型,根據(jù)用戶同步的數(shù)據(jù),選擇相應的信息,同步一個類型。需要注意同步方式是按消息時間同步,還是按消費點位置同步的。按數(shù)據(jù)消費點位置同步有四個場景,beginDateTime,endDateTime,beginOffset,endOffset。 beginDateTime 和beginOffset 二選其一,作為數(shù)據(jù)消費起點。endDateTime 和endOffset 二選其一。需要注意beginDateTime、endDateTime 中需要Kafka0.10.2版本以上才支持按數(shù)據(jù)消費點位置同步功能。另外需要注意beginOffset有三個比較特殊的形式:seekToBeginning,表示從開始點位消費數(shù)據(jù);seekToLast,表示從上次消費的偏移位置消費數(shù)據(jù),按照beginOffset從上次偏移位置只能一次消費,如果使用beginDateTime則可以多次消費,這取決于消息存放時間;seekToEnd,表示從最后點位消費數(shù)據(jù),會讀取到空數(shù)據(jù)。

skipExceeedRecord沒有太大作用,是不必填項。partition對topic所有分區(qū)共同讀消費的,所以無需自定義一個分區(qū),是非必填項。kafkaConfig,如果有其它相關配置參數(shù)可以擴展配置在kafkaConfig,kafkaConfig也是非必填項。

MaxCompute Writer的主要參數(shù):dataSource是數(shù)據(jù)源名稱,添加ODPS數(shù)據(jù)源。tables,表示所創(chuàng)建的數(shù)據(jù)表的表名稱,Kafka的數(shù)據(jù)要同步到哪張表中,相應的字段也可以建立。
partition,如果表為分區(qū)表,則必須配置到最后一級分區(qū),確定同步位置。若為非分區(qū)表,則不必填。column,盡量與Kafka column中的相關字段做一一對應的操作。同步的字段對應,信息同步才能確認成功。truncate,寫入時同步的數(shù)據(jù)是選擇以追加模式寫還是以覆蓋模式寫,盡量避免多個DDL同時操作一個分區(qū),或者在多個并發(fā)作業(yè)啟動前提前創(chuàng)建分區(qū)。

Kafka同步數(shù)據(jù)到MaxCompute:Kafka的Reader端,MaxCompute的Writer端以及限制參數(shù)。Reader包含server、endOffset、kafkaConfig、group.id、valueType、ByteArray、column字段、topic、beginOffset、seekToLast等。MaxCompute的Writer端包含覆蓋、追加、壓縮、查看源碼、同步到的表、字段要和Kafka的Reader端做一一對應,最重要的是value數(shù)據(jù)同步。限制參數(shù),主要有errorlimit,數(shù)據(jù)超過幾個錯誤后會進行報錯;speed,可以限制流速、并發(fā)度等。

參考Kafka生產者SDK編寫代碼:最終生產出的數(shù)據(jù)要發(fā)送到Kafka中,通過相關代碼可以查看用戶的生產數(shù)據(jù)。一段代碼表示配置信息的讀取,協(xié)議、序列化方式以及請求的等待時間,需要發(fā)送哪一個topic,發(fā)送什么樣的消息。發(fā)送完成后回傳一個信息。

代碼打包運行在ECS上(與Kafka同一個可用區(qū)):如下圖所示,執(zhí)行crontab-e命令,每到17:00執(zhí)行一次。下圖為發(fā)送日志完成后的消息記錄。

在MaxCompute上創(chuàng)建表:進入DataWorks業(yè)務流程頁面,創(chuàng)建目標表,使用一個DDL語句創(chuàng)建同步的表,或根據(jù)用戶個人業(yè)務相應創(chuàng)建不同的表的字段。

Kafka數(shù)據(jù)如何同步至MaxCompute

4.開發(fā)測試以及生產部署
選擇自定義資源組(或獨享集成資源組)進行同步操作:下圖所示,選擇右上角“配置任務資源組”,根據(jù)用戶個人需求選擇資源組,點擊執(zhí)行。執(zhí)行完成后,會出現(xiàn)標識顯示成功,同步數(shù)據(jù)記錄以及結果是否成功。同步過程基本結束。

查詢同步的數(shù)據(jù)結果:在DataWorks臨界面查看同步結果,在臨時節(jié)點點擊查詢命令,select * from testkafka3(表),查看數(shù)據(jù)同步結果。數(shù)據(jù)已經同步過來,證明測試成功。

Kafka數(shù)據(jù)如何同步至MaxCompute

設置調度參數(shù):業(yè)務流程開發(fā)數(shù)據(jù)同步之后,會對相關模型進行一些業(yè)務處理,最后設計一些SQL節(jié)點、同步節(jié)點,進行部署。如下圖所示,在右側點擊調度配置,輸入調度時間。

提交業(yè)務流程節(jié)點,并打包發(fā)布:點擊業(yè)務流程,選擇所需要提交的節(jié)點并提交。一些業(yè)務流程提交之后不需要放到生產環(huán)境當中。然后進入任務發(fā)布界面,將節(jié)點添加到待發(fā)布進行任務部署。

Kafka數(shù)據(jù)如何同步至MaxCompute

確認業(yè)務流程發(fā)布成功:最后在運維中心頁面,確認發(fā)布是否在生產環(huán)境中存在。至此Kafka同步數(shù)據(jù)到MaxCompute過程結束。到了對應的調度時間,在各個節(jié)點或者右上角會有節(jié)點的日志展示,可以查看日志運行情況是否正常,或是否需要進行后續(xù)操作,部署數(shù)據(jù)或是相關命令。

關于Kafka數(shù)據(jù)如何同步至MaxCompute問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業(yè)資訊頻道了解更多相關知識。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI