您好,登錄后才能下訂單哦!
? Async I/O 是阿里巴巴貢獻(xiàn)給社區(qū)的一個(gè)呼聲非常高的特性,于1.2版本引入。主要目的是為了解決數(shù)據(jù)流與外部系統(tǒng)交互時(shí)的通信延遲(比如等待外部系統(tǒng)的響應(yīng))成為了系統(tǒng)瓶頸的問題。對(duì)于實(shí)時(shí)處理,當(dāng)需要使用外部存儲(chǔ)數(shù)據(jù)的時(shí)候,需要小心對(duì)待,不能讓與外部系統(tǒng)之間的交互延遲對(duì)流處理的整個(gè)工作進(jìn)度起決定性的影響。
? 例如,在mapfunction等算子里訪問外部存儲(chǔ),實(shí)際上該交互過程是同步的:比如請(qǐng)求a發(fā)送到數(shù)據(jù)庫(kù),那么mapfunction會(huì)一直等待響應(yīng)。在很多案例中,這個(gè)等待過程是非常浪費(fèi)函數(shù)時(shí)間的。與數(shù)據(jù)庫(kù)異步交互,意味著單個(gè)函數(shù)實(shí)例可以并發(fā)處理很多請(qǐng)求,同時(shí)并發(fā)接收響應(yīng)。那么,等待的時(shí)候由于也會(huì)發(fā)送其它請(qǐng)求和接收其它響應(yīng),被重復(fù)使用而節(jié)省了時(shí)間。至少,等待時(shí)間在多個(gè)請(qǐng)求上被攤銷。這就使得很多使用案例具有更高的吞吐量。
? 圖1.1 flink--異步IO
注意:通過增加MapFunction的到一個(gè)較大的并行度也是可以改善吞吐量的,但是這就意味著更高的資源開銷:更多的MapFunction實(shí)例意味著更多的task,線程,flink內(nèi)部網(wǎng)絡(luò)連接,數(shù)據(jù)庫(kù)的鏈接,緩存,更多內(nèi)部狀態(tài)開銷。
使用flink的異步IO時(shí),需要所連接的數(shù)據(jù)庫(kù)支持異步客戶端。幸運(yùn)的是很多流行的數(shù)據(jù)庫(kù)支持這樣的客戶端。假如沒有異步客戶端,也可以創(chuàng)建多個(gè)同步客戶端,放到線程池里,使用線程池來完成異步功能。當(dāng)然,該種方式相對(duì)于異步客戶端更低效。
? flink異步IO的API支持用戶在data stream中使用異步請(qǐng)求客戶端。API自身處理與數(shù)據(jù)流的整合,消息順序,時(shí)間時(shí)間,容錯(cuò)等。
假如有目標(biāo)數(shù)據(jù)庫(kù)的異步客戶端,使用異步IO,需要實(shí)現(xiàn)一下三步:
1、實(shí)現(xiàn)AsyncFunction或者RichAsyncFunction,該函數(shù)實(shí)現(xiàn)了請(qǐng)求異步分發(fā)的功能。
2、一個(gè)callback回調(diào),該函數(shù)取回操作的結(jié)果,然后傳遞給ResultFuture。
3、對(duì)DataStream使用異步IO操作。
可以看看AsyncFunction這個(gè)接口的源碼
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
void asyncInvoke(IN var1, ResultFuture<OUT> var2) throws Exception;
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
}
}
主要需要實(shí)現(xiàn)兩個(gè)方法:
void asyncInvoke(IN var1, ResultFuture<OUT> var2):
這是真正實(shí)現(xiàn)外部操作邏輯的方法,var1是輸入的參數(shù),var2則是返回結(jié)果的集合
default void timeout(IN input, ResultFuture<OUT> resultFuture)
這是當(dāng)異步請(qǐng)求超時(shí)的時(shí)候,會(huì)調(diào)用這個(gè)方法。參數(shù)的用途和上面一樣
而RichAsyncFunction由于繼承了RichAsyncFunction類,所以還提供了open和close這兩個(gè)方法,一般我們的用法是,open方法中創(chuàng)建連接外部存儲(chǔ)的client連接(比如連接mysql的jdbc連接),close 用于關(guān)閉client連接,至于asyncInvoke和timeout兩個(gè)方法的用法和上面一樣,這里不重復(fù)。一般我們常用的是RichAsyncFunction。
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// 將異步IO類應(yīng)用于數(shù)據(jù)流
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
要注意的是,最后需要將查詢到的數(shù)據(jù)放入 resultFuture 中,即通過resultFuture.complete
來將結(jié)果傳遞給框架。第一次調(diào)用 ResultFuture.complete的時(shí)候 ResultFuture就會(huì)完成。所有后續(xù)的complete調(diào)用都會(huì)被忽略。
參數(shù)有4個(gè),in、asyncObject、timeout、timeUnit、capacity
in:輸入的數(shù)據(jù)流
asyncObject:異步IO操作類對(duì)象
timeout:
異步IO請(qǐng)求被視為失敗的超時(shí)時(shí)間,超過該時(shí)間異步請(qǐng)求就算失敗。該參數(shù)主要是為了剔除死掉或者失敗的請(qǐng)求。
timeUnit:時(shí)間的單位,例如TimeUnit.MICROSECONDS,表示毫秒
capacity:
該參數(shù)定義了同時(shí)最多有多少個(gè)異步請(qǐng)求在處理。即使異步IO的方式會(huì)導(dǎo)致更高的吞吐量,但是對(duì)于實(shí)時(shí)應(yīng)用來說該操作也是一個(gè)瓶頸。限制并發(fā)請(qǐng)求數(shù),算子不會(huì)積壓過多的未處理請(qǐng)求,但是一旦超過容量的顯示會(huì)觸發(fā)背壓。
當(dāng)一個(gè)異步IO請(qǐng)求多次超時(shí),默認(rèn)情況下會(huì)拋出一個(gè)異常,然后重啟job。如果想處理超時(shí),可以覆蓋AsyncFunction.timeout方法。
AsyncFunction發(fā)起的并發(fā)請(qǐng)求完成的順序是不可預(yù)期的。為了控制結(jié)果發(fā)送的順序,flink提供了兩種模式:
1). Unordered
結(jié)果記錄在異步請(qǐng)求結(jié)束后立刻發(fā)送。流中的數(shù)據(jù)在經(jīng)過該異步IO操作后順序就和以前不一樣了,也就是請(qǐng)求的順序和請(qǐng)求結(jié)果的順序的不能保證一致。當(dāng)使用處理時(shí)間作為基礎(chǔ)時(shí)間特性的時(shí)候,該方式具有極低的延遲和極低的負(fù)載。調(diào)用方式AsyncDataStream.unorderedWait(...)
2). Ordered
該種方式流的順序會(huì)被保留。結(jié)果記錄發(fā)送的順序和異步請(qǐng)求被觸發(fā)的順序一樣,該順序就是原來流中事件的順序。為了實(shí)現(xiàn)該目標(biāo),操作算子會(huì)在該結(jié)果記錄之前的記錄為發(fā)送之前緩存該記錄。這往往會(huì)引入額外的延遲和一些Checkpoint負(fù)載,因?yàn)橄啾扔跓o序模式結(jié)果記錄會(huì)保存在Checkpoint狀態(tài)內(nèi)部較長(zhǎng)的時(shí)間。調(diào)用方式AsyncDataStream.orderedWait(...)
當(dāng)使用事件時(shí)間的時(shí)候,異步IO操作也會(huì)正確的處理watermark機(jī)制。這就意味著兩種order模式的具體操作如下:
1). Unordered
watermark不會(huì)超過記錄,意味著watermark建立了一個(gè)order邊界。記錄僅會(huì)在兩個(gè)watermark之間無序發(fā)射。當(dāng)前watermark之后的記錄僅會(huì)在當(dāng)前watermark發(fā)送之后發(fā)送。watermark也僅會(huì)在該watermark之前的所有記錄發(fā)射完成之后發(fā)送。這就意味著在存在watermark的情況下,無序模式引入了一些與有序模式相同的延遲和管理開銷。開銷的大小取決于watermark的頻率。也就是watermark之間是有序的,但是同一個(gè)watermark內(nèi)部的請(qǐng)求是無序的
2). Ordered
watermark的順序就如記錄的順序一樣被保存。與處理時(shí)間相比,開銷沒有顯著變化。請(qǐng)記住,注入時(shí)間 Ingestion Time是基于源處理時(shí)間自動(dòng)生成的watermark事件時(shí)間的特殊情況。
異步IO操作提供了僅一次處理的容錯(cuò)擔(dān)保。它會(huì)將在傳出的異步IO請(qǐng)求保存于Checkpoint,然后故障恢復(fù)的時(shí)候從Checkpoint中恢復(fù)這些請(qǐng)求。
1、maven的pom依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>SparkDemo</groupId>
<artifactId>SparkDemoTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.1.0</spark.version>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.0</version>
</dependency>
<!--因?yàn)閟park和es默認(rèn)依賴的netty版本不一致,前者使用3.x版本,后者使用4.1.32版本
所以導(dǎo)致es使用的是3.x版本,有些方法不兼容,這里直接使用使用新版本,否則報(bào)錯(cuò)-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.32.Final</version>
</dependency>
<!--flink-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql異步客戶端-->
<!-- https://mvnrepository.com/artifact/io.vertx/vertx-core -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>3.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.vertx/vertx-web -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>3.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
<!--下面這是maven打包scala的插件,一定要,否則直接忽略scala代碼-->
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、源代碼
目標(biāo)mysql表的格式為:
id name
1 king
2 tao
3 ming
需要根據(jù)name查詢到id
代碼:
package flinktest;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLClient;
import io.vertx.ext.sql.SQLConnection;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* flink 異步IO demo:使用異步IO和mysql交互
* 因?yàn)槠胀ǖ膉dbc客戶端不支持異步方式,所以這里引入vertx
* 的異步j(luò)dbc client(異步IO要求客戶端支持異步操作)
*
* 實(shí)現(xiàn)目標(biāo):根據(jù)數(shù)據(jù)源,使用異步IO從mysql查詢對(duì)應(yīng)的數(shù)據(jù), 然后打印出來
*/
public class AsyncToMysql {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> sourceList = new ArrayList<>();
//構(gòu)建數(shù)據(jù)源查詢條件,后面用來作為sql查詢中where的查詢值
sourceList.add("king");
sourceList.add("tao");
DataStreamSource<String> source = env.fromCollection(sourceList);
//調(diào)用異步IO處理類
DataStream<JsonObject> result = AsyncDataStream.unorderedWait(
source,
new MysqlAsyncFunc(),
10, //這里超時(shí)時(shí)長(zhǎng)如果在本地idea跑的話不要設(shè)置得太短,因?yàn)楸镜貓?zhí)行延遲比較大
TimeUnit.SECONDS,
20).setParallelism(1);
result.print();
try {
env.execute("TEST async");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 繼承 RichAsyncFunction類,編寫自定義的異步IO處理類
*/
private static class MysqlAsyncFunc extends RichAsyncFunction<String, JsonObject> {
private transient SQLClient mysqlClient;
private Cache<String, String> cache;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//構(gòu)建mysql查詢緩存,這里使用Caffeine這個(gè)高性能緩存庫(kù)
cache = Caffeine
.newBuilder()
.maximumSize(1025)
.expireAfterAccess(10, TimeUnit.MINUTES) //設(shè)置緩存過期時(shí)間
.build();
//構(gòu)建mysql jdbc連接
JsonObject mysqlClientConfig = new JsonObject();
//設(shè)置jdbc連接參數(shù)
mysqlClientConfig.put("url", "jdbc:mysql://192.168.50.121:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true")
.put("driver_class", "com.mysql.cj.jdbc.Driver")
.put("max_pool_size", 20)
.put("user", "root")
.put("password", "xxxxx");
//設(shè)置vertx的工作參數(shù),比如線程池大小
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(10);
vo.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(vo);
mysqlClient = JDBCClient.createNonShared(vertx, mysqlClientConfig);
if (mysqlClient != null) {
System.out.println("連接mysql成功!!!");
}
}
//清理環(huán)境
@Override
public void close() throws Exception {
super.close();
//關(guān)閉mysql連接,清除緩存
if (mysqlClient != null) {
mysqlClient.close();
}
if (cache != null) {
cache.cleanUp();
}
}
@Override
public void asyncInvoke(String input, ResultFuture<JsonObject> resultFuture) throws Exception {
System.out.println("key is:" + input);
String key = input;
//先從緩存中查找,找到就直接返回
String cacheIfPresent = cache.getIfPresent(key);
JsonObject output = new JsonObject();
if (cacheIfPresent != null) {
output.put("name", key);
output.put("id-name", cacheIfPresent);
resultFuture.complete(Collections.singleton(output));
//return;
}
System.out.println("開始查詢");
mysqlClient.getConnection(conn -> {
if (conn.failed()) {
resultFuture.completeExceptionally(conn.cause());
//return;
}
final SQLConnection sqlConnection = conn.result();
//拼接查詢語(yǔ)句
String querySql = "select id,name from customer where name='" + key + "'";
System.out.println("執(zhí)行的sql為:" + querySql);
//執(zhí)行查詢,并獲取結(jié)果
sqlConnection.query(querySql, res -> {
if (res.failed()) {
resultFuture.completeExceptionally(null);
System.out.println("執(zhí)行失敗");
//return;
}
if (res.succeeded()) {
System.out.println("執(zhí)行成功,獲取結(jié)果");
ResultSet result = res.result();
List<JsonObject> rows = result.getRows();
System.out.println("結(jié)果個(gè)數(shù):" + String.valueOf(rows.size()));
if (rows.size() <= 0) {
resultFuture.complete(null);
//return;
}
//結(jié)果返回,并更新到緩存中
for (JsonObject row : rows) {
String name = row.getString("name");
String id = row.getInteger("id").toString();
String desc = id + "-" + name;
System.out.println("結(jié)果:" + desc);
output.put("name", key);
output.put("id-name", desc);
cache.put(key, desc);
resultFuture.complete(Collections.singleton(output));
}
} else {
//執(zhí)行失敗,返回空
resultFuture.complete(null);
}
});
//連接關(guān)閉
sqlConnection.close(done -> {
if (done.failed()) {
throw new RuntimeException(done.cause());
}
});
});
}
}
}
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。