溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

基于Pulsar Functions的事件處理設(shè)計模式是什么

發(fā)布時間:2022-01-04 10:47:07 來源:億速云 閱讀:159 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)基于Pulsar Functions的事件處理設(shè)計模式是什么,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

下面將介紹一些常見的實時流式傳輸模式及其實現(xiàn)。

模式 1:動態(tài)路由


首先回顧一下如何使用 Apache Pulsar Functions 實現(xiàn)基于內(nèi)容的路由。基于內(nèi)容的路由是一種集成模式。該模式已經(jīng)存在多年,通常用于事件中心和消息框架中?;舅悸肥菣z查每條消息的內(nèi)容,根據(jù)消息內(nèi)容將消息路由到不同目的地。

基于Pulsar Functions的事件處理設(shè)計模式是什么

下面的例子使用了Apache Pulsar SDK,SDK 允許用戶配置三個不同的值:

  • 用于在消息中查找匹配的正則表達式

  • 消息匹配表達式模式時被發(fā)送到的 topic

  • 消息不匹配表達式模式時被發(fā)送到的 topic

這個例子證明了 Pulsar Functions 功能強大,可以基于功能邏輯動態(tài)決定將事件發(fā)送到哪里。

 import java.util.regex.*;  import org.apache.pulsar.functions.api.Context;  import org.apache.pulsar.functions.api.Function;

 public ContentBasedRoutingFunction implements Function<String, String> {

    String process(String input, Context context) throws Exception {         String regex = context             .getUserConfigValue(“regex”).toString();         String matchedTopic = context             .getUserConfigValue(“matched-topic”).toString();         String unmatchedTopic = context             .getUserConfigValue(“unmatched-topic”).toString();

        Pattern p = Pattern.compile(regex);         Matcher m = p.matcher(input);         if (m.matches()) {           context.publish(matchedTopic, input);         } else {           context.publish(unmatchedTopic, input);         }     }  }


模式 2:過濾


如果想通過僅保留滿足給定條件的事件來排除 topic 上的大多數(shù)事件時,應(yīng)用選擇過濾模式。過濾模式對僅查找感興趣的事件特別有效,如信用卡付款超過了一定金額;日志文件中的 ERROR 消息;傳感器讀數(shù)超過特定閾值等等(見模式 4)。

假如用戶在監(jiān)視信用卡交易的事件流,并嘗試檢測欺詐或可疑行為。由于交易量很大,選擇“同意/不同意”的時間有限,用戶必須先過濾掉有“風(fēng)險”特征的交易,如預(yù)付現(xiàn)金、大額付款等。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.company.creditcard.Purchase;

public class FraudFilter implements Function<Purchase, Purchase> {

   Purchase process(Purchase p, Context context) throws Exception {         if (p.getTransactionType() == ‘CASH ADVANCE’) ||             p.getAmount > 500.00) {            return p;         }        return null;    }}

可以使用過濾器來過濾有“風(fēng)險”特征的交易。過濾器可以識別這些“風(fēng)險”特征,并只將這些交易路由到一個單獨的 topic 上以進行進一步評估。

經(jīng)過過濾器過濾后,所有信用卡支付都可以被路由到一個“潛在欺詐行為”的 topic 上進行進一步評估,而其他事件則會被過濾掉,過濾器也不會對過濾掉的事件執(zhí)行任何操作。

基于Pulsar Functions的事件處理設(shè)計模式是什么

上圖是基于三個獨立支付對象的 FraudFilter function。第一次支付符合給定標準,被路由到“潛在欺詐行為”topic 上進行進一步評估;而第二次和第三次支付不符合欺詐標準,直接被過濾掉(沒有被路由到“潛在欺詐行為”過濾器上)。



模式 3:轉(zhuǎn)換


轉(zhuǎn)換模式用于將事件從一種類型轉(zhuǎn)換為另一種類型,或用于添加、刪除或修改輸入事件的值。

|| 投影

投影模式類似于關(guān)系代數(shù)中的投影算子,選擇輸入事件的屬性子集,并創(chuàng)建僅包含這些屬性的輸出事件。投影模式可用于刪除事件中的敏感字段,或者只保留事件中的必要屬性。下圖為投影模式的一種應(yīng)用,在將記錄發(fā)布到下游 topic 前,“屏蔽”傳入的社安全號碼。

基于Pulsar Functions的事件處理設(shè)計模式是什么

|| 富集模式

富集模式用于將數(shù)據(jù)添加到輸入屬性中不存在的輸出事件中。典型的富集模式包含基于輸入事件中的某個鍵值對引用數(shù)據(jù)進行某種查找。以下示例展示了如何根據(jù)輸入事件中包含的 IP 地址將地理位置添加到輸出事件。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.company.creditcard.Purchase;import com.company.services.GeoService;

public class IPLookup implements Function<Purchase, Purchase> {    Purchase process(Purchase p) throws Exception {      Geo g = GeoService.getByIp(p.getIPAddress());      // By default, these fields are blank, so we just modify the object      p.setLongitude(g.getLon());      p.setLatitiude(g.getLat());      return p;    }}

|| 分離模式

在分離模式下,事件處理器接收單個輸入事件,并將其分為多個輸出事件。當(dāng)輸入事件是一個包含多個單獨事件(如日志文件中的 entry)的批處理,并且想要單獨處理每個事件時,分離模式十分適用。下圖展示了分離模式的處理過程:先根據(jù)換行符分隔輸入,再逐行發(fā)布到配置的輸出 topic。

基于Pulsar Functions的事件處理設(shè)計模式是什么

此 function 的實現(xiàn)過程如下:


import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public class Splitter implements Function<String, String> {

   String process(String s, Context context) throws Exception {       Arrays.asLists(s.split(“\\R”).forEach(line ->            context.publish(context.getOutputTopic(), line));       return null;    }}


模式 4:警報和閾值


警報和閾值模式可進行檢測,并根據(jù)檢測條件生成警報(如高溫警報)??梢曰诤唵蔚闹?,也可以基于較復(fù)雜的條件(如增長率、數(shù)量的持續(xù)變化等)生成警報。

下面的示例為基于用戶配置的閾值參數(shù)(如 100.00,38.7 等)和接收警報通知的郵箱地址生成警報。當(dāng)此 function 接收到超過配置閾值的傳感器事件時,將發(fā)送電子郵件。

import javax.mail.*;import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public SimpleAlertFunction implements Function<Sensor, Void> {   Void process(Sensor sensor, Context context) throws Exception {       Double threshold = context           .getUserConfigValue(“threshold”).toString();       String alertEmail = context           .getUserConfigValue(“alert-email”).toString();

     if (sensor.getReading() >= threshold) {         Session s = Session.getDefaultInstance();         MimeMessage msg = new MineMessage(s);         msg.setText(“Alert for Sensor:” + sensor.getId());         Transport.send(msg);      }      return null;  }}

下面是一個有狀態(tài) function 示例,該 function 根據(jù)特定傳感器讀數(shù)的增長率生成警報。在決定是否生成警報時,需要訪問以前的傳感器讀數(shù)。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public ComplexAlertFunction implements Function<Sensor, Void> {

 Void process(Sensor sensor, Context context) throws Exception {      Double threshold = context           .getUserConfigValue(“threshold”).toString();      String alertTopic = context           .getUserConfigValue(“alert-topic”).toString();

     // Get previous & current metric values      Float previous = context.getState(sensor.getId() + “-metric”);      Long previous_time = context.getState(sensor.getId() + “-metric-time”);      Float current = sensor.getMetric();      Long current_time = sensor.getMetricTime();

     // Calculate Rate of change & compare to threshold.      Double rateOfChange = (current-previous) /                           (current_time-previous_time);      if (abs(rateOfChange) >= threshold) {         // Publish the sensor ID to the alert topic for handling         context.publish(alertTopic, sensor.getId());      }

     // Update metric values      context.putState(sensor.getId() + “-metric”, current);      context.putState(sensor.getId() + “-metric-time”, current_time);  }}

通過 Apache Pulsar Functions 狀態(tài)管理特性僅保留先前的度量讀數(shù)和時間,并將傳感器 ID 添加到這些值中(因為將會處理來自多個傳感器的度量,所以需要傳感器 ID)。為了簡單起見,假設(shè)事件以正確的順序到達,即始終是最新讀數(shù),沒有亂序讀數(shù)。

另外,這一次我們將傳感器 ID 轉(zhuǎn)發(fā)到一個專門的警報 topic 以進行進一步處理,而不是僅發(fā)送電子郵件。通過這種方式,我們可以對事件進行額外的富集處理(通過 Pulsar Functions)。例如,查找獲取傳感器的地理位置,然后通知相關(guān)人員。

基于Pulsar Functions的事件處理設(shè)計模式是什么



模式 5:簡單計數(shù)和窗口計數(shù)


簡單計數(shù)和窗口計數(shù)模式使用了聚合函數(shù),聚合函數(shù)將事件的集合作為輸入,并通過對輸入事件應(yīng)用一個 function 生成一個所需的輸出事件。聚合函數(shù)包括:求和、平均值、最大值、最小值、百分位數(shù)等。

基于Pulsar Functions的事件處理設(shè)計模式是什么

以下為使用 Pulsar Functions 實現(xiàn)“字數(shù)統(tǒng)計”的示例,計算每個單詞在給定 topic 中出現(xiàn)次數(shù)的總和。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;

public WordCountFunction implements Function<String, Void> {

  Void process(String s, Context context) throws Exception {      Arrays.asLists(s.split(“\\.”).forEach(word -> context.incrCounter(word, 1));      return null;   }}

考慮到流數(shù)據(jù) source 無休止的特性,無限期聚合用處不大,因為通常是在數(shù)據(jù)窗口上進行這些計算(如前一小時內(nèi)的故障次數(shù))。

基于Pulsar Functions的事件處理設(shè)計模式是什么

數(shù)據(jù)窗口代表事件流的有限子集,如上圖所示。但是,應(yīng)該如何定義數(shù)據(jù)窗口的邊界?有兩個用于定義窗口的常用屬性:

  • 觸發(fā)策略:控制執(zhí)行或觸發(fā) function 代碼的時間。Apache Pulsar Function 框架通過這些規(guī)則來通知代碼處理窗口中收集的全部數(shù)據(jù)。

  • 清除策略:控制保留在窗口中的數(shù)據(jù)量。這些規(guī)則用于決定是否從窗口中清除數(shù)據(jù)元素。

這兩個策略都是由時間或窗口中的數(shù)據(jù)量驅(qū)動的。二者之間的區(qū)別是什么?又是怎樣協(xié)同工作的?在多種窗口技術(shù)中,最常用的是滾動窗口和滑動窗口。

|| 滾動窗口

窗口已滿是滾動窗口清除策略的唯一條件,因此,只需要指定想要使用觸發(fā)策略(基于計數(shù)或基于時間)即可。基于計數(shù)的滾動窗口是怎樣工作的?

在下圖的第一個示例中,觸發(fā)策略設(shè)置為 2,也就是說,在窗口中有兩個項目時,觸發(fā)器將會觸發(fā),開始執(zhí)行 Pulsar Function 代碼。這一系列行為與時間無關(guān),窗口計數(shù)達到 2 用了 5 秒還是 5 個小時并不重要,重要的是窗口計數(shù)達到 2。

基于Pulsar Functions的事件處理設(shè)計模式是什么

將上述基于計數(shù)的滾動窗口與基于時間的滾動窗口(時間設(shè)置為 10 秒)進行對比。經(jīng)過 10 秒的間隔后,無論窗口中有多少事件,function 代碼都會被觸發(fā)。在下圖中,第一個窗口中有 7 個事件,而第二個窗口中只有 3 個事件。

基于Pulsar Functions的事件處理設(shè)計模式是什么

|| 滑動窗口

滑動窗口計數(shù)定義了窗口的長度,窗口長度設(shè)置了清除策略以限制保留待處理的數(shù)據(jù)量;滑動間隔定義了觸發(fā)策略。滾動窗口策略和滑動窗口策略都可以根據(jù)時間(時間段)或長度(數(shù)據(jù)元素的數(shù)量)來定義。

在下圖中,窗口長度為 2 秒,也就是說,2 秒以前的數(shù)據(jù)會被清除,并且不會用于計算?;瑒娱g隔為 1 秒,即每 1 秒鐘執(zhí)行一次 Pulsar function 代碼。這樣,可以在整個窗口長度內(nèi)處理數(shù)據(jù)。

基于Pulsar Functions的事件處理設(shè)計模式是什么

前面的示例都是基于時間來定義清除策略和觸發(fā)策略,也可以根據(jù)長度來定義清除策略或觸發(fā)策略,或者同時定義這兩種策略。

在 Pulsar Functions 中實現(xiàn)這兩種類型的窗口 function 都很容易,只需要指定一個 java.util.Collection 作為輸入類型,如下所示,并在創(chuàng)建 function 時在 -userConfig 標志中指定適當(dāng)?shù)拇翱谂渲脤傩浴?/p>

用于實現(xiàn)前面提到的時間窗口四種情形的配置參數(shù)如下:

  • “–windowLengthCount”:每個窗口的消息數(shù)量

  • “–windowLengthDurationMs”:窗口時間(以毫秒為單位)

  • “–slidingIntervalCount”:窗口滑動后的消息數(shù)量

  • “–slidingIntervalDurationMs”:窗口滑動后的時間

正確的組合方式如下表:

時間,滑動窗口-windowLengthDurationMs = XXXX
-slidingIntervalDurationMs = XXXX
時間,Batch Window(即滾動窗口)-windowLengthDurationMs = XXXX
長度,滑動窗口-windowLengthCount = XXXX
-slidingIntervalCount = XXXX
長度,Batch Window(即滾動窗口)-windowLengthCount = XXXX

以上就是基于Pulsar Functions的事件處理設(shè)計模式是什么,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降摹OM隳芡ㄟ^這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI