溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink批處理怎么實(shí)現(xiàn)

發(fā)布時間:2021-12-31 14:34:16 來源:億速云 閱讀:956 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“Flink批處理怎么實(shí)現(xiàn)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Flink批處理怎么實(shí)現(xiàn)”吧!

1.Flink簡介

Apache Flink是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。Flink被設(shè)計在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存執(zhí)行速度和任意規(guī)模來執(zhí)行計算,F(xiàn)link 是一個開源的流處理框架,它具有以下特點(diǎn)

  • 批流一體:統(tǒng)一批處理、流處理

  • 分布式:Flink程序可以運(yùn)行在多臺機(jī)器上

  • 高性能:處理性能比較高

  • 高可用:Flink支持高可用性(HA)

  • 準(zhǔn)確:Flink可以保證數(shù)據(jù)處理的準(zhǔn)確性

2.Flink核心模塊組成

首先,類比Spark, 我們來看Flink的模塊劃分

Flink批處理怎么實(shí)現(xiàn)

Deploy層

可以啟動單個JVM,讓Flink以Local模式運(yùn)行Flink也可以以Standalone 集群模式運(yùn)行,同時也支持Flink ON YARN,F(xiàn)link應(yīng)用直接提交到Y(jié)ARN上面運(yùn)行Flink還可以運(yùn)行在GCE(谷歌云服務(wù))和EC2(亞馬遜云服務(wù))

Core層(Runtime)

在Runtime之上提供了兩套核心的API,DataStream API(流處理)和DataSet API(批處理)

APIs & Libraries層

核心API之上又?jǐn)U展了一些高階的庫和API

  • CEP流處理

  • Table API和SQL

  • Flink ML機(jī)器學(xué)習(xí)庫

  • Gelly圖計算

3.Flink生態(tài)組成

Flink作為大數(shù)據(jù)生態(tài)的一員,除了本身外,可以很好地與生態(tài)中的其他組件進(jìn)行結(jié)合使用,大的概況方面來講,就有輸入方面和輸出方面, Flink批處理怎么實(shí)現(xiàn)

其中中間的部分,上面已經(jīng)介紹,主頁看看兩邊的,其中綠色背景是流處理方式的場景,藍(lán)色背景是批處理方式的場景

輸入Connectors(左側(cè)部分)

  • 流處理方式:包含Kafka(消息隊列)、AWS kinesis(實(shí)時數(shù)據(jù)流服務(wù))、RabbitMQ(消息隊列)、NIFI(數(shù) 據(jù)管道)、Twitter(API)

  • 批處理方式:包含HDFS(分布式文件系統(tǒng))、HBase(分布式列式數(shù)據(jù)庫)、Amazon S3(文件系統(tǒng))、 MapR FS(文件系統(tǒng))、ALLuxio(基于內(nèi)存分布式文件系統(tǒng))

輸出Connectors(右側(cè)部分)

  • 流處理方式:包含Kafka(消息隊列)、AWS kinesis(實(shí)時數(shù)據(jù)流服務(wù))、RabbitMQ(消息隊列)、NIFI(數(shù) 據(jù)管道)、Cassandra(NOSQL數(shù)據(jù)庫)、ElasticSearch(全文檢索)、HDFS rolling file(滾動文件)

  • 批處理方式:包含HBase(分布式列式數(shù)據(jù)庫)、HDFS(分布式文件系統(tǒng))

4.Flink流處理模式介紹

Spark中的流處理主要有兩種,一種是Spark Streamin是維批處理,如果對事件內(nèi)的時間沒有要求,這種方式可以滿足很多需求,另外一種是Structed Streaming 是基于一張無界的大表,核心API就是Spark Sql的,而Flink是專注于無限流,把有界流看成是無限流的一種特殊情況,另外兩個框架都有狀態(tài)管理。

Flink批處理怎么實(shí)現(xiàn)

無限流處理

輸入的數(shù)據(jù)沒有盡頭,像水流一樣源源不斷,數(shù)據(jù)處理從當(dāng)前或者過去的某一個時間 點(diǎn)開始,持續(xù)不停地進(jìn)行。

有限流處理

從某一個時間點(diǎn)開始處理數(shù)據(jù),然后在另一個時間點(diǎn)結(jié)束輸入數(shù)據(jù)可能本身是有限的(即輸入數(shù)據(jù)集并不會隨著時間增長),也可能出于分析的目的被人為地設(shè)定為有限集(即只分析某一個時間段內(nèi)的事件)Flink封裝了DataStream API進(jìn)行流處理,封裝了DataSet API進(jìn)行批處理。同時,F(xiàn)link也是一個批流一體的處理引擎,提供了Table API / SQL統(tǒng)一了批處理和流處理。

有狀態(tài)的流處理應(yīng)用

基于SubTask,每個SubTask處理時候,都會獲取狀態(tài)并更新狀態(tài),

Flink批處理怎么實(shí)現(xiàn)

5.Flink入門實(shí)踐

以經(jīng)典的WordCount為例,來看Flink的兩個批流處理案例,案例以nc -lp 來作為Source, 以控制臺輸出為Sink, 分為Java和Scala版本哦,

Scala版本之批處理

import org.apache.flink.api.scala._

object WordCountScalaBatch {
  def main(args: Array[String]): Unit = {

    val inputPath = "E:\\hadoop_res\\input\\a.txt"
    val outputPath = "E:\\hadoop_res\\output2"

    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val text: DataSet[String] = environment.readTextFile(inputPath)
    text
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .setParallelism(1)
      .writeAsCsv(outputPath, "\n", ",")


    //setParallelism(1)很多算子后面都可以調(diào)用
    environment.execute("job name")

  }

}

Scala版本之流處理

import org.apache.flink.streaming.api.scala._

object WordCountScalaStream {
  def main(args: Array[String]): Unit = {
    //處理流式數(shù)據(jù)
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val streamData: DataStream[String] = environment.socketTextStream("linux121", 7777)

    val out: DataStream[(String, Int)] = streamData
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    out.print()

    environment.execute("test stream")
  }

}

Java版本之批處理

package com.hoult.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountJavaBatch {
    public static void main(String[] args) throws Exception {

        String inputPath = "E:\\hadoop_res\\input\\a.txt";
        String outputPath = "E:\\hadoop_res\\output";

        //獲取flink的運(yùn)行環(huán)境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> text = executionEnvironment.readTextFile(inputPath);
        FlatMapOperator<String, Tuple2<String, Integer>> wordsOne = text.flatMap(new SplitClz());

        //hello,1  you,1 hi,1  him,1
        UnsortedGrouping<Tuple2<String, Integer>> groupWordAndOne = wordsOne.groupBy(0);
        AggregateOperator<Tuple2<String, Integer>> wordCount = groupWordAndOne.sum(1);

        wordCount.writeAsCsv(outputPath, "\n", "\t").setParallelism(1);

        executionEnvironment.execute();
    }

    static class SplitClz implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] strs = s.split("\\s+");
            for (String str : strs) {
                collector.collect(new Tuple2<String, Integer>(str, 1));
            }
        }
    }
}

Java版本之流處理

package com.hoult.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountJavaStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = executionEnvironment.socketTextStream("linux121", 7777);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }).keyBy(0).sum(1);
        sum.print();
        executionEnvironment.execute();
    }
}

到此,相信大家對“Flink批處理怎么實(shí)現(xiàn)”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI