溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何將nutch2.3的bin/crawl腳本改寫為java類

發(fā)布時(shí)間:2021-11-08 10:04:52 來源:億速云 閱讀:168 作者:小新 欄目:云計(jì)算

這篇文章將為大家詳細(xì)講解有關(guān)如何將nutch2.3的bin/crawl腳本改寫為java類,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

將nutch3.3的bin/crawl腳本改寫為java類

nutch2.8以后,以前的主控代碼org.apache.nutch.crawl.Crawl類沒了,只剩下對(duì)應(yīng)的控制腳本bin/crawl,感覺在IDEA里面調(diào)試不方便,所以我了解了下shell腳本,根據(jù)nutch3.3的bin/crawlbin/nutch腳本,把bin/crawl翻譯成了java的Crawl類以便在IDEA里面調(diào)試

代碼設(shè)計(jì)說明

我參考了nutch2.7的crawl類,nutch3.3的bin/crawlbin/nutch,盡量按照shell腳本的原組織結(jié)構(gòu)和邏輯進(jìn)行翻譯,有些地方不能直接使用的,就稍作了修改。

  • 主要的業(yè)務(wù)邏輯在public int run(String[] args)方法里

  • 程序主入口是main,調(diào)用ToolRunner.run(NutchConfiguration.create(), new Crawl(), args);執(zhí)行上面的run方法

  • public void binNutch5j(String jobName,String commandLine,String options)相當(dāng)于bin/crawl腳本里函數(shù)__bin_nutch的功能

  • public int runJob(String jobName,String commandLine,String options)相當(dāng)于腳本bin/nutch的功能,這里沒有像腳本中那樣用if-else,也沒有使用switch-case,而是采用反射創(chuàng)建相應(yīng)的job

  • public void preConfig(Configuration conf,String options)用于根據(jù)帶-D參數(shù) commonOptions等指令設(shè)置每個(gè)Job的配置項(xiàng)

  • CLASS_MAP是靜態(tài)(static)屬性,一個(gè)記錄JobName和對(duì)應(yīng)的類名的映射關(guān)系的哈希表(HashMap)

gora BUG說明

我之前是在每個(gè)job是按照腳本使用batchId參數(shù)的,遇到了下面這個(gè)問題:

Gora MongoDb Exception, can't serialize Utf8

貌似是序列化問題,好像gora-0.6版本解決了這個(gè)BUG,但我的nutch代碼是gora-0.5的,不會(huì)升級(jí),所以就簡(jiǎn)單的把-batchId參數(shù)去掉,使用-all參數(shù)就行了,這點(diǎn)在代碼里可以看到。

關(guān)于升級(jí)到gora-0.6,有空再研究好了。

通過這個(gè)腳本的改寫,我了解了腳本的基本使用,同時(shí)對(duì)之前看的java反射等知識(shí)進(jìn)行了實(shí)踐,并對(duì)nutch的完整爬取流程、主要控制邏輯有了深刻的印象。主要是前面那個(gè)gora的BUG卡了我?guī)滋欤疫€以為自己翻譯的有問題,看來調(diào)試能力還需要加強(qiáng)。

java代碼

這段代碼是翻譯nutch3.3的bin/crawlbin/nutch腳本

Crawl類加到在org.apache.nutch.crawl包下,源碼如下:

package org.apache.nutch.crawl;

/**
 * Created by brianway on 2016/1/19.
 * @author brianway
 * @site brianway.github.io
 * org.apache.nutch.crawl.Crawl;
 */


import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.fetcher.FetcherJob;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

// Commons Logging imports
//import org.apache.hadoop.fs.*;
//import org.apache.hadoop.mapred.*;
//import org.apache.nutch.util.HadoopFSUtil;
//import org.apache.nutch.util.NutchJob;
//import org.apache.nutch.crawl.InjectorJob;
//import org.apache.nutch.crawl.GeneratorJob;
//import org.apache.nutch.fetcher.FetcherJob;
//import org.apache.nutch.parse.ParserJob;
//import org.apache.nutch.crawl.DbUpdaterJob;
//import org.apache.nutch.indexer.IndexingJob;
//import org.apache.nutch.indexer.solr.SolrDeleteDuplicates;

public class Crawl extends NutchTool implements Tool{
    public static final Logger LOG = LoggerFactory.getLogger(Crawl.class);

    /* Perform complete crawling and indexing (to Solr) given a set of root urls and the -solr
   parameter respectively. More information and Usage parameters can be found below. */
    public static void main(String args[]) throws Exception {
        int res = ToolRunner.run(NutchConfiguration.create(), new Crawl(), args);
        System.exit(res);
    }

    //為了編譯過
    @Override
    public Map<String, Object> run(Map<String, Object> args) throws Exception {
        return null;
    }


    @Override
    public int run(String[] args) throws Exception {
        if (args.length < 1) {
            System.out.println
                    ("Usage: Crawl -urls <urlDir> -crawlId <crawlID> -solr <solrURL>  [-threads n] [-depth i] [-topN N]");
            // ("Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>");
            return -1;
        }

        // ------------check args---------
/*
        //?。∮赡_本直譯的,感覺少參數(shù),所以注釋掉,換下面的方式
        String seedDir = args[1];
        String crawlID = args[2];
        String solrUrl=null;
        int limit=1;

        if(args.length-1 == 3){
            limit = Integer.parseInt(args[3]);
        }else if(args.length-1 == 4){
            solrUrl = args[3];
            limit = Integer.parseInt(args[4]);
        }else{
            System.out.println("Unknown # of arguments "+(args.length-1));
            System.out.println
                    ("Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>");
            return -1;
            //"Usage: Crawl <urlDir> -solr <solrURL> [-dir d] [-threads n] [-depth i] [-topN N]"
            //"Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>";
        }
*/
        String seedDir = null;
        String crawlID = null;
        String solrUrl=null;
        int limit = 0;
        long topN = Long.MAX_VALUE;
        int threads = getConf().getInt("fetcher.threads.fetch", 10);
        //parameter-format in crawl class is
        // like nutch2.7 "Usage: Crawl <urlDir> -solr <solrURL> [-dir d] [-threads n] [-depth i] [-topN N]"
        //not like nutch3.3 "Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>";
        for (int i = 0; i < args.length; i++) {
            if ("-urls".equals(args[i])) {
                seedDir = args[++i];
            } else if ("-crawlId".equals(args[i])) {
                crawlID = args[++i];
            } else if ("-threads".equals(args[i])) {
                threads = Integer.parseInt(args[++i]);
            } else if ("-depth".equals(args[i])) {
                limit = Integer.parseInt(args[++i]);
            } else if ("-topN".equals(args[i])) {
                topN =   Long.parseLong(args[++i]);
            } else if ("-solr".equals(args[i])) {
                solrUrl = args[++i];
                i++;
            } else  {
                System.err.println("Unrecognized arg " + args[i]);
                return -1;
            }
        }

        if(StringUtils.isEmpty(seedDir)){
            System.out.println("Missing seedDir : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>");
            return -1;
        }
        if(StringUtils.isEmpty(crawlID)){
            System.out.println("Missing crawlID : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>");
            return -1;
        }
        if(StringUtils.isEmpty(solrUrl)){
            System.out.println("No SOLRURL specified. Skipping indexing.");
        }
        if(limit == 0) {
            System.out.println("Missing numberOfRounds : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>");
            return -1;
        }

        /**
         * MODIFY THE PARAMETERS BELOW TO YOUR NEEDS
         */
        //set the number of slaves nodes
        int numSlaves = 1;
        //and the total number of available tasks
        // sets Hadoop parameter "mapred.reduce.tasks"
        int numTasks = numSlaves<<1;
        // number of urls to fetch in one iteration
        // 250K per task?
        //!!這里使用topN
        long sizeFetchlist = topN;//numSlaves *5;
        // time limit for feching
        int timeLimitFetch=180;
        //Adds <days> to the current time to facilitate
        //crawling urls already fetched sooner then
        //db.default.fetch.interval.
        int addDays=0;

        // note that some of the options listed here could be set in the
        // corresponding hadoop site xml param file
        String commonOptions="-D mapred.reduce.tasks="+numTasks+" -D mapred.child.java.opts=-Xmx1000m -D mapred.reduce.tasks.speculative.execution=false -D mapred.map.tasks.speculative.execution=false -D mapred.compress.map.output=true ";

        preConfig(getConf(),commonOptions);

        //initial injection
        System.out.println("Injecting seed URLs");
        String  inject_args = seedDir+" -crawlId "+crawlID;
        binNutch5j("inject",inject_args,commonOptions);

        for(int a=1;a<=limit;a++){
            //-----------generating-------------
            System.out.println("Generating batchId");
            String batchId = System.currentTimeMillis()+"-"+new Random().nextInt(32767);
            System.out.println("Generating a new fetchlist");
            String  generate_args = "-topN "+ sizeFetchlist +" -noNorm -noFilter -adddays "+addDays+" -crawlId "+crawlID+" -batchId "+batchId;
            //String  generate_options = commonOptions;
            int  res = runJob("generate",generate_args,commonOptions);
            System.out.println("binNutch5j generate "+generate_args);
            if(res==0){

            }else if(res == 1){
                System.out.println("Generate returned 1 (no new segments created)");
                System.out.println("Escaping loop: no more URLs to fetch now");
                break;
            }else{
                System.out.println("Error running:");
                System.out.println("binNutch5j generate "+generate_args);
                System.out.println("Failed with exit value "+res);
                return res;
            }
            //--------fetching-----------
            System.out.println("Fetching : ");
            //String fetch_args = batchId+" -crawlId "+crawlID+" -threads "+threads;
            String fetch_args = "-all"+" -crawlId "+crawlID+" -threads "+threads;
            String  fetch_options = commonOptions+" -D fetcher.timelimit.mins="+timeLimitFetch;
            //10 threads
            binNutch5j("fetch",fetch_args,fetch_options);
            //----------parsing--------------
            // parsing the batch
            //
            if(!getConf().getBoolean(FetcherJob.PARSE_KEY, false)){
                System.out.println("Parsing : ");
                //enable the skipping of records for the parsing so that a dodgy document
                // so that it does not fail the full task
                //String parse_args = batchId+" -crawlId "+crawlID;
                String parse_args = "-all"+" -crawlId "+crawlID;
                String  skipRecordsOptions=" -D mapred.skip.attempts.to.start.skipping=2 -D mapred.skip.map.max.skip.records=1";
                binNutch5j("parse",parse_args,commonOptions+skipRecordsOptions);
            }

            //----------updatedb------------
            // updatedb with this batch
            System.out.println("CrawlDB update for "+crawlID);
           // String updatedb_args = batchId+" -crawlId "+crawlID;
            String updatedb_args = "-all"+" -crawlId "+crawlID;
            binNutch5j("updatedb",updatedb_args,commonOptions);

            if(!StringUtils.isEmpty(solrUrl)){
                System.out.println("Indexing "+ crawlID+ " on SOLR index -> " +solrUrl);
                String index_args = batchId+"  -all -crawlId "+crawlID;
                String  index_options = commonOptions+" -D solr.server.url="+solrUrl;
                binNutch5j("index",index_args,index_options);

                System.out.println("SOLR dedup -> "+solrUrl);
                binNutch5j("solrdedup",solrUrl,commonOptions);

            }else{
                System.out.println("Skipping indexing tasks: no SOLR url provided.");
            }

        }

        return 0;
    }

    /**
     * 相當(dāng)于bin/crawl的函數(shù)__bin_nutch的功能
     * @param jobName job
     * @param commandLine
     */

    public void binNutch5j(String jobName,String commandLine,String options)throws Exception{
        int res = runJob(jobName,commandLine,options);
        if(res!=0) {
            System.out.println("Error running:");
            System.out.println(jobName + " " + commandLine);
            System.out.println("Error running:");
            System.exit(res);
        }
    }

    /**
     * 相當(dāng)于腳本bin/nutch的功能
     *
     * @param jobName
     * @param commandLine
     * @return
     */
    public int runJob(String jobName,String commandLine,String options)throws Exception{
        //這里為了方便,沒有像腳本那樣用多個(gè)if-elif語(yǔ)句,也沒有用switch-case,直接用了反射來完成
        Configuration conf = NutchConfiguration.create();
        if(!StringUtils.isEmpty(options)){
            preConfig(conf,options);
        }
        String[] args =  commandLine.split("\\s+");
        String className = CLASS_MAP.get(jobName);
        Class<?> jobClass  =  Class.forName(className);
        Constructor c = jobClass.getConstructor();
        Tool  job =(Tool) c.newInstance();
        System.out.println("---------------runJob: "+jobClass.getName()+"----------------------");
        return  ToolRunner.run(conf, job, args);
    }


    /**
     * 設(shè)置每個(gè)job的配置
     * @param conf
     * @param options
     */
    public void preConfig(Configuration conf,String options){
        String [] equations = options.split("\\s*-D\\s+");
        System.out.println("options:"+options);
        // i start from  1 not  0, skip the empty string ""
        for (int i=1;i<equations.length;i++) {
            String equation = equations[i];
            String [] pair = equation.split("=");
            //System.out.println(pair[0]+":"+pair[1]);
            conf.set(pair[0],pair[1]);
            //System.out.println("conf print: "+pair[0]+"  "+conf.get(pair[0]));
        }
    }


    /**
     * the map to store the mapping relations jobName->ClassName
     */
    public static HashMap<String,String> CLASS_MAP = new HashMap<String,String>();

    /**
     * init the CLASS_MAP,refer to "bin/nutch"
     */
    static {
        CLASS_MAP.put("inject","org.apache.nutch.crawl.InjectorJob");
        CLASS_MAP.put("generate","org.apache.nutch.crawl.GeneratorJob");
        CLASS_MAP.put("fetch","org.apache.nutch.fetcher.FetcherJob");
        CLASS_MAP.put("parse","org.apache.nutch.parse.ParserJob");
        CLASS_MAP.put("updatedb","org.apache.nutch.crawl.DbUpdaterJob");
        CLASS_MAP.put("readdb","org.apache.nutch.crawl.WebTableReader");
        CLASS_MAP.put("elasticindex","org.apache.nutch.indexer.elastic.ElasticIndexerJob");
        CLASS_MAP.put("index","org.apache.nutch.indexer.IndexingJob");
        CLASS_MAP.put("solrdedup","org.apache.nutch.indexer.solr.SolrDeleteDuplicates");
    }

}

關(guān)于“如何將nutch2.3的bin/crawl腳本改寫為java類”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI