溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Flink CEP事件處理

發(fā)布時(shí)間:2021-12-31 10:30:29 來源:億速云 閱讀:169 作者:小新 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)Flink CEP事件處理,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

什么是CEP

復(fù)雜事件處理,允許在無界數(shù)據(jù)流中檢測(cè)出特定事件模型

單個(gè)模式

單個(gè)模式指一個(gè)模式,可以是一個(gè)單例也可以是循環(huán)模式。
模式都是單例的,可以通過量詞轉(zhuǎn)換成循環(huán)模式。每個(gè)模式可以有一個(gè)或多個(gè)條件來決定接受哪些事件。

量詞
  1. pattern.oneOrMore():期望給定的事件出現(xiàn)一次或多次

  2. pattern.times(#oftimes):期望一個(gè)給定事件出現(xiàn)特定次數(shù)的模式

  3. pattern.times(#fromTimes, #toTimes):期望一個(gè)給定事件出現(xiàn)次數(shù)在一個(gè)最小值與最大值中間

  4. pattern.greedy():貪心算法,盡可能多匹配,還不能讓模式組貪心

  5. pattern.optional():變?yōu)榭蛇x

示例:
// 期望出現(xiàn)4次
start.times(4);
// 期望出現(xiàn)0或者4次
start.times(4).optional();
// 期望出現(xiàn)2、3或者4次
start.times(2, 4);
// 期望出現(xiàn)2、3或者4次,并且盡可能的重復(fù)次數(shù)多
start.times(2, 4).greedy();
// 期望出現(xiàn)0、2、3或者4次
start.times(2, 4).optional();
// 期望出現(xiàn)0、2、3或者4次,并且盡可能的重復(fù)次數(shù)多
start.times(2, 4).optional().greedy();
// 期望出現(xiàn)1到多次
start.oneOrMore();
// 期望出現(xiàn)1到多次,并且盡可能的重復(fù)次數(shù)多
start.oneOrMore().greedy();
// 期望出現(xiàn)0到多次
start.oneOrMore().optional();
// 期望出現(xiàn)0到多次,并且盡可能的重復(fù)次數(shù)多
start.oneOrMore().optional().greedy();
// 期望出現(xiàn)2到多次
start.timesOrMore(2);
// 期望出現(xiàn)2到多次,并且盡可能的重復(fù)次數(shù)多
start.timesOrMore(2).greedy();
// 期望出現(xiàn)0、2或多次
start.timesOrMore(2).optional();
// 期望出現(xiàn)0、2或多次,并且盡可能的重復(fù)次數(shù)多
start.timesOrMore(2).optional().greedy();
條件

判斷事件屬性的條件可以是以下方法

  1. pattern.where()

  2. pattern.or()

  3. pattern.until()
    這些方法入?yún)⒖梢允荌terativeCondition或SimpleCondition

pattern.subtype方法限制接受事件類型是初始事件的子類型。

  1. 迭代條件IterativeCondition

  2. 簡(jiǎn)單條件SimpleCondition

  3. 組合條件.where().or()等

  4. 停止條件.until()

組合模式

FlinkCEP支持事件之間如下形式的連續(xù)策略

  1. 嚴(yán)格連續(xù):期望所有匹配事件嚴(yán)格的一個(gè)接一個(gè)出現(xiàn),中間沒有任何不匹配事件

  2. 松散連續(xù):忽略匹配的事件之間的不匹配的事件

  3. 不確定的松散連續(xù):更進(jìn)一步的松散連續(xù),允許忽略掉一些匹配事件的附加匹配

1. next() 指定嚴(yán)格連續(xù)
2. followedBy() 指定松散連續(xù)
3. followedByAny() 不確定松散連續(xù)
4. notNext() 如果不想后面直接連著一個(gè)特定事件
5. notFollowedBy(),如果不想一個(gè)特定事件發(fā)生在兩個(gè)事件之間的任何地方。
ps: 模式序列不能以notFollowedBy()結(jié)尾
    一個(gè)NOT模式前面不能是可選的模式

定義模式一個(gè)有效時(shí)間約束:pattern.within()方法指定有效時(shí)間內(nèi)發(fā)生。
模式序列只能有一個(gè)時(shí)間限制,如果限制多個(gè)時(shí)間在不同的模式上,會(huì)使用最小的時(shí)間限制。

循環(huán)模式默認(rèn)是松散連續(xù),如果合用嚴(yán)格連續(xù),需使用consecutive()方法明確指定。如果想使用不確定松散連續(xù),可以使用allowCombinations()方法
==示例:consecutive==

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

輸入:C D A1 A2 A3 D A4 B,會(huì)產(chǎn)生下面的輸出:

如果施加嚴(yán)格連續(xù)性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}

不施加嚴(yán)格連續(xù)性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}

==示例:allowCombinations==

Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition<Event>() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

輸入:C D A1 A2 A3 D A4 B,會(huì)產(chǎn)生如下的輸出:

如果使用不確定松散連續(xù): {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}

如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
模式組

定義一個(gè)模式序列作為begin,followedBy,followedByAny和next條件

關(guān)于“Flink CEP事件處理”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI