溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Apache Flink官方文檔-Flink CEP

發(fā)布時間:2020-07-23 09:22:16 來源:網(wǎng)絡 閱讀:6098 作者:Lynn_Yuan 欄目:大數(shù)據(jù)

原文鏈接

Flink CEP

0. 本文概述簡介

FlinkCEP是在Flink之上實現(xiàn)的復雜事件處理(CEP)庫。 它允許你在×××的事件流中檢測事件模式,讓你有機會掌握數(shù)據(jù)中重要的事項。

本文描述了Flink CEP中可用的API調用。 首先介紹Pattern API,它允許你指定要在流中檢測的模式,然后介紹如何檢測匹配事件序列并對其進行操作。
然后,我們將介紹CEP庫在處理事件時間延遲時所做的假設。

1.入門

首先是要在你的pom.xml文件中,引入CEP庫。

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep_2.11</artifactId>
  <version>1.5.0</version>
</dependency>

注意要應用模式匹配的DataStream中的事件必須實現(xiàn)正確的equals()和hashCode()方法,因為FlinkCEP使用它們來比較和匹配事件。

第一個demo如下:
Java:

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition<Event>() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition<Event>() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(
    new PatternSelectFunction<Event, Alert> {
        @Override
        public Alert select(Map<String, List<Event>> pattern) throws Exception {
            return createAlertFrom(pattern);
        }
    }
});

Scala:

val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.select(createAlert(_))

2.Pattern API

Pattern API允許你定義要從輸入流中提取的復雜模式序列。

每個復雜模式序列都是由多個簡單模式組成,即尋找具有相同屬性的單個事件的模式。我們可以先定義一些簡單的模式,然后組合成復雜的模式序列。
可以將模式序列視為此類模式的結構圖,基于用戶指定的條件從一個模式轉換到下一個模式,例如, event.getName().equals("start")。
匹配是一系列輸入事件,通過一系列有效的模式轉換訪問復雜模式圖中的所有模式。

注意每個模式必須具有唯一的名稱,以便后續(xù)可以使用該名稱來標識匹配的事件。

注意模式名稱不能包含字符“:”。

在本節(jié)接下來的部分,我們將首先介紹如何定義單個模式,然后如何將各個模式組合到復雜模式中。

2.1 單個模式

Pattern可以是單單個,也可以是循環(huán)模式。單個模式接受單個事件,而循環(huán)模式可以接受多個事件。在模式匹配符號中,模式“a b + c?d”(或“a”,后跟一個或多個“b”,可選地后跟“c”,后跟“d”),a,c ?,和d是單例模式,而b +是循環(huán)模式。
默認情況下,模式是單個模式,您可以使用Quantifiers將其轉換為循環(huán)模式。每個模式可以有一個或多個條件,基于它接受事件。

2.1.1 Quantifiers

在FlinkCEP中,您可以使用以下方法指定循環(huán)模式:pattern.oneOrMore(),用于期望一個或多個事件發(fā)生的模式(例如之前提到的b +);和pattern.times(#ofTimes),
用于期望給定類型事件的特定出現(xiàn)次數(shù)的模式,例如4個;和patterntimes(#fromTimes,#toTimes),用于期望給定類型事件的最小出現(xiàn)次數(shù)和最大出現(xiàn)次數(shù)的模式,例如, 2-4。

您可以使用pattern.greedy()方法使循環(huán)模式變得貪婪,但是還不能使組模式變得貪婪。您可以使用pattern.optional()方法使得所有模式,循環(huán)與否,變?yōu)榭蛇x。

對于名為start的模式,以下是有效的Quantifiers:

 // expecting 4 occurrences
 start.times(4);

 // expecting 0 or 4 occurrences
 start.times(4).optional();

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4);

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy();

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional();

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy();

 // expecting 1 or more occurrences
 start.oneOrMore();

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy();

 // expecting 0 or more occurrences
 start.oneOrMore().optional();

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy();

 // expecting 2 or more occurrences
 start.timesOrMore(2);

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy();

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy();
2.1.2 Conditions-條件

在每個模式中,從一個模式轉到下一個模式,可以指定其他條件。您可以將使用下面這些條件:

  1. 傳入事件的屬性,例如其值應大于5,或大于先前接受的事件的平均值。

  2. 匹配事件的連續(xù)性,例如檢測模式a,b,c,序列中間不能有任何非匹配事件。
2.1.3 Conditions on Properties-關于屬性的條件

可以通過pattern.where(),pattern.or()或pattern.until()方法指定事件屬性的條件。 條件可以是IterativeConditions或SimpleConditions。

  1. 迭代條件:

這是最常見的條件類型。 你可以指定一個條件,該條件基于先前接受的事件的屬性或其子集的統(tǒng)計信息來接受后續(xù)事件。

下面代碼說的是:如果名稱以“foo”開頭同時如果該模式的先前接受的事件的價格總和加上當前事件的價格不超過該值 5.0,則迭代條件接受名為“middle”的模式的下一個事件,。
迭代條件可以很強大的,尤其是與循環(huán)模式相結合,例如, oneOrMore()。
Java

middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
        if (!value.getName().startsWith("foo")) {
            return false;
        }

        double sum = value.getPrice();
        for (Event event : ctx.getEventsForPattern("middle")) {
            sum += event.getPrice();
        }
        return Double.compare(sum, 5.0) < 0;
    }
});

Scala:

middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )

注意對context.getEventsForPattern(...)的調用,將為給定潛在匹配項查找所有先前接受的事件。 此操作的代價可能會變化巨大,因此在使用條件時,請盡量減少其使用。

  1. 簡單條件:

這種類型的條件擴展了前面提到的IterativeCondition類,并且僅根據(jù)事件本身的屬性決定是否接受事件。

start.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return value.getName().startsWith("foo");
    }
});

最后,還可以通過pattern.subtype(subClass)方法將接受事件的類型限制為初始事件類型的子類型。
Java:

start.subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
    @Override
    public boolean filter(SubEvent value) {
        return ... // some condition
    }
});

Scala:

start.where(event => event.getName.startsWith("foo"))
  1. 組合條件:

如上所示,可以將子類型條件與其他條件組合使用。 這適用于所有條件。 您可以通過順序調用where()來任意組合條件。
最終結果將是各個條件的結果的邏輯AND。 要使用OR組合條件,可以使用or()方法,如下所示。

pattern.where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // some condition
    }
}).or(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) {
        return ... // or condition
    }
});

Scala:

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
  1. 停止條件:

在循環(huán)模式(oneOrMore()和oneOrMore().optional())的情況下,還可以指定停止條件,例如: 接受值大于5的事件,直到值的總和小于50。

為了更好的理解,可以看看下面的例子:

給定模式:(a+ until b),b之前,要出現(xiàn)一個或者多個a。

給定輸入序列:a1,c,a2,b,a3

輸出結果: {a1 a2}{a1}{a2}{a3}

可以看到{a1,a2,a3},{a2,a3}這兩個并沒有輸出,這就是停止條件的作用。

  1. 連續(xù)事件條件

FlinkCEP支持事件之間以下形式進行連續(xù):

  • 嚴格連續(xù)性:希望所有匹配事件一個接一個地出現(xiàn),中間沒有任何不匹配的事件。

  • 寬松連續(xù)性:忽略匹配的事件之間出現(xiàn)的不匹配事件。 不能忽略兩個事件之間的匹配事件。

  • 非確定性輕松連續(xù)性:進一步放寬連續(xù)性,允許忽略某些匹配事件的其他匹配。

為了解釋上面的內容,我們舉個例子。假如有個模式序列"a+ b",輸入序列"a1,c,a2,b",不同連續(xù)條件下有不同的區(qū)別:

  1. 嚴格連續(xù)性:{a2 b} - 由于c的存在導致a1被廢棄

  2. 寬松連續(xù)性:{a1,b}和{a1 a2 b} - c被忽略

  3. 非確定性寬松連續(xù)性:{a1 b}, {a2 b}, 和 {a1 a2 b}

對于循環(huán)模式(例如oneOrMore()和times()),默認是寬松的連續(xù)性。 如果你想要嚴格的連續(xù)性,你必須使用consecutive()顯式指定它,
如果你想要非確定性的松弛連續(xù)性,你可以使用allowCombinations()方法。

注意在本節(jié)中,我們討論的是單個循環(huán)模式中的連續(xù)性,并且需要在該上下文中理解consecutive()和allowCombinations()。
稍后在講解組合模式時,我們將討論其他方法,例如next()和followedBy(),用于指定模式之間的連續(xù)條件。

2.1.4 API簡介
  1. where(condition)

定義當前模式的條件。 為了匹配模式,事件必須滿足條件。 多個連續(xù)的where(),其條件為AND:

pattern.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
});
  1. or(condition)

添加與現(xiàn)有條件進行OR運算的新條件。 只有在至少通過其中一個條件時,事件才能匹配該模式:

pattern.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // some condition
    }
}).or(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
  1. until(condition)

指定循環(huán)模式的停止條件。 意味著如果匹配給定條件的事件發(fā)生,則不再接受該模式中的事件。

僅適用于oneOrMore()

注意:它允許在基于事件的條件下清除相應模式的狀態(tài)。

pattern.oneOrMore().until(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
  1. subtype(subClass)

定義當前模式的子類型條件。 如果事件屬于此子類型,則事件只能匹配該模式:

pattern.subtype(SubEvent.class);
  1. oneOrMore()

指定此模式至少發(fā)生一次匹配事件。

默認情況下,使用寬松的內部連續(xù)性。

注意:建議使用until()或within()來啟用狀態(tài)清除

pattern.oneOrMore().until(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
        return ... // alternative condition
    }
});
  1. timesOrMore(#times)

指定此模式至少需要#times次出現(xiàn)匹配事件。

默認情況下,使用寬松的內部連續(xù)性(在后續(xù)事件之間)。

pattern.timesOrMore(2);
  1. times(#ofTimes)

指定此模式需要匹配事件的確切出現(xiàn)次數(shù)。

默認情況下,使用寬松的內部連續(xù)性(在后續(xù)事件之間)。

pattern.times(2);
  1. times(#fromTimes, #toTimes)

指定此模式期望在匹配事件的#fromTimes次和#toTimes次之間出現(xiàn)。

默認情況下,使用寬松的內部連續(xù)性。

pattern.times(2, 4);
  1. optional()

指定此模式是可選的,即有可能根本不會發(fā)生。 這適用于所有上述量詞。

pattern.oneOrMore().optional();
  1. greedy()

指定此模式是貪婪的,即它將盡可能多地重復。 這僅適用于quantifiers,目前不支持組模式。

pattern.oneOrMore().greedy();
  1. consecutive()

與oneOrMore()和times()一起使用并在匹配事件之間強加嚴格的連續(xù)性,即任何不匹配的元素都會中斷匹配。

如果不使用,則使用寬松的連續(xù)性(如followBy())。

例如,這樣的模式:

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

針對上面的模式,我們假如輸入序列如:C D A1 A2 A3 D A4 B

使用consecutive:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

不使用:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

  1. allowCombinations()

與oneOrMore()和times()一起使用,并在匹配事件之間強加非確定性寬松連續(xù)性(如 followedByAny())。

如果不應用,則使用寬松的連續(xù)性(如followBy())。

例如,這樣的模式:

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

針對上面的模式,我們假如輸入序列如:C D A1 A2 A3 D A4 B

使用allowCombinations:{C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}

不使用:{C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

2.2 組合模式

2.2.1 簡介

已經了解了單個模式的樣子,現(xiàn)在是時候看看如何將它們組合成一個完整的模式序列。

模式序列必須以初始模式開始,如下所示:

Pattern<Event, ?> start = Pattern.<Event>begin("start");

接下來,您可以通過指定它們之間所需的連續(xù)條件,為模式序列添加更多模式。 在上一節(jié)中,我們描述了Flink支持的不同鄰接模式,即嚴格,寬松和非確定性寬松,以及如何在循環(huán)模式中應用它們。
要在連續(xù)模式之間應用它們,可以使用:

next() 對應 嚴格,
followedBy() 對應 寬松連續(xù)性
followedByAny() 對應 非確定性寬松連續(xù)性

亦或

notNext() 如果不希望一個事件類型緊接著另一個類型出現(xiàn)。
notFollowedBy() 不希望兩個事件之間任何地方出現(xiàn)該事件。

注意 模式序列不能以notFollowedBy()結束。

注意 NOT模式前面不能有可選模式。

// strict contiguity
Pattern<Event, ?> strict = start.next("middle").where(...);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);

// NOT pattern with strict contiguity
Pattern<Event, ?> strictNot = start.notNext("not").where(...);

// NOT pattern with relaxed contiguity
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

寬松連續(xù)性指的是僅第一個成功匹配的事件會被匹配到,然而非確定性寬松連續(xù)性,相同的開始會有多個匹配結果發(fā)出。距離,如果一個模式是"a b",給定輸入序列是"a c b1 b2"。對于不同連續(xù)性會有不同輸出。

  1. a和b之間嚴格連續(xù)性,將會返回{},也即是沒有匹配。因為c的出現(xiàn)導致a,拋棄了。

  2. a和b之間寬松連續(xù)性,返回的是{a,b1},因為寬松連續(xù)性將會拋棄為匹配成功的元素,直至匹配到下一個要匹配的事件。

  3. a和b之間非確定性寬松連續(xù)性,返回的是{a,b1},{a,b2}。

也可以為模式定義時間約束。 例如,可以通過pattern.within()方法定義模式應在10秒內發(fā)生。 時間模式支持處理時間和事件時間。
注意模式序列只能有一個時間約束。 如果在不同的單獨模式上定義了多個這樣的約束,則應用最小的約束。

next.within(Time.seconds(10));

可以為begin,followBy,followByAny和next定義一個模式序列作為條件。模式序列將被邏輯地視為匹配條件,而且將返回GroupPattern并且
可對GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。


PatternPatte <Event, ?> start = Pattern.begin(
    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);

// strict contiguity
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
2.2.2 API
  1. begin(#name)

定義一個開始模式

Pattern<Event, ?> start = Pattern.<Event>begin("start");
  1. begin(#pattern_sequence)

定義一個開始模式

Pattern<Event, ?> start = Pattern.<Event>begin(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
  1. next(#name)

追加一個新的模式。匹配事件必須直接跟著先前的匹配事件(嚴格連續(xù)性):

Pattern<Event, ?> next = start.next("middle");
  1. next(#pattern_sequence)

追加一個新的模式。匹配事件必須直接接著先前的匹配事件(嚴格連續(xù)性):

Pattern<Event, ?> next = start.next(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
  1. followedBy(#name)

追加加新模式。 匹配事件和先前匹配事件(寬松連續(xù))之間可能發(fā)生其他非匹配事件:

Pattern<Event, ?> followedBy = start.followedBy("middle");
  1. followedBy(#pattern_sequence)

追加新模式。 匹配事件和先前匹配事件(寬松連續(xù))之間可能發(fā)生其他非匹配事件:

Pattern<Event, ?> followedBy = start.followedBy(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
  1. followedByAny(#name)

添加新模式。 匹配事件和先前匹配事件之間可能發(fā)生其他事件,并且將針對每個備選匹配事件(非確定性放松連續(xù)性)呈現(xiàn)替代匹配:

Pattern<Event, ?> followedByAny = start.followedByAny("middle");
  1. followedByAny(#pattern_sequence)

添加新模式。 匹配事件和先前匹配事件之間可能發(fā)生其他事件,并且將針對每個備選匹配事件(非確定性放松連續(xù)性)呈現(xiàn)替代匹配:

Pattern<Event, ?> followedByAny = start.followedByAny(
    Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
  1. notNext()

添加新的否定模式。 匹配(否定)事件必須直接跟著先前的匹配事件(嚴格連續(xù)性)才能丟棄部分匹配:

Pattern<Event, ?> notNext = start.notNext("not");
  1. notFollowedBy()

追加一個新的否定模式匹配。即使在匹配(否定)事件和先前匹配事件(寬松連續(xù)性)之間發(fā)生其他事件,也將丟棄部分匹配事件序列:

Pattern<Event, ?> notFollowedBy = start.notFollowedBy("not");
  1. within(time)

定義事件序列進行模式匹配的最大時間間隔。 如果未完成的事件序列超過此時間,則將其丟棄:

pattern.within(Time.seconds(10));

2.3 匹配后的跳過策略

對于給定模式,可以將同一事件分配給多個成功匹配。 要控制將分配事件的匹配數(shù),需要指定名為AfterMatchSkipStrategy的跳過策略。
跳過策略有四種類型,如下所示:

  • NO_SKIP:將發(fā)出每個可能的匹配。

  • SKIP_PAST_LAST_EVENT:丟棄包含匹配事件的每個部分匹配。

  • SKIP_TO_FIRST:丟棄包含PatternName第一個之前匹配事件的每個部分匹配。

  • SKIP_TO_LAST:丟棄包含PatternName最后一個匹配事件之前的每個部分匹配。

請注意,使用SKIP_TO_FIRST和SKIP_TO_LAST跳過策略時,還應指定有效的PatternName。

例如,對于給定模式a b {2}和數(shù)據(jù)流ab1,ab2,ab3,ab4,ab5,ab6,這四種跳過策略之間的差異如下:

Apache Flink官方文檔-Flink CEP

要指定要使用的跳過策略,只需調用以下命令創(chuàng)建AfterMatchSkipStrategy:

Apache Flink官方文檔-Flink CEP

使用方法:

AfterMatchSkipStrategy skipStrategy = ...
Pattern.begin("patternName", skipStrategy);

2.4 檢測模式-Detecting Patterns

指定要查找的模式序列后,就可以將其應用于輸入流以檢測潛在匹配。 要針對模式序列運行事件流,必須創(chuàng)建PatternStream。
給定輸入流 input,模式 pattern 和可選的比較器 comparator,用于在EventTime的情況下對具有相同時間戳的事件進行排序或在同一時刻到達,通過調用以下命令創(chuàng)建PatternStream:

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

根據(jù)實際情況,創(chuàng)建的流可以是有key,也可以是無key的。

請注意,在無key的流上使用模式,將導致job的并行度為1。

2.5 Selecting from Patterns

獲得PatternStream后,您可以通過select或flatSelect方法從檢測到的事件序列中進行查詢。

select()方法需要PatternSelectFunction的實現(xiàn)。 PatternSelectFunction具有為每個匹配事件序列調用的select方法。
它以Map <String,List <IN >>的形式接收匹配,其中key是模式序列中每個模式的名稱,值是該模式的所有已接受事件的列表(IN是輸入元素的類型)。
給定模式的事件按時間戳排序。 返回每個模式的接受事件列表的原因是當使用循環(huán)模式(例如oneToMany()和times())時,對于給定模式可以接受多個事件。
選擇函數(shù)只返回一個結果。

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
    @Override
    public OUT select(Map<String, List<IN>> pattern) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);
        return new OUT(startEvent, endEvent);
    }
}

PatternFlatSelectFunction類似于PatternSelectFunction,唯一的區(qū)別是它可以返回任意數(shù)量的結果。 為此,select方法有一個額外的Collector參數(shù),用于將輸出元素向下游轉發(fā)。

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
    @Override
    public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
        IN startEvent = pattern.get("start").get(0);
        IN endEvent = pattern.get("end").get(0);

        for (int i = 0; i < startEvent.getValue(); i++ ) {
            collector.collect(new OUT(startEvent, endEvent));
        }
    }
}

2.6 處理超時部分模式

每當模式具有通過within關鍵字附加的時間窗口長度時,部分事件序列可能因為超出時間窗口長度而被丟棄。 為了對這些超時的部分匹配作出相應的處理,select和flatSelect API調用允許指定超時處理程序。
為每個超時的部分事件序列調用此超時處理程序。 超時處理程序接收到目前為止由模式匹配的所有事件,以及檢測到超時時的時間戳。

為了處理部分模式,select和flatSelect API提供了一個帶參數(shù)的重載版本

  • PatternTimeoutFunction/ PatternFlatTimeoutFunction。
  • OutputTag 超時的匹配將會在其中返回。
  • PatternSelectFunction / PatternFlatSelectFunction。

PatternStreamPatte <Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
    outputTag,
    new PatternSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);

SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
    outputTag,
    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);

DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);

2.7 事件事件模式下處理滯后數(shù)據(jù)

在CEP中,元素處理的順序很重要。為了保證在采用事件事件時以正確的順序處理事件,最初將傳入的事件放入緩沖區(qū),其中事件基于它們的時間戳以升序排序,
并且當watermark到達時,處理該緩沖區(qū)中時間戳小于watermark時間的所有元素。這意味著watermark之間的事件按事件時間順序處理。

請注意,在采用事件時間時,CEP library會假設watermark是正確的。

為了保證跨watermark的記錄按照事件時間順序處理,F(xiàn)link的CEP庫假定watermark是正確的,并將時間戳小于上次可見watermark的時間視為滯后事件。滯后事件不會被進一步處理。

2.8 例子

以下示例檢測事件的帶key數(shù)據(jù)流上的模式start,middle(name =“error”) - > end(name =“critical”)。 事件的key是其id,并且有效模式必須在10秒內發(fā)生。 整個處理是用事件時間完成的。

StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> input = ...

DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
    @Override
    public Integer getKey(Event value) throws Exception {
        return value.getId();
    }
});

Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .next("middle").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("error");
        }
    }).followedBy("end").where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.getName().equals("critical");
        }
    }).within(Time.seconds(10));

PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
    @Override
    public Alert select(Map<String, List<Event>> pattern) throws Exception {
        return createAlert(pattern);
    }
});
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI