溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

flink偽分布式搭建及其本地idea測flink連接

發(fā)布時間:2020-07-29 06:39:58 來源:網(wǎng)絡(luò) 閱讀:1117 作者:猿程序G 欄目:大數(shù)據(jù)

下載安裝flink:
上傳壓縮包:flink-1.7.2-bin-scala_2.12.tgz
解壓:tar -zxvf /flink-1.7.2-bin-scala_2.12.tgz -C ../hone
復(fù)制解壓文件到子節(jié)點:
scp -r /home/flink-1.7.2/ root@slave1:/home/
scp -r /home/flink-1.7.2/ root@slave2:/home/
修改配置文件:選擇一個master節(jié)點,配置conf/flink-conf.yaml
vi conf/flink-conf.yaml
設(shè)置jobmanager.rpc.address 配置項為該節(jié)點的IP 或者主機名
jobmanager.rpc.address: 10.108.4.202
然后添加子節(jié)點配置:
在所有的節(jié)點中:flink目錄下:vi conf/slaves
添加所有子節(jié)點ip然后保存
啟動本地的flink集群:
cd 到flink目錄下
./bin/start-cluster.sh
查看webui:ip:8081
啟動監(jiān)聽:nc -lk 9000
當(dāng)報nc命令不存在時(yum install nc)
然后執(zhí)行測試jar:
停止flink集群:bin/stop-cluster.sh
以集群方式提交任務(wù):在flink目錄下
./bin/flink run -m yarn-cluster -c com.demo.florian.WordCount $DEMO_DIR/target/flink-demo-1.0-SNAPSHOT.jar --port 9000

新建maven程序
pom.xml依賴如下:
然后新建一個TestSocketWindowWordCount類具體代碼如下
然后啟動flink集群->新建一個監(jiān)聽:nc -lk 6666
然后啟動TestSocketWindowWordCount類
在linux監(jiān)聽頁面輸入代碼
觀察在idea控制臺就有統(tǒng)計的輸出
-------pom.xml開始-----------
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
-------pom.xml結(jié)束-----------
-------TestSocketWindowWordCount開始------------------
package com.gyb;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.xml.soap.Text;

public class TestSocketWindowWordCount {
public static void main(String args[]) {
String hostname = "192.168.198.130";
int port = 6666;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream text = env.socketTextStream(hostname, port, "\n");//獲取執(zhí)行環(huán)境
SingleOutputStreamOperator windowCounts = text
.flatMap(new FlatMapFunction<String, SocketWindowWordCount.WordWithCount>() {@Override
br/>@Override
for (String word : value.split("\s")) {
out.collect(new SocketWindowWordCount.WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(5))
.reduce(new ReduceFunction<SocketWindowWordCount.WordWithCount>() {@Override
br/>@Override
return new SocketWindowWordCount.WordWithCount(a.word, a.count + b.count);
}
});

// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
//env.execute("Socket Window WordCount");
try {
    env.execute("Socket Window WordCount");
} catch (Exception e) {
    e.printStackTrace();
}

}

public static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {}

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }
}

}
-------TestSocketWindowWordCount結(jié)束------------------

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI