溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

MapReduce Map Join怎么使用

發(fā)布時(shí)間:2021-12-23 13:49:26 來(lái)源:億速云 閱讀:166 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“MapReduce Map Join怎么使用”,在日常操作中,相信很多人在MapReduce Map Join怎么使用問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”MapReduce Map Join怎么使用”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

1. 樣例數(shù)據(jù) 
 

011990-99999    SIHCCAJAVRI
012650-99999    TYNSET-HANSMOEN


012650-99999    194903241200    111
012650-99999    194903241800    78
011990-99999    195005150700    0
011990-99999    195005151200    22
011990-99999    195005151800    -11


2. 需求 
MapReduce Map Join怎么使用  

3. 思路、代碼 
足夠小的關(guān)聯(lián)文件(即氣象臺(tái)信息)添加到分布式緩存,然后在每個(gè) Mapper 端讀取被緩存到本地全量氣象臺(tái)信息,再與天氣信息相關(guān)聯(lián)。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


public class MapJoin {

    static class RecordMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Map<String, String> stationMap = new HashMap<String, String>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //預(yù)處理,把要關(guān)聯(lián)的文件加載到緩存中
            Path[] paths = context.getLocalCacheFiles();
            //新的檢索緩存文件的API是 context.getCacheFiles() ,而 context.getLocalCacheFiles() 被棄用
            //然而 context.getCacheFiles() 返回的是 HDFS 路徑; context.getLocalCacheFiles() 返回的才是本地路徑

            //這里只緩存了一個(gè)文件,所以取第一個(gè)即可
            BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));
            String line = null;
            try {
                while ((line = reader.readLine()) != null) {
                    String[] vals = line.split("\\t");
                    if (vals.length == 2) {
                        stationMap.put(vals[0], vals[1]);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reader.close();
            }
            super.setup(context);
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] vals = value.toString().split("\\t");
            if (vals.length == 3) {
                String stationName = stationMap.get(vals[0]); //Join
                stationName = stationName == null ? "" : stationName;
                context.write(new Text(vals[0]),
                        new Text(stationName + "\t" + vals[1] + "\t" + vals[2]));
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Parameter number is wrong, please enter three parameters:<ncdc input> <station input> <output>");
            System.exit(-1);
        }

        Path inputPath = new Path(otherArgs[0]);
        Path stationPath = new Path(otherArgs[1]);
        Path outputPath = new Path(otherArgs[2]);

        Job job = Job.getInstance(conf, "MapJoin");
        job.setJarByClass(MapJoin.class);

        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        job.addCacheFile(stationPath.toUri()); //添加緩存文件,可添加多個(gè)

        job.setMapperClass(RecordMapper.class);
        job.setMapOutputKeyClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}


4. 運(yùn)行結(jié)果
MapReduce Map Join怎么使用

到此,關(guān)于“MapReduce Map Join怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI