您好,登錄后才能下訂單哦!
這篇文章主要講解了“Flink怎么將流式數(shù)據(jù)寫入redis”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink怎么將流式數(shù)據(jù)寫入redis”吧!
redis作為一個(gè)高吞吐的存儲系統(tǒng),在生產(chǎn)中有著廣泛的應(yīng)用,今天我們主要講一下如何將流式數(shù)據(jù)寫入redis,以及遇到的一些問題 解決。官方并沒有提供寫入redis的connector,所以我們采用apache的另一個(gè)項(xiàng)目bahir-flink [1]中提供的連接器來實(shí)現(xiàn)。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
這里我們主要是模擬一條用戶信息
//user,subject,province
Tuple3<String,String,String> tuple = Tuple3.of("tom", "math", "beijing");
DataStream<Tuple3<String,String,String>> dataStream = bsEnv.fromElements(tuple);
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("10.160.85.185")
// 可選 .setPassword("1234")
.setPort(6379)
.build();
InetSocketAddress host0 = new InetSocketAddress("host1", 6379);
InetSocketAddress host1 = new InetSocketAddress("host2", 6379);
InetSocketAddress host2 = new InetSocketAddress("host3", 6379);
HashSet<InetSocketAddress> set = new HashSet<>();
set.add(host0);
set.add(host1);
set.add(host2);
FlinkJedisClusterConfig config = new FlinkJedisClusterConfig.Builder().setNodes(set)
.build();
我們需要實(shí)現(xiàn)一個(gè)RedisMapper接口的類,這個(gè)類的主要功能就是將我們自己的輸入數(shù)據(jù)映射到redis的對應(yīng)的類型。
我們看下RedisMapper接口,這里面總共有三個(gè)方法:
public interface RedisMapper<T> extends Function, Serializable {
/**
* Returns descriptor which defines data type.
*
* @return data type descriptor
*/
RedisCommandDescription getCommandDescription();
/**
* Extracts key from data.
*
* @param data source data
* @return key
*/
String getKeyFromData(T data);
/**
* Extracts value from data.
*
* @param data source data
* @return value
*/
String getValueFromData(T data);
}
getCommandDescription方法返回一個(gè)RedisCommandDescription對象,我們看下RedisCommandDescription的構(gòu)造方法:
public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
................
}
public RedisCommandDescription(RedisCommand redisCommand) {
this(redisCommand, null);
}
我們以數(shù)據(jù)寫入hash結(jié)構(gòu)為例,構(gòu)造了一個(gè)key為HASH_NAME的RedisCommandDescription
new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
兩個(gè)構(gòu)造方法區(qū)別就在于是否有第二個(gè)參數(shù)additionalKey,這個(gè)參數(shù)主要是針對SORTED_SET和HASH結(jié)構(gòu)的,因?yàn)檫@兩個(gè)結(jié)構(gòu)需要有三個(gè)變量,其他的結(jié)構(gòu)只需要兩個(gè)變量就行了。
在hash結(jié)構(gòu)里,這個(gè)additionalKey對應(yīng)hash的key,getKeyFromData方法得到的數(shù)據(jù)對應(yīng)hash的field,getValueFromData獲取的數(shù)據(jù)對應(yīng)hash的value。
最后我們數(shù)據(jù)寫入對應(yīng)的redis sink即可,寫入的redis數(shù)據(jù)如下:
我們看到,上面我們構(gòu)造redis的hash結(jié)構(gòu)的時(shí)候,key是寫死的,也就是只能寫入一個(gè)key,如果我的key是動(dòng)態(tài)生成的,該怎么辦呢?
比如我有一個(gè)類似的需求,流式數(shù)據(jù)是一些學(xué)生成績信息,我的key想要的是學(xué)生的name,field是相應(yīng)的科目,而value是這個(gè)科目對應(yīng)的成績。
目前系統(tǒng)沒提供這樣的功能,不過這個(gè)也沒事,沒有什么不是改源碼解決不了的。
我們看下RedisSink中的invoke方法,
public void invoke(IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
switch (redisCommand) {
....................
case HSET:
this.redisCommandsContainer.hset(this.additionalKey, key, value);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
}
我們看到對于hash結(jié)構(gòu)來說,key和value也就是從我們的RedisMapper的實(shí)現(xiàn)類中獲取的,但是additionalKey卻不是動(dòng)態(tài)生成的,我們只需要改下這里。動(dòng)態(tài)獲取additionalKey就行。
public interface RedisMapper<T> extends Function, Serializable{
RedisCommandDescription getCommandDescription();
String getKeyFromData(T data);
String getValueFromData(T data);
String getAdditionalKey(T data);
}
我們給RedisMapper接口添加一個(gè)getAdditionalKey方法,然后在實(shí)現(xiàn)類中實(shí)現(xiàn)該方法。
然后在RedisSink的invoke方法動(dòng)態(tài)獲取additionalKey,修改源碼之后的方法如下:
@Override
public void invoke(IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
String value = redisSinkMapper.getValueFromData(input);
String additionalKey = redisSinkMapper.getAdditionalKey(input);
switch (redisCommand) {
..................
case HSET:
this.redisCommandsContainer.hset(additionalKey, key, value);
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
}
感謝各位的閱讀,以上就是“Flink怎么將流式數(shù)據(jù)寫入redis”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Flink怎么將流式數(shù)據(jù)寫入redis這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。