溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

大數(shù)據(jù)框架—Flink與Beam

發(fā)布時間:2020-04-05 17:04:08 來源:網(wǎng)絡(luò) 閱讀:5674 作者:ZeroOne01 欄目:大數(shù)據(jù)

Flink概述

Flink是Apache的一個頂級項目,Apache Flink 是一個開源的分布式流處理和批處理系統(tǒng)。Flink 的核心是在數(shù)據(jù)流上提供數(shù)據(jù)分發(fā)、通信、具備容錯的分布式計算。同時,F(xiàn)link 在流處理引擎上構(gòu)建了批處理引擎,原生支持了迭代計算、內(nèi)存管理和程序優(yōu)化。

現(xiàn)有的開源計算方案,會把流處理和批處理作為兩種不同的應(yīng)用類型,因為它們所提供的SLA(Service-Level-Aggreement)是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理。

Flink從另一個視角看待流處理和批處理,將二者統(tǒng)一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數(shù)據(jù)流是×××的;批處理被作為一種特殊的流處理,只是它的輸入數(shù)據(jù)流被定義為有界的。

Flink流處理特性:

  • 支持高吞吐、低延遲、高性能的流處理
  • 支持帶有事件時間的窗口(Window)操作
  • 支持有狀態(tài)計算的Exactly-once語義
  • 支持高度靈活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
  • 支持具有Backpressure功能的持續(xù)流模型
  • 支持基于輕量級分布式快照(Snapshot)實現(xiàn)的容錯
  • 一個運行時同時支持Batch on Streaming處理和Streaming處理
  • Flink在JVM內(nèi)部實現(xiàn)了自己的內(nèi)存管理
  • 支持迭代計算
  • 支持程序自動優(yōu)化:避免特定情況下Shuffle、排序等昂貴操作,中間結(jié)果有必要進行緩存

Flink架構(gòu)圖:
大數(shù)據(jù)框架—Flink與Beam

Flink以層級式系統(tǒng)形式組件其軟件棧,不同層的棧建立在其下層基礎(chǔ)上,并且各層接受程序不同層的抽象形式。

在最基本的層面上,一個Flink應(yīng)用程序是由以下幾部分組成:

  • Data source: 數(shù)據(jù)源,將數(shù)據(jù)輸入到Flink中
  • Transformations: 處理數(shù)據(jù)
  • Data sink: 將處理后的數(shù)據(jù)傳輸?shù)侥硞€地方

如下圖:
大數(shù)據(jù)框架—Flink與Beam

目前Flink支持如下框架:

  • Apache Kafka (sink/source)
  • Elasticsearch 1.x / 2.x / 5.x (sink)
  • HDFS (sink)
  • RabbitMQ (sink/source)
  • Amazon Kinesis Streams (sink/source)
  • Twitter (source)
  • Apache NiFi (sink/source)
  • Apache Cassandra (sink)
  • Redis, Flume, and ActiveMQ (via Apache Bahir) (sink)

Flink官網(wǎng)地址如下:

http://flink.apache.org/

部分內(nèi)容參考自如下文章:

https://blog.csdn.net/jdoouddm7i/article/details/62039337


使用Flink完成wordcount統(tǒng)計

Flink下載地址:

http://flink.apache.org/downloads.html

Flink快速開始文檔地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/setup_quickstart.html

注:安裝Flink之前系統(tǒng)中需要安裝有jdk1.7以上版本的環(huán)境

我這里下載的是2.6版本的Flink:

[root@study-01 ~]# cd /usr/local/src/
[root@study-01 /usr/local/src]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop26-scala_2.11.tgz
[root@study-01 /usr/local/src]# tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz -C /usr/local
[root@study-01 /usr/local/src]# cd ../flink-1.4.2/
[root@study-01 /usr/local/flink-1.4.2]# ls
bin  conf  examples  lib  LICENSE  log  NOTICE  opt  README.txt  resources  tools
[root@study-01 /usr/local/flink-1.4.2]#

啟動Flink:

[root@study-01 /usr/local/flink-1.4.2]# ./bin/start-local.sh
[root@study-01 /usr/local/flink-1.4.2]# jps
6576 Jps
6131 JobManager
6499 TaskManager
[root@study-01 /usr/local/flink-1.4.2]# 

啟動成功之后就可以訪問主機ip的8081端口,進入到Flink的web頁面:
大數(shù)據(jù)框架—Flink與Beam

我們現(xiàn)在就可以開始實現(xiàn)wordcount案例了,我這里有一個文件,內(nèi)容如下:

[root@study-01 /usr/local/flink-1.4.2]# cat /data/hello.txt 
hadoop welcome
hadoop hdfs mapreduce
hadoop hdfs
hello hadoop
spark vs mapreduce
[root@study-01 /usr/local/flink-1.4.2]#

執(zhí)行如下命令,實現(xiàn)wordcount案例,如果學習過Hadoop會發(fā)現(xiàn)這個命令和Hadoop上使用MapReduce實現(xiàn)wordcount案例是類似的:

[root@study-01 /usr/local/flink-1.4.2]# ./bin/flink run ./examples/batch/WordCount.jar  --input file:///data/hello.txt --output file:///data/tmp/flink_wordcount_out

執(zhí)行完成后,可以到web頁面上,查看任務(wù)的執(zhí)行信息:
大數(shù)據(jù)框架—Flink與Beam

查看輸出結(jié)果:

[root@study-01 /usr/local/flink-1.4.2]# cat /data/tmp/flink_wordcount_out 
hadoop 4
hdfs 2
hello 1
mapreduce 2
spark 1
vs 1
welcome 1
[root@study-01 /usr/local/flink-1.4.2]#

Beam概述

Google的新老三駕馬車:

  • 老的三駕馬車:GFS、MapReduce、BigTable
  • 新的三駕馬車:Dremel、Pregel、Caffeine

我們都知道,Hadoop生態(tài)圈內(nèi)的幾個框架都源于Google老的三駕馬車,而一些新的框架實現(xiàn)也是部分源于Google新的三駕馬車的概念。所以現(xiàn)在市面上的大數(shù)據(jù)相關(guān)框架很多,框架多就會導(dǎo)致編程規(guī)范多、處理模式不一致,而我們希望有一個工具能夠統(tǒng)一這些編程模型,因此,Beam就誕生了。

Apache Beam是 Apache 軟件基金會于2017年1 月 10 日對外宣布的開源平臺。Beam 為創(chuàng)建復(fù)雜數(shù)據(jù)平行處理管道,提供了一個可移動(兼容性好)的 API 層。這層 API 的核心概念基于 Beam 模型(以前被稱為 Dataflow 模型),并在每個 Beam 引擎上不同程度得執(zhí)行。

背景:

2016 年 2 月份,谷歌及其合作伙伴向 Apache 捐贈了一大批代碼,創(chuàng)立了孵化中的 Beam 項目( 最初叫 Apache Dataflow)。這些代碼中的大部分來自于谷歌 Cloud Dataflow SDK——開發(fā)者用來寫流處理和批處理管道(pipelines)的庫,可在任何支持的執(zhí)行引擎上運行。當時,支持的主要引擎是谷歌 Cloud Dataflow,附帶對 Apache Spark 和 開發(fā)中的 Apache Flink 支持。如今,它正式開放之時,已經(jīng)有五個官方支持的引擎。除去已經(jīng)提到的三個,還包括 Beam 模型和 Apache Apex。

Beam特點:

  • 統(tǒng)一了數(shù)據(jù)批處理(batch)和流處理(stream)編程范式,
  • 能在任何執(zhí)行引擎上運行。
  • 它不僅為模型設(shè)計、更為執(zhí)行一系列數(shù)據(jù)導(dǎo)向的工作流提供了統(tǒng)一的模型。這些工作流包括數(shù)據(jù)處理、吸收和整合。

Beam的官方網(wǎng)站:

https://beam.apache.org/


將WordCount的Beam程序以多種不同Runner運行

Beam Java的快速開始文檔:

https://beam.apache.org/get-started/quickstart-java/

安裝Beam的前置也是需要系統(tǒng)具備jdk1.7以上版本的環(huán)境,以及Maven環(huán)境。

使用如下命令下載Beam以及wordcount案例代碼:

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=2.4.0 \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=org.apache.beam.examples \
      -DinteractiveMode=false

進入下載后的目錄進行查看:

[root@study-01 /usr/local/src]# cd word-count-beam/
[root@study-01 /usr/local/src/word-count-beam]# tree
.
├── pom.xml
└── src
    ├── main
    │   └── java
    │       └── org
    │           └── apache
    │               └── beam
    │                   └── examples
    │                       ├── common
    │                       │   ├── ExampleBigQueryTableOptions.java
    │                       │   ├── ExampleOptions.java
    │                       │   ├── ExamplePubsubTopicAndSubscriptionOptions.java
    │                       │   ├── ExamplePubsubTopicOptions.java
    │                       │   ├── ExampleUtils.java
    │                       │   └── WriteOneFilePerWindow.java
    │                       ├── complete
    │                       │   └── game
    │                       │       ├── GameStats.java
    │                       │       ├── HourlyTeamScore.java
    │                       │       ├── injector
    │                       │       │   ├── Injector.java
    │                       │       │   ├── InjectorUtils.java
    │                       │       │   └── RetryHttpInitializerWrapper.java
    │                       │       ├── LeaderBoard.java
    │                       │       ├── StatefulTeamScore.java
    │                       │       ├── UserScore.java
    │                       │       └── utils
    │                       │           ├── GameConstants.java
    │                       │           ├── WriteToBigQuery.java
    │                       │           ├── WriteToText.java
    │                       │           └── WriteWindowedToBigQuery.java
    │                       ├── DebuggingWordCount.java
    │                       ├── MinimalWordCount.java
    │                       ├── WindowedWordCount.java
    │                       └── WordCount.java
    └── test
        └── java
            └── org
                └── apache
                    └── beam
                        └── examples
                            ├── complete
                            │   └── game
                            │       ├── GameStatsTest.java
                            │       ├── HourlyTeamScoreTest.java
                            │       ├── LeaderBoardTest.java
                            │       ├── StatefulTeamScoreTest.java
                            │       └── UserScoreTest.java
                            ├── DebuggingWordCountTest.java
                            ├── MinimalWordCountTest.java
                            └── WordCountTest.java

20 directories, 31 files
[root@study-01 /usr/local/src/word-count-beam]#

默認情況下,beam的runner是Direct,下面就用Direct來運行wordcount案例,命令如下:

[root@study-01 /usr/local/src/word-count-beam]# ls
pom.xml  src  target
[root@study-01 /usr/local/src/word-count-beam]#
[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/data/hello.txt --output=counts" -Pdirect-runner

運行的結(jié)果會存放在當前的目錄下:

[root@study-01 /usr/local/src/word-count-beam]# ls
counts-00000-of-00003  counts-00001-of-00003  counts-00002-of-00003  pom.xml  src  target
[root@study-01 /usr/local/src/word-count-beam]# more counts*  # 查看結(jié)果文件
::::::::::::::
counts-00000-of-00003
::::::::::::::
welcome: 1
spark: 1
::::::::::::::
counts-00001-of-00003
::::::::::::::
hdfs: 2
hadoop: 4
mapreduce: 2
::::::::::::::
counts-00002-of-00003
::::::::::::::
hello: 1
vs: 1
[root@study-01 /usr/local/src/word-count-beam]#

如果需要指定其他的runner則可以使用--runner參數(shù)進行指定,例如我要指定runner為Flink,則修改命令如下即可:

[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=FlinkRunner --inputFile=/data/hello.txt --output=counts" -Pflink-runner

刪除之前生成的文件及目錄,我們來使用Spark的方式進行運行。使用Spark的話,也只是修改--runner以及-Pspark參數(shù)即可:

[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=/data/hello.txt --output=counts" -Pspark-runner

運行成功后,也是會生成如下文件及目錄:

[root@study-01 /usr/local/src/word-count-beam]# ls
counts-00000-of-00003  counts-00001-of-00003  counts-00002-of-00003  pom.xml  src  target
[root@study-01 /usr/local/src/word-count-beam]#

查看處理結(jié)果:

[root@study-01 /usr/local/src/word-count-beam]# more counts*
::::::::::::::
counts-00000-of-00003
::::::::::::::
spark: 1
::::::::::::::
counts-00001-of-00003
::::::::::::::
welcome: 1
hello: 1
mapreduce: 2
::::::::::::::
counts-00002-of-00003
::::::::::::::
vs: 1
hdfs: 2
hadoop: 4
[root@study-01 /usr/local/src/word-count-beam]#

以上這兩個示例只是想說明一點,同一份代碼,可以運行在不同的計算引擎上。不需要為不同的引擎開發(fā)不同的代碼,這就是Beam框架的最主要的設(shè)計目的之一。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI