您好,登錄后才能下訂單哦!
現(xiàn)在很多地方都會用到zookeeper, 用到它的地方就是為了實現(xiàn)分布式。用到的場景就是服務注冊,比如一個集群服務器,需要知道哪些服務器在線,哪些服務器不在線。
ZK有一個功能,就是創(chuàng)建臨時節(jié)點,當機器啟動應用的時候就會連接到一個ZK節(jié)點,然后創(chuàng)建一個臨時節(jié)點,那么通過獲取監(jiān)聽該路徑,并且獲取該路徑下的節(jié)點數(shù)量就知道有哪些服務器在線了。當機器停止應用的時候,zk的臨時節(jié)點將會自動被刪除。我們通過這個機制去實現(xiàn)。
這次主要實現(xiàn)是采用springboot, zkui, swagger實現(xiàn)。接下來來看一下主要的代碼實現(xiàn):
在機器啟動的時候獲取本機的IP,然后將本機的IP和指定的端口號注冊到程序中:
@SpringBootApplication
public class ZKApplication implements CommandLineRunner{
@Autowired
AppConfig appConfig;
@Autowired
ZKUtil zkUtil;
public static void main(String[] args) {
SpringApplication.run(ZKApplication.class, args);
System.out.println("啟動應用成功");
}
@Override
public void run(String... strings) throws Exception {
//獲得本機IP
String addr = InetAddress.getLocalHost().getHostAddress();
Thread thread = new Thread(new ZKRegister(appConfig, zkUtil, addr));
thread.setName("register-thread");
thread.start();
Thread scanner = new Thread(new Scanner());
scanner.start();
}
}
創(chuàng)建一個工具類,工具類主要實現(xiàn)創(chuàng)建父節(jié)點,創(chuàng)建臨時路徑,監(jiān)聽事件,獲取所有注冊節(jié)點。
/**
* 創(chuàng)建臨時目錄
*/
public void createEphemeralNode(String path, String value) {
zkClient.createEphemeral(path, value);
}
/**
* 監(jiān)聽事件
*/
public void subscribeEvent(String path) {
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath:" + parentPath + ":list:" + currentChilds.toString());
}
});
}
這塊就基本完成了,下面開始創(chuàng)建controller,目的是為了獲取所有在線機器的節(jié)點。為了方便測試和查看我使用了Swagger2, 這樣界面話的發(fā)請求工具特別好用。
接下來看controller的主要內(nèi)容:
/**
* 獲取所有路由節(jié)點
* @return
*/
@ApiOperation("獲取所有路由節(jié)點")
@RequestMapping(value = "getAllRoute",method = RequestMethod.POST)
@ResponseBody()
public List<String> getAllRoute(){
List<String> allNode = zkUtil.getAllNode();
List<String> result = new ArrayList<>();
for (String node : allNode) {
String key = node.split("-")[1];
result.add(key);
}
return result ;
}
同時配置對應的Swagger2
/**
* Created by huangqingshi on 2019/1/8.
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Value("${swagger.switch}")
private boolean swaggerSwitch;
@Bean
public Docket api() {
Docket docket = new Docket(DocumentationType.SWAGGER_2);
docket.enable(swaggerSwitch);
docket
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.hqs.zk.register.controller")).paths(PathSelectors.any()).build();
return docket;
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("Spring boot zk register")
.description("測試")
.contact(new Contact("黃青石","http://www.cnblogs.com/huangqingshi","68344150@qq.com"))
.termsOfServiceUrl("http://www.cnblogs.com/huangqingshi")
.version("1.0")
.build();
}
}
好了,接下來該啟動工程了,啟動之后訪問:http://localhost:8080/swagger-ui.html
點擊下面的zk-controller,對應controller的方法就會顯示出來,然后點try it out, execute 相應的結果就直接出來了, 通過下面的圖片,可以發(fā)現(xiàn)我本機的IP已經(jīng)注冊到里邊了。
接下來,咱們使用ZKUI連接上zookeeper,看一下是否真的有注冊的機器(父節(jié)點用的monior),已經(jīng)存在了,沒有問題:
注冊這塊就算實現(xiàn)完了,我一直想實現(xiàn)一個簡易的聊天,參考了各種資料然后實現(xiàn)了一把,也算圓夢了。下面開始實現(xiàn)簡易netty版聊天(為什么選擇netty?因為這個工具非常棒),使用protobuf進行序列化和反序列化:
首先從官網(wǎng)上下載protobuf工具,注意對應不同的操作系統(tǒng),我的是WINDOWS的,直接下載一個EXE程序,你下載的哪個版本,需要使用與該版本對應的版本號,否則會出錯誤。
自己創(chuàng)建好對應的Request.proto和Response.proto,在里邊指定好對應的字段和包名信息。分別執(zhí)行命令即可生成對應的文件:protoc.exe ./Response.proto --java_out=./ 這個是生成Response的,還需要指定一條生成Request。
將文件夾放到工程里邊,工程的大致接入如下:
Server的主要實現(xiàn),主要基于protoBuf固定長度的進行實現(xiàn)的(序列化和反序列化一般通過固定長度或者分隔符實現(xiàn)),這樣的話就不會造成粘包、拆包的問題。
如果看到這里,說明你喜歡這篇文章,關注我每日技術博文推送。
public void bind(int port) throws Exception {
//配置服務器端NIO線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
socketChannel.pipeline().addLast(new ProtobufDecoder(RequestProto.ReqProtocol.getDefaultInstance())).
addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder());
socketChannel.pipeline().addLast(new ProBufServerHandler());
}
});
//綁定端口,同步等待
ChannelFuture f = b.bind(port).sync();
if (f.isSuccess()) {
System.out.println("啟動 server 成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
客戶端主要兩個方式,一個方式是客戶端向服務端發(fā)請求,一個方式是群組發(fā)消息,我為了快速實現(xiàn),就直接發(fā)一條請求,并且將結果輸出到日志中了??蛻舳耸褂靡粋€線程執(zhí)行兩個不同的方法,然后將一個是發(fā)送給Server, 一個是發(fā)送給Group。發(fā)送給Server比較簡單就直接給Server了。
@PostConstruct
public void start() throws Exception{
connection(appConfig.getNettyServer(), appConfig.getNettyPort());
for(int i = 1; i <= 1; i++) {
int j = i;
Runnable runnable = () -> {
try {
sendMesgToServer(j);
sendMesgToGroup(j);
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(runnable).start();
}
}
發(fā)送給Group的話需要記住每次過來的唯一requestId,并且保存對應的channel,然后發(fā)送消息的時候遍歷所有requestId,并且與之對應的發(fā)送消息:
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestProto.ReqProtocol reqProtocol) throws Exception {
RequestProto.ReqProtocol req = reqProtocol;
CHANNEL_MAP.putIfAbsent(req.getRequestId(), (NioSocketChannel)channelHandlerContext.channel());
// System.out.println("get Msg from Client:" + req.getReqMsg());
handleReq(req);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
public void handleReq(RequestProto.ReqProtocol req) {
Long originRequestId = req.getRequestId();
if(req.getType() == Constants.CommandType.SERVER) {
NioSocketChannel nioSocketChannel = CHANNEL_MAP.get(req.getRequestId());
sendMsg(nioSocketChannel, originRequestId, originRequestId, Constants.CommandType.SERVER, "hello client");
} else if(req.getType() == Constants.CommandType.GROUP) {
for(Map.Entry<Long, NioSocketChannel> entry : CHANNEL_MAP.entrySet()) {
//過濾自己收消息
if(entry.getKey() == originRequestId) {
continue;
}
sendMsg(entry.getValue(), originRequestId, entry.getKey(), Constants.CommandType.GROUP, req.getReqMsg());
}
}
}
輸出的結果如下,自定義兩個客戶端,一個requestId是1L,另一個requestId是2L,然后都在啟動的時候sleep 3秒,然后發(fā)送給server。sleep5秒發(fā)送到Group里邊去,輸出的結果就是如下這個樣子的。
1L : send message to server successful!
2L : send message to server successful!
get Msg from Server: 2:hello client
received id:2- send to id:2
received id:1- send to id:1
get Msg from Server: 1:hello client
received id:1- send to id:2
get Msg from Group: 1:hello peoole in group
received id:2- send to id:1
get Msg from Group: 2:hello peoole in group
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。