溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何在Springboot中利用Netty實現(xiàn)一個RPC服務(wù)器

發(fā)布時間:2021-01-11 14:30:06 來源:億速云 閱讀:206 作者:Leah 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)如何在Springboot中利用Netty實現(xiàn)一個RPC服務(wù)器,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

一、什么是RPC?

RPC(Remote Procedure Call)遠(yuǎn)程過程調(diào)用,是一種進程間的通信方式,其可以做到像調(diào)用本地方法那樣調(diào)用位于遠(yuǎn)程的計算機的服務(wù)。其實現(xiàn)的原理過程如下:

  • 本地的進程通過接口進行本地方法調(diào)用。

  • RPC客戶端將調(diào)用的接口名、接口方法、方法參數(shù)等信息利用網(wǎng)絡(luò)通信發(fā)送給RPC服務(wù)器。

  • RPC服務(wù)器對請求進行解析,根據(jù)接口名、接口方法、方法參數(shù)等信息找到對應(yīng)的方法實現(xiàn),并進行本地方法調(diào)用,然后將方法調(diào)用結(jié)果響應(yīng)給RPC客戶端。

二、實現(xiàn)RPC需要解決那些問題?

1. 約定通信協(xié)議格式

RPC分為客戶端與服務(wù)端,就像HTTP一樣,我們需要定義交互的協(xié)議格式。主要包括三個方面:

  • 請求格式

  • 響應(yīng)格式

  • 網(wǎng)絡(luò)通信時數(shù)據(jù)的序列化方式

RPC請求

@Data
public class RpcRequest {
 /**
  * 請求ID 用來標(biāo)識本次請求以匹配RPC服務(wù)器的響應(yīng)
  */
 private String requestId;
 /**
  * 調(diào)用的類(接口)權(quán)限定名稱
  */
 private String className;
 /**
  * 調(diào)用的方法名
  */
 private String methodName;
 /**
  * 方法參類型列表
  */
 private Class<?>[] parameterTypes;
 /**
  * 方法參數(shù)
  */
 private Object[] parameters;
}

RPC響應(yīng)

@Data
public class RpcResponse {
 /**
  * 響應(yīng)對應(yīng)的請求ID
  */
 private String requestId;
 /**
  * 調(diào)用是否成功的標(biāo)識
  */
 private boolean success = true;
 /**
  * 調(diào)用錯誤信息
  */
 private String errorMessage;
 /**
  * 調(diào)用結(jié)果
  */
 private Object result;
}

2. 序列化方式

序列化方式可以使用JDK自帶的序列化方式或者一些第三方的序列化方式,JDK自帶的由于性能較差所以不推薦。我們這里選擇JSON作為序列化協(xié)議,即將請求和響應(yīng)對象序列化為JSON字符串后發(fā)送到對端,對端接收到后反序列為相應(yīng)的對象,這里采用阿里的 fastjson 作為JSON序列化框架。

3. TCP粘包、拆包

TCP是個“流”協(xié)議,所謂流,就是沒有界限的一串?dāng)?shù)據(jù)。大家可以想想河里的流水,是連成一片的,其間并沒有分界線。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會根據(jù)TCP緩沖區(qū)的實際情況進行包的劃分,所以在業(yè)務(wù)上認(rèn)為,一個完整的包可能會被TCP拆分成多個包進行發(fā)送,也有可能把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。粘包和拆包需要應(yīng)用層程序來解決。

我們采用在請求和響應(yīng)的頭部保存消息體的長度的方式解決粘包和拆包問題。請求和響應(yīng)的格式如下:

 +--------+----------------+
 | Length | Content  |
 | 4字節(jié) | Length個字節(jié) |
 +--------+----------------+

4. 網(wǎng)絡(luò)通信框架的選擇

出于性能的考慮,RPC一般選擇異步非阻塞的網(wǎng)絡(luò)通信方式,JDK自帶的NIO網(wǎng)絡(luò)編程操作繁雜,Netty是一款基于NIO開發(fā)的網(wǎng)絡(luò)通信框架,其對java NIO進行封裝對外提供友好的API,并且內(nèi)置了很多開箱即用的組件,如各種編碼解碼器。所以我們采用Netty作為RPC服務(wù)的網(wǎng)絡(luò)通信框架。

三、RPC服務(wù)端

RPC分為客戶端和服務(wù)端,它們有一個共同的服務(wù)接口API,我們首先定義一個接口 HelloService

public interface HelloService {
 String sayHello(String name);
}

然后服務(wù)端需要提供該接口的實現(xiàn)類,然后使用自定義的@RpcService注解標(biāo)注,該注解擴展自@Component,被其標(biāo)注的類可以被Spring的容器管理。

@RpcService
public class HelloServiceImp implements HelloService {
 @Override
 public String sayHello(String name) {
  return "Hello " + name;
 }
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
 
}

RPC服務(wù)器類

我們實現(xiàn)了ApplicationContextAware接口,以便從bean容器中取出@RpcService實現(xiàn)類,存入我們的map容器中。

@Component
@Slf4j
public class RpcServer implements ApplicationContextAware, InitializingBean {
 // RPC服務(wù)實現(xiàn)容器
 private Map<String, Object> rpcServices = new HashMap<>();
 @Value("${rpc.server.port}")
 private int port;

 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  Map<String, Object> services = applicationContext.getBeansWithAnnotation(RpcService.class);
  for (Map.Entry<String, Object> entry : services.entrySet()) {
   Object bean = entry.getValue();
   Class<?>[] interfaces = bean.getClass().getInterfaces();
   for (Class<?> inter : interfaces) {
    rpcServices.put(inter.getName(), bean);
   }
  }
  log.info("加載RPC服務(wù)數(shù)量:{}", rpcServices.size());
 }

 @Override
 public void afterPropertiesSet() {
  start();
 }

 private void start(){
  new Thread(() -> {
   EventLoopGroup boss = new NioEventLoopGroup(1);
   EventLoopGroup worker = new NioEventLoopGroup();
   try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(boss, worker)
      .childHandler(new ChannelInitializer<SocketChannel>() {
       @Override
       protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 60));
        pipeline.addLast(new JsonDecoder());
        pipeline.addLast(new JsonEncoder());
        pipeline.addLast(new RpcInboundHandler(rpcServices));
       }
      })
      .channel(NioServerSocketChannel.class);
    ChannelFuture future = bootstrap.bind(port).sync();
    log.info("RPC 服務(wù)器啟動, 監(jiān)聽端口:" + port);
    future.channel().closeFuture().sync();
   }catch (Exception e){
    e.printStackTrace();
    boss.shutdownGracefully();
    worker.shutdownGracefully();
   }
  }).start();

 }
}

RpcServerInboundHandler 負(fù)責(zé)處理RPC請求

@Slf4j
public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter {
 private Map<String, Object> rpcServices;

 public RpcServerInboundHandler(Map<String, Object> rpcServices){
  this.rpcServices = rpcServices;
 }

 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  log.info("客戶端連接成功,{}", ctx.channel().remoteAddress());
 }

 public void channelInactive(ChannelHandlerContext ctx) {
  log.info("客戶端斷開連接,{}", ctx.channel().remoteAddress());
  ctx.channel().close();
 }

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg){
  RpcRequest rpcRequest = (RpcRequest) msg;
  log.info("接收到客戶端請求, 請求接口:{}, 請求方法:{}", rpcRequest.getClassName(), rpcRequest.getMethodName());
  RpcResponse response = new RpcResponse();
  response.setRequestId(rpcRequest.getRequestId());
  Object result = null;
  try {
   result = this.handleRequest(rpcRequest);
   response.setResult(result);
  } catch (Exception e) {
   e.printStackTrace();
   response.setSuccess(false);
   response.setErrorMessage(e.getMessage());
  }
  log.info("服務(wù)器響應(yīng):{}", response);
  ctx.writeAndFlush(response);
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  log.info("連接異常");
  ctx.channel().close();
  super.exceptionCaught(ctx, cause);
 }

 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if (evt instanceof IdleStateEvent){
   IdleStateEvent event = (IdleStateEvent)evt;
   if (event.state()== IdleState.ALL_IDLE){
    log.info("客戶端已超過60秒未讀寫數(shù)據(jù), 關(guān)閉連接.{}",ctx.channel().remoteAddress());
    ctx.channel().close();
   }
  }else{
   super.userEventTriggered(ctx,evt);
  }
 }

 private Object handleRequest(RpcRequest rpcRequest) throws Exception{
  Object bean = rpcServices.get(rpcRequest.getClassName());
  if(bean == null){
   throw new RuntimeException("未找到對應(yīng)的服務(wù): " + rpcRequest.getClassName());
  }
  Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
  method.setAccessible(true);
  return method.invoke(bean, rpcRequest.getParameters());
 }
}

四、RPC客戶端

/**
 * RPC遠(yuǎn)程調(diào)用的客戶端
 */
@Slf4j
@Component
public class RpcClient {
 @Value("${rpc.remote.ip}")
 private String remoteIp;

 @Value("${rpc.remote.port}")
 private int port;

 private Bootstrap bootstrap;

 // 儲存調(diào)用結(jié)果
 private final Map<String, SynchronousQueue<RpcResponse>> results = new ConcurrentHashMap<>();

 public RpcClient(){

 }

 @PostConstruct
 public void init(){
  bootstrap = new Bootstrap().remoteAddress(remoteIp, port);
  NioEventLoopGroup worker = new NioEventLoopGroup(1);
  bootstrap.group(worker)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
     @Override
     protected void initChannel(SocketChannel channel) throws Exception {
      ChannelPipeline pipeline = channel.pipeline();
      pipeline.addLast(new IdleStateHandler(0, 0, 10));
      pipeline.addLast(new JsonEncoder());
      pipeline.addLast(new JsonDecoder());
      pipeline.addLast(new RpcClientInboundHandler(results));
     }
    });
 }

 public RpcResponse send(RpcRequest rpcRequest) {
  RpcResponse rpcResponse = null;
  rpcRequest.setRequestId(UUID.randomUUID().toString());
  Channel channel = null;
  try {
   channel = bootstrap.connect().sync().channel();
   log.info("連接建立, 發(fā)送請求:{}", rpcRequest);
   channel.writeAndFlush(rpcRequest);
   SynchronousQueue<RpcResponse> queue = new SynchronousQueue<>();
   results.put(rpcRequest.getRequestId(), queue);
   // 阻塞等待獲取響應(yīng)
   rpcResponse = queue.take();
   results.remove(rpcRequest.getRequestId());
  } catch (InterruptedException e) {
   e.printStackTrace();
  } finally {
   if(channel != null && channel.isActive()){
    channel.close();
   }
  }
  return rpcResponse;
 }
}

RpcClientInboundHandler負(fù)責(zé)處理服務(wù)端的響應(yīng)

@Slf4j
public class RpcClientInboundHandler extends ChannelInboundHandlerAdapter {
 private Map<String, SynchronousQueue<RpcResponse>> results;

 public RpcClientInboundHandler(Map<String, SynchronousQueue<RpcResponse>> results){
  this.results = results;
 }

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  RpcResponse rpcResponse = (RpcResponse) msg;
  log.info("收到服務(wù)器響應(yīng):{}", rpcResponse);
  if(!rpcResponse.isSuccess()){
   throw new RuntimeException("調(diào)用結(jié)果異常,異常信息:" + rpcResponse.getErrorMessage());
  }
  // 取出結(jié)果容器,將response放進queue中
  SynchronousQueue<RpcResponse> queue = results.get(rpcResponse.getRequestId());
  queue.put(rpcResponse);
 }

 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if (evt instanceof IdleStateEvent){
   IdleStateEvent event = (IdleStateEvent)evt;
   if (event.state() == IdleState.ALL_IDLE){
    log.info("發(fā)送心跳包");
    RpcRequest request = new RpcRequest();
    request.setMethodName("heartBeat");
    ctx.channel().writeAndFlush(request);
   }
  }else{
   super.userEventTriggered(ctx, evt);
  }
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
  log.info("異常:{}", cause.getMessage());
  ctx.channel().close();
 }
}

接口代理

為了使客戶端像調(diào)用本地方法一樣調(diào)用遠(yuǎn)程服務(wù),我們需要對接口進行動態(tài)代理。

代理類實現(xiàn)

@Component
public class RpcProxy implements InvocationHandler {

 @Autowired
 private RpcClient rpcClient;

 @Override
 public Object invoke(Object proxy, Method method, Object[] args){
  RpcRequest rpcRequest = new RpcRequest();
  rpcRequest.setClassName(method.getDeclaringClass().getName());
  rpcRequest.setMethodName(method.getName());
  rpcRequest.setParameters(args);
  rpcRequest.setParameterTypes(method.getParameterTypes());

  RpcResponse rpcResponse = rpcClient.send(rpcRequest);
  return rpcResponse.getResult();
 }
}

實現(xiàn)FactoryBean接口,將生產(chǎn)動態(tài)代理類納入 Spring 容器管理。

public class RpcFactoryBean<T> implements FactoryBean<T> {
 private Class<T> interfaceClass;

 @Autowired
 private RpcProxy rpcProxy;

 public RpcFactoryBean(Class<T> interfaceClass){
  this.interfaceClass = interfaceClass;
 }

 @Override
 public T getObject(){
  return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, rpcProxy);
 }

 @Override
 public Class<?> getObjectType() {
  return interfaceClass;
 }
}

自定義類路徑掃描器,掃描包下的RPC接口,動態(tài)生產(chǎn)代理類,納入 Spring 容器管理

public class RpcScanner extends ClassPathBeanDefinitionScanner {

 public RpcScanner(BeanDefinitionRegistry registry) {
  super(registry);
 }

 @Override
 protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
  Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages);
  for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
   GenericBeanDefinition beanDefinition = (GenericBeanDefinition)beanDefinitionHolder.getBeanDefinition();
   beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(beanDefinition.getBeanClassName());
   beanDefinition.setBeanClassName(RpcFactoryBean.class.getName());
  }
  return beanDefinitionHolders;
 }

 @Override
 protected boolean isCandidateComponent(MetadataReader metadataReader) throws IOException {
  return true;
 }

 @Override
 protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
  return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
 }
}
@Component
public class RpcBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {
 @Override
 public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
  RpcScanner rpcScanner = new RpcScanner(registry);
  // 傳入RPC接口所在的包名
  rpcScanner.scan("com.ygd.rpc.common.service");
 }

 @Override
 public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
 }
}

JSON編解碼器

/**
 * 將 RpcRequest 編碼成字節(jié)序列發(fā)送
 * 消息格式: Length + Content
 * Length使用int存儲,標(biāo)識消息體的長度
 *
 * +--------+----------------+
 * | Length | Content  |
 * | 4字節(jié) | Length個字節(jié) |
 * +--------+----------------+
 */
public class JsonEncoder extends MessageToByteEncoder<RpcRequest> {
 @Override
 protected void encode(ChannelHandlerContext ctx, RpcRequest rpcRequest, ByteBuf out){
  byte[] bytes = JSON.toJSONBytes(rpcRequest);
  // 將消息體的長度寫入消息頭部
  out.writeInt(bytes.length);
  // 寫入消息體
  out.writeBytes(bytes);
 }
}
/**
 * 將響應(yīng)消息解碼成 RpcResponse
 */
public class JsonDecoder extends LengthFieldBasedFrameDecoder {

 public JsonDecoder(){
  super(Integer.MAX_VALUE, 0, 4, 0, 4);
 }

 @Override
 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  ByteBuf msg = (ByteBuf) super.decode(ctx, in);
  byte[] bytes = new byte[msg.readableBytes()];
  msg.readBytes(bytes);
  RpcResponse rpcResponse = JSON.parseObject(bytes, RpcResponse.class);
  return rpcResponse;
 }
}

測試
我們編寫一個Controller進行測試

@RestController
@RequestMapping("/hello")
public class HelloController {
 @Autowired
 private HelloService helloService;
 @GetMapping("/sayHello")
 public String hello(String name){
  return helloService.sayHello(name);
 }
}

通過 PostMan調(diào)用 controller 接口 http://localhost:9998/hello/sayHello?name=小明

響應(yīng): Hello 小明

關(guān)于如何在Springboot中利用Netty實現(xiàn)一個RPC服務(wù)器就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI