您好,登錄后才能下訂單哦!
這篇文章主要講解了“flink中怎么使用自定義聚合函數(shù)統(tǒng)計網(wǎng)站TP指標(biāo)”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“flink中怎么使用自定義聚合函數(shù)統(tǒng)計網(wǎng)站TP指標(biāo)”吧!
在網(wǎng)站性能測試中,我們經(jīng)常會選擇 TP50、TP95 或者 TP99 等作為性能指標(biāo)。接下來我們講講這些指標(biāo)的含義、以及在flink中如何實時統(tǒng)計:
TP50,top percent 50,即 50% 的數(shù)據(jù)都滿足某一條件;
TP95,top percent 95,即 95% 的數(shù)據(jù)都滿足某一條件;
TP99,top percent 99,即 99% 的數(shù)據(jù)都滿足某一條件;
我們舉一個例子,我們要統(tǒng)計網(wǎng)站一分鐘之內(nèi)的的響應(yīng)時間的TP90,正常的處理邏輯就是把這一分鐘之內(nèi)所有的網(wǎng)站的響應(yīng)時間從小到大排序,然后計算出總條數(shù)count,然后計算出排名在90%的響應(yīng)時間是多少(count*0.9),就是我們要的值。
這個需求很明顯就是一個使用聚合函數(shù)來做的案例,F(xiàn)link中提供了大量的聚合函數(shù),比如count,max,min等等,但是對于這個需求,卻無法滿足,所以我們需要自定義一個聚合函數(shù)來實現(xiàn)我們的需求。
在前段時間,我們聊了聊flink的聚合算子,具體可參考: flink實戰(zhàn)-聊一聊flink中的聚合算子 , 聚合算子是我們在寫代碼的時候用來實現(xiàn)一個聚合功能,聚合函數(shù)其實和聚合算子類似,只不過聚合函數(shù)用于在寫sql的時候使用。
自定義聚合函數(shù)需要繼承抽象類org.apache.flink.table.functions.AggregateFunction。并實現(xiàn)下面幾個方法。
createAccumulator():這個方法會在一次聚合操作的開始調(diào)用一次,主要用于構(gòu)造一個Accumulator,用于存儲在聚合過程中的臨時對象。
accumulate() 這個方法,每來一條數(shù)據(jù)會調(diào)用一次這個方法,我們就在這個方法里實現(xiàn)我們的聚合函數(shù)的具體邏輯。
getValue() 這個方法是在聚合結(jié)束以后,對中間結(jié)果做處理,然后將結(jié)果返回,最終sql中得到的結(jié)果數(shù)據(jù)就是這個值。
對于TP指標(biāo),正常的思路我們可以先創(chuàng)建一個臨時變量,里面有一個list,每來一個數(shù)據(jù),就放到這個list里面,在getValue方法里,進(jìn)行排序,取相應(yīng)的TP值。
但是這種思路會有一個問題,就是如果要聚合的時間范圍內(nèi),數(shù)據(jù)過多的話。就會在list存儲大量的數(shù)據(jù),會造成checkpoint過大,時間過長,最后導(dǎo)致程序失敗。得不到正確的結(jié)果。
所以我們需要換一個思路,既然最后我們想要的是一個有序列表,那么我們是不是可以把這個list結(jié)構(gòu)優(yōu)化一下,使用Treemap來存儲,map的key就是指標(biāo),比如響應(yīng)時間。value就是對應(yīng)的指標(biāo)出現(xiàn)的次數(shù)。這樣getValue方法里,只需要將map的value值累加,就能得到總數(shù)count,然后計算出來相應(yīng)的tp值的位置position,最后我們再從頭累加map的value,直到累加結(jié)果大于相應(yīng)的位置position,則map的key即為所求。
示例如下:我們先構(gòu)建一個source,只是隨機(jī)生成一個變量,網(wǎng)站的相應(yīng)時間response_time。
String sql = "CREATE TABLE source (\n" +
" response_time INT,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts," +
"proctime as proctime()\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1000',\n" +
" 'fields.response_time.min'='1',\n" +
" 'fields.response_time.max'='1000'" +
")";
定義一個聚合函數(shù)用的臨時變量:
public static class TPAccum{
public Integer tp;
public Map<Integer,Integer> map = new HashMap<>();
}
實現(xiàn)自定義聚合函數(shù)類
public static class TP extends AggregateFunction<Integer,TPAccum>{
@Override
public TPAccum createAccumulator(){
return new TPAccum();
}
@Override
public Integer getValue(TPAccum acc){
if (acc.map.size() == 0){
return null;
} else {
Map<Integer,Integer> map = new TreeMap<>(acc.map);
int sum = map.values().stream().reduce(0, Integer::sum);
int tp = acc.tp;
int responseTime = 0;
int p = 0;
Double d = sum * (tp / 100D);
for (Map.Entry<Integer,Integer> entry: map.entrySet()){
p += entry.getValue();
int position = d.intValue() - 1;
if (p >= position){
responseTime = entry.getKey();
break;
}
}
return responseTime;
}
}
public void accumulate(TPAccum acc, Integer iValue, Integer tp){
acc.tp = tp;
if (acc.map.containsKey(iValue)){
acc.map.put(iValue, acc.map.get(iValue) + 1);
} else {
acc.map.put(iValue, 1);
}
}
}
實際的查詢sql如下:
String sqlSelect =
"select TUMBLE_START(proctime,INTERVAL '1' SECOND) as starttime,mytp(response_time,50) from source" +
" group by TUMBLE(proctime,INTERVAL '1' SECOND)";
感謝各位的閱讀,以上就是“flink中怎么使用自定義聚合函數(shù)統(tǒng)計網(wǎng)站TP指標(biāo)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對flink中怎么使用自定義聚合函數(shù)統(tǒng)計網(wǎng)站TP指標(biāo)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。