溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink如何實現(xiàn)雙流 join

發(fā)布時間:2021-12-31 10:39:09 來源:億速云 閱讀:146 作者:小新 欄目:大數(shù)據(jù)

這篇文章給大家分享的是有關(guān)Flink如何實現(xiàn)雙流 join的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

在數(shù)據(jù)庫中的靜態(tài)表上做 OLAP 分析時,兩表 join 是非常常見的操作。同理,在流式處理作業(yè)中,有時也需要在兩條流上做 join 以獲得更豐富的信息。Flink DataStream API 為用戶提供了3個算子來實現(xiàn)雙流 join,分別是:

  • join()

  • coGroup()

  • intervalJoin()

準(zhǔn)備數(shù)據(jù)

從 Kafka 分別接入點擊流和訂單流,并轉(zhuǎn)化為 POJO。

DataStream<String> clickSourceStream = env
  .addSource(new FlinkKafkaConsumer011<>(
    "ods_analytics_access_log",
    new SimpleStringSchema(),
    kafkaProps
  ).setStartFromLatest());
DataStream<String> orderSourceStream = env
  .addSource(new FlinkKafkaConsumer011<>(
    "ods_ms_order_done",
    new SimpleStringSchema(),
    kafkaProps
  ).setStartFromLatest());

DataStream<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream
  .map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class));
DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceStream
  .map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));

join()

join() 算子提供的語義為"Window join",即按照指定字段和(滾動/滑動/會話)窗口進(jìn)行 inner join,支持處理時間和事件時間兩種時間特征。以下示例以10秒滾動窗口,將兩個流通過商品 ID 關(guān)聯(lián),取得訂單流中的售價相關(guān)字段。

Flink如何實現(xiàn)雙流 join

clickRecordStream
  .join(orderRecordStream)
  .where(record -> record.getMerchandiseId())
  .equalTo(record -> record.getMerchandiseId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  .apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception {
      return StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t');
    }
  })
  .print().setParallelism(1);

簡單易用。

coGroup()

只有 inner join 肯定還不夠,如何實現(xiàn) left/right outer join 呢?答案就是利用 coGroup() 算子。它的調(diào)用方式類似于 join() 算子,也需要開窗,但是 CoGroupFunction 比 JoinFunction 更加靈活,可以按照用戶指定的邏輯匹配左流和/或右流的數(shù)據(jù)并輸出。

以下的例子就實現(xiàn)了點擊流 left join 訂單流的功能,是很樸素的 nested loop join 思想(二重循環(huán))。

clickRecordStream
  .coGroup(orderRecordStream)
  .where(record -> record.getMerchandiseId())
  .equalTo(record -> record.getMerchandiseId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
  .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords, Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>> collector) throws Exception {
      for (AnalyticsAccessLogRecord accessRecord : accessRecords) {
        boolean isMatched = false;
        for (OrderDoneLogRecord orderRecord : orderRecords) {
          // 右流中有對應(yīng)的記錄
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), orderRecord.getPrice()));
          isMatched = true;
        }
        if (!isMatched) {
          // 右流中沒有對應(yīng)的記錄
          collector.collect(new Tuple2<>(accessRecord.getMerchandiseName(), null));
        }
      }
    }
  })
  .print().setParallelism(1);

intervalJoin()

join() 和 coGroup() 都是基于窗口做關(guān)聯(lián)的。但是在某些情況下,兩條流的數(shù)據(jù)步調(diào)未必一致。例如,訂單流的數(shù)據(jù)有可能在點擊流的購買動作發(fā)生之后很久才被寫入,如果用窗口來圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的語義,按照指定字段以及右流相對左流偏移的時間區(qū)間進(jìn)行關(guān)聯(lián),即:

right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

Flink如何實現(xiàn)雙流 join

interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區(qū)間的上下界,并且只支持事件時間。

示例代碼如下。注意在運行之前,需要分別在兩個流上應(yīng)用 assignTimestampsAndWatermarks() 方法獲取事件時間戳和水印。

clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  .between(Time.seconds(-30), Time.seconds(30))
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }
  })
  .print().setParallelism(1);

由上可見,interval join 與 window join 不同,是兩個 KeyedStream 之上的操作,并且需要調(diào)用 between() 方法指定偏移區(qū)間的上下界。如果想令上下界是開區(qū)間,可以調(diào)用 upperBoundExclusive()/lowerBoundExclusive() 方法。

interval join 的實現(xiàn)原理

以下是 KeyedStream.process(ProcessJoinFunction) 方法調(diào)用的重載方法的邏輯。

public <OUT> SingleOutputStreamOperator<OUT> process(
        ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
        TypeInformation<OUT> outputType) {
    Preconditions.checkNotNull(processJoinFunction);
    Preconditions.checkNotNull(outputType);
    final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
    final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
        new IntervalJoinOperator<>(
            lowerBound,
            upperBound,
            lowerBoundInclusive,
            upperBoundInclusive,
            left.getType().createSerializer(left.getExecutionConfig()),
            right.getType().createSerializer(right.getExecutionConfig()),
            cleanedUdf
        );
    return left
        .connect(right)
        .keyBy(keySelector1, keySelector2)
        .transform("Interval Join", outputType, operator);
}

可見是先對兩條流執(zhí)行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子進(jìn)行轉(zhuǎn)換。在 IntervalJoinOperator 中,會利用兩個 MapState 分別緩存左流和右流的數(shù)據(jù)。

private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;

@Override
public void initializeState(StateInitializationContext context) throws Exception {
    super.initializeState(context);
    this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
        LEFT_BUFFER,
        LongSerializer.INSTANCE,
        new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
    ));
    this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
        RIGHT_BUFFER,
        LongSerializer.INSTANCE,
        new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
    ));
}

其中 Long 表示事件時間戳,List> 表示該時刻到來的數(shù)據(jù)記錄。當(dāng)左流和右流有數(shù)據(jù)到達(dá)時,會分別調(diào)用 processElement1() 和 processElement2() 方法,它們都調(diào)用了 processElement() 方法,代碼如下。

@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
    processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}

@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
    processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}

@SuppressWarnings("unchecked")
private <THIS, OTHER> void processElement(
        final StreamRecord<THIS> record,
        final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
        final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
        final long relativeLowerBound,
        final long relativeUpperBound,
        final boolean isLeft) throws Exception {
    final THIS ourValue = record.getValue();
    final long ourTimestamp = record.getTimestamp();
    if (ourTimestamp == Long.MIN_VALUE) {
        throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                "interval stream joins need to have timestamps meaningful timestamps.");
    }
    if (isLate(ourTimestamp)) {
        return;
    }
    addToBuffer(ourBuffer, ourValue, ourTimestamp);
    for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
        final long timestamp  = bucket.getKey();
        if (timestamp < ourTimestamp + relativeLowerBound ||
                timestamp > ourTimestamp + relativeUpperBound) {
            continue;
        }
        for (BufferEntry<OTHER> entry: bucket.getValue()) {
            if (isLeft) {
                collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
            } else {
                collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
            }
        }
    }
    long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
    if (isLeft) {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
    } else {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
    }
}

這段代碼的思路是:

  1. 取得當(dāng)前流 StreamRecord 的時間戳,調(diào)用 isLate() 方法判斷它是否是遲到數(shù)據(jù)(即時間戳小于當(dāng)前水印值),如是則丟棄。

  2. 調(diào)用 addToBuffer() 方法,將時間戳和數(shù)據(jù)一起插入當(dāng)前流對應(yīng)的 MapState。

  3. 遍歷另外一個流的 MapState,如果數(shù)據(jù)滿足前述的時間區(qū)間條件,則調(diào)用 collect() 方法將該條數(shù)據(jù)投遞給用戶定義的 ProcessJoinFunction 進(jìn)行處理。collect() 方法的代碼如下,注意結(jié)果對應(yīng)的時間戳是左右流時間戳里較大的那個。

private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
    final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
    collector.setAbsoluteTimestamp(resultTimestamp);
    context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);
    userFunction.processElement(left, right, context, collector);
}
  1. 調(diào)用 TimerService.registerEventTimeTimer() 注冊時間戳為 timestamp + relativeUpperBound 的定時器,該定時器負(fù)責(zé)在水印超過區(qū)間的上界時執(zhí)行狀態(tài)的清理邏輯,防止數(shù)據(jù)堆積。注意左右流的定時器所屬的 namespace 是不同的,具體邏輯則位于 onEventTime() 方法中。

@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
    long timerTimestamp = timer.getTimestamp();
    String namespace = timer.getNamespace();
    logger.trace("onEventTime @ {}", timerTimestamp);
    switch (namespace) {
        case CLEANUP_NAMESPACE_LEFT: {
            long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
            logger.trace("Removing from left buffer @ {}", timestamp);
            leftBuffer.remove(timestamp);
            break;
        }
        case CLEANUP_NAMESPACE_RIGHT: {
            long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
            logger.trace("Removing from right buffer @ {}", timestamp);
            rightBuffer.remove(timestamp);
            break;
        }
        default:
            throw new RuntimeException("Invalid namespace " + namespace);
    }
}

感謝各位的閱讀!關(guān)于“Flink如何實現(xiàn)雙流 join”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI