您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)如何理解MapReduce計數(shù)器,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
在許多情況下,一個用戶需要了解待分析的數(shù)據(jù),盡管這并非所要執(zhí)行的分析任務(wù) 的核心內(nèi)容。以統(tǒng)計數(shù)據(jù)集中無效記錄數(shù)目的任務(wù)為例,如果發(fā)現(xiàn)無效記錄的比例 相當(dāng)高,那么就需要認(rèn)真思考為何存在如此多無效記錄。是所采用的檢測程序存在 缺陷,還是數(shù)據(jù)集質(zhì)量確實(shí)很低,包含大量無效記錄?如果確定是數(shù)據(jù)集的質(zhì)量問 題,則可能需要擴(kuò)大數(shù)據(jù)集的規(guī)模,以增大有效記錄的比例,從而進(jìn)行有意義的 分析。
計數(shù)器是一種收集作業(yè)統(tǒng)計信息的有效手段,用于質(zhì)量控制或應(yīng)用級統(tǒng)計。計數(shù)器 還可輔助診斷系統(tǒng)故障。如果需要將日志信息傳輸?shù)絤ap或reduce任務(wù),更好的 方法通常是嘗試傳輸計數(shù)器值以監(jiān)測某一特定事件是否發(fā)生。對于大型分布式作業(yè) 而言,使用計數(shù)器更為方便。首先,獲取計數(shù)器值比輸出日志更方便,其次,根據(jù) 計數(shù)器值統(tǒng)計特定事件的發(fā)生次數(shù)要比分析一堆日志文件容易得多。
Hadoop為每個作業(yè)維護(hù)若干內(nèi)置計數(shù)器, 以描述該作業(yè)的各項(xiàng)指標(biāo)。例如,某些計數(shù)器記錄已處理的字節(jié)數(shù)和記錄數(shù),使用戶可監(jiān)控已處理的輸入數(shù)據(jù)量和已產(chǎn)生的輸出數(shù)據(jù)量,并以此對 job 做適當(dāng)?shù)膬?yōu)化。
14/06/08 15:13:35 INFO mapreduce.Job: Counters: 46 File System Counters FILE: Number of bytes read=159 FILE: Number of bytes written=159447 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=198 HDFS: Number of bytes written=35 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Rack-local map tasks=1 Total time spent by all maps in occupied slots (ms)=3896 Total time spent by all reduces in occupied slots (ms)=9006 Map-Reduce Framework Map input records=3 Map output records=12 Map output bytes=129 Map output materialized bytes=159 Input split bytes=117 Combine input records=0 Combine output records=0 Reduce input groups=4 Reduce shuffle bytes=159 Reduce input records=12 Reduce output records=4 Spilled Records=24 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=13 CPU time spent (ms)=3830 Physical memory (bytes) snapshot=537718784 Virtual memory (bytes) snapshot=7365263360 Total committed heap usage (bytes)=2022309888 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=81 File Output Format Counters Bytes Written=35
計數(shù)器由其關(guān)聯(lián)任務(wù)維護(hù),并定期傳到tasktracker,再由tasktracker傳給 jobtracker.因此,計數(shù)器能夠被全局地聚集。詳見第 hadoop 權(quán)威指南第170頁的“進(jìn)度和狀態(tài)的更新”小節(jié)。與其他計數(shù)器(包括用戶定義的計數(shù)器)不同,內(nèi)置的作業(yè)計數(shù)器實(shí)際上 由jobtracker維護(hù),不必在整個網(wǎng)絡(luò)中發(fā)送。
一個任務(wù)的計數(shù)器值每次都是完整傳輸?shù)?,而非自上次傳輸之后再繼續(xù)數(shù)未完成的傳輸,以避免由于消息丟失而引發(fā)的錯誤。另外,如果一個任務(wù)在作業(yè)執(zhí)行期間失 敗,則相關(guān)計數(shù)器值會減小。僅當(dāng)一個作業(yè)執(zhí)行成功之后,計數(shù)器的值才是完整可 靠的。
MapReduce允許用戶編寫程序來定義計數(shù)器,計數(shù)器的值可在mapper或reducer 中增加。多個計數(shù)器由一個Java枚舉(enum)類型來定義,以便對計數(shù)器分組。一 個作業(yè)可以定義的枚舉類型數(shù)量不限,各個枚舉類型所包含的字段數(shù)量也不限。枚 舉類型的名稱即為組的名稱,枚舉類型的字段就是計數(shù)器名稱。計數(shù)器是全局的。 換言之,MapReduce框架將跨所有map和reduce聚集這些計數(shù)器,并在作業(yè)結(jié)束 時產(chǎn)生一個最終結(jié)果。
Notice1:需要說明的是,不同的 hadoop 版本定義的方式會有些許差異。
(1)在0.20.x版本中使用counter很簡單,直接定義即可,如無此counter,hadoop會自動添加此counter.
Counter ct = context.getCounter("INPUT_WORDS", "count"); ct.increment(1);
(2)在0.19.x版本中,需要定義enum
enum MyCounter {INPUT_WORDS }; reporter.incrCounter(MyCounter.INPUT_WORDS, 1); RunningJob job = JobClient.runJob(conf); Counters c = job.getCounters(); long cnt = c.getCounter(MyCounter.INPUT_WORDS);
Notice2:使用計數(shù)器需要清楚的是它們都存儲在jobTracker的內(nèi)存里。Mapper/Reducer 任務(wù)序列化它們,連同更新狀態(tài)被發(fā)送。為了運(yùn)行正常且jobTracker不會出問題,計數(shù)器的數(shù)量應(yīng)該在10-100個,計數(shù)器不僅僅只用來聚合MapReduce job的統(tǒng)計值。新版本的hadoop限制了計數(shù)器的數(shù)量,以防給jobTracker帶來損害。你最不想看到的事情就是由于定義上百個計數(shù)器而使jobTracker宕機(jī)。
下面咱們來看一個計數(shù)器的實(shí)例(以下代碼請運(yùn)行在 0.20.1 版本以上):
hello world 2013 mapreduce hello world 2013 mapreduce hello world 2013 mapreduce
/** * Project Name:CDHJobs * File Name:MapredCounter.java * Package Name:tmp * Date:2014-6-8下午2:12:48 * Copyright (c) 2014, decli#qq.com All Rights Reserved. * */ package tmp; import java.io.IOException; import java.util.StringTokenizer; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountWithCounter { static enum WordsNature { STARTS_WITH_DIGIT, STARTS_WITH_LETTER, ALL } /** * The map class of WordCount. */ public static class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /** * The reducer class of WordCount */ public static class TokenCounterReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; String token = key.toString(); if (StringUtils.isNumeric(token)) { context.getCounter(WordsNature.STARTS_WITH_DIGIT).increment(1); } else if (StringUtils.isAlpha(token)) { context.getCounter(WordsNature.STARTS_WITH_LETTER).increment(1); } context.getCounter(WordsNature.ALL).increment(1); for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } /** * The main entry point. */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "WordCountWithCounter"); job.setJarByClass(WordCountWithCounter.class); job.setMapperClass(TokenCounterMapper.class); job.setReducerClass(TokenCounterReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/tmp/dsap/rawdata/june/a.txt")); FileOutputFormat.setOutputPath(job, new Path("/tmp/dsap/rawdata/june/a_result")); int exitCode = job.waitForCompletion(true) ? 0 : 1; Counters counters = job.getCounters(); Counter c1 = counters.findCounter(WordsNature.STARTS_WITH_DIGIT); System.out.println("-------------->>>>: " + c1.getDisplayName() + ": " + c1.getValue()); // The below example shows how to get built-in counter groups that Hadoop provides basically. for (CounterGroup group : counters) { System.out.println("=========================================================="); System.out.println("* Counter Group: " + group.getDisplayName() + " (" + group.getName() + ")"); System.out.println(" number of counters in this group: " + group.size()); for (Counter counter : group) { System.out.println(" ++++ " + counter.getDisplayName() + ": " + counter.getName() + ": " + counter.getValue()); } } System.exit(exitCode); } }
運(yùn)行結(jié)果下面會一并給出。Counter有"組group"的概念,用于表示邏輯上相同范圍的所有數(shù)值。MapReduce job提供的默認(rèn)Counter分為7個組,下面逐一介紹。這里也拿上面的測試數(shù)據(jù)來做詳細(xì)比對,我將會針對具體的計數(shù)器,挑選一些主要的簡述一下。
... 前面省略 job 運(yùn)行信息 xx 字 ... ALL=4 STARTS_WITH_DIGIT=1 STARTS_WITH_LETTER=3 -------------->>>>: STARTS_WITH_DIGIT: 1 ========================================================== #MapReduce job執(zhí)行所依賴的數(shù)據(jù)來自于不同的文件系統(tǒng),這個group表示job與文件系統(tǒng)交互的讀寫統(tǒng)計 * Counter Group: File System Counters (org.apache.hadoop.mapreduce.FileSystemCounter) number of counters in this group: 10 #job讀取本地文件系統(tǒng)的文件字節(jié)數(shù)。假定我們當(dāng)前map的輸入數(shù)據(jù)都來自于HDFS,那么在map階段,這個數(shù)據(jù)應(yīng)該是0。但reduce在執(zhí)行前,它 的輸入數(shù)據(jù)是經(jīng)過shuffle的merge后存儲在reduce端本地磁盤中,所以這個數(shù)據(jù)就是所有reduce的總輸入字節(jié)數(shù)。 ++++ FILE: Number of bytes read: FILE_BYTES_READ: 159 #map的中間結(jié)果都會spill到本地磁盤中,在map執(zhí)行完后,形成最終的spill文件。所以map端這里的數(shù)據(jù)就表示map task往本地磁盤中總共寫了多少字節(jié)。與map端相對應(yīng)的是,reduce端在shuffle時,會不斷地拉取map端的中間結(jié)果,然后做merge并 不斷spill到自己的本地磁盤中。最終形成一個單獨(dú)文件,這個文件就是reduce的輸入文件。 ++++ FILE: Number of bytes written: FILE_BYTES_WRITTEN: 159447 ++++ FILE: Number of read operations: FILE_READ_OPS: 0 ++++ FILE: Number of large read operations: FILE_LARGE_READ_OPS: 0 ++++ FILE: Number of write operations: FILE_WRITE_OPS: 0 # 整個job執(zhí)行過程中,只有map端運(yùn)行時,才從HDFS讀取數(shù)據(jù),這些數(shù)據(jù)不限于源文件內(nèi)容,還包括所有map的split元數(shù)據(jù)。所以這個值應(yīng)該比FileInputFormatCounters.BYTES_READ 要略大些。 ++++ HDFS: Number of bytes read: HDFS_BYTES_READ: 198 #Reduce的最終結(jié)果都會寫入HDFS,就是一個job執(zhí)行結(jié)果的總量。 ++++ HDFS: Number of bytes written: HDFS_BYTES_WRITTEN: 35 ++++ HDFS: Number of read operations: HDFS_READ_OPS: 6 ++++ HDFS: Number of large read operations: HDFS_LARGE_READ_OPS: 0 ++++ HDFS: Number of write operations: HDFS_WRITE_OPS: 2 ========================================================== #這個group描述與job調(diào)度相關(guān)的統(tǒng)計 * Counter Group: Job Counters (org.apache.hadoop.mapreduce.JobCounter) number of counters in this group: 5 #Job在被調(diào)度時,如果啟動了一個data-local(源文件的幅本在執(zhí)行map task的taskTracker本地) ++++ Data-local map tasks #當(dāng)前job為某些map task的執(zhí)行保留了slot,總共保留的時間是多少 ++++ FALLOW_SLOTS_MILLIS_MAPS/REDUCES #所有map task占用slot的總時間,包含執(zhí)行時間和創(chuàng)建/銷毀子JVM的時間 ++++ SLOTS_MILLIS_MAPS/REDUCES # 此job啟動了多少個map task ++++ Launched map tasks: TOTAL_LAUNCHED_MAPS: 1 # 此job啟動了多少個reduce task ++++ Launched reduce tasks: TOTAL_LAUNCHED_REDUCES: 1 ++++ Rack-local map tasks: RACK_LOCAL_MAPS: 1 ++++ Total time spent by all maps in occupied slots (ms): SLOTS_MILLIS_MAPS: 3896 ++++ Total time spent by all reduces in occupied slots (ms): SLOTS_MILLIS_REDUCES: 9006 ========================================================== #這個Counter group包含了相當(dāng)多地job執(zhí)行細(xì)節(jié)數(shù)據(jù)。這里需要有個概念認(rèn)識是:一般情況下,record就表示一行數(shù)據(jù),而相對地byte表示這行數(shù)據(jù)的大小是 多少,這里的group表示經(jīng)過reduce merge后像這樣的輸入形式{"aaa", [5, 8, 2, …]}。 * Counter Group: Map-Reduce Framework (org.apache.hadoop.mapreduce.TaskCounter) number of counters in this group: 20 #所有map task從HDFS讀取的文件總行數(shù) ++++ Map input records: MAP_INPUT_RECORDS: 3 #map task的直接輸出record是多少,就是在map方法中調(diào)用context.write的次數(shù),也就是未經(jīng)過Combine時的原生輸出條數(shù) ++++ Map output records: MAP_OUTPUT_RECORDS: 12 # Map的輸出結(jié)果key/value都會被序列化到內(nèi)存緩沖區(qū)中,所以這里的bytes指序列化后的最終字節(jié)之和 ++++ Map output bytes: MAP_OUTPUT_BYTES: 129 ++++ Map output materialized bytes: MAP_OUTPUT_MATERIALIZED_BYTES: 159 # #與map task 的split相關(guān)的數(shù)據(jù)都會保存于HDFS中,而在保存時元數(shù)據(jù)也相應(yīng)地存儲著數(shù)據(jù)是以怎樣的壓縮方式放入的,它的具體類型是什么,這些額外的數(shù)據(jù)是 MapReduce框架加入的,與job無關(guān),這里記錄的大小就是表示額外信息的字節(jié)大小 ++++ Input split bytes: SPLIT_RAW_BYTES: 117 #Combiner是為了減少盡量減少需要拉取和移動的數(shù)據(jù),所以combine輸入條數(shù)與map的輸出條數(shù)是一致的。 ++++ Combine input records: COMBINE_INPUT_RECORDS: 0 # 經(jīng)過Combiner后,相同key的數(shù)據(jù)經(jīng)過壓縮,在map端自己解決了很多重復(fù)數(shù)據(jù),表示最終在map端中間文件中的所有條目數(shù) ++++ Combine output records: COMBINE_OUTPUT_RECORDS: 0 #Reduce總共讀取了多少個這樣的groups ++++ Reduce input groups: REDUCE_INPUT_GROUPS: 4 #Reduce端的copy線程總共從map端抓取了多少的中間數(shù)據(jù),表示各個map task最終的中間文件總和 ++++ Reduce shuffle bytes: REDUCE_SHUFFLE_BYTES: 159 #如果有Combiner的話,那么這里的數(shù)值就等于map端Combiner運(yùn)算后的最后條數(shù),如果沒有,那么就應(yīng)該等于map的輸出條數(shù) ++++ Reduce input records: REDUCE_INPUT_RECORDS: 12 #所有reduce執(zhí)行后輸出的總條目數(shù) ++++ Reduce output records: REDUCE_OUTPUT_RECORDS: 4 #spill過程在map和reduce端都會發(fā)生,這里統(tǒng)計在總共從內(nèi)存往磁盤中spill了多少條數(shù)據(jù) ++++ Spilled Records: SPILLED_RECORDS: 24 #每個reduce幾乎都得從所有map端拉取數(shù)據(jù),每個copy線程拉取成功一個map的數(shù)據(jù),那么增1,所以它的總數(shù)基本等于 reduce number * map number ++++ Shuffled Maps : SHUFFLED_MAPS: 1 # copy線程在抓取map端中間數(shù)據(jù)時,如果因?yàn)榫W(wǎng)絡(luò)連接異?;蚴荌O異常,所引起的shuffle錯誤次數(shù) ++++ Failed Shuffles: FAILED_SHUFFLE: 0 #記錄著shuffle過程中總共經(jīng)歷了多少次merge動作 ++++ Merged Map outputs: MERGED_MAP_OUTPUTS: 1 #通過JMX獲取到執(zhí)行map與reduce的子JVM總共的GC時間消耗 ++++ GC time elapsed (ms): GC_TIME_MILLIS: 13 ++++ CPU time spent (ms): CPU_MILLISECONDS: 3830 ++++ Physical memory (bytes) snapshot: PHYSICAL_MEMORY_BYTES: 537718784 ++++ Virtual memory (bytes) snapshot: VIRTUAL_MEMORY_BYTES: 7365263360 ++++ Total committed heap usage (bytes): COMMITTED_HEAP_BYTES: 2022309888 ========================================================== #這組內(nèi)描述Shuffle過程中的各種錯誤情況發(fā)生次數(shù),基本定位于Shuffle階段copy線程抓取map端中間數(shù)據(jù)時的各種錯誤。 * Counter Group: Shuffle Errors (Shuffle Errors) number of counters in this group: 6 #每個map都有一個ID,如attempt_201109020150_0254_m_000000_0,如果reduce的copy線程抓取過來的元數(shù)據(jù)中這個ID不是標(biāo)準(zhǔn)格式,那么此Counter增加 ++++ BAD_ID: BAD_ID: 0 #表示copy線程建立到map端的連接有誤 ++++ CONNECTION: CONNECTION: 0 #Reduce的copy線程如果在抓取map端數(shù)據(jù)時出現(xiàn)IOException,那么這個值相應(yīng)增加 ++++ IO_ERROR: IO_ERROR: 0 #map端的那個中間結(jié)果是有壓縮好的有格式數(shù)據(jù),所有它有兩個length信息:源數(shù)據(jù)大小與壓縮后數(shù)據(jù)大小。如果這兩個length信息傳輸?shù)挠姓`(負(fù)值),那么此Counter增加 ++++ WRONG_LENGTH: WRONG_LENGTH: 0 #每個copy線程當(dāng)然是有目的:為某個reduce抓取某些map的中間結(jié)果,如果當(dāng)前抓取的map數(shù)據(jù)不是copy線程之前定義好的map,那么就表示把數(shù)據(jù)拉錯了 ++++ WRONG_MAP: WRONG_MAP: 0 #與上面描述一致,如果抓取的數(shù)據(jù)表示它不是為此reduce而準(zhǔn)備的,那還是拉錯數(shù)據(jù)了。 ++++ WRONG_REDUCE: WRONG_REDUCE: 0 ========================================================== #這個group表示map task讀取文件內(nèi)容(總輸入數(shù)據(jù))的統(tǒng)計 * Counter Group: File Input Format Counters (org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter) number of counters in this group: 1 # Map task的所有輸入數(shù)據(jù)(字節(jié)),等于各個map task的map方法傳入的所有value值字節(jié)之和。 ++++ Bytes Read: BYTES_READ: 81 ========================================================== ##這個group表示reduce task輸出文件內(nèi)容(總輸出數(shù)據(jù))的統(tǒng)計 * Counter Group: File Output Format Counters (org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter) number of counters in this group: 1 ++++ Bytes Written: BYTES_WRITTEN: 35 ========================================================== # 自定義計數(shù)器的統(tǒng)計 * Counter Group: tmp.WordCountWithCounter$WordsNature (tmp.WordCountWithCounter$WordsNature) number of counters in this group: 3 ++++ ALL: ALL: 4 ++++ STARTS_WITH_DIGIT: STARTS_WITH_DIGIT: 1 ++++ STARTS_WITH_LETTER: STARTS_WITH_LETTER: 3
關(guān)于如何理解MapReduce計數(shù)器就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。