溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark Streaming 實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)案例

發(fā)布時(shí)間:2020-07-25 07:25:06 來源:網(wǎng)絡(luò) 閱讀:2764 作者:ChinaUnicom110 欄目:大數(shù)據(jù)

Spark 是一個(gè)基于內(nèi)存式的分布式計(jì)算框架。具有高性能,高效可擴(kuò)展,容錯(cuò)等優(yōu)點(diǎn)。

今天講解一下spark的流計(jì)算,其實(shí)它也不完全是實(shí)時(shí)的流計(jì)算,算是一種準(zhǔn)實(shí)時(shí)的流計(jì)算。

上圖講解

Spark Streaming 實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)案例

運(yùn)行環(huán)境:需要linux環(huán)境下的spark環(huán)境

本例用的centOS 6.5x64 因?yàn)樾枰褂肨CP協(xié)議傳輸數(shù)據(jù),所以需要安裝一個(gè)nc插件。

安裝方式: yum  install ncxxx 或者掛載光盤安裝

安裝后啟動(dòng)nc -lk 9999 端口可以隨便指定,最好是1024以上的就可以。

下面貼出代碼

java版本的

import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;

import scala.Tuple2;
public class SparkDemo {
	public static void main(String[] args) {
		SparkConf conf=new SparkConf().setAppName("sparkDemo2").setMaster("local[3]");
		JavaStreamingContext jsc=new JavaStreamingContext(conf,Durations.seconds(5));
		//使用帶狀態(tài)的算子,需要checkpoint做容錯(cuò)處理
		jsc.checkpoint("D://chkspark");
		JavaReceiverInputDStream<String> socketTextStream=jsc.socketTextStream("10.115.27.234", 1000);
		JavaDStream<String> wordsDstream=socketTextStream.flatMap(new FlatMapFunction<String, String>() {

			private static final long serialVersionUID=1L;
			public Iterable<String> call(String line) throws Exception {
					return Arrays.asList(line.split(" "));
			}
			});
		JavaPairDStream<String, Integer> wordsToPairDstream=wordsDstream.mapToPair(new PairFunction<String, String,Integer>() {

			private static final long SerialVersionUID=1L;
			public Tuple2<String, Integer> call(String word) throws Exception {
				
				return new Tuple2<String, Integer>(word, 1);
			}
		});
		/**
		 * 一個(gè)batch對(duì)應(yīng)一個(gè)RDD。 
		 * */ 
		JavaPairDStream<String, Integer> resultDstream=wordsToPairDstream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {

			private static final long serialVersionUID=1L;
			public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
				Integer oldValue=0;   //默認(rèn)舊value是0
				if (state.isPresent()) {
					oldValue=state.get();
				}
				for (Integer value:values) {
					oldValue+=value;
				}
				return Optional.of(oldValue);
			}
		});
		//打印結(jié)果
		resultDstream.print();
		jsc.start();
		jsc.awaitTermination();
	}
}

程序測(cè)試: 從linux端的nc 下輸入任意字符串,spark streaming會(huì)實(shí)時(shí)對(duì)輸入的數(shù)據(jù)做出統(tǒng)計(jì)。類似于wordcount. 除非手動(dòng)kill這個(gè)進(jìn)程,否則會(huì)一直運(yùn)行下去。因?yàn)樗脑砭褪呛妥詠硭乃饕粯?,是一連串的數(shù)據(jù)流。

運(yùn)行結(jié)果展示:

Spark Streaming 實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)統(tǒng)計(jì)案例

也可以用scala寫出同樣的程序,代碼量更少。

需要深入理解spark streaming的架構(gòu)原理。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI