溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

如何理解TopK算法及其實(shí)現(xiàn)

發(fā)布時(shí)間:2021-11-24 15:54:19 來(lái)源:億速云 閱讀:195 作者:柒染 欄目:云計(jì)算

今天就跟大家聊聊有關(guān)如何理解TopK算法及其實(shí)現(xiàn),可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

1、問(wèn)題描述

在大數(shù)據(jù)規(guī)模中,經(jīng)常遇到一類(lèi)需要求出現(xiàn)頻率最高的K個(gè)數(shù),這類(lèi)問(wèn)題稱(chēng)為“TOPK”問(wèn)題!例如:統(tǒng)計(jì)歌曲中最熱門(mén)的前10首歌曲,統(tǒng)計(jì)訪問(wèn)流量最高的前5個(gè)網(wǎng)站等。

2、例如統(tǒng)計(jì)訪問(wèn)流量最高的前5個(gè)網(wǎng)站:

數(shù)據(jù)test.data文件:

如何理解TopK算法及其實(shí)現(xiàn)

數(shù)據(jù)格式解釋?zhuān)河蛎?nbsp;   上行流量    下行流量

思路:

1、Mapper每解析一行內(nèi)容,按照"\t"獲取各個(gè)字段

2、因?yàn)閁RL有很多重復(fù)記錄,所以將URL放到key(通過(guò)分析MapReduce原理),流量放在value

3、在reduce統(tǒng)計(jì)總流量,通過(guò)TreeMap進(jìn)行對(duì)數(shù)據(jù)進(jìn)行緩存,最后一并輸出(值得注意的是要一次性輸出必須要用到Reduce類(lèi)的cleanup方法)

程序如下:

Mapper類(lèi):

package com.itheima.hadoop.mapreduce.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    /**
     * @param key
     *            : 每一行偏移量
     * @param value
     *            : 每一行的內(nèi)容
     * @param context
     *            : 環(huán)境上下文
     */
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        /**
         * 該計(jì)數(shù)器是org.apache.hadoop.mapreduce.Counter
         */
        Counter counter = context
                .getCounter("ExistProblem", "ExistProblemLine"); // 自定義存在問(wèn)題的行錯(cuò)誤計(jì)數(shù)器
        String line = value.toString(); // 讀取一行數(shù)據(jù)
        String[] fields = line.split("\t"); // 獲取各個(gè)字段,按照\(chéng)t劃分
        try {
            String url = fields[0]; // 獲取URL字段
            long upFlow = Long.parseLong(fields[1]); // 獲取上行流量(upFlow)字段
            long downFlow = Long.parseLong(fields[2]); // 獲取下行流量(downFlow)字段

            FlowBean bean = new FlowBean(upFlow, downFlow); // 將上行流量和下行流量封裝到bean中
            Text tUrl = new Text(url); // 將java數(shù)據(jù)類(lèi)型轉(zhuǎn)換hadoop數(shù)據(jù)類(lèi)型
            context.write(tUrl, bean); // 傳遞的數(shù)據(jù)較多,封裝到bean進(jìn)行傳輸(tips:bean傳輸時(shí)需要注意序列化問(wèn)題)
        } catch (Exception e) {
            e.printStackTrace();
            counter.increment(1); // 記錄錯(cuò)誤行數(shù)
        }
    }
}

Reduce類(lèi):

package com.itheima.hadoop.mapreduce.reducer;

import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.itheima.hadoop.mapreduce.bean.FlowBean;

public class TopKURLReducer extends Reducer<Text, FlowBean, FlowBean, Text> {
    private TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean, Text>();
    /**
     * @param key
     *            : 每一行相同URL
     * @param values
     *            : 總流量bean
     */
    @Override
    public void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        long countUpFlow = 0;
        long countDownFlow = 0;
        /*
         * 1、取出每個(gè)bean的總流量 2、統(tǒng)計(jì)多個(gè)bean的總流量 3、緩存到treeMap中
         */
        for (FlowBean bean : values) {
            countUpFlow += bean.getUpFlow(); // 統(tǒng)計(jì)上行流量
            countDownFlow += bean.getDownFlow(); // 統(tǒng)計(jì)下行總流量
        }
        // 封裝統(tǒng)計(jì)的流量
        FlowBean bean = new FlowBean(countUpFlow, countDownFlow);
        treeMap.put(bean, new Text(key)); // 緩存到treeMap中
    }
    @Override
    public void cleanup(Context context) throws IOException,
            InterruptedException {
        //遍歷緩存
        for (Entry<FlowBean,Text> entry : treeMap.entrySet()) {
            context.write(entry.getKey(), entry.getValue());
        }
        super.cleanup(context); // 不能動(dòng)原本的銷(xiāo)毀操作
    }
}

FlowBean類(lèi):

package com.itheima.hadoop.mapreduce.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable, Comparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long maxFlow;
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + maxFlow;
    }
    /**
     * 1、序列化注意的問(wèn)題,序列化需要默認(rèn)的構(gòu)造方法(反射) 2、在readFields()和write()方法中,應(yīng)該遵循按照順序?qū)懗龊妥x入
     */
    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.maxFlow = upFlow + downFlow;
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getMaxFlow() {
        return maxFlow;
    }
    public void setMaxFlow(long maxFlow) {
        this.maxFlow = maxFlow;
    }
    @Override
    public void readFields(DataInput dataIn) throws IOException {
        upFlow = dataIn.readLong();
        downFlow = dataIn.readLong();
        maxFlow = dataIn.readLong();
    }
    @Override
    public void write(DataOutput dataOut) throws IOException {
        dataOut.writeLong(upFlow);
        dataOut.writeLong(downFlow);
        dataOut.writeLong(maxFlow);
    }
    @Override
    public int compareTo(FlowBean o) {
        return this.maxFlow > o.maxFlow ? -1
                : this.maxFlow < o.maxFlow ? 1 : 0;
    }
}

驅(qū)動(dòng)類(lèi):

package com.itheima.hadoop.drivers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import com.itheima.hadoop.mapreduce.bean.FlowBean;
import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;
import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;

public class TopKURLDriver extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {
        
        /**
         * 1、創(chuàng)建job作業(yè)
         * 2、設(shè)置job提交的Class
         * 3、設(shè)置MapperClass,設(shè)置ReduceClass
         * 4、設(shè)置Mapper和Reduce各自的OutputKey和OutputValue類(lèi)型
         * 5、設(shè)置處理文件的路徑,輸出結(jié)果的路徑
         * 6、提交job
         */
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(TopKURLRunner.class);
        
        job.setMapperClass(TopKURLMapper.class);
        job.setReducerClass(TopKURLReducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        
        //參數(shù)true為打印進(jìn)度
        return job.waitForCompletion(true)?0:1;
    }

}
package com.itheima.hadoop.runner;

import org.apache.hadoop.util.ToolRunner;

import com.itheima.hadoop.runner.TopKURLRunner;

public class TopKURLRunner {

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new TopKURLRunner(), args);
        System.exit(res);
    }
}

運(yùn)行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData

運(yùn)行結(jié)果:

如何理解TopK算法及其實(shí)現(xiàn)

看完上述內(nèi)容,你們對(duì)如何理解TopK算法及其實(shí)現(xiàn)有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI