溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

HBase和HDFS數(shù)據(jù)互導(dǎo)程序

發(fā)布時(shí)間:2020-07-28 18:52:04 來源:網(wǎng)絡(luò) 閱讀:3125 作者:jethai 欄目:關(guān)系型數(shù)據(jù)庫




下面說說JAVA API 提供的這些類的功能和他們之間有什么樣的聯(lián)系。


1.HBaseConfiguration

關(guān)系:org.apache.hadoop.hbase.HBaseConfiguration

作用:通過此類可以對HBase進(jìn)行配置

用法實(shí)例: Configuration config = HBaseConfiguration.create();

說明: HBaseConfiguration.create() 默認(rèn)會從classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration。

2.HBaseAdmin 類

關(guān)系:org.apache.hadoop.hbase.client.HBaseAdmin

作用:提供接口關(guān)系HBase 數(shù)據(jù)庫中的表信息

用法:HBaseAdmin admin = new HBaseAdmin(config);

3.Descriptor類

關(guān)系:org.apache.hadoop.hbase.HTableDescriptor

作用:HTableDescriptor 類包含了表的名字以及表的列族信息

用法:HTableDescriptor htd =new HTableDescriptor(tablename);

             構(gòu)造一個表描述符指定TableName對象。

             Htd.addFamily(new HColumnDescriptor(“myFamily”));

             將列家族給定的描述符

4.HTable

關(guān)系:org.apache.hadoop.hbase.client.HTable

作用:HTable HBase 的表通信

用法:HTable tab = new HTable(config,Bytes.toBytes(tablename));

           ResultScanner sc = tab.getScanner(Bytes.toBytes(“familyName”));

說明:獲取表內(nèi)列族 familyNme 的所有數(shù)據(jù)。

5.Put

關(guān)系:org.apache.hadoop.hbase.client.Put

作用:獲取單個行的數(shù)據(jù)

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

           Put put = new Put(row);

           p.add(family,qualifier,value);

說明:向表 tablename 添加 “family,qualifier,value”指定的值。

6.Get

關(guān)系:org.apache.hadoop.hbase.client.Get

作用:獲取單個行的數(shù)據(jù)

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

           Get get = new Get(Bytes.toBytes(row));

           Result result = table.get(get);

說明:獲取 tablename 表中 row 行的對應(yīng)數(shù)據(jù)

7.ResultScanner

關(guān)系:Interface

作用:獲取值的接口

用法:ResultScanner scanner = table.getScanner(Bytes.toBytes(family));

           For(Result rowResult : scanner){

                   Bytes[] str = rowResult.getValue(family,column);

}

說明:循環(huán)獲取行中列值。



例1 HBase之讀取HDFS數(shù)據(jù)寫入HBase

package org.hadoop.hbase;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountHbaseWriter {
 public static class WordCountHbaseMapper extends
   Mapper<Object, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);// 輸出<key,value>為<word,one>
   }
  }
 }
 public static class WordCountHbaseReducer extends
   TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {// 遍歷求和
    sum += val.get();
   }
   Put put = new Put(key.getBytes());//put實(shí)例化,每一個詞存一行
   //列族為content,列修飾符為count,列值為數(shù)目
   put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
   context.write(new ImmutableBytesWritable(key.getBytes()), put);// 輸出求和后的<key,value>
  }
 }
 
 public static void main(String[] args){
  String tablename = "wordcount";
  Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", "192.168.1.139");
    conf.set("hbase.zookeeper.property.clientPort", "2191");
  HBaseAdmin admin = null;
  try {
   admin = new HBaseAdmin(conf);
   if(admin.tableExists(tablename)){
    System.out.println("table exists!recreating.......");
    admin.disableTable(tablename);
    admin.deleteTable(tablename);
   }
   HTableDescriptor htd = new HTableDescriptor(tablename);
   HColumnDescriptor tcd = new HColumnDescriptor("content");
   htd.addFamily(tcd);//創(chuàng)建列族
   admin.createTable(htd);//創(chuàng)建表
   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length != 1) {
        System.err.println("Usage: WordCountHbaseWriter <in>");
        System.exit(2);
      }
      Job job = new Job(conf, "WordCountHbaseWriter");
  job.setNumReduceTasks(2);
      job.setJarByClass(WordCountHbaseWriter.class);
   //使用WordCountHbaseMapper類完成Map過程;
      job.setMapperClass(WordCountHbaseMapper.class);
      TableMapReduceUtil.initTableReducerJob(tablename, WordCountHbaseReducer.class, job);
      //設(shè)置任務(wù)數(shù)據(jù)的輸入路徑;
      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
   //設(shè)置了Map過程的輸出類型,其中設(shè)置key的輸出類型為Text;
      job.setOutputKeyClass(Text.class);
   //設(shè)置了Map過程的輸出類型,其中設(shè)置value的輸出類型為IntWritable;
      job.setOutputValueClass(IntWritable.class);
   //調(diào)用job.waitForCompletion(true) 執(zhí)行任務(wù),執(zhí)行成功后退出;
      System.exit(job.waitForCompletion(true) ? 0 : 1);
  } catch (Exception e) {
   e.printStackTrace();
  } finally{
   if(admin!=null)
    try {
     admin.close();
    } catch (IOException e) {
     e.printStackTrace();
    }
  }
  
 }
}


例2 HBase之讀取HBase數(shù)據(jù)寫入HDFS

package org.hadoop.hbase;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountHbaseReader {
 
 public static class WordCountHbaseReaderMapper extends 
    TableMapper<Text,Text>{
    @Override
    protected void map(ImmutableBytesWritable key,Result value,Context context)
            throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer("");
        for(Entry<byte[],byte[]> entry:value.getFamilyMap("content".getBytes()).entrySet()){
            String str =  new String(entry.getValue());
            //將字節(jié)數(shù)組轉(zhuǎn)換為String類型
            if(str != null){
                sb.append(new String(entry.getKey()));
                sb.append(":");
                sb.append(str);
            }
            context.write(new Text(key.get()), new Text(new String(sb)));
        }
    }
}
 public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,Text>{
     private Text result = new Text();
     @Override
     protected void reduce(Text key, Iterable<Text> values,Context context)
             throws IOException, InterruptedException {
         for(Text val:values){
             result.set(val);
             context.write(key, result);
         }
     }
 }
 
 public static void main(String[] args) throws Exception {
     String tablename = "wordcount";
     Configuration conf = HBaseConfiguration.create();
     conf.set("hbase.zookeeper.quorum", "192.168.1.139");
     conf.set("hbase.zookeeper.property.clientPort", "2191");
     
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     if (otherArgs.length != 1) {
       System.err.println("Usage: WordCountHbaseReader <out>");
       System.exit(2);
     }
     Job job = new Job(conf, "WordCountHbaseReader");
     job.setJarByClass(WordCountHbaseReader.class);
     //設(shè)置任務(wù)數(shù)據(jù)的輸出路徑;
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
     job.setReducerClass(WordCountHbaseReaderReduce.class);
     Scan scan = new Scan();
     TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
     //調(diào)用job.waitForCompletion(true) 執(zhí)行任務(wù),執(zhí)行成功后退出;
     System.exit(job.waitForCompletion(true) ? 0 : 1);

 }
}


程序中用到hadoop的相關(guān)JAR包(如下圖)及hbase所有jar包

HBase和HDFS數(shù)據(jù)互導(dǎo)程序

如果上面的API還不能滿足你的要求,可以到下面這個網(wǎng)站里面Hbase全部API介紹

http://www.yiibai.com/hbase/

 

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI