溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

二、flink--集群環(huán)境搭建

發(fā)布時(shí)間:2020-08-07 05:52:23 來源:網(wǎng)絡(luò) 閱讀:757 作者:隔壁小白 欄目:大數(shù)據(jù)

一、Flink環(huán)境搭建

1.1 flink部署方式

Flink可以選擇的部署方式有:
Local、Standalone(資源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。
我們主要對(duì)Standalone模式和Yarn模式下的Flink集群部署進(jìn)行分析。
Standalone模式常用于單機(jī)進(jìn)行程序測(cè)試,Yarn模式常用于實(shí)際線上生產(chǎn)環(huán)境。

1.2 集群規(guī)劃

1、集群規(guī)劃

節(jié)點(diǎn)名稱 master(jobManager) worker(taskManager) zookeeper
bigdata11
bigdata21
bigdata31

(注:zookeeper只是用于實(shí)現(xiàn)master HA的必要組件,如果不需要master HA,則zookeeper可以去掉。)

2、軟件版本

jdk 1.8
scala 2.11.8
hadoop 2.8
zookeeper 3.4.10
flink 1.6.1

3、基礎(chǔ)環(huán)境
安裝好jdk、scala、hadoop(hdfs+yarn都要部署好)、zookeeper,部署方法看之前的相關(guān)文章。而且要注意的是,節(jié)點(diǎn)之間要配置好ssh秘鑰免登陸。

1.3 Standalone模式安裝

1、解壓程序:

tar -zxvf flink-1.6.1-bin-hadoop28-scala_2.11.tgz -C /opt/module/修改配置文件

2、修改配置文件

配置master節(jié)點(diǎn)地址:
[root@bigdata11 conf]$ sudo vi masters
bigdata11:8081

配置worker節(jié)點(diǎn)地址:
[root@bigdata11 conf]$ sudo vi slaves
bigdata12
bigdata13

修改flink工作參數(shù):
[root@bigdata11 conf]$ sudo vi flink-conf.yaml 
taskmanager.numberOfTaskSlots:2   //52行
jobmanager.rpc.address: bigdata11  //33行  指定jobmanager 的rpc地址
可選配置:
??每個(gè)JobManager(jobmanager.heap.mb)的可用內(nèi)存量,
??每個(gè)TaskManager(taskmanager.heap.mb)的可用內(nèi)存量,
??每臺(tái)機(jī)器(taskManager)的可用的slot數(shù)量(taskmanager.numberOfTaskSlots),
??每個(gè)job的并行度(parallelism.default)
??臨時(shí)目錄(taskmanager.tmp.dirs)

3、配置環(huán)境變量

vim /etc/profile.d/flink.sh
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin

然后source /etc/profile.d/flink.sh 啟用環(huán)境變量

4、拷貝配置好的/opt/module/flink-1.6.1到其他節(jié)點(diǎn)
使用scp或者rsync

scp -r /opt/module/flink-1.6.1 bigdata12:`pwd`
scp -r /opt/module/flink-1.6.1 bigdata13:`pwd`

同時(shí)配置好其他兩臺(tái)的環(huán)境變量

5、啟動(dòng)flink集群

[root@bigdata11 flink-1.6.1]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host bigdata11.
Starting taskexecutor daemon on host bigdata12.
Starting taskexecutor daemon on host bigdata13.

使用jps可以在對(duì)應(yīng)的節(jié)點(diǎn)上查看對(duì)應(yīng)的進(jìn)程

StandloneSessionClusterEntrypoint  這是jobmanager進(jìn)程
TaskManagerRunner   這是taskmanager進(jìn)程

6、web UI 查看
http://bigdata11:8081

7、運(yùn)行測(cè)試任務(wù)

flink run -m bigdata11:8081 ./examples/batch/WordCount.jar --input /opt/module/datas/word.txt --output /tmp/word.output

8、增減節(jié)點(diǎn)到集群中

增加/減少jobmanager節(jié)點(diǎn):
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

增加/減少taskmanager節(jié)點(diǎn)(需要到當(dāng)前節(jié)點(diǎn)去啟動(dòng)):
bin/taskmanager.sh start|start-foreground|stop|stop-all

1.4 standalone模式j(luò)obManager HA

? 首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對(duì)于 Standalone 來說,F(xiàn)link 必須依賴于 Zookeeper 來實(shí)現(xiàn) JobManager 的 HA(Zookeeper 已經(jīng)成為了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個(gè) Standalone 的 Flink 集群會(huì)同時(shí)有多個(gè)活著的 JobManager,其中只有一個(gè)處于工作狀態(tài),其他處于 Standby 狀態(tài)。當(dāng)工作中的 JobManager 失去連接后(如宕機(jī)或 Crash),Zookeeper 會(huì)從 Standby 中選舉新的 JobManager 來接管 Flink 集群。
? 對(duì)于 Yarn Cluaster 模式來說,F(xiàn)link 就要依靠 Yarn 本身來對(duì) JobManager 做 HA 了。其實(shí)這里完全是 Yarn 的機(jī)制。對(duì)于 Yarn Cluster 模式來說,JobManager 和 TaskManager 都是被 Yarn 啟動(dòng)在 Yarn 的 Container 中。此時(shí)的 JobManager,其實(shí)應(yīng)該稱之為 Flink Application Master。也就說它的故障恢復(fù),就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一樣)。由于完全依賴了 Yarn,因此不同版本的 Yarn 可能會(huì)有細(xì)微的差異。這里不再做深究。

1、修改配置文件
conf/flink-conf.yaml

注釋掉 
#jobmanager.rpc.address: bigdata11

修改下面的配置
high-availability: zookeeper   //73行 指定高可用方式為zookeeper

#指定高可用模式中zookeeper的地址列表 //88行
high-availability.zookeeper.quorum:bigdata11:2181,bigdata12:2181,bigdata13:2181

#指定將jobmanager狀態(tài)數(shù)據(jù)持久化保存到hdfs中
high-availability.storageDir: hdfs:///flink/ha/       

#JobManager元數(shù)據(jù)保存在文件系統(tǒng)storageDir中,只有指向此狀態(tài)的指針存儲(chǔ)在ZooKeeper中(必須) //沒有
high-availability.zookeeper.path.root: /flink         

#根ZooKeeper節(jié)點(diǎn),在該節(jié)點(diǎn)下放置所有集群節(jié)點(diǎn)(推薦),這是集群節(jié)點(diǎn)信息保存位置
high-availability.cluster-id:/flinkCluster           

#自定義集群(推薦),這里是檢查點(diǎn)和保存點(diǎn)的配置,保存在hdfs中,非必須
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints

conf/masters

將主備jobmanager地址都寫到該配置文件中。
bigdata11:8081
bigdata12:8081

conf/zoo.cfg

server.1=bigdata11:2888:3888
server.2=bigdata12:2888:3888
server.3=bigdata13:2888:3888

修改完后同步配置到其他所有節(jié)點(diǎn)中。

2、啟動(dòng)集群

先啟動(dòng)好zookeeper服務(wù)。
然后啟動(dòng)hdfs服務(wù)。
最后啟動(dòng)flink集群。 start-cluster.sh

1.5 yarn模式安裝
部署步驟和上面standalone基本一樣,這里不重復(fù)。還要添加以下配置:
配置好hadoop(hdfs和yarn)環(huán)境,同時(shí)配置好HADOOP_HOME這個(gè)環(huán)境變量。
接著在yarn下啟動(dòng)jobmanager和taskmanager。

/opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 4 -jm 1024 -tm 1024 -nm test -d

其中:
-n(--container):TaskManager的數(shù)量。
-s(--slots):    每個(gè)TaskManager的slot數(shù)量,默認(rèn)一個(gè)slot一個(gè)core,默認(rèn)每個(gè)taskmanager的slot的個(gè)數(shù)為1,有時(shí)可以多一些taskmanager,做冗余。
-jm:JobManager的內(nèi)存(單位MB)。
-tm:每個(gè)taskmanager的內(nèi)存(單位MB)。
-nm:yarn 的appName(現(xiàn)在yarn的ui上的名字)。 
-d:后臺(tái)執(zhí)行

會(huì)自動(dòng)根據(jù) conf/ 下的配置文件啟動(dòng)對(duì)應(yīng)的jobmanager和taskmanager的

啟動(dòng)完成后,可以到y(tǒng)arn 的web頁(yè)面查看到剛才提交會(huì)話任務(wù):

http://bigdata11:8088

同時(shí)可以在提交session的節(jié)點(diǎn)上使用jps查看對(duì)應(yīng)的進(jìn)程:

YarnSessionClusterEntrypoint  這個(gè)就是剛剛提交的yarn-session維持的session進(jìn)程

提交測(cè)試任務(wù)到y(tǒng)arn中的flink集群運(yùn)行

./bin/flink run ./examples/batch/WordCount.jar --input 輸出數(shù)據(jù)路徑
--output 輸出數(shù)據(jù)路徑

可以手動(dòng)使用 -m jobManagerAddress 指定jobmanager地址,但是flink client可以自動(dòng)根據(jù)flink的配置文件獲取到j(luò)obmanager地址,所以可以不用指定

提交任務(wù)之后,可以在yarn的web頁(yè)面中查看到相關(guān)的任務(wù)信息

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI