溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink怎么將流式數(shù)據(jù)寫入redis

發(fā)布時(shí)間:2021-12-31 10:33:37 來源:億速云 閱讀:531 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“Flink怎么將流式數(shù)據(jù)寫入redis”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Flink怎么將流式數(shù)據(jù)寫入redis”吧!

背景

redis作為一個(gè)高吞吐的存儲系統(tǒng),在生產(chǎn)中有著廣泛的應(yīng)用,今天我們主要講一下如何將流式數(shù)據(jù)寫入redis,以及遇到的一些問題 解決。官方并沒有提供寫入redis的connector,所以我們采用apache的另一個(gè)項(xiàng)目bahir-flink [1]中提供的連接器來實(shí)現(xiàn)。 

實(shí)例講解 

引入pom

 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-redis_2.11</artifactId>
   <version>1.1.5</version>
  </dependency>
     

構(gòu)造數(shù)據(jù)源

這里我們主要是模擬一條用戶信息

  //user,subject,province
  Tuple3<String,String,String> tuple = Tuple3.of("tom", "math", "beijing");
  DataStream<Tuple3<String,String,String>> dataStream = bsEnv.fromElements(tuple);
     

構(gòu)造redis配置

  • 單機(jī)配置
 FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("10.160.85.185")
                                                                // 可選 .setPassword("1234")
                                                                .setPort(6379)
                                                                .build();
 
  • 集群配置
  InetSocketAddress host0 = new InetSocketAddress("host1", 6379);
  InetSocketAddress host1 = new InetSocketAddress("host2", 6379);
  InetSocketAddress host2 = new InetSocketAddress("host3", 6379);

  HashSet<InetSocketAddress> set = new HashSet<>();
  set.add(host0);
  set.add(host1);
  set.add(host2);

  FlinkJedisClusterConfig config = new FlinkJedisClusterConfig.Builder().setNodes(set)
                                                                        .build();
     

實(shí)現(xiàn)RedisMapper

我們需要實(shí)現(xiàn)一個(gè)RedisMapper接口的類,這個(gè)類的主要功能就是將我們自己的輸入數(shù)據(jù)映射到redis的對應(yīng)的類型。

我們看下RedisMapper接口,這里面總共有三個(gè)方法:

  • getCommandDescription:主要來獲取我們寫入哪種類型的數(shù)據(jù),比如list、hash等等。
  • getKeyFromData:主要是從我們的輸入數(shù)據(jù)中抽取key
  • getValueFromData:從我們的輸入數(shù)據(jù)中抽取value
public interface RedisMapper<T> extends Function, Serializable {

 /**
  * Returns descriptor which defines data type.
  *
  * @return data type descriptor
  */
 RedisCommandDescription getCommandDescription();

 /**
  * Extracts key from data.
  *
  * @param data source data
  * @return key
  */
 String getKeyFromData(T data);

 /**
  * Extracts value from data.
  *
  * @param data source data
  * @return value
  */
 String getValueFromData(T data);
}
 

getCommandDescription方法返回一個(gè)RedisCommandDescription對象,我們看下RedisCommandDescription的構(gòu)造方法:

 public RedisCommandDescription(RedisCommand redisCommand, String additionalKey) {
        ................
 }

 public RedisCommandDescription(RedisCommand redisCommand) {
  this(redisCommand, null);
 }
 

我們以數(shù)據(jù)寫入hash結(jié)構(gòu)為例,構(gòu)造了一個(gè)key為HASH_NAME的RedisCommandDescription

new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
 

兩個(gè)構(gòu)造方法區(qū)別就在于是否有第二個(gè)參數(shù)additionalKey,這個(gè)參數(shù)主要是針對SORTED_SET和HASH結(jié)構(gòu)的,因?yàn)檫@兩個(gè)結(jié)構(gòu)需要有三個(gè)變量,其他的結(jié)構(gòu)只需要兩個(gè)變量就行了。

在hash結(jié)構(gòu)里,這個(gè)additionalKey對應(yīng)hash的key,getKeyFromData方法得到的數(shù)據(jù)對應(yīng)hash的field,getValueFromData獲取的數(shù)據(jù)對應(yīng)hash的value。

最后我們數(shù)據(jù)寫入對應(yīng)的redis sink即可,寫入的redis數(shù)據(jù)如下:

Flink怎么將流式數(shù)據(jù)寫入redis

動(dòng)態(tài)生成key

我們看到,上面我們構(gòu)造redis的hash結(jié)構(gòu)的時(shí)候,key是寫死的,也就是只能寫入一個(gè)key,如果我的key是動(dòng)態(tài)生成的,該怎么辦呢?

比如我有一個(gè)類似的需求,流式數(shù)據(jù)是一些學(xué)生成績信息,我的key想要的是學(xué)生的name,field是相應(yīng)的科目,而value是這個(gè)科目對應(yīng)的成績。

目前系統(tǒng)沒提供這樣的功能,不過這個(gè)也沒事,沒有什么不是改源碼解決不了的。

我們看下RedisSink中的invoke方法,

 public void invoke(IN input) throws Exception {
  String key = redisSinkMapper.getKeyFromData(input);
  String value = redisSinkMapper.getValueFromData(input);

  switch (redisCommand) {
      ....................
   case HSET:
    this.redisCommandsContainer.hset(this.additionalKey, key, value);
    break;
   default:
    throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
  }
 }
 

我們看到對于hash結(jié)構(gòu)來說,key和value也就是從我們的RedisMapper的實(shí)現(xiàn)類中獲取的,但是additionalKey卻不是動(dòng)態(tài)生成的,我們只需要改下這里。動(dòng)態(tài)獲取additionalKey就行。

public interface RedisMapper<T> extends Function, Serializable{

 RedisCommandDescription getCommandDescription();

 String getKeyFromData(T data);

 String getValueFromData(T data);

 String getAdditionalKey(T data);
}

 

我們給RedisMapper接口添加一個(gè)getAdditionalKey方法,然后在實(shí)現(xiàn)類中實(shí)現(xiàn)該方法。

然后在RedisSink的invoke方法動(dòng)態(tài)獲取additionalKey,修改源碼之后的方法如下:

 @Override
 public void invoke(IN input) throws Exception {
  String key = redisSinkMapper.getKeyFromData(input);
  String value = redisSinkMapper.getValueFromData(input);
  String additionalKey = redisSinkMapper.getAdditionalKey(input);
  switch (redisCommand) {
         ..................
   case HSET:
    this.redisCommandsContainer.hset(additionalKey, key, value);
    break;
   default:
    throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
  }
 }

感謝各位的閱讀,以上就是“Flink怎么將流式數(shù)據(jù)寫入redis”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Flink怎么將流式數(shù)據(jù)寫入redis這一問題有了更深刻的體會,具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI