溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hbase0.98 coprocessor Endpoint如何實現(xiàn)HelloWorld

發(fā)布時間:2021-12-04 15:19:51 來源:億速云 閱讀:156 作者:小新 欄目:云計算

這篇文章主要為大家展示了“hbase0.98 coprocessor Endpoint如何實現(xiàn)HelloWorld”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“hbase0.98 coprocessor Endpoint如何實現(xiàn)HelloWorld”這篇文章吧。

HBase作為列族數(shù)據(jù)庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執(zhí)行求和、計數(shù)、排序等操作。比如,在舊版本的(<0.92)Hbase中,統(tǒng)計數(shù)據(jù)表的總行數(shù),需要使用Counter方法,執(zhí)行一次MapReduce Job才能得到。雖然HBase在數(shù)據(jù)存儲層中集成了MapReduce,能夠有效用于數(shù)據(jù)表的分布式計算。然而在很多情況下,做一些簡單的相加或者聚合計算的時候,如果直接將計算過程放置在server端,能夠減少通訊開銷,從而獲得很好的性能提升。于是,HBase在0.92之后引入了協(xié)處理器(coprocessors),實現(xiàn)一些激動人心的新特性:能夠輕易建立二次索引、復雜過濾器(謂詞下推)以及訪問控制等。 HBase協(xié)處理器的靈感來自于Jeff Dean 09年的演講( P66-67)。

####hbase coprocessor 大類分為兩種coprocessor分別是:

  1. RegionObserver :它是一種類似于傳統(tǒng)數(shù)據(jù)庫的觸發(fā)器,提供了鉤子函數(shù):Get、Put、Delete、Scan等。

  1. Endpoint:是一個遠程rpc調用,類似于webservice形式調用,但他不適用xml,而是使用的序列化框架是protobuf(序列化后數(shù)據(jù)更?。?,本文將介紹此種Coprocessor.

Endpoint 允許您定義自己的動態(tài)RPC協(xié)議,用于客戶端與region servers通訊。Coprocessor 與region server在相同的進程空間中,因此您可以在region端定義自己的方法(endpoint),將計算放到region端,減少網絡開銷,常用于提升hbase的功能,如:count,sum等。

###我的環(huán)境

  • hadoop : 2.2

  • hbase-hadoop2 :0.98+

  • JDK:1.6 ##這里必須要1.6 要不然會出現(xiàn)不能加載jar包的現(xiàn)象。

  • 操作系統(tǒng):CentOS 6.4

###編寫代碼

  1. 首先你需要利用protobuf(網上自己搜google維護的目前發(fā)展到2.5版本) 工具成一個HelloWorld 序列化對象。

    ####HelloWorld.proto

    option java_package = "com.gzhdi.coprocessor.generated";
    option java_outer_classname = "ServerHelloworld";
    option java_generic_services = true;
    option java_generate_equals_and_hash = true;
    option optimize_for = SPEED;
    
    message HelloRequest {
      required bytes askWord = 10;
    }
    
    message HelloResponse {
      required bytes retWord = 10;
    }
    
    message AskRequest {
      required bytes ask = 100;
    }
    
    message AnsResponse {
      required bytes ans = 100;
    }
    
    service HelloWorld {
      rpc sendHello(HelloRequest)
        returns (HelloResponse);
    
      rpc question(AskRequest)
        returns (AnsResponse);
    }


  2. 使用命令生成代碼,并拷貝到你的工程里邊去,我的文件在工程下面放著呢,直接生成到工程里邊。 這段代碼就會生成一個HelloWorld.java文件.

    protoc.exe  --java_out=../src HelloWorld.proto


  3. 編寫主要代碼

    ####server端代碼

    package com.gzhdi.copocessor;
    
    import java.io.IOException;
    
    import org.apache.hadoop.hbase.Coprocessor;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    
    import com.google.protobuf.ByteString;
    import com.google.protobuf.RpcCallback;
    import com.google.protobuf.RpcController;
    import com.google.protobuf.Service;
    import com.gzhdi.coprocessor.generated.ServerHelloworld;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloRequest;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloResponse;
    
    public class HelloWorldEndPoint  extends ServerHelloworld.HelloWorld implements Coprocessor,CoprocessorService{
    
    	private RegionCoprocessorEnvironment env; 
    
    	@Override
    	public void sendHello(RpcController controller, HelloRequest request,
    			RpcCallback<HelloResponse> done) {
    		System.out.println("request HelloRequest:"+request.getAskWord());
    		HelloResponse resp=HelloResponse.newBuilder().setRetWord(ByteString.copyFromUtf8("hello world!!!")).build();
    
    		done.run(resp);
    	}
    
    	@Override
    	public void question(RpcController controller, AskRequest request,
    			RpcCallback<AnsResponse> done) {
    		System.out.println("request question:"+request.getAsk());
    		AnsResponse resp=AnsResponse.newBuilder().setAns(ByteString.copyFromUtf8("helloworld,"+request.getAsk().toStringUtf8())).build();
    		done.run(resp);
    	}
    
    	@Override
    	public Service getService() {
    		return this;
    	}
    
    	@Override
    	public void start(CoprocessorEnvironment env) throws IOException {
    		if (env instanceof RegionCoprocessorEnvironment) {  
    		      this.env = (RegionCoprocessorEnvironment)env;  
    		    } else {  
    		      throw new CoprocessorException("Must be loaded on a table region!");  
    		    }  
    	}
    
    	@Override
    	public void stop(CoprocessorEnvironment env) throws IOException {
    
    	}
    }


    ####client 端代碼

    package com.gzhdi.copocessor;
    
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.coprocessor.Batch;
    import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
    import org.apache.hadoop.hbase.ipc.ServerRpcController;
    
    import com.google.protobuf.ByteString;
    import com.google.protobuf.ServiceException;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AnsResponse;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.AskRequest;
    import com.gzhdi.coprocessor.generated.ServerHelloworld.HelloWorld;
    
    public class HelloWorldClient {
    
    	public static void main(String[] args) throws ServiceException, Throwable {
    		myclient();
    	}
    //如果你沒有寫好自己的例子可以跑跑hbase自帶的小例子
    //	private static void example1() throws IOException, ServiceException,
    //			Throwable {
    //		System.out.println("begin.....");  
    //        long begin_time=System.currentTimeMillis();  
    //       Configuration config=HBaseConfiguration.create();  
    ////     String master_ip="192.168.150.128";  
    //       String master_ip="10.10.113.211";  
    //       String zk_ip="10.10.113.211";  
    //       String table_name="t1";  
    //       config.set("hbase.zookeeper.property.clientPort", "2181");   
    //       config.set("hbase.zookeeper.quorum", zk_ip);   
    //       config.set("hbase.master", master_ip+":600000");  
    //       
    //       HTable table = new HTable(config, table_name);
    //       final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
    //       Map results = table.coprocessorService(
    //           ExampleProtos.RowCountService.class, // the protocol interface we're invoking
    //           null, null,                          // start and end row keys
    //           
    //           new Batch.Call() {
    //        	   
    //               public Long call(Object counter) throws IOException {
    //                 BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
    //                     new BlockingRpcCallback();
    //                 ((ExampleProtos.RowCountService)counter).getRowCount(null, request, rpcCallback);
    //                 ExampleProtos.CountResponse response = rpcCallback.get();
    //                 System.out.println("count :::::"+response.getCount());
    //                 return response.hasCount() ? response.getCount() : 0;
    //               }
    //			
    //           });
    //	}
    
    	public static void myclient(){
    		// TODO Auto-generated method stub
    				System.out.println("begin.....");  
    		        long begin_time=System.currentTimeMillis();  
    		       Configuration config=HBaseConfiguration.create();  
    //		     String master_ip="192.168.150.128";  
    		       String master_ip="10.10.113.211";  
    		       String zk_ip="10.10.113.211";  
    		       String table_name="t1";  
    		       config.set("hbase.zookeeper.property.clientPort", "2181");   
    		       config.set("hbase.zookeeper.quorum", zk_ip);   
    		       config.set("hbase.master", master_ip+":600000");  
    
    		       final AskRequest req=AskRequest.newBuilder().setAsk(ByteString.copyFromUtf8("hello")).build();
    		       AnsResponse resp=null;
    		       try {
    				HTable table=new HTable(config,table_name);
    				Map<byte[], ByteString> re=table.coprocessorService(HelloWorld.class, null, null, new Batch.Call<HelloWorld, ByteString>() {
    
    					@Override
    					public ByteString call(HelloWorld instance) throws IOException {
    						ServerRpcController controller = new ServerRpcController();
    						BlockingRpcCallback<AnsResponse> rpccall=new BlockingRpcCallback<AnsResponse>();
    						instance.question(controller, req, rpccall);
    						AnsResponse resp=rpccall.get();
    
    
    						//result
    						System.out.println("resp:"+ resp.getAns().toStringUtf8());
    
    						return resp.getAns();
    					}
    
    				});
    			} catch (IOException e) {
    				e.printStackTrace();
    			} catch (ServiceException e) {
    				e.printStackTrace();
    			} catch (Throwable e) {
    				e.printStackTrace();
    			}  
    	}
    
    }


  4. 利用jdk 1.6打包(切記jdk1.6,因為hbase用1.6打包的) 導出hellworld.jar 包名隨便起。

###部署

  1. 將包helloworld.jar 放在 %HBASE_HOME/lib/ 下就可以了。

  2. 重新啟動hbase

  3. 驗證

     [root@hdp22 ~ Desktop]# hbase shell
    hbase(main):001:0> import com.gzhdi.copocessor.HelloWorldEndPoint
    => Java::ComGzhdiCopocessor::HelloWorldEndPoint    //如果打印出這句話就說明包已經加載完畢


  4. 向指定表添加endpoint

    hbase(main):002:0> create 't1','f1'
    0 row(s) in 6.5290 seconds
    => Hbase::Table - t1   //創(chuàng)建表t1
    
    hbase(main):003:0> alter 't1','coprocessor'=>'|com.gzhdi.copocessor.HelloWorldEndPoint|1001|'
    Updating all regions with the new schema...
    0/1 regions updated.
    1/1 regions updated.
    Done.
    0 row(s) in 2.5960 seconds
    
    hbase(main):005:0> describe 't1'
    DESCRIPTION                                                                                                              ENABLED                                                          
     't1', {TABLE_ATTRIBUTES => {coprocessor$1 => '|com.gzhdi.copocessor.HelloWorldEndPoint|1001|'}, {NAME => 'f1', DATA_BLO true                                                             
     CK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERS                                                                  
     IONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE                                                                  
      => 'true'}                                                                                                                                                                              
    1 row(s) in 0.0940 seconds
    
    //OK 成功了


###調用 現(xiàn)在就可以使用你的客戶端代碼調用該服務了,需要制定zookeeper地址和表名(因為服務是針對表的)。

以上是“hbase0.98 coprocessor Endpoint如何實現(xiàn)HelloWorld”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI