溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

使用MapReduce怎么實(shí)現(xiàn)決策樹算法

發(fā)布時(shí)間:2021-05-31 17:34:42 來源:億速云 閱讀:371 作者:Leah 欄目:編程語言

今天就跟大家聊聊有關(guān)使用MapReduce怎么實(shí)現(xiàn)決策樹算法,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

首先,基于C45決策樹算法實(shí)現(xiàn)對(duì)應(yīng)的Mapper算子,相關(guān)的代碼如下:

public class MapClass extends MapReduceBase implements Mapper {
 
  private final static IntWritable one = new IntWritable(1);
  private Text attValue = new Text();
  private int i;
  private String token;
  public static int no_Attr;
  public Split split = null;
  
  public int size_split_1 = 0;
  
  public void configure(JobConf conf){
   try {
  split = (Split) ObjectSerializable.unSerialize(conf.get("currentsplit"));
 } catch (ClassNotFoundException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 } catch (IOException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 }
   size_split_1 = Integer.parseInt(conf.get("current_index"));
  }
  
  public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
      throws IOException {
    String line = value.toString(); // changing input instance value to
                    // string
    StringTokenizer itr = new StringTokenizer(line);
    int index = 0;
    String attr_value = null;
    no_Attr = itr.countTokens() - 1;
    String attr[] = new String[no_Attr];
    boolean match = true;
    for (i = 0; i < no_Attr; i++) {
      attr[i] = itr.nextToken(); // Finding the values of different
                    // attributes
    }
 
    String classLabel = itr.nextToken();
    int size_split = split.attr_index.size();
    Counter counter = reporter.getCounter("reporter-"+Main.current_index, size_split+" "+size_split_1);
    counter.increment(1l);
    for (int count = 0; count < size_split; count++) {
      index = (Integer) split.attr_index.get(count);
      attr_value = (String) split.attr_value.get(count);
      if (!attr[index].equals(attr_value)) {
        match = false;
        break;
      }
    }
 
    if (match) {
      for (int l = 0; l < no_Attr; l++) {
        if (!split.attr_index.contains(l)) {
         //表示出某個(gè)屬性在某個(gè)類標(biāo)簽上出現(xiàn)了一次
          token = l + " " + attr[l] + " " + classLabel;
          attValue.set(token);
          output.collect(attValue, one);
        }
        else{
         
        }
      }
      if (size_split == no_Attr) {
        token = no_Attr + " " + "null" + " " + classLabel;
        attValue.set(token);
        output.collect(attValue, one);
      }
    }
  }
 
}

然后,基于C45決策樹算法實(shí)現(xiàn)對(duì)應(yīng)的Reducer算子,相關(guān)的代碼如下:

public class Reduce extends MapReduceBase implements Reducer {
 
  static int cnt = 0;
  ArrayList ar = new ArrayList();
  String data = null;
  private static int currentIndex;
 
  public void configure(JobConf conf) {
    currentIndex = Integer.valueOf(conf.get("currentIndex"));
  }
 
  public void reduce(Text key, Iterator values, OutputCollector output,
      Reporter reporter) throws IOException {
    int sum = 0;
    //sum表示按照某個(gè)屬性進(jìn)行劃分的子數(shù)據(jù)集上的某個(gè)類出現(xiàn)的個(gè)數(shù)
    while (values.hasNext()) {
      sum += values.next().get();
    }
    //最后將這個(gè)屬性上的取值寫入output中;
    output.collect(key, new IntWritable(sum));
 
    String data = key + " " + sum;
    ar.add(data);
    //將最終結(jié)果寫入到文件中;
    writeToFile(ar);
    ar.add("\n");
  }
 
  public static void writeToFile(ArrayList text) {
    try {
      cnt++;
      Path input = new Path("C45/intermediate" + currentIndex + ".txt");
      Configuration conf = new Configuration();
      FileSystem fs = FileSystem.get(conf);
      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(input, true)));
 
      for (String str : text) {
        bw.write(str);
      }
      bw.newLine();
      bw.close();
    } catch (Exception e) {
      System.out.println("File is not creating in reduce");
    }
  }
}

最后,編寫Main函數(shù),啟動(dòng)MapReduce作業(yè),需要啟動(dòng)多趟,代碼如下:

package com.hackecho.hadoop;
 
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.PropertyConfigurator;
import org.dmg.pmml.MiningFunctionType;
import org.dmg.pmml.Node;
import org.dmg.pmml.PMML;
import org.dmg.pmml.TreeModel;
 
//在這里MapReduce的作用就是根據(jù)各個(gè)屬性的特征來劃分子數(shù)據(jù)集
public class Main extends Configured implements Tool {
 
 //當(dāng)前分裂
  public static Split currentsplit = new Split();
  //已經(jīng)分裂完成的集合
  public static List splitted = new ArrayList();
  //current_index 表示目前進(jìn)行分裂的位置
  public static int current_index = 0;
  
  public static ArrayList ar = new ArrayList();
  
  public static List leafSplits = new ArrayList();
  
  public static final String PROJECT_HOME = System.getProperty("user.dir");
 
  public static void main(String[] args) throws Exception {
   //在splitted中已經(jīng)放入了一個(gè)currentsplit了,所以此時(shí)的splitted的size大小為1
   PropertyConfigurator.configure(PROJECT_HOME + "/conf/log/log4j.properties");
    splitted.add(currentsplit);
   
    Path c45 = new Path("C45");
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    if (fs.exists(c45)) {
      fs.delete(c45, true);
    }
    fs.mkdirs(c45);
    int res = 0;
    int split_index = 0;
    //增益率
    double gainratio = 0;
    //最佳增益
    double best_gainratio = 0;
    //熵值
    double entropy = 0;
    //分類標(biāo)簽
    String classLabel = null;
    //屬性個(gè)數(shù)
    int total_attributes = MapClass.no_Attr;
    total_attributes = 4;
    //分裂的個(gè)數(shù)
    int split_size = splitted.size();
    //增益率
    GainRatio gainObj;
    //產(chǎn)生分裂的新節(jié)點(diǎn)
    Split newnode;
 
    while (split_size > current_index) {
     currentsplit = splitted.get(current_index);
      gainObj = new GainRatio();
      res = ToolRunner.run(new Configuration(), new Main(), args);
      System.out.println("Current NODE INDEX . ::" + current_index);
      int j = 0;
      int temp_size;
      gainObj.getcount();
      //計(jì)算當(dāng)前節(jié)點(diǎn)的信息熵
      entropy = gainObj.currNodeEntophy();
      //獲取在當(dāng)前節(jié)點(diǎn)的分類
      classLabel = gainObj.majorityLabel();
      currentsplit.classLabel = classLabel;
 
      if (entropy != 0.0 && currentsplit.attr_index.size() != total_attributes) {
        System.out.println("");
        System.out.println("Entropy NOTT zero  SPLIT INDEX::  " + entropy);
        best_gainratio = 0;
        //計(jì)算各個(gè)屬性的信息增益值
        for (j = 0; j < total_attributes; j++) // Finding the gain of
                            // each attribute
        {
          if (!currentsplit.attr_index.contains(j)) {
           //按照每一個(gè)屬性的序號(hào),也就是索引j來計(jì)算各個(gè)屬性的信息增益
            gainratio = gainObj.gainratio(j, entropy);
            //找出最佳的信息增益
            if (gainratio >= best_gainratio) {
              split_index = j;
              best_gainratio = gainratio;
            }
          }
        }
 
        //split_index表示在第幾個(gè)屬性上完成了分裂,也就是分裂的索引值;
        //attr_values_split表示分裂的屬性所取的值的拼接成的字符串;
        String attr_values_split = gainObj.getvalues(split_index);
        StringTokenizer attrs = new StringTokenizer(attr_values_split);
        int number_splits = attrs.countTokens(); // number of splits
                             // possible with
                             // attribute selected
        String red = "";
        System.out.println(" INDEX :: " + split_index);
        System.out.println(" SPLITTING VALUES " + attr_values_split);
 
        //根據(jù)分裂形成的屬性值的集合將在某個(gè)節(jié)點(diǎn)上按照屬性值將數(shù)據(jù)集分成若干類
        for (int splitnumber = 1; splitnumber <= number_splits; splitnumber++) {
          temp_size = currentsplit.attr_index.size();
          newnode = new Split();
          for (int y = 0; y < temp_size; y++) {
            newnode.attr_index.add(currentsplit.attr_index.get(y));
            newnode.attr_value.add(currentsplit.attr_value.get(y));
          }
          red = attrs.nextToken();
 
          newnode.attr_index.add(split_index);
          newnode.attr_value.add(red);
          //按照當(dāng)前的屬性值將數(shù)據(jù)集將若干分類,同時(shí)將數(shù)據(jù)集按照這個(gè)屬性劃分位若干個(gè)新的分裂;
          splitted.add(newnode);
        }
      } else if(entropy==0.0 && currentsplit.attr_index.size()!=total_attributes){
       //每次計(jì)算到葉子節(jié)點(diǎn)的時(shí)候,就將其持久化到模型文件中
       /**
        String rule = "";
        temp_size = currentsplit.attr_index.size();
        for (int val = 0; val < temp_size; val++) {
          rule = rule + " " + currentsplit.attr_index.get(val) + " " + currentsplit.attr_value.get(val);
        }
        rule = rule + " " + currentsplit.classLabel;
        ar.add(rule);
        writeRuleToFile(ar);
        ar.add("\n");
        if (entropy != 0.0) {
          System.out.println("Enter rule in file:: " + rule);
        } else {
          System.out.println("Enter rule in file Entropy zero ::  " + rule);
        }
        System.out.println("persistence model@!!!!");
        */
       leafSplits.add(currentsplit);
      }
      else{
       TreeModel tree = PmmlDecisionTree.buildTreeModel(leafSplits);
       PMML pmml = new PMML();
       pmml.addModels(tree);
       PmmlModelFactory.pmmlPersistence("C45/DecisionTree.pmml", pmml);
      }
      split_size = splitted.size();
      System.out.println("TOTAL NODES::  " + split_size);
      current_index++;
    }
    System.out.println("Done!");
    System.exit(res);
  }
 
  public static void writeRuleToFile(ArrayList text) throws IOException {
   Path rule = new Path("C45/rule.txt");
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    try {
      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(rule, true)));
      for (String str : text) {
        bw.write(str);
      }
      bw.newLine();
      bw.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
 
  public int run(String[] args) throws Exception {
    System.out.println("In main ---- run");
    JobConf conf = new JobConf(getConf(), Main.class);
    conf.setJobName("C45");
    conf.set("currentsplit",ObjectSerializable.serialize(currentsplit));
    conf.set("current_index",String.valueOf(currentsplit.attr_index.size()));
    conf.set("currentIndex", String.valueOf(current_index));
 
    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);
 
    conf.setMapperClass(MapClass.class);
    conf.setReducerClass(Reduce.class);
    System.out.println("back to run");
 
    FileSystem fs = FileSystem.get(conf);
 
    Path out = new Path(args[1] + current_index);
    if (fs.exists(out)) {
      fs.delete(out, true);
    }
    FileInputFormat.setInputPaths(conf, args[0]);
    FileOutputFormat.setOutputPath(conf, out);
 
    JobClient.runJob(conf);
    return 0;
  }
}

看完上述內(nèi)容,你們對(duì)使用MapReduce怎么實(shí)現(xiàn)決策樹算法有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI