溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Flink和Drools的實時日志處理方法是什么

發(fā)布時間:2021-12-31 10:47:41 來源:億速云 閱讀:194 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“Flink和Drools的實時日志處理方法是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

背景

日志系統(tǒng)接入的日志種類多、格式復雜多樣,主流的有以下幾種日志:

  • filebeat采集到的文本日志,格式多樣
  • winbeat采集到的操作系統(tǒng)日志
  • 設(shè)備上報到logstash的syslog日志
  • 接入到kafka的業(yè)務(wù)日志

以上通過各種渠道接入的日志,存在2個主要的問題:

  • 格式不統(tǒng)一、不規(guī)范、標準化不夠
  • 如何從各類日志中提取出用戶關(guān)心的指標,挖掘更多的業(yè)務(wù)價值

為了解決上面2個問題,我們基于flink和drools規(guī)則引擎做了實時的日志處理服務(wù)。

系統(tǒng)架構(gòu)

架構(gòu)比較簡單,架構(gòu)圖如下:

Flink和Drools的實時日志處理方法是什么  

各類日志都是通過kafka匯總,做日志中轉(zhuǎn)。

flink消費kafka的數(shù)據(jù),同時通過API調(diào)用拉取drools規(guī)則引擎,對日志做解析處理后,將解析后的數(shù)據(jù)存儲到Elasticsearch中,用于日志的搜索和分析等業(yè)務(wù)。

為了監(jiān)控日志解析的實時狀態(tài),flink會將日志處理的統(tǒng)計數(shù)據(jù),如每分鐘處理的日志量,每種日志從各個機器IP來的日志量寫到Redis中,用于監(jiān)控統(tǒng)計。

 

模塊介紹

系統(tǒng)項目命名為eagle。

  • eagle-api:基于springboot,作為drools規(guī)則引擎的寫入和讀取API服務(wù)。

  • eagle-common:通用類模塊。

  • eagle-log:基于flink的日志處理服務(wù)。

重點講一下eagle-log:

對接kafka、ES和Redis

對接kafka和ES都比較簡單,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch7),詳見代碼。

對接Redis,最開始用的是org.apache.bahir提供的redis connector,后來發(fā)現(xiàn)靈活度不夠,就使用了Jedis。

在將統(tǒng)計數(shù)據(jù)寫入redis的時候,最開始用的keyby分組后緩存了分組數(shù)據(jù),在sink中做統(tǒng)計處理后寫入,參考代碼如下:

String name = "redis-agg-log";
        DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .process(new ProcessWindowFunction<LogEntry, Tuple2<String, List<LogEntry>>, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<LogEntry> iterable, Collector<Tuple2<String, List<LogEntry>>> collector) {
                        ArrayList<LogEntry> logs = Lists.newArrayList(iterable);
                        if (logs.size() > 0) {
                            collector.collect(new Tuple2(s, logs));
                        }
                    }
                }).setParallelism(redisSinkParallelism).name(name).uid(name);
 

后來發(fā)現(xiàn)這樣做對內(nèi)存消耗比較大,其實不需要緩存整個分組的原始數(shù)據(jù),只需要一個統(tǒng)計數(shù)據(jù)就OK了,優(yōu)化后:

String name = "redis-agg-log";
        DataStream<LogStatWindowResult> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime))
                .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())
                .setParallelism(redisSinkParallelism).name(name).uid(name);
 

這里使用了flink的聚合函數(shù)和Accumulator,通過flink的agg操作做統(tǒng)計,減輕了內(nèi)存消耗的壓力。

使用broadcast廣播drools規(guī)則引擎

1、drools規(guī)則流通過broadcast map state廣播出去。

2、kafka的數(shù)據(jù)流connect規(guī)則流處理日志。

//廣播規(guī)則流
env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
                .broadcast(ruleStateDescriptor);

//kafka數(shù)據(jù)流
FlinkKafkaConsumer010<LogEntry> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);

//數(shù)據(jù)流connect規(guī)則流處理日志
BroadcastConnectedStream<LogEntry, RuleBase> connectedStreams = dataSource.connect(ruleSource);
connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);

“Flink和Drools的實時日志處理方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI