溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

第五章 Flink Complex Event Processing 復雜事件處理

發(fā)布時間:2020-08-08 14:42:21 來源:網(wǎng)絡 閱讀:1426 作者:mythmoon 欄目:大數(shù)據(jù)

   轉載需標明出處                      mythmoon@163.com

 Complex Event Processing  復雜事件處理   

In the previous chapter, we talked about the Table API provided by Apache Flink and how we can use it to process relational data structures. This chapter onwards, we will start learning more about the libraries provided by Apache Flink and how we can use them for specific use cases. To start with, let’s try to understand a library called Comrlex Event Processing (CEP). CEP is a very interesting but complex topic that has its value in various industries. Wherever there is a stream of events expected, naturally people want to perform complex event processing in all such use cases. Let’s try to understand what CEP is all about. 在前一章中, 我們討論了Apache Flink 提供的表 api, 以及如何使用它來處理關系數(shù)據(jù)結構。本章之后, 我們將開始了解有關 apacheflink 提供的庫的更多信息, 以及如何將它們用于特定的用例。首先, 讓我們嘗試了解一個名為 Comrlex Event Processing (CEP) 的庫。cep 是一個非常有趣但復雜的話題, 在各個行業(yè)都有其價值。只要有預期的事件流, 人們自然希望在所有此類用例中執(zhí)行復雜的事件處理。讓我們試著了解 cep 的意義。

 

What is complex event processing? 什么是復雜事件處理?

CEP analyzes streams of disparate events occurring with high frequency and low latency. These days, streaming events can be found in various industries, for example: cep 分析以高頻和低延遲發(fā)生的不同事件流。如今, 流媒體事件可以在不同的行業(yè)中找到, 例如:

 

第五章 Flink Complex Event Processing  復雜事件處理In the oil and gas domain, sensor data comes from various drilling tools or from upstream oil pipeline equipment 在石油和天然氣領域, 傳感器數(shù)據(jù)來自各種鉆井工具或上游石油管道設備

第五章 Flink Complex Event Processing  復雜事件處理In the security domain, activity data, malware information, and usage pattern data come from various end points 在安全域中, 活動數(shù)據(jù)、惡意軟件信息和使用模式數(shù)據(jù)來自不同的端點

第五章 Flink Complex Event Processing  復雜事件處理In the wearable domain, data comes from various wrist bands with information about your heart beat rate, your activity, and so on 在可穿戴領域, 數(shù)據(jù)來自不同的腕帶, 其中包含有關您的心跳率、活動等信息

第五章 Flink Complex Event Processing  復雜事件處理In the banking domain, data comes from credit card usage, banking activities, and so on  在銀行領域, 數(shù)據(jù)來自信用卡使用、銀行活動等


 

 

It is very important to analyze variation patterns to get notified in real time about any change in the regular assembly. CEP can understand patterns across the streams of events, sub-events, and their sequences. CEP helps to identify meaningful patterns and complex relationships among unrelated events, and sends notifications in real and near real time to prevent damage: 分析變體模式以實時獲得有關常規(guī)程序集中任何更改的通知是非常重要的。cep 可以了解跨事件流、子事件及其序列的模式。cep 有助于識別不相關事件之間有意義的模式和復雜關系, 并實時和近實時發(fā)送通知, 以防止損壞:

 

第五章 Flink Complex Event Processing  復雜事件處理

 

The preceding diagram shows how the CEP flow works. Even though the flow looks simple, CEP has various abilities such as: 上圖顯示了 cep 流的工作原理。盡管流看起來很簡單, cep 也有各種能力, 例如:

第五章 Flink Complex Event Processing  復雜事件處理The ability to produce results as soon as the input event stream is available在輸入事件流可用時生成結果的能力

第五章 Flink Complex Event Processing  復雜事件處理The ability to provide computations such as aggregation over time and timeout between two events of interest提供計算 (如隨時間的聚合和兩個感興趣的事件之間的超時) 的能力

第五章 Flink Complex Event Processing  復雜事件處理The ability to provide real-timeInear real-time alerts and notifications on detection of complex event patterns能夠提供實時輸入實時警報和通知, 用于檢測復雜事件模式

第五章 Flink Complex Event Processing  復雜事件處理The ability to connect and correlate heterogeneous sources and analyze patterns in them連接異構源并將其關聯(lián)并分析其中模式的能力

第五章 Flink Complex Event Processing  復雜事件處理The ability to achieve high-throughput, low-latency processing實現(xiàn)高吞吐量、低延遲處理的能力

 

There are various solutions available on the market. With big data technology advancements, we have multiple options like Apache Spark, Apache Samza, Apache Beam, among others, but none of them have a dedicated library to fit all solutions. Now let us try to understand what we can achieve with Flink’s CEP library.

市場上有各種各樣的解決方案。隨著大數(shù)據(jù)技術的進步, 我們有多種選擇, apache spark, apache samza, apache beam , 但沒有一個專用的庫, 以適應所有的解決方案?,F(xiàn)在, 讓我們嘗試了解 flink cep 庫可以實現(xiàn)什么。


 

 

Flink CEP

Apache Flink provides the Flink CEP library, which provides APIs to perform complex event processing. The library consists of the following core components: apache flink 提供 flink cep 庫, 該庫提供用于執(zhí)行復雜事件處理的 api。該庫由以下核心組件組成:

Event stream

Pattern definition  模式定義

Pattern detection  模式檢測

Alert generation 警告生成

 

 

 

第五章 Flink Complex Event Processing  復雜事件處理

 

Flink CEP works on Flink’s streaming API called DataStream. A programmer needs to define the pattern to be detected from the stream of events and then Flink’s CEP engine detects the pattern and takes the appropriate action, such as generating alerts. flink cep 適用于 flink 名為 datastream 的流媒體 api。程序員需要定義要從事件流中檢測到的模式, 然后 flink 的 cep 引擎檢測到該模式并采取適當?shù)牟僮? 例如生成警報。

In order to get started, we need to add the following Maven dependency:

<!--

https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.10 -

->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>


 

 

Event streams

A very important component of CEP is its input event stream. In earlier chapters, we have seen details of the DataStream API. Now let’s use that knowledge to implement CEP. The very first thing we need to do is define a Java POJO for the event. Let’s assume we need to monitor a temperature sensor event stream. CEP的一個非常重要的組成部分是它的輸入事件流。在前面的章節(jié)中, 我們看到了DataStream API的詳細信息。現(xiàn)在, 讓我們使用這些知識來實現(xiàn)CEP。我們需要做的第一件事就是為事件定義一個 Java POJO。假設我們需要監(jiān)視溫度傳感器事件流。

First we define an abstract class and then extend this class.

 

第五章 Flink Complex Event Processing  復雜事件處理

 

The following code snippets demonstrate this. First, we write an abstract class as shown here:

package com.demo.chapter05;

 

public abstract class MonitoringEvent { private String machineName;

public String getMachineName() { return machineName;

}

 

public void setMachineName(String machineName) { this.machineName = machineName;

}

 

@Override

public int hashCode() { final int prime = 31; int result = 1;

result = prime * result + ((machineName == null)  0 : machineName.hashCode());

return result;

}

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true; if (obj == null)

return false;


 

 

if (getClass() != obj.getClass()) return false;

MonitoringEvent other = (MonitoringEvent) obj; if (machineName == null) {

if (other.machineName != null) return false;

} else if (!machineName.equals(other.machineName)) return false;

return true;

}

 

public MonitoringEvent(String machineName) { super();

this.machineName = machineName;

}

 

}

 

Then we create a POJO for the actual temperature event:

package com.demo.chapter05;

public class TemperatureEvent extends MonitoringEvent { public TemperatureEvent(String machineName) {

super(machineName);

}

private double temperature; public double getTemperature() {

return temperature;

}

 

public void setTemperature(double temperature) { this.temperature = temperature;

}

 

@Override

public int hashCode() { final int prime = 31;

int result = super.hashCode(); long temp;

temp = Double.doubleToLongBits(temperature);

result = prime * result + (int) (temp ^ (temp >>> 32)); return result;

}


 

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true;

if (!super.equals(obj)) return false;

if (getClass() != obj.getClass()) return false;

TemperatureEvent other = (TemperatureEvent) obj; if (Double.doubleToLongBits(temperature) !=

Double.doubleToLongBits(other.temperature)) return false;

return true;

}

 

public TemperatureEvent(String machineName, double temperature) { super(machineName);

this.temperature = temperature;

}

 

@Override

public String toString() {

return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName()=" + getMachineName()

+ "]";

}

 

}

 

Now we can define the event source as follows: In Java:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),

new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2),

new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),

new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz",


 

22.7),



new TemperatureEvent("xyz", 27.0));


 

 

In Scala:

val env: StreamExecutionEnvironment =

StreamExecutionEnvironment.getExecutionEnvironment

val input: DataStream[TemperatureEvent] = env.fromElements(new TemperatureEvent("xyz", 22.0),

new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),

new TemperatureEvent("xyz", 22.2),

new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3),

new TemperatureEvent("xyz", 22.1),

new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),

new TemperatureEvent("xyz", 27.0))

 

 

Pattern API

The Pattern API allows you to define complex event patterns very easily. Each pattern consists of multiple states. To go from one state to another state, generally we need to define the conditions. The conditions could be continuity or filtered out events. Pattern API 允許您非常輕松地定義復雜的事件模式。每個模式由多個狀態(tài)組成。要從一種狀態(tài)到另一個狀態(tài), 一般我們需要定義條件。條件可以是連續(xù)性, 也可以是篩選出事件。

 

第五章 Flink Complex Event Processing  復雜事件處理

 

Let’s try to understand each pattern operation in detail.


 

 

Begin

The initial state can be defined as follows: In Java:

Pattern<Event, > start = Pattern.<Event>begin("start");

 

In Scala:

val start : Pattern[Event, _] = Pattern.begin("start")

 

 

Filter

We can also specify the filter condition for the initial state: In Java:

start.where(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // condition

}

});

 

In Scala:

 

start.where(event => ... /* condition */)

 

 

Subtype

We can also filter out events based on their sub-types, using the subtype() method: In Java:

start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() {

@Override

public boolean filter(SubEvent value) { return ... // condition

}

});

 

In Scala:

 

start.subtype(classOf[SubEvent]).where(subEvent => ... /* condition */)


 

 

OR

Pattern API also allows us define multiple conditions together. We can use OR and AND

operators. In Java:

pattern.where(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // condition

}

}).or(new FilterFunction<Event>() {

@Override

public boolean filter(Event value) { return ... // or condition

}

});

 

In Scala:

 

pattern.where(event => ... /* condition */).or(event => ... /* or condition

*/)

 

 

Continuity

As stated earlier, we do not always need to filter out events. There can always be some pattern where we need continuity instead of filters.

Continuity can be of two types strict continuity and non-strict continuity.

如前所述, 我們并不總是需要篩選出事件??偸强梢杂幸恍┠J? 我們需要連續(xù)性, 而不是過濾器。連續(xù)性可以是兩種類型-嚴格的連續(xù)性和不嚴格的連續(xù)性。

Strict continuity

Strict continuity needs two events to succeed directly which means there should be no other event in between. This pattern can be defined by next().嚴格的連續(xù)性需要兩個事件直接成功, 這意味著兩者之間不應該有其他事件。此模式可以由下一個  定義。

In Java:

Pattern<Event, > strictNext = start.next("middle");

 

In Scala:

val strictNext: Pattern[Event, _] = start.next("middle")


 

 

Non-strict continuity

Non-strict continuity can be stated as other events are allowed to be in between the specific two events. This pattern can be defined by followedBy().非嚴格的連續(xù)性可以說是, 因為允許在特定的兩個事件之間出現(xiàn)其他事件。此模式可以通過以下定義 ()。

In Java:

Pattern<Event, > nonStrictNext = start.followedBy("middle");

 

In Scala:

val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")

 

 

Within

Pattern API also allows us to do pattern matching based on time intervals. We can define a time-based temporal constraint as follows. Pattern API 還允許我們根據(jù)時間間隔進行模式匹配。我們可以定義一個基于時間的時間約束, 如下所示。

In Java:

next.within(Time.seconds(30));

 

In Scala:

next.within(Time.seconds(10))

 

 

Detecting patterns

To detect patterns against a stream of events, we need to run the stream though the pattern. The CEP.pattern() returns PatternStream. 要針對事件流檢測模式, 我們需要通過模式運行該流。cep 模式 () 返回 patternstream。

The following code snippet shows how we can detect a pattern. First the pattern is defined to check if the temperature value is greater than 26.0 degrees in 10 seconds. 下面的代碼段演示了如何檢測模式。首先定義模式是為了在10秒內(nèi)檢查溫度值是否大于22.0度。

In Java:

Pattern<TemperatureEvent, > warningPattern = Pattern.<TemperatureEvent> begin("first")

.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {

public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {

return true;

}


 

 

return false;

}

}).within(Time.seconds(10));

 

PatternStream<TemperatureEvent> patternStream = CEP.pattern(inputEventStream, warningPattern);

In Scala:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

 

val input = // data

 

val pattern: Pattern[TempEvent, _] = Pattern.begin("start").where(event => event.temp >= 26.0)

val patternStream: PatternStream[TempEvent] = CEP.pattern(input, pattern)

 

 

Selecting from patterns

Once the pattern stream is available, we need to select the pattern from it and then take appropriate actions based on it. We can use the select or flatSelect method to select data from the pattern. 一旦模式流可用, 我們需要從中選擇模式, 然后根據(jù)它采取適當?shù)男袆?。我們可以使用選擇或平面選擇方法從模式中選擇數(shù)據(jù)。

 

Select

The select method needs PatternSelectionFunction implementation. It has a select method which would be called for each event sequence. The select method receives a map of stringIevent pairs of matched events. The string is defined by the name of the state. The select method returns exactly one result. 選擇方法需要模式選擇函數(shù)實現(xiàn)。它有一個選擇方法, 將為每個事件序列調用該方法。選擇方法接收匹配事件的字符串事件對的映射。字符串由狀態(tài)的名稱定義。選擇方法只返回一個結果。

To collect the results, we need to define the output POJO. In our case, let’s say we need to generate alerts as output. Then we need to define POJO as follows: 為了收集結果, 我們需要定義輸出 pojo。在我們的例子中, 假設我們需要生成警報作為輸出。然后, 我們需要定義 pojo, 如下所示:

 

package com.demo.chapter05; public class Alert {

private String message;

 

public String getMessage() { return message;

}


 

 

public void setMessage(String message) { this.message = message;

}

 

public Alert(String message) { super();

this.message = message;

}

 

@Override

public String toString() {

return "Alert [message=" + message + "]";

}

 

@Override

public int hashCode() { final int prime = 31; int result = 1;

result = prime * result + ((message == null)  0 : message.hashCode());

return result;

}

 

@Override

public boolean equals(Object obj) { if (this == obj)

return true; if (obj == null)

return false;

if (getClass() != obj.getClass()) return false;

Alert other = (Alert) obj; if (message == null) {

if (other.message != null) return false;

} else if (!message.equals(other.message)) return false;

return true;

}

 

}


 

 

Next we define the select functions. In Java:

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {

@Override

public OUT select(Map<String, IN> pattern) { IN startEvent = pattern.get("start"); IN endEvent = pattern.get("end"); return new OUT(startEvent, endEvent);

}

}

 

In Scala:

 

def selectFn(pattern : mutable.Map[String, IN]): OUT = { val startEvent = pattern.get("start").get

val endEvent = pattern.get("end").get OUT(startEvent, endEvent)

}

 

 

flatSelect

The flatSelect method is similar to the select method. The only difference between the two is that flatSelect can return an arbitrary number of results. The flatSelect method has an additional Collector parameter which is used for output element. 平板選擇方法類似于選擇方法。兩者之間的唯一區(qū)別是, 平面選擇可以返回任意數(shù)量的結果。平面選擇方法有一個額外的收集器參數(shù), 用于輸出元素。

The following example shows how we can use the flatSelect method. In Java:

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {

@Override

public void select(Map<String, IN> pattern, Collector<OUT> collector) { IN startEvent = pattern.get("start");

IN endEvent = pattern.get("end");

 

for (int i = 0; i < startEvent.getValue(); i++ ) { collector.collect(new OUT(startEvent, endEvent));

}

}

}


 

 

In Scala:

def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT]) = {

val startEvent = pattern.get("start").get val endEvent = pattern.get("end").get for (i <- 0 to startEvent.getValue) {

collector.collect(OUT(startEvent, endEvent))

}

}

 

 

Handling timed-out partial patterns

Sometimes we may miss out certain events if we have constrained the patterns with a time boundary. It is possible that events may be discarded because they exceed the length. In order to take actions on the timed out events, the select and flatSelect methods allow a timeout handler. This handler is called for each timeout event pattern. 有時, 如果我們限制了具有時間邊界的模式, 我們可能會錯過某些事件。事件可能會因為超過長度而被丟棄。為了對超時事件執(zhí)行操作, select 和平面 select 方法允許超時處理程序。為每個超時事件模式調用此處理程序。

In this case, the select method contains two parameters: PatternSelectFunction and PatternTimeoutFunction. The return type for a timed out function can be different from the select pattern function. The timed out event and select event are wrapped in the class Either.Right and Either.Left.

The following code snippets shows how we do things in practice. In Java:

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

 

DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(

new PatternTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternSelectFunction<Event, ComplexEvent>() {...}

);

 

DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(

new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...}, new PatternFlatSelectFunction<Event, ComplexEvent>() {...}

);


 

 

In Scala, the select API:

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

 

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{

(pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()

} {

pattern: mutable.Map[String, Event] => ComplexEvent()

}

 

The flatSelect API is called with the Collector as it can emit an arbitrary number of events:

 

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

 

DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{

(pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) =>

out.collect(TimeoutEvent())

} {

(pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => out.collect(ComplexEvent())

}

 

 

Use case - complex event processing on a temperature sensor

In earlier sections, we learnt about various features provided by the Flink CEP engine. Now it’s time to understand how we can use it in real-world solutions. For that, let’s assume we work for a mechanical company which produces some products. In the product factory, there is a need to constantly monitor certain machines. The factory has already set up the sensors which keep on sending the temperature of the machines at a given time. 在前面的部分中, 我們了解了 flink cep 引擎提供的各種功能。現(xiàn)在×××解我們?nèi)绾卧诂F(xiàn)實世界的解決方案中使用它了。為此, 讓我們假設我們在一家生產(chǎn)一些產(chǎn)品的機械公司工作。在產(chǎn)品工廠, 需要不斷監(jiān)控某些機器。工廠已經(jīng)安裝了傳感器, 在給定的時間繼續(xù)發(fā)送機器的溫度。

 

Now we will be setting up a system that constantly monitors the temperature value and generates an alert if the temperature exceeds a certain value.

現(xiàn)在我們將設置一個系統(tǒng), 不斷監(jiān)測溫度值, 并在溫度超過某個值的情況下生成警報。


 

 

We can use the following architecture:

 

第五章 Flink Complex Event Processing  復雜事件處理

 

Here we will be using Kafka to collect events from sensors. In order to write a Java application, we first need to create a Maven project and add the following dependency:

 

<!--

https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala_2.11 -

->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-java_2.11 -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink- streaming-scala_2.11 -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_2.11</artifactId>

<version>1.1.4</version>

</dependency>

<dependency>


 

 

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.9_2.11</artifactId>

<version>1.1.4</version>

</dependency>

 

Next we need to do following things for using Kafka.

First we need to define a custom Kafka deserializer. This will read bytes from a Kafka topic and convert it into TemperatureEvent. The following is the code to do this.

EventDeserializationSchema.java:

package com.demo.chapter05;

 

import java.io.IOException;

import java.nio.charset.StandardCharsets;

 

import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor;

import org.apache.flink.streaming.util.serialization.DeserializationSchema;

 

public class EventDeserializationSchema implements DeserializationSchema<TemperatureEvent> {

 

public TypeInformation<TemperatureEvent> getProducedType() { return TypeExtractor.getForClass(TemperatureEvent.class);

}

 

public TemperatureEvent deserialize(byte[] arg0) throws IOException { String str = new String(arg0, StandardCharsets.UTF_8);

 

String[] parts = str.split("=");

return new TemperatureEvent(parts[0], Double.parseDouble(parts[1]));

}

 

public boolean isEndOfStream(TemperatureEvent arg0) { return false;

}

 

}

 

Next we create topics in Kafka called temperature:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication- factor 1 --partitions 1 --topic temperature


 

 

Now we move to Java code which would listen to these events in Flink streams: 現(xiàn)在, 我們轉到 java 代碼, 它將偵聽 flink 流中的這些事件:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");

 

DataStream<TemperatureEvent> inputEventStream = env.addSource(

new FlinkKafkaConsumer09<TemperatureEvent>("temperature", new EventDeserializationSchema(), properties));

Next we will define the pattern to check if the temperature is greater than 26.0 degrees Celsius within 10 seconds:

 

Pattern<TemperatureEvent, > warningPattern = Pattern.<TemperatureEvent> begin("first").subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {

private static final long serialVersionUID = 1L;

 

public boolean filter(TemperatureEvent value) { if (value.getTemperature() >= 26.0) {

return true;

}

return false;

}

}).within(Time.seconds(10));

 

Next match this pattern with the stream of events and select the event. We will also add up the alert messages into results stream as shown here:

 

DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)

.select(new PatternSelectFunction<TemperatureEvent, Alert>() { private static final long serialVersionUID = 1L;

 

public Alert select(Map<String, TemperatureEvent> event) throws Exception {

 

return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()

+ " on machine name:" + event.get("first").getMachineName());

}

 

});


 

 

In order to know what the alerts were generated, we will print the results:

patternStream.print();

 

And we execute the stream:

env.execute("CEP on Temperature Sensor");

 

Now we are all set to execute the application. As and when we get messages in Kafka topics, the CEP will keep on executing.

 

The actual execution will looks like the following. Here is how we can provide sample input:

 

xyz=21.0 xyz=30.0 LogShaft=29.3 Boiler=23.1 Boiler=24.2 Boiler=27.0 Boiler=29.0

Here is how the sample output will look like:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1010488393]

10/09/2016

18:15:55

Job execution switched to status RUNNING.

10/09/2016

18:15:55

Source: Custom Source(1/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(1/4)   switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(2/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(2/4)   switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(3/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(3/4)   switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(4/4)   switched to SCHEDULED

10/09/2016

18:15:55

Source: Custom Source(4/4)   switched to DEPLOYING

10/09/2016

18:15:55

CEPPatternOperator(1/1)   switched to SCHEDULED

10/09/2016

18:15:55

CEPPatternOperator(1/1)   switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(1/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(1/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(2/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(2/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(3/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(3/4) switched to DEPLOYING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(4/4) switched to SCHEDULED

10/09/2016

18:15:55

Map -> Sink:   Unnamed(4/4) switched to DEPLOYING

10/09/2016

18:15:55

Source: Custom Source(2/4)   switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(3/4)   switched to RUNNING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(1/4) switched to RUNNING

10/09/2016

18:15:55

Map -> Sink:   Unnamed(2/4) switched to RUNNING


 

10/09/2016

18:15:55

Map -> Sink:   Unnamed(3/4) switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(4/4)   switched to RUNNING

10/09/2016

18:15:55

Source: Custom Source(1/4)   switched to RUNNING

10/09/2016

18:15:55

CEPPatternOperator(1/1)   switched to RUNNING

10/09/2016

18:15:55

Map   -> Sink: Unnamed(4/4) switched to RUNNING

1> Alert [message=Temperature Rise Detected:30.0 on machine name:xyz]

2> Alert [message=Temperature Rise Detected:29.3 on machine name:LogShaft] 3> Alert [message=Temperature Rise Detected:27.0 on machine name:Boiler] 4> Alert [message=Temperature Rise Detected:29.0 on machine name:Boiler]

 

We can also configure a mail client and use some external web hook to send e-mail or messenger notifications.

 

第五章 Flink Complex Event Processing  復雜事件處理

 

 

Summary 小結

In this chapter, we learnt about CEP. We discussed the challenges involved and how we can use the Flink CEP library to solve CEP problems. We also learnt about Pattern API and the various operators we can use to define the pattern. In the final section, we tried to connect the dots and see one complete use case. With some changes, this setup can be used as it is present in various other domains as well. 在本章中, 我們了解了 cep。我們討論了所涉及的挑戰(zhàn)以及如何使用 flink cep 庫來解決 cep 問題。我們還了解了模式 api 和各種運算符, 我們可以使用來定義模式。在最后一節(jié)中, 我們嘗試連接點, 并看到一個完整的用例。通過一些更改, 此設置也可以在其他各種域中使用。

In the next chapter, we will see how to use Flink’s built-in Machine Learning library to solve complex problems.


向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI