溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

hbase0.98.9中如何實現(xiàn)endpoints

發(fā)布時間:2021-11-24 15:53:46 來源:億速云 閱讀:142 作者:柒染 欄目:云計算

本篇文章為大家展示了hbase0.98.9中如何實現(xiàn)endpoints,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

定制一個endpoint的過程。

下面是實現(xiàn)過程:

1、定義接口描述文件(該功能有protobuf提供出來

option java_package = "coprocessor.endpoints.generated";
option java_outer_classname = "RowCounterEndpointProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service RowCountService {
  rpc getRowCount(CountRequest)
    returns (CountResponse);
  rpc getKeyValueCount(CountRequest)
    returns (CountResponse);
}

這個文件我直接拿的hbase提供的example中的例子。其中的語法應(yīng)該有過類似經(jīng)驗的一看就清楚了,實在不清楚就請查查protobuf的幫助手冊吧。

2、根據(jù)接口描述文件生成java接口類(該功能有protobuf提供出來)

有了接口描述文件,還需要生成java語言的接口類。這個需要借助protobuf提供的工具protoc。

$protoc --java_out=./ Examples.proto

簡單解釋下,protoc這個命令在你裝了protobuf后就有了。Examples.proto這個是文件名,也就是剛才編寫的那個接口描述文件。“--java_out”這個用來指定生成后的java類放的地方。

所以,這地方如果你沒有裝protobuf,你需要裝一個,window和linux版都有,多說一句,如果你去裝hadoop64位的編譯環(huán)境的話,應(yīng)該是要裝protobuf。

3、實現(xiàn)接口

package coprocessor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;

import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;

public class RowCounterEndpointExample extends RowCountService implements
		Coprocessor, CoprocessorService {
	private RegionCoprocessorEnvironment env;

	public RowCounterEndpointExample() {
	}

	@Override
	public Service getService() {
		return this;
	}

	@Override
	public void getRowCount(RpcController controller, CountRequest request,
			RpcCallback<CountResponse> done) {
		Scan scan = new Scan();
		scan.setFilter(new FirstKeyOnlyFilter());
		CountResponse response = null;
		InternalScanner scanner = null;
		try {
			scanner = env.getRegion().getScanner(scan);
			List<Cell> results = new ArrayList<Cell>();
			boolean hasMore = false;
			byte[] lastRow = null;
			long count = 0;
			do {
				hasMore = scanner.next(results);
				for (Cell kv : results) {
					byte[] currentRow = CellUtil.cloneRow(kv);
					if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
						lastRow = currentRow;
						count++;
					}
				}
				results.clear();
			} while (hasMore);

			response = CountResponse.newBuilder().setCount(count).build();
		} catch (IOException ioe) {
			ResponseConverter.setControllerException(controller, ioe);
		} finally {
			if (scanner != null) {
				try {
					scanner.close();
				} catch (IOException ignored) {
				}
			}
		}
		done.run(response);
	}

	@Override
	public void getKeyValueCount(RpcController controller,
			CountRequest request, RpcCallback<CountResponse> done) {
		CountResponse response = null;
		InternalScanner scanner = null;
		try {
			scanner = env.getRegion().getScanner(new Scan());
			List<Cell> results = new ArrayList<Cell>();
			boolean hasMore = false;
			long count = 0;
			do {
				hasMore = scanner.next(results);
				for (Cell kv : results) {
					count++;
				}
				results.clear();
			} while (hasMore);

			response = CountResponse.newBuilder().setCount(count).build();
		} catch (IOException ioe) {
			ResponseConverter.setControllerException(controller, ioe);
		} finally {
			if (scanner != null) {
				try {
					scanner.close();
				} catch (IOException ignored) {
				}
			}
		}
		done.run(response);
	}

	@Override
	public void start(CoprocessorEnvironment env) throws IOException {
		if (env instanceof RegionCoprocessorEnvironment) {
			this.env = (RegionCoprocessorEnvironment) env;
		} else {
			throw new CoprocessorException("Must be loaded on a table region!");
		}
	}

	@Override
	public void stop(CoprocessorEnvironment env) throws IOException {
		// TODO Auto-generated method stub

	}

}

4、注冊接口(Hbase功能,通過配置文件或者表模式方式注冊

這部分,可以看hbase權(quán)威指南了,我就看這部分做的。

5、測試調(diào)用

package coprocessor;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.ServiceException;

import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountRequest;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.CountResponse;
import coprocessor.endpoints.generated.RowCounterEndpointProtos.RowCountService;
import util.HBaseHelper;

public class RowCounterEndpointClientExample {
	public static void main(String[] args) throws ServiceException, Throwable {
		Configuration conf = HBaseConfiguration.create();
		HBaseHelper helper = HBaseHelper.getHelper(conf);
		//helper.dropTable("testtable");
		//helper.createTable("testtable", "colfam1", "colfam2");
		System.out.println("Adding rows to table...");
		helper.fillTable("testtable", 1, 10, 10, "colfam1", "colfam2");

		HTable table = new HTable(conf, "testtable");

		final CountRequest request = CountRequest.getDefaultInstance();
		
		final Batch.Call<RowCountService, Long> call =new Batch.Call<RowCountService, Long>() {
			public Long call(RowCountService counter)
					throws IOException {
				ServerRpcController controller = new ServerRpcController();
				BlockingRpcCallback<CountResponse> rpcCallback = new BlockingRpcCallback<CountResponse>();
				counter.getRowCount(controller, request, rpcCallback);
				CountResponse response = rpcCallback.get();
				if (controller.failedOnException()) {
					throw controller.getFailedOn();
				}
				return (response != null && response.hasCount()) ? response
						.getCount() : 0;
			}
		};
		
		Map<byte[], Long> results = table.coprocessorService(
				RowCountService.class, null, null, call);
		
		for(byte[] b : results.keySet()){
			System.err.println(Bytes.toString(b) + ":" + results.get(b));
		} 
	}
}

上述內(nèi)容就是hbase0.98.9中如何實現(xiàn)endpoints,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI