溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

hbase如何編寫(xiě)mapreduce

發(fā)布時(shí)間:2021-12-09 10:38:06 來(lái)源:億速云 閱讀:159 作者:小新 欄目:大數(shù)據(jù)

這篇文章主要為大家展示了“hbase如何編寫(xiě)mapreduce”,內(nèi)容簡(jiǎn)而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“hbase如何編寫(xiě)mapreduce”這篇文章吧。

package com.hbase.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class HbaseMrTest {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf =  HBaseConfiguration.create();
		//配置conf
		conf.set("hbase.zookeeper.quorum", "bigdata01,bigdata02,bigdata03");
		conf.set("hbase.zookeeper.property.clientPort", "2181");	
		Job job = Job.getInstance(conf, "word-count");
		//指定執(zhí)行job的主類(lèi)
		job.setJarByClass(HbaseMrTest.class);
		Scan scan = new Scan();
		//定義mapper需要掃描的列
		scan.addColumn(Bytes.toBytes("content"), Bytes.toBytes("words"));
		//配置mapper
		TableMapReduceUtil.initTableMapperJob("wordcount", scan,HMapper.class , Text.class, IntWritable.class, job);
		//配置recuder
		TableMapReduceUtil.initTableReducerJob("result", HReducer.class, job);
		//提交job
		System.exit(job.waitForCompletion(true)?0:1);
	}
}
// Text, IntWritable 為輸出類(lèi)型
class HMapper extends TableMapper<Text, IntWritable>{
	Text out = new Text();
	IntWritable iw = new IntWritable(1);
	@Override
	protected void map(ImmutableBytesWritable key, Result value,
			Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		//通過(guò)result 直接過(guò)得content:words 的值
		byte[] bytes =  value.getValue(Bytes.toBytes("content"), Bytes.toBytes("words"));
		if(bytes!=null) {
			String words = Bytes.toString(bytes);
			//對(duì)獲得的一行單詞進(jìn)行分割
			String[] ws = words.split(" ");
			for(String wd : ws) {
				out.set(wd);
				//寫(xiě)出值,如: you 1
				context.write(out, iw);
			}			
		}
	}	
}
// Text, IntWritable 為mapper的輸出類(lèi)型
class HReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
	@Override
	protected void reduce(Text text, Iterable<IntWritable> iter,
			Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
			throws IOException, InterruptedException {
		int sum = 0 ;
		//對(duì)iter遍歷
		for(IntWritable intw : iter) {
			sum+= intw.get();
		}
		//new 一個(gè)put 構(gòu)造函數(shù)內(nèi)的值為row key
		Put put = new Put(Bytes.toBytes(text.toString()));
		//put添加columnfamily 和column
		put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("wordcnt"), Bytes.toBytes(String.valueOf(sum)));
		//將每個(gè)單詞當(dāng)做row key 寫(xiě)出,put是相加的總和
		context.write(new ImmutableBytesWritable(Bytes.toBytes(text.toString())), put);
	}
	
}
最后將java文件export為RaunableJar放到linux java -jar hbase.jar com.hbase.test.HbaseMrTest 運(yùn)行

原始數(shù)據(jù):hbase如何編寫(xiě)mapreduce

運(yùn)行結(jié)果:

hbase如何編寫(xiě)mapreduce

以上是“hbase如何編寫(xiě)mapreduce”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI