溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Hive UDAF開發(fā)詳解

發(fā)布時(shí)間:2020-06-27 04:58:27 來(lái)源:網(wǎng)絡(luò) 閱讀:1660 作者:choulanlan 欄目:大數(shù)據(jù)

說(shuō)明

這篇文章是來(lái)自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴(yán)格翻譯,因?yàn)榉g的文章示例寫得比較通俗易懂,此外,我把自己對(duì)于Hive的UDAF理解穿插到文章里面。

udfa是Hive中用戶自定義的聚集函數(shù),hive內(nèi)置UDAF函數(shù)包括有sum()與count(),UDAF實(shí)現(xiàn)有簡(jiǎn)單與通用兩種方式,簡(jiǎn)單UDAF因?yàn)槭褂肑ava反射導(dǎo)致性能損失,而且有些特性不能使用,已經(jīng)被棄用了;在這篇博文中我們將關(guān)注Hive中自定義聚類函數(shù)-GenericUDAF,UDAF開發(fā)主要涉及到以下兩個(gè)抽象類:

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver??

  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??

源碼鏈接

博文中的所有的代碼和數(shù)據(jù)可以在以下鏈接找到:hive examples

示例數(shù)據(jù)準(zhǔn)備

首先先創(chuàng)建一張包含示例數(shù)據(jù)的表:people,該表只有name一列,該列中包含了一個(gè)或多個(gè)名字,該表數(shù)據(jù)保存在people.txt文件中。

[plain]?view plain?copy

  1. ~$?cat?./people.txt??

  2. ??

  3. John?Smith??

  4. John?and?Ann?White??

  5. Ted?Green??

  6. Dorothy??

把該文件上載到hdfs目錄/user/matthew/people中:

[plain]?view plain?copy

  1. hadoop?fs?-mkdir?people??

  2. hadoop?fs?-put?./people.txt?people??

下面要?jiǎng)?chuàng)建hive外部表,在hive shell中執(zhí)行


[sql]?view plain?copy

  1. CREATE?EXTERNAL?TABLE?people?(name?string)??

  2. ROW?FORMAT?DELIMITED?FIELDS???

  3. ????TERMINATED?BY?'\t'???

  4. ????ESCAPED?BY?''???

  5. ????LINES?TERMINATED?BY?'\n'??

  6. STORED?AS?TEXTFILE???

  7. LOCATION?'/user/matthew/people';??


相關(guān)抽象類介紹

創(chuàng)建一個(gè)GenericUDAF必須先了解以下兩個(gè)抽象類:

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver???

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??

為了更好理解上述抽象類的API,要記住hive只是mapreduce函數(shù),只不過hive已經(jīng)幫助我們寫好并隱藏mapreduce,向上提供簡(jiǎn)潔的sql函數(shù),所以我們要結(jié)合Mapper、Combiner與Reducer來(lái)幫助我們理解這個(gè)函數(shù)。要記住在Hadoop集群中有若干臺(tái)機(jī)器,在不同的機(jī)器上Mapper與Reducer任務(wù)獨(dú)立運(yùn)行。

所以大體上來(lái)說(shuō),這個(gè)UDAF函數(shù)讀取數(shù)據(jù)(mapper),聚集一堆mapper輸出到部分聚集結(jié)果(combiner),并且最終創(chuàng)建一個(gè)最終的聚集結(jié)果(reducer)。因?yàn)槲覀兛缬蚨鄠€(gè)combiner進(jìn)行聚集,所以我們需要保存部分聚集結(jié)果。

AbstractGenericUDAFResolver

Resolver很簡(jiǎn)單,要覆蓋實(shí)現(xiàn)下面方法,該方法會(huì)根據(jù)sql傳人的參數(shù)數(shù)據(jù)格式指定調(diào)用哪個(gè)Evaluator進(jìn)行處理。

[java]?view plain?copy

  1. <span?style="background-color:?rgb(255,?255,?255);"><span?style="font-size:14px;">public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)?throws?SemanticException;</span></span>??

GenericUDAFEvaluator

UDAF邏輯處理主要發(fā)生在Evaluator中,要實(shí)現(xiàn)該抽象類的幾個(gè)方法。

在理解Evaluator之前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內(nèi)部類Model。


ObjectInspector

作用主要是解耦數(shù)據(jù)使用與數(shù)據(jù)格式,使得數(shù)據(jù)流在輸入輸出端切換不同的輸入輸出格式,不同的Operator上使用不同的格式。可以參考這兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有關(guān)于objectinspector的介紹。

Model

Model代表了UDAF在mapreduce的各個(gè)階段。

[java]?view plain?copy

  1. public?static?enum?Mode?{??

  2. ????/**?

  3. ?????*?PARTIAL1:?這個(gè)是mapreduce的map階段:從原始數(shù)據(jù)到部分?jǐn)?shù)據(jù)聚合?

  4. ?????*?將會(huì)調(diào)用iterate()和terminatePartial()?

  5. ?????*/??

  6. ????PARTIAL1,??

  7. ????????/**?

  8. ?????*?PARTIAL2:?這個(gè)是mapreduce的map端的Combiner階段,負(fù)責(zé)在map端合并map的數(shù)據(jù)::從部分?jǐn)?shù)據(jù)聚合到部分?jǐn)?shù)據(jù)聚合:?

  9. ?????*?將會(huì)調(diào)用merge()?和?terminatePartial()??

  10. ?????*/??

  11. ????PARTIAL2,??

  12. ????????/**?

  13. ?????*?FINAL:?mapreduce的reduce階段:從部分?jǐn)?shù)據(jù)的聚合到完全聚合??

  14. ?????*?將會(huì)調(diào)用merge()和terminate()?

  15. ?????*/??

  16. ????FINAL,??

  17. ????????/**?

  18. ?????*?COMPLETE:?如果出現(xiàn)了這個(gè)階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結(jié)果了:從原始數(shù)據(jù)直接到完全聚合?

  19. ??????*?將會(huì)調(diào)用?iterate()和terminate()?

  20. ?????*/??

  21. ????COMPLETE??

  22. ??};??

一般情況下,完整的UDAF邏輯是一個(gè)mapreduce過程,如果有mapper和reducer,就會(huì)經(jīng)歷PARTIAL1(mapper),F(xiàn)INAL(reducer),如果還有combiner,那就會(huì)經(jīng)歷PARTIAL1(mapper),PARTIAL2(combiner),F(xiàn)INAL(reducer)。

而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會(huì)只有COMPLETE階段,這個(gè)階段直接輸入原始數(shù)據(jù),出結(jié)果。

GenericUDAFEvaluator的方法

[java]?view plain?copy

  1. //?確定各個(gè)階段輸入輸出參數(shù)的數(shù)據(jù)格式ObjectInspectors??

  2. public??ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)?throws?HiveException;??

  3. ??

  4. //?保存數(shù)據(jù)聚集結(jié)果的類??

  5. abstract?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException;??

  6. ??

  7. //?重置聚集結(jié)果??

  8. public?void?reset(AggregationBuffer?agg)?throws?HiveException;??

  9. ??

  10. //?map階段,迭代處理輸入sql傳過來(lái)的列數(shù)據(jù)??

  11. public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)?throws?HiveException;??

  12. ??

  13. //?map與combiner結(jié)束返回結(jié)果,得到部分?jǐn)?shù)據(jù)聚集結(jié)果??

  14. public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException;??

  15. ??

  16. //?combiner合并map返回的結(jié)果,還有reducer合并mapper或combiner返回的結(jié)果。??

  17. public?void?merge(AggregationBuffer?agg,?Object?partial)?throws?HiveException;??

  18. ??

  19. //?reducer階段,輸出最終結(jié)果??

  20. public?Object?terminate(AggregationBuffer?agg)?throws?HiveException;??

圖解Model與Evaluator關(guān)系

Hive UDAF開發(fā)詳解

Model各階段對(duì)應(yīng)Evaluator方法調(diào)用


Hive UDAF開發(fā)詳解


Evaluator各個(gè)階段下處理mapreduce流程

實(shí)例

下面將講述一個(gè)聚集函數(shù)UDAF的實(shí)例,我們將計(jì)算people這張表中的name列字母的個(gè)數(shù)。

下面的函數(shù)代碼是計(jì)算指定列中字符的總數(shù)(包括空格)

代碼

[java]?view plain?copy

  1. @Description(name?=?"letters",?value?=?"_FUNC_(expr)?-?返回該列中所有字符串的字符總數(shù)")??

  2. public?class?TotalNumOfLettersGenericUDAF?extends?AbstractGenericUDAFResolver?{??

  3. ??

  4. ????@Override??

  5. ????public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)??

  6. ????????????throws?SemanticException?{??

  7. ????????if?(parameters.length?!=?1)?{??

  8. ????????????throw?new?UDFArgumentTypeException(parameters.length?-?1,??

  9. ????????????????????"Exactly?one?argument?is?expected.");??

  10. ????????}??

  11. ??????????

  12. ????????ObjectInspector?oi?=?TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);??

  13. ??????????

  14. ????????if?(oi.getCategory()?!=?ObjectInspector.Category.PRIMITIVE){??

  15. ????????????throw?new?UDFArgumentTypeException(0,??

  16. ????????????????????????????"Argument?must?be?PRIMITIVE,?but?"??

  17. ????????????????????????????+?oi.getCategory().name()??

  18. ????????????????????????????+?"?was?passed.");??

  19. ????????}??

  20. ??????????

  21. ????????PrimitiveObjectInspector?inputOI?=?(PrimitiveObjectInspector)?oi;??

  22. ??????????

  23. ????????if?(inputOI.getPrimitiveCategory()?!=?PrimitiveObjectInspector.PrimitiveCategory.STRING){??

  24. ????????????throw?new?UDFArgumentTypeException(0,??

  25. ????????????????????????????"Argument?must?be?String,?but?"??

  26. ????????????????????????????+?inputOI.getPrimitiveCategory().name()??

  27. ????????????????????????????+?"?was?passed.");??

  28. ????????}??

  29. ??????????

  30. ????????return?new?TotalNumOfLettersEvaluator();??

  31. ????}??

  32. ??

  33. ????public?static?class?TotalNumOfLettersEvaluator?extends?GenericUDAFEvaluator?{??

  34. ??

  35. ????????PrimitiveObjectInspector?inputOI;??

  36. ????????ObjectInspector?outputOI;??

  37. ????????PrimitiveObjectInspector?integerOI;??

  38. ??????????

  39. ????????int?total?=?0;??

  40. ??

  41. ????????@Override??

  42. ????????public?ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)??

  43. ????????????????throws?HiveException?{??

  44. ??????????????

  45. ????????????assert?(parameters.length?==?1);??

  46. ????????????super.init(m,?parameters);??

  47. ?????????????

  48. ?????????????//map階段讀取sql列,輸入為String基礎(chǔ)數(shù)據(jù)格式??

  49. ????????????if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??

  50. ????????????????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??

  51. ????????????}?else?{??

  52. ????????????//其余階段,輸入為Integer基礎(chǔ)數(shù)據(jù)格式??

  53. ????????????????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??

  54. ????????????}??

  55. ??

  56. ?????????????//?指定各個(gè)階段輸出數(shù)據(jù)格式都為Integer類型??

  57. ????????????outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??

  58. ????????????????????ObjectInspectorOptions.JAVA);??

  59. ????????????return?outputOI;??

  60. ??

  61. ????????}??

  62. ??

  63. ????????/**?

  64. ?????????*?存儲(chǔ)當(dāng)前字符總數(shù)的類?

  65. ?????????*/??

  66. ????????static?class?LetterSumAgg?implements?AggregationBuffer?{??

  67. ????????????int?sum?=?0;??

  68. ????????????void?add(int?num){??

  69. ????????????????sum?+=?num;??

  70. ????????????}??

  71. ????????}??

  72. ??

  73. ????????@Override??

  74. ????????public?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException?{??

  75. ????????????LetterSumAgg?result?=?new?LetterSumAgg();??

  76. ????????????return?result;??

  77. ????????}??

  78. ??

  79. ????????@Override??

  80. ????????public?void?reset(AggregationBuffer?agg)?throws?HiveException?{??

  81. ????????????LetterSumAgg?myagg?=?new?LetterSumAgg();??

  82. ????????}??

  83. ??????????

  84. ????????private?boolean?warned?=?false;??

  85. ??

  86. ????????@Override??

  87. ????????public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??

  88. ????????????????throws?HiveException?{??

  89. ????????????assert?(parameters.length?==?1);??

  90. ????????????if?(parameters[0]?!=?null)?{??

  91. ????????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  92. ????????????????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??

  93. ????????????????myagg.add(String.valueOf(p1).length());??

  94. ????????????}??

  95. ????????}??

  96. ??

  97. ????????@Override??

  98. ????????public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException?{??

  99. ????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  100. ????????????total?+=?myagg.sum;??

  101. ????????????return?total;??

  102. ????????}??

  103. ??

  104. ????????@Override??

  105. ????????public?void?merge(AggregationBuffer?agg,?Object?partial)??

  106. ????????????????throws?HiveException?{??

  107. ????????????if?(partial?!=?null)?{??

  108. ??????????????????

  109. ????????????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??

  110. ??????????????????

  111. ????????????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??

  112. ??????????????????

  113. ????????????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??

  114. ??????????????????

  115. ????????????????myagg2.add(partialSum);??

  116. ????????????????myagg1.add(myagg2.sum);??

  117. ????????????}??

  118. ????????}??

  119. ??

  120. ????????@Override??

  121. ????????public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??

  122. ????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  123. ????????????total?=?myagg.sum;??

  124. ????????????return?myagg.sum;??

  125. ????????}??

  126. ??

  127. ????}??

  128. }??


代碼說(shuō)明

這里有一些關(guān)于combiner的資源,Philippe Adjiman?講得不錯(cuò)。


AggregationBuffer?允許我們保存中間結(jié)果,通過定義我們的buffer,我們可以處理任何格式的數(shù)據(jù),在代碼例子中字符總數(shù)保存在AggregationBuffer?。


[java]?view plain?copy

  1. /**?

  2. *?保存當(dāng)前字符總數(shù)的類?

  3. */??

  4. static?class?LetterSumAgg?implements?AggregationBuffer?{??

  5. ????int?sum?=?0;??

  6. ????void?add(int?num){??

  7. ????????sum?+=?num;??

  8. ????}??

  9. }??


這意味著UDAF在不同的mapreduce階段會(huì)接收到不同的輸入。Iterate讀取我們表中的一行(或者準(zhǔn)確來(lái)說(shuō)是表),然后輸出其他數(shù)據(jù)格式的聚集結(jié)果。

artialAggregation合并這些聚集結(jié)果到另外相同格式的新的聚集結(jié)果,然后最終的reducer取得這些聚集結(jié)果然后輸出最終結(jié)果(該結(jié)果或許與接收數(shù)據(jù)的格式不一致)。

在init()方法中我們指定輸入為string,結(jié)果輸出格式為integer,還有,部分聚集結(jié)果輸出格式為integer(保存在aggregation buffer中);terminate()terminatePartial()兩者輸出一個(gè)integer


[java]?view plain?copy

  1. //?init方法中根據(jù)不同的mode指定輸出數(shù)據(jù)的格式objectinspector??

  2. if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??

  3. ????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??

  4. }?else?{??

  5. ????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??

  6. }??

  7. ??

  8. //?不同model階段的輸出數(shù)據(jù)格式??

  9. outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??

  10. ????????????????????ObjectInspectorOptions.JAVA);??


iterate()函數(shù)讀取到每行中列的字符串,計(jì)算與保存該字符串的長(zhǎng)度

[java]?view plain?copy

  1. public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??

  2. ????throws?HiveException?{??

  3. ????...??

  4. ????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??

  5. ????myagg.add(String.valueOf(p1).length());??

  6. ????}??

  7. }??


Merge函數(shù)增加部分聚集總數(shù)到AggregationBuffer

[java]?view plain?copy

  1. public?void?merge(AggregationBuffer?agg,?Object?partial)??

  2. ????????throws?HiveException?{??

  3. ????if?(partial?!=?null)?{??

  4. ??????????????????

  5. ????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??

  6. ??????????????????

  7. ????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??

  8. ??????????????????

  9. ????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??

  10. ??????????????????

  11. ????????myagg2.add(partialSum);??

  12. ????????myagg1.add(myagg2.sum);??

  13. ????}??

  14. }??


Terminate()函數(shù)返回AggregationBuffer中的內(nèi)容,這里產(chǎn)生了最終結(jié)果。

[java]?view plain?copy

  1. public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??

  2. ????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  3. ????total?=?myagg.sum;??

  4. ????return?myagg.sum;??

  5. }??

使用自定義函數(shù)

[plain]?view plain?copy

  1. ADD?JAR?./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;??

  2. CREATE?TEMPORARY?FUNCTION?letters?as?'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';??

  3. ??

  4. SELECT?letters(name)?FROM?people;??

  5. OK??

  6. 44??

  7. Time?taken:?20.688?seconds ?


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI