溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink如何新增connectors模塊

發(fā)布時間:2021-12-31 10:25:22 來源:億速云 閱讀:346 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“Flink如何新增connectors模塊”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

Flink中的API

Flink 為流式/批式處理應(yīng)用程序的開發(fā)提供了不同級別的抽象。 

Flink如何新增connectors模塊

  • Flink API 最底層的抽象為有狀態(tài)實時流處理。其抽象實現(xiàn)是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用。它允許用戶在應(yīng)用程序中自由地處理來自單流或多流的事件(數(shù)據(jù)),并提供具有全局一致性和容錯保障的狀態(tài)。此外,用戶可以在此層抽象中注冊事件時間(event time)和處理時間(processing time)回調(diào)方法,從而允許程序可以實現(xiàn)復(fù)雜計算。

  • Flink API 第二層抽象是 Core APIs。實際上,許多應(yīng)用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進行編程:其中包含 DataStream API(應(yīng)用于有界/無界數(shù)據(jù)流場景)和 DataSet API(應(yīng)用于有界數(shù)據(jù)集場景)兩部分。Core APIs 提供的流式 API(Fluent API)為數(shù)據(jù)處理提供了通用的模塊組件,例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等。此層 API 中處理的數(shù)據(jù)類型在每種編程語言中都有其對應(yīng)的類。
    Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來實現(xiàn)自己的需求。DataSet API 還額外提供了一些原語,比如循環(huán)/迭代(loop/iteration)操作。

  • Flink API 第三層抽象是 Table API。Table API 是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數(shù)據(jù)場景下,它可以表示一張正在動態(tài)改變的表。Table API 遵循(擴展)關(guān)系模型:即表擁有 schema(類似于關(guān)系型數(shù)據(jù)庫中的 schema),并且 Table API 也提供了類似于關(guān)系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應(yīng)執(zhí)行的邏輯操作,而不是確切地指定程序應(yīng)該執(zhí)行的代碼。盡管 Table API 使用起來很簡潔并且可以由各種類型的用戶自定義函數(shù)擴展功能,但還是比 Core API 的表達能力差。此外,Table API 程序在執(zhí)行之前還會使用優(yōu)化器中的優(yōu)化規(guī)則對用戶編寫的表達式進行優(yōu)化。
    表和 DataStream/DataSet 可以進行無縫切換,F(xiàn)link 允許用戶在編寫應(yīng)用程序時將 Table API 與 DataStream/DataSet API 混合使用。

  • Flink API 最頂層抽象是 SQL。這層抽象在語義和程序表達式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達式。SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的,并且 SQL 查詢語句可以在 Table API 中定義的表上執(zhí)行。

 DataStream/DateSet API

Flink中的DataStream和DataSet程序是常規(guī)程序,可對數(shù)據(jù)流實施轉(zhuǎn)換(例如,過濾,更新狀態(tài),定義窗口,聚合)。最初從各種來源(例如,消息隊列,套接字流,文件)創(chuàng)建數(shù)據(jù)流。結(jié)果通過接收器返回,接收器可以例如將數(shù)據(jù)寫入文件或標準輸出(例如命令行終端)。Flink程序可在各種上下文中運行,獨立運行或嵌入其他程序中。執(zhí)行可以在本地JVM或許多計算機的群集中進行。

預(yù)定義的 Source 和 Sink

一些比較基本的 Source 和 Sink 已經(jīng)內(nèi)置在 Flink 里。 預(yù)定義 data sources 支持從文件、目錄、socket,以及 collections 和 iterators 中讀取數(shù)據(jù)。 預(yù)定義 data sinks 支持把數(shù)據(jù)寫入文件、標準輸出(stdout)、標準錯誤輸出(stderr)和 socket。

官方文檔

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/

 DataStream/DateSet API開發(fā)

從本篇開始,增加DataStream/DateSet API演示內(nèi)容,在原有的工程基礎(chǔ)上,擴展一個connectors模塊;此模塊會演示以下幾個組件簡單使用;

  • elasticsearch

  • file(text, csv)

  • kafka

  • jdbc (mysql)

  • rabbitmq

  • redis

新增connectors模塊

在當前工程中,創(chuàng)建名稱為connectors的maven工程模塊

pom.xml

   <artifactId>connectors</artifactId>

    <dependencies>
        <!-- Flink jdbc依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
        <!-- mysql驅(qū)動包 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!-- kafka依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- redis依賴 -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <!-- rabbitMq依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- elasticsearch7依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

刷新工程maven,下載相關(guān)功能依賴組件包;

創(chuàng)建用戶表(演示使用)

-- 數(shù)所據(jù)庫 flink 下創(chuàng)建用戶表
CREATE TABLE `t_user` (
  `id` int(8) NOT NULL AUTO_INCREMENT,
  `name` varchar(40) DEFAULT NULL,
  `age` int(3) DEFAULT NULL,
  `sex` int(2) DEFAULT NULL,
  `address` varchar(40) DEFAULT NULL,
  `createTime` timestamp NULL DEFAULT NULL,
  `createTimeSeries` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

創(chuàng)建實體Bean(演示使用)

TUser.java

package com.flink.examples;

/**
 * @Description t_user表數(shù)據(jù)封裝類
 */
public class TUser {

    private Integer id;
    private String name;
    private Integer age;
    private Integer sex;
    private String address;
    private Long createTimeSeries;

    public TUser(){}

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public Integer getSex() {
        return sex;
    }

    public void setSex(Integer sex) {
        this.sex = sex;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Long getCreateTimeSeries() {
        return createTimeSeries;
    }

    public void setCreateTimeSeries(Long createTimeSeries) {
        this.createTimeSeries = createTimeSeries;
    }

    @Override
    public String toString() {
        return "TUser{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", sex=" + sex +
                ", address='" + address + '\'' +
                ", createTimeSeries=" + createTimeSeries +
                '}';
    }
}

TCount.java

package com.flink.examples;

/**
 * @Description 統(tǒng)計表
 */
public class TCount {

    /**
     * 性別
     */
    private Integer sex;
    /**
     * 數(shù)量
     */
    private Integer num;

    public TCount(){}

    public TCount(Integer sex, Integer num){
        this.sex = sex;
        this.num = num;
    }

    public Integer getSex() {
        return sex;
    }

    public void setSex(Integer sex) {
        this.sex = sex;
    }

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }
}

工程模塊

Flink如何新增connectors模塊

“Flink如何新增connectors模塊”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI