溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

hbase的典型場景

發(fā)布時間:2020-07-12 22:40:48 來源:網(wǎng)絡(luò) 閱讀:381 作者:原生zzy 欄目:大數(shù)據(jù)

1. hbase整合Mapreduce

  在離線任務(wù)場景中,MapReduce訪問HBASE數(shù)據(jù),加快分析速度和擴展分析能力。
hbase的典型場景
從hbase中讀取數(shù)據(jù)(result)

public class ReadHBaseDataMR {
    private static final String ZK_KEY = "hbase.zookeeper.quorum";
    private static final String ZK_VALUE = "hadoop01:2181,hadoop01:2182,hadoop03:2181";
    private static Configuration conf;
    static {
        conf=HBaseConfiguration.create();
        conf.set(ZK_KEY,ZK_VALUE);
        //因為是從hbase中讀取到自己的hdfs集群中,所以這里需要加載hdfs的配置文件
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
    }
    //job
    public static void main(String[] args) {
        Job job = null;
        try {
            //這里使用hbase的 conf
            job = Job.getInstance(conf);
            job.setJarByClass(ReadHBaseDataMR.class);
            //全表掃描
            Scan scans=new Scan();
            String tableName="user_info";
            //設(shè)置MapReduce與hbase的整合
            TableMapReduceUtil.initTableMapperJob(tableName,
                    scans,
                    ReadHBaseDataMR_Mapper.class,
                    Text.class,
                    NullWritable.class,
                    job,
                    false);
            //設(shè)置ReducerTask 的個數(shù)為0
            job.setNumReduceTasks(0);
            //設(shè)置輸出搭配hdfs上的路徑
            Path output=new Path("/output/hbase/hbaseToHDFS");
            if(output.getFileSystem(conf).exists(output)) {
                output.getFileSystem(conf).delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            //提交任務(wù)
            boolean waitForCompletion = job.waitForCompletion(true);
            System.exit(waitForCompletion?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    //使用TableMapper,去讀取hbase中的表的數(shù)據(jù)
    private static class ReadHBaseDataMR_Mapper extends TableMapper<Text, NullWritable> {
        Text mk = new Text();
        NullWritable kv = NullWritable.get();

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //默認的按照每一個rowkey讀取
             List<Cell> cells = value.listCells();
             //這里以四個坐標(biāo)確定一行記錄,行鍵,列簇,列,時間戳
             for(Cell cell:cells){
                String row= Bytes.toString(CellUtil.cloneRow(cell));  //行鍵
                String cf=Bytes.toString(CellUtil.cloneFamily(cell)); //列簇
                String column=Bytes.toString(CellUtil.cloneQualifier(cell));  //列
                String values=Bytes.toString(CellUtil.cloneValue(cell));  //值
                long time=cell.getTimestamp();  //時間戳
                 mk.set(row+"\t"+cf+"\t"+column+"\t"+value+"\t"+time);
                 context.write(mk,kv);
             }
        }
    }
}

寫入數(shù)據(jù)到hbase中(put)

public class HDFSToHbase {

    private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
    private static final String ZK_CONNECT_VALUE = "hadoop02:2181,hadoop03:2181,hadoop01:2181";
    private static Configuration conf;
    static {
        conf=HBaseConfiguration.create();
        conf.set(ZK_CONNECT_KEY,ZK_CONNECT_VALUE);
        //因為是從hbase中讀取到自己的hdfs集群中,所以這里需要加載hdfs的配置文件
        conf.addResource("core-site.xml");
        conf.addResource("hdfs-site.xml");
    }

    //job
    public static void main(String[] args) {
        try {
            Job job = Job.getInstance(conf);
            job.setJarByClass(HDFSToHbase.class);
            job.setMapperClass(MyMapper.class);
            //指定Map端的輸出
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);

            /**
             * 指定為nulL的表示使用默認的
             */
            String tableName="student";
            //整合MapReduce  reducer 到hbase
            TableMapReduceUtil.initTableReducerJob(tableName,MyReducer.class,
           job,null, null, null, null,
                    false );
            //指定MapReducer的輸入路徑
            Path input = new Path("/in/mingxing.txt");
            FileInputFormat.addInputPath(job, input);
            //提交任務(wù)
            boolean waitForCompletion = job.waitForCompletion(true);
            System.exit(waitForCompletion ? 0 : 1);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //Mapper
    private static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        NullWritable mv = NullWritable.get();
        //map端不做任何操作,直接將讀取的數(shù)據(jù)輸出到reduce端
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, mv);
        }
    }

    //Reudcer,使用TableReducer的Reudcer
    /**
     * TableReducer<KEYIN, VALUEIN, KEYOUT>
     * KEYIN:mapper輸出的key
     * VALUEIN:mapper輸出的value
     * KEYOUT:reduce輸出的key
     * 默認的有第四個參數(shù):Mutation,表示put/delete操作
     */
    private static class MyReducer extends TableReducer<Text, NullWritable, NullWritable>{
        //列簇
            String family[] = { "basicinfo","extrainfo"};

            @Override
            protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
                // zhangfenglun,M,20,13522334455,zfl@163.com,23521472  字段
                for(NullWritable value:values){
                    String fields[]=key.toString().split(",");
                    //以名稱作為rowkey
                    Put put=new Put(fields[0].getBytes());
                    put.addColumn(fields[0].getBytes(),"sex".getBytes(),fields[1].getBytes());
                    put.addColumn(fields[0].getBytes(),"age".getBytes(),fields[2].getBytes());
                    put.addColumn(fields[1].getBytes(),"phone".getBytes(),fields[3].getBytes());
                    put.addColumn(fields[1].getBytes(),"email".getBytes(),fields[4].getBytes());
                    put.addColumn(fields[1].getBytes(),"qq".getBytes(),fields[5].getBytes());
                    context.write(value, put);
                }
        }
    }
}

2. MySQL導(dǎo)入到HBASE

#使用sqoop從MySQL導(dǎo)入HBASE

sqoop import \
--connect jdbc:mysql://hadoop01:3306/test \   #MySQL的入口
--username hadoop \  #登錄MySQL的用戶名
--password root \     #登錄MySQL的密碼
--table book \          #插入的到MySQL的表
--hbase-table book \   #HBASE的表名    
--column-family info \   #HBASE表中的列簇
--hbase-row-key bid \   #mysql中的哪一個列為rowkey

#ps:這里由于版本不兼容的問題,所以,這里的HBASE中插入的表必須提前創(chuàng)建,并且不能使用:--hbase-create-table \,這個語句

3.HBASE整合hive

  原理:Hive與HBASE利用兩者本身對外的API來實現(xiàn)整合,主要靠的是HBaseStorageHandler 進 行通信,利用 HBaseStorageHandler,Hive 可以獲取到 Hive 表對應(yīng)的 HBase 表名,列簇以及 列,InputFormat 和 OutputFormat 類,創(chuàng)建和刪除 HBase 表等。
  Hive 訪問 HBase 中表數(shù)據(jù),實質(zhì)上是通過 MapReduce 讀取 HBase 表數(shù)據(jù),其實現(xiàn)是在 MR 中,使用 HiveHBaseTableInputFormat 完成對 HBase 表的切分,獲取 RecordReader 對象來讀 取數(shù)據(jù)。
  對HBASE表的切分原則:一個region切分成一個split,即表中有多少個region,MapReduce就有多少個map task。
  讀取HBASE表數(shù)據(jù)都是通過scanner,對表進行全表掃描,如果有過濾條件,則轉(zhuǎn)化為filter,當(dāng)過濾條件為rowkey時,則轉(zhuǎn)化為rowkey的過濾。
具體操作

#指定 hbase 所使用的 zookeeper 集群的地址:默認端口是 2181,可以不寫:
hive>set hbase.zookeeper.quorum=hadoop02:2181,hadoop03:2181,hadoop04:2181;
#指定 hbase 在 zookeeper 中使用的根目錄
hive>set zookeeper.znode.parent=/hbase;
#創(chuàng)建基于 HBase 表的 hive 表
hive>create external table mingxing(rowkey string, base_info map, extra_info map) row format delimited fields terminated by '\t' 
>stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
>with serdeproperties ("hbase.columns.mapping" = ":key,base_info:,extra_info:")
>tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="mingxing");
#ps:org.apache.hadoop.hive.hbase.HBaseStorageHandler:處理 hive 到 hbase 轉(zhuǎn)換關(guān)系的處理器
#ps:hbase.columns.mapping:定義 hbase 的列簇和列到 hive 的映射關(guān)系
#ps:hbase.table.name:hbase 表名

雖然hive整合了hbase,但是實際的數(shù)據(jù)還是存儲在hbase上,hive相應(yīng)的表目錄下對應(yīng)的文件為空,但是每次hbase中有數(shù)據(jù)添加時,hive在執(zhí)行這張表查詢的時候,也會更新相應(yīng)的字段。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI