T2。(2)在List中隨機(jī)挑選一個(gè)數(shù)據(jù)向量A,使用一個(gè)粗糙距離計(jì)算方式計(jì)算A..."/>
溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

數(shù)據(jù)挖掘-Mahout-Canopy聚類實(shí)踐

發(fā)布時(shí)間:2020-07-31 14:23:08 來源:網(wǎng)絡(luò) 閱讀:998 作者:刀刀_高揚(yáng) 欄目:大數(shù)據(jù)

1、原理解釋


(1)原始數(shù)據(jù)集合List按照一定的規(guī)則進(jìn)行排序,初始距離閾值設(shè)置為T1、T2,T1>T2。


(2)在List中隨機(jī)挑選一個(gè)數(shù)據(jù)向量A,使用一個(gè)粗糙距離計(jì)算方式計(jì)算A與List中其它樣本數(shù)據(jù)向量之間的距離d。


(3)根據(jù)2中的距離d,把d小于T1的樣本數(shù)據(jù)向量劃到一個(gè)canopy中,同時(shí)把d小于T2的樣本數(shù)據(jù)向量從List中移除。


(4)重復(fù)2、3,直至List為空



2、下載測試數(shù)據(jù)


cd /tmp


hadoop dfs -mkdir /input


wget http://archive.ics.uci.edu/ml/databases/synthetic_control/synthetic_control.data


hadoop dfs -copyFromLocal /tmp/synthetic_control.data /input/synthetic_control.data


3、格式轉(zhuǎn)換(文本→向量)


編輯文件 Text2VectorWritable.jar


package mahout.fansy.utils.transform;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import org.apache.hadoop.util.ToolRunner;

import org.apache.mahout.common.AbstractJob;

import org.apache.mahout.math.RandomAccessSparseVector;

import org.apache.mahout.math.Vector;

import org.apache.mahout.math.VectorWritable;

/**

 *  --* transform text data to vectorWritable data

 *   --* @author fansy

 *    --*

 *     --*/

public class Text2VectorWritable extends AbstractJob{

  public static void main(String[] args) throws Exception{

       ToolRunner.run(new Configuration(), new Text2VectorWritable(),args);

  }

  @Override

  public int run(String[] arg0) throws Exception {

       addInputOption();

       addOutputOption();

       if (parseArguments(arg0) == null) {

         return -1;

       }

       Path input=getInputPath();

       Path output=getOutputPath();

       Configuration conf=getConf();

       // set job information

       Job job=new Job(conf,"text2vectorWritableCopy with input:"+input.getName());

       job.setOutputFormatClass(SequenceFileOutputFormat.class);

       job.setMapperClass(Text2VectorWritableMapper.class);

       job.setMapOutputKeyClass(LongWritable.class);

       job.setMapOutputValueClass(VectorWritable.class);

       job.setReducerClass(Text2VectorWritableReducer.class);

       job.setOutputKeyClass(LongWritable.class);

       job.setOutputValueClass(VectorWritable.class);

job.setJarByClass(Text2VectorWritable.class);

       FileInputFormat.addInputPath(job, input);

       SequenceFileOutputFormat.setOutputPath(job, output);

       if (!job.waitForCompletion(true)) { // wait for the job is done

            throw new InterruptedException("Canopy Job failed processing " + input);

       }

       return 0;

   }

   /**

    * Mapper main procedure

    * @author fansy

    *

   --*/

   public static class Text2VectorWritableMapper extends Mapper<LongWritable,Text,LongWritable,VectorWritable>{

        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{

            String[] str=value.toString().split("\\s{1,}");

            // split data use one or more blanker

            Vector vector=new RandomAccessSparseVector(str.length);

            for(int i=0;i<str.length;i++){

                 vector.set(i, Double.parseDouble(str[i]));

            }

            VectorWritable va=new VectorWritable(vector);

            context.write(key, va);

       }

   }

   /**

   * Reducer: do nothing but output

   * @author fansy

   *

   --*/

   public static class Text2VectorWritableReducer extends Reducer<LongWritable,VectorWritable,LongWritable,VectorWritable>{

       public void reduce(LongWritable key,Iterable<VectorWritable> values,Context context)throws IOException,InterruptedException{

           for(VectorWritable v:values){

                context.write(key, v);

           }

       }

   }

}

             

編譯,輸出ClusteringUtils.jar,并拷貝至/home/mahout/mahout_jar

輸出時(shí)選擇Export→Runnable Jar File→Extract required libraries into generated JAR



然后執(zhí)行:

hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.transform.Text2VectorWritable -i hdfs:///input/synthetic_control.data -o hdfs:///input/synthetic_control.data.transform


有可能會遇到org/apache/mahout/common/AbstractJob找不到類報(bào)錯(cuò),這個(gè)一般是由于HADOOP_CLASSPATH配置位置不包含mahout的jar的原因。


解決方法1:

拷貝mahout的jar文件到/home/hadoop/lib中去,并確認(rèn)這個(gè)/home/hadoop/lib確實(shí)在HADOOP_CLASSPATH中


cp /home/hadoop/mahout/*.jar /home/hadoop/hadoop/lib


解決方法2(推薦):

在hadoop-env.sh中加入


for f in /home/hadoop/mahout/*.jar; do

  if [ "$HADOOP_CLASSPATH" ]; then

    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f

  else

    export HADOOP_CLASSPATH=$f

  fi

done


記得將hadoop-evn.sh分發(fā)到其它節(jié)點(diǎn)


重啟hadoop環(huán)境

stop-all.sh

start-all.sh


執(zhí)行轉(zhuǎn)換:


hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar mahout.fansy.utils.transform.Text2VectorWritable -i hdfs:///input/synthetic_control.data -o hdfs:///input/synthetic_control.data.transform(如果在導(dǎo)出Jar的時(shí)候已經(jīng)指派主類,這個(gè)命令會報(bào)錯(cuò),使用下面的命令)

hadoop jar /home/hadoop/mahout/mahout_jar/ClusteringUtils.jar -o hdfs:///input/synthetic_control.data.transform


輸出完畢的文件已經(jīng)是面目全非的Vector文件了

hdfs:///input/synthetic_control.data.transform/part-r-00000


4、執(zhí)行Canopy聚類

mahout canopy --input hdfs:///input/synthetic_control.data.transform/part-r-00000 --output /output/canopy --distanceMeasure org.apache.mahout.common.distance.EuclideanDistanceMeasure --t1 80 --t2 55 --t3 80 --t4 55 --clustering


5、轉(zhuǎn)換格式(向量→文本)


把4中的結(jié)果轉(zhuǎn)換成為文本


編輯文件ReadClusterWritable.java


package mahout.fansy.utils;  

import java.io.IOException;  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.Mapper;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

import org.apache.hadoop.util.ToolRunner;  

import org.apache.mahout.clustering.iterator.ClusterWritable;  

import org.apache.mahout.common.AbstractJob;  

import org.slf4j.Logger;  

import org.slf4j.LoggerFactory;  

/**  

 ??* read cluster centers  

 ??* @author fansy  

 ??*/  

public class ReadClusterWritable extends AbstractJob {  

      public static void main(String[] args) throws Exception{  

           ToolRunner.run(new Configuration(), new ReadClusterWritable(),args);  

      }  

      @Override  

      public int run(String[] args) throws Exception {  

           addInputOption();  

           addOutputOption();  

           if (parseArguments(args) == null) {  

                return -1;  

             }  

           Job job=new Job(getConf(),getInputPath().toString());  

           job.setInputFormatClass(SequenceFileInputFormat.class);  

           job.setMapperClass(RM.class);  

           job.setMapOutputKeyClass(Text.class);  

           job.setMapOutputValueClass(Text.class);  

           job.setNumReduceTasks(0);  

           job.setJarByClass(ReadClusterWritable.class);  

 

           FileInputFormat.addInputPath(job, getInputPath());  

           FileOutputFormat.setOutputPath(job, getOutputPath());  

           if (!job.waitForCompletion(true)) {  

              throw new InterruptedException("Canopy Job failed processing " + getInputPath());  

        }  

      return 0;  

    }  

    public static class RM extends Mapper<Text,ClusterWritable ,Text,Text>{  

         private Logger log=LoggerFactory.getLogger(RM.class);  

       public void map(Text key,ClusterWritable value,Context context) throws  

IOException,InterruptedException{  

              String str=value.getValue().getCenter().asFormatString();  

         //   System.out.println("center****************:"+str);  

           log.info("center*****************************:"+str); // set log information  

              context.write(key, new Text(str));  

         }  

    }  

}


打包到ClusteringUtils.jar,上傳到/home/hadoop/mahout/mahout_jar


如果需要清除eclipse中Launch Configuration中的信息,需要進(jìn)入工程所在文件夾下的/.metadata/.plugins/org.eclipse.debug.core/.launches

然后刪除里面的文件


運(yùn)行

hadoop jar ClusteringUtils.jar mahout.fansy.utils.ReadClusterWritable -i /output/canopy/clusters-0-final/part-r-00000 -o /output/canopy-output(如果不成功就運(yùn)行下面的命令)

hadoop jar ClusteringUtils.jar -i /output/canopy/clusters-0-final/part-r-00000 -o /output/canopy-output


這時(shí)候/output/canopy-output/part-m-00000里面放置的就是聚類的結(jié)果文件


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI