您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)使用MapReduce怎么實(shí)現(xiàn)決策樹算法,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
首先,基于C45決策樹算法實(shí)現(xiàn)對(duì)應(yīng)的Mapper算子,相關(guān)的代碼如下:
public class MapClass extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private Text attValue = new Text(); private int i; private String token; public static int no_Attr; public Split split = null; public int size_split_1 = 0; public void configure(JobConf conf){ try { split = (Split) ObjectSerializable.unSerialize(conf.get("currentsplit")); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } size_split_1 = Integer.parseInt(conf.get("current_index")); } public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); // changing input instance value to // string StringTokenizer itr = new StringTokenizer(line); int index = 0; String attr_value = null; no_Attr = itr.countTokens() - 1; String attr[] = new String[no_Attr]; boolean match = true; for (i = 0; i < no_Attr; i++) { attr[i] = itr.nextToken(); // Finding the values of different // attributes } String classLabel = itr.nextToken(); int size_split = split.attr_index.size(); Counter counter = reporter.getCounter("reporter-"+Main.current_index, size_split+" "+size_split_1); counter.increment(1l); for (int count = 0; count < size_split; count++) { index = (Integer) split.attr_index.get(count); attr_value = (String) split.attr_value.get(count); if (!attr[index].equals(attr_value)) { match = false; break; } } if (match) { for (int l = 0; l < no_Attr; l++) { if (!split.attr_index.contains(l)) { //表示出某個(gè)屬性在某個(gè)類標(biāo)簽上出現(xiàn)了一次 token = l + " " + attr[l] + " " + classLabel; attValue.set(token); output.collect(attValue, one); } else{ } } if (size_split == no_Attr) { token = no_Attr + " " + "null" + " " + classLabel; attValue.set(token); output.collect(attValue, one); } } } }
然后,基于C45決策樹算法實(shí)現(xiàn)對(duì)應(yīng)的Reducer算子,相關(guān)的代碼如下:
public class Reduce extends MapReduceBase implements Reducer { static int cnt = 0; ArrayList ar = new ArrayList(); String data = null; private static int currentIndex; public void configure(JobConf conf) { currentIndex = Integer.valueOf(conf.get("currentIndex")); } public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; //sum表示按照某個(gè)屬性進(jìn)行劃分的子數(shù)據(jù)集上的某個(gè)類出現(xiàn)的個(gè)數(shù) while (values.hasNext()) { sum += values.next().get(); } //最后將這個(gè)屬性上的取值寫入output中; output.collect(key, new IntWritable(sum)); String data = key + " " + sum; ar.add(data); //將最終結(jié)果寫入到文件中; writeToFile(ar); ar.add("\n"); } public static void writeToFile(ArrayList text) { try { cnt++; Path input = new Path("C45/intermediate" + currentIndex + ".txt"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(input, true))); for (String str : text) { bw.write(str); } bw.newLine(); bw.close(); } catch (Exception e) { System.out.println("File is not creating in reduce"); } } }
最后,編寫Main函數(shù),啟動(dòng)MapReduce作業(yè),需要啟動(dòng)多趟,代碼如下:
package com.hackecho.hadoop; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.PropertyConfigurator; import org.dmg.pmml.MiningFunctionType; import org.dmg.pmml.Node; import org.dmg.pmml.PMML; import org.dmg.pmml.TreeModel; //在這里MapReduce的作用就是根據(jù)各個(gè)屬性的特征來劃分子數(shù)據(jù)集 public class Main extends Configured implements Tool { //當(dāng)前分裂 public static Split currentsplit = new Split(); //已經(jīng)分裂完成的集合 public static List splitted = new ArrayList(); //current_index 表示目前進(jìn)行分裂的位置 public static int current_index = 0; public static ArrayList ar = new ArrayList(); public static List leafSplits = new ArrayList(); public static final String PROJECT_HOME = System.getProperty("user.dir"); public static void main(String[] args) throws Exception { //在splitted中已經(jīng)放入了一個(gè)currentsplit了,所以此時(shí)的splitted的size大小為1 PropertyConfigurator.configure(PROJECT_HOME + "/conf/log/log4j.properties"); splitted.add(currentsplit); Path c45 = new Path("C45"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); if (fs.exists(c45)) { fs.delete(c45, true); } fs.mkdirs(c45); int res = 0; int split_index = 0; //增益率 double gainratio = 0; //最佳增益 double best_gainratio = 0; //熵值 double entropy = 0; //分類標(biāo)簽 String classLabel = null; //屬性個(gè)數(shù) int total_attributes = MapClass.no_Attr; total_attributes = 4; //分裂的個(gè)數(shù) int split_size = splitted.size(); //增益率 GainRatio gainObj; //產(chǎn)生分裂的新節(jié)點(diǎn) Split newnode; while (split_size > current_index) { currentsplit = splitted.get(current_index); gainObj = new GainRatio(); res = ToolRunner.run(new Configuration(), new Main(), args); System.out.println("Current NODE INDEX . ::" + current_index); int j = 0; int temp_size; gainObj.getcount(); //計(jì)算當(dāng)前節(jié)點(diǎn)的信息熵 entropy = gainObj.currNodeEntophy(); //獲取在當(dāng)前節(jié)點(diǎn)的分類 classLabel = gainObj.majorityLabel(); currentsplit.classLabel = classLabel; if (entropy != 0.0 && currentsplit.attr_index.size() != total_attributes) { System.out.println(""); System.out.println("Entropy NOTT zero SPLIT INDEX:: " + entropy); best_gainratio = 0; //計(jì)算各個(gè)屬性的信息增益值 for (j = 0; j < total_attributes; j++) // Finding the gain of // each attribute { if (!currentsplit.attr_index.contains(j)) { //按照每一個(gè)屬性的序號(hào),也就是索引j來計(jì)算各個(gè)屬性的信息增益 gainratio = gainObj.gainratio(j, entropy); //找出最佳的信息增益 if (gainratio >= best_gainratio) { split_index = j; best_gainratio = gainratio; } } } //split_index表示在第幾個(gè)屬性上完成了分裂,也就是分裂的索引值; //attr_values_split表示分裂的屬性所取的值的拼接成的字符串; String attr_values_split = gainObj.getvalues(split_index); StringTokenizer attrs = new StringTokenizer(attr_values_split); int number_splits = attrs.countTokens(); // number of splits // possible with // attribute selected String red = ""; System.out.println(" INDEX :: " + split_index); System.out.println(" SPLITTING VALUES " + attr_values_split); //根據(jù)分裂形成的屬性值的集合將在某個(gè)節(jié)點(diǎn)上按照屬性值將數(shù)據(jù)集分成若干類 for (int splitnumber = 1; splitnumber <= number_splits; splitnumber++) { temp_size = currentsplit.attr_index.size(); newnode = new Split(); for (int y = 0; y < temp_size; y++) { newnode.attr_index.add(currentsplit.attr_index.get(y)); newnode.attr_value.add(currentsplit.attr_value.get(y)); } red = attrs.nextToken(); newnode.attr_index.add(split_index); newnode.attr_value.add(red); //按照當(dāng)前的屬性值將數(shù)據(jù)集將若干分類,同時(shí)將數(shù)據(jù)集按照這個(gè)屬性劃分位若干個(gè)新的分裂; splitted.add(newnode); } } else if(entropy==0.0 && currentsplit.attr_index.size()!=total_attributes){ //每次計(jì)算到葉子節(jié)點(diǎn)的時(shí)候,就將其持久化到模型文件中 /** String rule = ""; temp_size = currentsplit.attr_index.size(); for (int val = 0; val < temp_size; val++) { rule = rule + " " + currentsplit.attr_index.get(val) + " " + currentsplit.attr_value.get(val); } rule = rule + " " + currentsplit.classLabel; ar.add(rule); writeRuleToFile(ar); ar.add("\n"); if (entropy != 0.0) { System.out.println("Enter rule in file:: " + rule); } else { System.out.println("Enter rule in file Entropy zero :: " + rule); } System.out.println("persistence model@!!!!"); */ leafSplits.add(currentsplit); } else{ TreeModel tree = PmmlDecisionTree.buildTreeModel(leafSplits); PMML pmml = new PMML(); pmml.addModels(tree); PmmlModelFactory.pmmlPersistence("C45/DecisionTree.pmml", pmml); } split_size = splitted.size(); System.out.println("TOTAL NODES:: " + split_size); current_index++; } System.out.println("Done!"); System.exit(res); } public static void writeRuleToFile(ArrayList text) throws IOException { Path rule = new Path("C45/rule.txt"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); try { BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(rule, true))); for (String str : text) { bw.write(str); } bw.newLine(); bw.close(); } catch (Exception e) { e.printStackTrace(); } } public int run(String[] args) throws Exception { System.out.println("In main ---- run"); JobConf conf = new JobConf(getConf(), Main.class); conf.setJobName("C45"); conf.set("currentsplit",ObjectSerializable.serialize(currentsplit)); conf.set("current_index",String.valueOf(currentsplit.attr_index.size())); conf.set("currentIndex", String.valueOf(current_index)); // the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setReducerClass(Reduce.class); System.out.println("back to run"); FileSystem fs = FileSystem.get(conf); Path out = new Path(args[1] + current_index); if (fs.exists(out)) { fs.delete(out, true); } FileInputFormat.setInputPaths(conf, args[0]); FileOutputFormat.setOutputPath(conf, out); JobClient.runJob(conf); return 0; } }
看完上述內(nèi)容,你們對(duì)使用MapReduce怎么實(shí)現(xiàn)決策樹算法有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。