溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

文本挖掘分詞mapreduce化

發(fā)布時(shí)間:2020-08-22 20:39:06 來(lái)源:網(wǎng)絡(luò) 閱讀:1169 作者:jethai 欄目:大數(shù)據(jù)

軟件版本

paoding-analysis3.0


文本挖掘分詞mapreduce化


項(xiàng)目jar包和拷貝庖丁dic目錄到項(xiàng)目的類路徑下


文本挖掘分詞mapreduce化


修改paoding-analysis.jar下的paoding-dic-home.properties文件設(shè)置詞典文件路徑

paoding.dic.home=classpath:dic


分詞程序demo

import java.io.IOException;
import java.io.StringReader;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

import net.paoding.analysis.analyzer.PaodingAnalyzer;


public class TokenizeWithPaoding {
public static void main(String[] args) {
    
    String line="中華民族共和國(guó)";
    PaodingAnalyzer analyzer =new PaodingAnalyzer();
    StringReader sr=new StringReader(line);
    TokenStream ts=analyzer.tokenStream("", sr);//分詞流,第一個(gè)參數(shù)無(wú)意義
    //迭代分詞流
    try {
        while(ts.incrementToken()){
            CharTermAttribute ta=ts.getAttribute(CharTermAttribute.class);
            System.out.println(ta.toString());
        }
    } catch (Exception e) {
        
        e.printStackTrace();
    }
}
}



新聞文文本分類源文件

http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz


每個(gè)文件夾代表一個(gè)類別,每個(gè)類別下的文件代表一條新聞

文本挖掘分詞mapreduce化

中文新聞分類需要先分詞


對(duì)于大量小文件可以使用FileInputFormat的另一個(gè)抽象子類CombineFileInputFormat實(shí)現(xiàn)createRecordReader方法

CombineFileInputFormat重寫了getSpilt方法,返回的分片類型是CombineFileSpilt,是InputSpilt的子類,可包含多個(gè)文件


RecordReader怎么由文件生成key-value是由nextKeyValue函數(shù)決定


自定義的CombineFileInputFormat類

package org.conan.myhadoop.fengci;



import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
 * 自定義MyInputFormat類, 用于實(shí)現(xiàn)一個(gè)Split包含多個(gè)文件
 * @author BOB
 *
 */
public class MyInputFormat extends CombineFileInputFormat<Text, Text>{
        
        //禁止文件切分
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
                return false;
        }

        @Override
        public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
                return new CombineFileRecordReader<Text, Text>((CombineFileSplit)split, context, MyRecordReader.class);
        }

}



自定義的RecordReader類

package org.conan.myhadoop.fengci;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
 * 自定義MyRecordReader類, 用于讀取MyInputFormat對(duì)象切分的Split分片中的內(nèi)容
 * @author BOB
 *
 */
public class MyRecordReader extends RecordReader<Text, Text> {

        private CombineFileSplit combineFileSplit;                //當(dāng)前處理的分片
        private Configuration conf;                        //作業(yè)的配置信息
        private Text currentKey = new Text();                //當(dāng)前讀入的key
        private Text currentValue = new Text();        //當(dāng)前讀入的value
        private int totalLength;                        //當(dāng)前分片中文件的數(shù)量
        private int currentIndex;                                //正在讀取的文件在當(dāng)前分片中的位置索引
        private float currentProgress = 0F;                //當(dāng)前進(jìn)度
        private boolean processed = false;        //標(biāo)記當(dāng)前文件是否已經(jīng)被處理過(guò)
        
        //構(gòu)造方法
        public MyRecordReader(CombineFileSplit combineFileSplit,
                        TaskAttemptContext context, Integer fileIndex) {
                super();
                this.combineFileSplit = combineFileSplit;
                this.currentIndex = fileIndex;
                this.conf = context.getConfiguration();
                this.totalLength = combineFileSplit.getPaths().length;
        }

     
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        }
        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
                return currentKey;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
                return currentValue;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
                if(currentIndex >= 0 && currentIndex < totalLength) {
                        return currentProgress = (float) currentIndex/totalLength;
                }
                return currentProgress;
        }

        @Override
        public void close() throws IOException {

        }


        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
                if(!processed) {
                        //由文件的父目錄, 文件名以及目錄分割符組成key
                        Path file = combineFileSplit.getPath(currentIndex);
                        StringBuilder sb = new StringBuilder();
                        sb.append("/");
                        sb.append(file.getParent().getName()).append("/");
                        sb.append(file.getName());
                        currentKey.set(sb.toString());
                        
                        //以整個(gè)文件的內(nèi)容作為value
                        FSDataInputStream in = null;
                        byte[] content = new byte[(int)combineFileSplit.getLength(currentIndex)];
                        FileSystem fs = file.getFileSystem(conf);
                        in = fs.open(file);
                        in.readFully(content);
                        currentValue.set(content);
                        in.close();
                        processed = true;
                        return true;
                }
                return false;
        }

}



分詞驅(qū)動(dòng)類

package org.conan.myhadoop.fengci;

import java.io.IOException;
import java.io.StringReader;



import net.paoding.analysis.analyzer.PaodingAnalyzer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;



/**
 * 分詞驅(qū)動(dòng)器類, 用于給輸入文件進(jìn)行分詞
 * @author BOB
 *
 */
public class TokenizerDriver extends Configured implements Tool{

        
        public static void main(String[] args) throws Exception{
                int res = ToolRunner.run(new Configuration(), new TokenizerDriver(), args);
                System.exit(res);
        }

        @Override
        public int run(String[] args) throws Exception {
                Configuration conf = new Configuration();
                //參數(shù)設(shè)置
                conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 4000000);
              //作業(yè)名稱
                Job job = new Job(conf,"Tokenizer");
                job.setJarByClass(TokenizerDriver.class);
                
                job.setMapperClass(Map.class);
                
                job.setInputFormatClass(MyInputFormat.class);
                
                job.setOutputFormatClass(SequenceFileOutputFormat.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                Path inpath=new Path(args[0]);
                Path outpath=new Path(args[1]);
                FileSystem fs = inpath.getFileSystem(conf);
                FileStatus[] status = fs.listStatus(inpath);
                Path[] paths = FileUtil.stat2Paths(status);
                for(Path path : paths) {
                        FileInputFormat.addInputPath(job, path);
                }
                FileOutputFormat.setOutputPath(job, outpath);
                
                //輸出文件夾已經(jīng)存在則刪除
                FileSystem hdfs = outpath.getFileSystem(conf);
                if(hdfs.exists(outpath)){
                    hdfs.delete(outpath,true);
                    hdfs.close();
                }
                //沒(méi)有Reduce任務(wù)
                job.setNumReduceTasks(0); 
                return job.waitForCompletion(true) ? 0 : 1;
        }
        
        /**
         * Hadoop計(jì)算框架下的Map類, 用于并行處理文本分詞任務(wù)
         * @author BOB
         *
         */
        static class Map extends Mapper<Text, Text, Text, Text> {
                
                @Override
                protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
                        //創(chuàng)建分詞器
                        Analyzer analyzer = new PaodingAnalyzer();
                        String line = value.toString();
                        StringReader reader = new StringReader(line);
                        //獲取分詞流對(duì)象
                        TokenStream ts = analyzer.tokenStream("", reader);
                        StringBuilder sb = new StringBuilder();
                        
                        //遍歷分詞流中的詞語(yǔ)
                        while(ts.incrementToken()) {
                                CharTermAttribute ta = ts.getAttribute(CharTermAttribute.class);
                                if(sb.length() != 0) {
                                        sb.append(" ").append(ta.toString());
                                } else {
                                        sb.append(ta.toString());
                                }
                        }
                        value.set(sb.toString());
                        context.write(key, value);
                }
                
        }
}


分詞預(yù)先處理結(jié)果,將所有新聞集中到一個(gè)文本中,key為類別,一行代表一篇新聞,單詞之間用空格分開(kāi)

文本挖掘分詞mapreduce化

處理后的數(shù)據(jù)可用于mahout做貝葉斯分類器


參考文章:


http://f.dataguru.cn/thread-244375-1-1.html

http://www.cnblogs.com/panweishadow/p/4320720.html


 

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI