您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Flink CEP事件處理,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
復(fù)雜事件處理,允許在無界數(shù)據(jù)流中檢測(cè)出特定事件模型
單個(gè)模式指一個(gè)模式,可以是一個(gè)單例也可以是循環(huán)模式。
模式都是單例的,可以通過量詞轉(zhuǎn)換成循環(huán)模式。每個(gè)模式可以有一個(gè)或多個(gè)條件來決定接受哪些事件。
pattern.oneOrMore():期望給定的事件出現(xiàn)一次或多次
pattern.times(#oftimes):期望一個(gè)給定事件出現(xiàn)特定次數(shù)的模式
pattern.times(#fromTimes, #toTimes):期望一個(gè)給定事件出現(xiàn)次數(shù)在一個(gè)最小值與最大值中間
pattern.greedy():貪心算法,盡可能多匹配,還不能讓模式組貪心
pattern.optional():變?yōu)榭蛇x
示例: // 期望出現(xiàn)4次 start.times(4); // 期望出現(xiàn)0或者4次 start.times(4).optional(); // 期望出現(xiàn)2、3或者4次 start.times(2, 4); // 期望出現(xiàn)2、3或者4次,并且盡可能的重復(fù)次數(shù)多 start.times(2, 4).greedy(); // 期望出現(xiàn)0、2、3或者4次 start.times(2, 4).optional(); // 期望出現(xiàn)0、2、3或者4次,并且盡可能的重復(fù)次數(shù)多 start.times(2, 4).optional().greedy(); // 期望出現(xiàn)1到多次 start.oneOrMore(); // 期望出現(xiàn)1到多次,并且盡可能的重復(fù)次數(shù)多 start.oneOrMore().greedy(); // 期望出現(xiàn)0到多次 start.oneOrMore().optional(); // 期望出現(xiàn)0到多次,并且盡可能的重復(fù)次數(shù)多 start.oneOrMore().optional().greedy(); // 期望出現(xiàn)2到多次 start.timesOrMore(2); // 期望出現(xiàn)2到多次,并且盡可能的重復(fù)次數(shù)多 start.timesOrMore(2).greedy(); // 期望出現(xiàn)0、2或多次 start.timesOrMore(2).optional(); // 期望出現(xiàn)0、2或多次,并且盡可能的重復(fù)次數(shù)多 start.timesOrMore(2).optional().greedy();
判斷事件屬性的條件可以是以下方法
pattern.where()
pattern.or()
pattern.until()
這些方法入?yún)⒖梢允荌terativeCondition或SimpleCondition
pattern.subtype方法限制接受事件類型是初始事件的子類型。
迭代條件IterativeCondition
簡(jiǎn)單條件SimpleCondition
組合條件.where().or()等
停止條件.until()
FlinkCEP支持事件之間如下形式的連續(xù)策略
嚴(yán)格連續(xù):期望所有匹配事件嚴(yán)格的一個(gè)接一個(gè)出現(xiàn),中間沒有任何不匹配事件
松散連續(xù):忽略匹配的事件之間的不匹配的事件
不確定的松散連續(xù):更進(jìn)一步的松散連續(xù),允許忽略掉一些匹配事件的附加匹配
1. next() 指定嚴(yán)格連續(xù) 2. followedBy() 指定松散連續(xù) 3. followedByAny() 不確定松散連續(xù) 4. notNext() 如果不想后面直接連著一個(gè)特定事件 5. notFollowedBy(),如果不想一個(gè)特定事件發(fā)生在兩個(gè)事件之間的任何地方。 ps: 模式序列不能以notFollowedBy()結(jié)尾 一個(gè)NOT模式前面不能是可選的模式
定義模式一個(gè)有效時(shí)間約束:pattern.within()方法指定有效時(shí)間內(nèi)發(fā)生。
模式序列只能有一個(gè)時(shí)間限制,如果限制多個(gè)時(shí)間在不同的模式上,會(huì)使用最小的時(shí)間限制。
循環(huán)模式默認(rèn)是松散連續(xù),如果合用嚴(yán)格連續(xù),需使用consecutive()方法明確指定。如果想使用不確定松散連續(xù),可以使用allowCombinations()方法
==示例:consecutive==
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }) .followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).oneOrMore().consecutive() .followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }); 輸入:C D A1 A2 A3 D A4 B,會(huì)產(chǎn)生下面的輸出: 如果施加嚴(yán)格連續(xù)性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B} 不施加嚴(yán)格連續(xù)性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
==示例:allowCombinations==
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("c"); } }) .followedBy("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("a"); } }).oneOrMore().allowCombinations() .followedBy("end1").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("b"); } }); 輸入:C D A1 A2 A3 D A4 B,會(huì)產(chǎn)生如下的輸出: 如果使用不確定松散連續(xù): {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B} 如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
定義一個(gè)模式序列作為begin,followedBy,followedByAny和next條件
關(guān)于“Flink CEP事件處理”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。