溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Hadoop如何對(duì)文本文件實(shí)現(xiàn)全局排序

發(fā)布時(shí)間:2021-08-21 11:33:36 來(lái)源:億速云 閱讀:204 作者:小新 欄目:服務(wù)器

這篇文章主要為大家展示了“Hadoop如何對(duì)文本文件實(shí)現(xiàn)全局排序”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“Hadoop如何對(duì)文本文件實(shí)現(xiàn)全局排序”這篇文章吧。

一、背景

Hadoop中實(shí)現(xiàn)了用于全局排序的InputSampler類和TotalOrderPartitioner類,調(diào)用示例是org.apache.hadoop.examples.Sort。

但是當(dāng)我們以Text文件作為輸入時(shí),結(jié)果并非按Text中的string列排序,而且輸出結(jié)果是SequenceFile。

原因:

1) hadoop在處理Text文件時(shí),key是行號(hào)LongWritable類型,InputSampler抽樣的是key,TotalOrderPartitioner也是用key去查找分區(qū)。這樣,抽樣得到的partition文件是對(duì)行號(hào)的抽樣,結(jié)果自然是根據(jù)行號(hào)來(lái)排序。

2)大數(shù)據(jù)量時(shí),InputSampler抽樣速度會(huì)非常慢。比如,RandomSampler需要遍歷所有數(shù)據(jù),IntervalSampler需要遍歷文件數(shù)與splits數(shù)一樣。SplitSampler效率比較高,但它只抽取每個(gè)文件前面的記錄,不適合應(yīng)用于文件內(nèi)有序的情況。

二、功能

1. 實(shí)現(xiàn)了一種局部抽樣方法PartialSampler,適用于輸入數(shù)據(jù)各文件是獨(dú)立同分布的情況

2. 使RandomSampler、IntervalSampler、SplitSampler支持對(duì)文本的抽樣

3. 實(shí)現(xiàn)了針對(duì)Text文件string列的TotalOrderPartitioner

三、實(shí)現(xiàn)

1. PartialSampler

PartialSampler從第一份輸入數(shù)據(jù)中隨機(jī)抽取第一列文本數(shù)據(jù)。PartialSampler有兩個(gè)屬性:freq(采樣頻率),numSamples(采樣總數(shù))。

public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
   InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
   ArrayList<K> samples = new ArrayList<K>(numSamples);
   Random r = new Random();
   long seed = r.nextLong();
   r.setSeed(seed);
   LOG.debug("seed: " + seed);   
   // 對(duì)splits【0】抽樣
   for (int i = 0; i < 1; i++) {
    System.out.println("PartialSampler will getSample splits["+i+"]");
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
      Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
     if (r.nextDouble() <= freq) {
      if (samples.size() < numSamples) {
        // 選擇value中的第一列抽樣
        Text value0 = new Text(value.toString().split("\t")[0]);     
        samples.add((K) value0);        
      } else {
       // When exceeding the maximum number of samples, replace a
       // random element with this one, then adjust the frequency
       // to reflect the possibility of existing elements being
       // pushed out
       int ind = r.nextInt(numSamples);
       if (ind != numSamples) {
        Text value0 = new Text(value.toString().split("\t")[0]); 
        samples.set(ind, (K) value0);
       }
       freq *= (numSamples - 1) / (double) numSamples;
      }
      key = reader.createKey();
     }
    }    
    reader.close();
   }
   return (K[])samples.toArray();
  }

首先通過(guò)InputFormat的getSplits方法得到所有的輸入分區(qū);

然后掃描第一個(gè)分區(qū)中的記錄進(jìn)行采樣。

記錄采樣的具體過(guò)程如下:

從指定分區(qū)中取出一條記錄,判斷得到的隨機(jī)浮點(diǎn)數(shù)是否小于等于采樣頻率freq

  如果大于則放棄這條記錄;

  如果小于,則判斷當(dāng)前的采樣數(shù)是否小于最大采樣數(shù),

    如果小于則這條記錄被選中,被放進(jìn)采樣集合中;

    否則從【0,numSamples】中選擇一個(gè)隨機(jī)數(shù),如果這個(gè)隨機(jī)數(shù)不等于最大采樣數(shù)numSamples,則用這條記錄替換掉采樣集合隨機(jī)數(shù)對(duì)應(yīng)位置的記錄,同時(shí)采樣頻率freq減小變?yōu)閒req*(numSamples-1)/numSamples。

然后依次遍歷分區(qū)中的其它記錄。

note:

1)PartialSampler只適用于輸入數(shù)據(jù)各文件是獨(dú)立同分布的情況。

2)自帶的三種Sampler通過(guò)修改samples.add(key)為samples.add((K) value0); 也可以實(shí)現(xiàn)對(duì)第一列的抽樣。

2. TotalOrderPartitioner

TotalOrderPartitioner主要改進(jìn)了兩點(diǎn):

1)讀partition時(shí)指定keyClass為Text.class

因?yàn)閜artition文件中的key類型為Text

在configure函數(shù)中,修改:

//Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<K> keyClass = (Class<K>)Text.class;

2)查找分區(qū)時(shí),改用value查

public int getPartition(K key, V value, int numPartitions) {
  Text value0 = new Text(value.toString().split("\t")[0]); 
  return partitions.findPartition((K) value0);
 }

3. Sort

1)設(shè)置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass

2)初始化InputSampler對(duì)象,抽樣

3)partitionFile通過(guò)CacheFile傳給TotalOrderPartitioner,執(zhí)行MapReduce任務(wù)

 Class<? extends InputFormat> inputFormatClass = TextInputFormat.class;
  Class<? extends OutputFormat> outputFormatClass = TextOutputFormat.class;
  Class<? extends WritableComparable> outputKeyClass = Text.class;
  Class<? extends Writable> outputValueClass = Text.class;
  jobConf.setMapOutputKeyClass(LongWritable.class);
  // Set user-supplied (possibly default) job configs
  jobConf.setNumReduceTasks(num_reduces);
  jobConf.setInputFormat(inputFormatClass);
  jobConf.setOutputFormat(outputFormatClass);
  jobConf.setOutputKeyClass(outputKeyClass);
  jobConf.setOutputValueClass(outputValueClass);
  if (sampler != null) {
   System.out.println("Sampling input to effect total-order sort...");
   jobConf.setPartitionerClass(TotalOrderPartitioner.class);
   Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
   inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
   //Path partitionFile = new Path(inputDir, "_sortPartitioning");
   TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
   InputSampler.<K,V>writePartitionFile(jobConf, sampler);
   URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning");
   DistributedCache.addCacheFile(partitionUri, jobConf);
   DistributedCache.createSymlink(jobConf);
  }
  FileSystem hdfs = FileSystem.get(jobConf);
  hdfs.delete(outputpath);
  hdfs.close();
  System.out.println("Running on " +
    cluster.getTaskTrackers() +
    " nodes to sort from " + 
    FileInputFormat.getInputPaths(jobConf)[0] + " into " +
    FileOutputFormat.getOutputPath(jobConf) +
    " with " + num_reduces + " reduces.");
  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  jobResult = JobClient.runJob(jobConf);

四、執(zhí)行

usage:

hadoop jar yitengfei.jar com.yitengfei.Sort [-m <maps>] [-r <reduces>]
[-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)
-splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)
-splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)
-splitPartial <double pcnt> <numSamples> <maxsplits> | // Sample from partial splits at random (general) ]
<input> <output> <partitionfile>

Example:

hadoop jar yitengfei.jar com.yitengfei.Sort -r 10 -splitPartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition

五、性能

200G輸入數(shù)據(jù),15億條url,1000個(gè)分區(qū),排序時(shí)間只用了6分鐘

以上是“Hadoop如何對(duì)文本文件實(shí)現(xiàn)全局排序”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI