溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Vertx使用EventBus如何實現(xiàn)發(fā)送或接收對象

發(fā)布時間:2020-11-17 14:26:45 來源:億速云 閱讀:193 作者:Leah 欄目:開發(fā)技術(shù)

這篇文章將為大家詳細講解有關(guān)Vertx使用EventBus如何實現(xiàn)發(fā)送或接收對象,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

先看官方文檔步驟:

Vertx使用EventBus如何實現(xiàn)發(fā)送或接收對象

需要一個編解碼器,看源碼:

Vertx使用EventBus如何實現(xiàn)發(fā)送或接收對象

可見內(nèi)置了需要數(shù)據(jù)類型的實現(xiàn),所以發(fā)送其他消息可以發(fā)送,但是如果發(fā)送自定義對象就需要自己實現(xiàn)編解碼邏輯了

一 自定義編解碼器

/**
 * 自定義對象編解碼器,兩個類型可用于消息轉(zhuǎn)換,即發(fā)送對象轉(zhuǎn)換為接受需要的對象
 */
public class CustomizeMessageCodec implements MessageCodec<OrderMessage, OrderMessage> {
  /**
   * 將消息實體封裝到Buffer用于傳輸
   * 實現(xiàn)方式:使用對象流從對象中獲取Byte數(shù)組然后追加到Buffer
   */
  @Override
  public void encodeToWire(Buffer buffer, OrderMessage orderMessage) {
    final ByteArrayOutputStream b = new ByteArrayOutputStream();
    try (ObjectOutputStream o = new ObjectOutputStream(b)){
      o.writeObject(orderMessage);
      o.close();
      buffer.appendBytes(b.toByteArray());
    } catch (IOException e) { e.printStackTrace(); }
  }
  //從Buffer中獲取消息對象
  @Override
  public OrderMessage decodeFromWire(int pos, Buffer buffer) {
    final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());
    OrderMessage msg = null;
    try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();
    } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }
    return msg;
  }
  //消息轉(zhuǎn)換
  @Override
  public OrderMessage transform(OrderMessage orderMessage) {
    System.out.println("消息轉(zhuǎn)換---");//可對接受消息進行轉(zhuǎn)換,比如轉(zhuǎn)換成另一個對象等
    orderMessage.setName("姚振");
    return orderMessage;
  }
  @Override
  public String name() { return "myCodec"; }
  //識別是否是用戶自定義編解碼器,通常為-1
  @Override
  public byte systemCodecID() { return -1; }
  public static MessageCodec create() {
    return new CustomizeMessageCodec();
  }
}

這里有一個點要注意,nam方法是必須的,且發(fā)送的時候一定要指明name

二 發(fā)送消息編寫

public class ProducerVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    EventBus eventBus = vertx.eventBus();
    //發(fā)布消息(群發(fā))
    eventBus.publish("com.hou", "群發(fā)祝福!");
    //發(fā)送消息(單發(fā)),只會發(fā)送注冊此地址的一個,采用不嚴格的輪詢算法選擇
    DeliveryOptions options = new DeliveryOptions();//設(shè)置消息頭等
    options.addHeader("some-header", "some-value");
    eventBus.send("com.hou", "單發(fā)消息",options,ar->{
      if(ar.succeeded()) System.out.println("收到消費者確認信息:"+ar.result().body());
    });
    //發(fā)送自定義對象,需要編解碼器
    eventBus.registerCodec(CustomizeMessageCodec.create());//注冊編碼器
    DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必須指定名字
    OrderMessage orderMessage = new OrderMessage();
    orderMessage.setName("侯征");
    eventBus.send("com.hou", orderMessage, options1);
  }
}

三 接受消息Verticle編寫

public class ConsumerVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    //每個Vertx實例默認是單例
    EventBus eb = vertx.eventBus();
    //注冊處理器,消費com.hou發(fā)送的消息
    MessageConsumer<Object> consumer = eb.consumer("com.hou");//訂閱地址
    consumer.handler(message -> {//消息處理器
      if(message.body() instanceof OrderMessage){
        System.out.println("接受到對象: " + ((OrderMessage) message.body()).getName());
      }
      System.out.println("我是普通消費者: " + message.body());
      message.reply("收到了!"); // 回復(fù)生產(chǎn)者,send才能接受
    }).completionHandler(res -> {//注冊完成后通知事件,適用于集群中比較慢的情況下
        System.out.println("注冊處理器結(jié)果"+res.succeeded());
    });
    //撤銷處理器
    //consumer.unregister();
  }
}

四 注冊部署Verticcle

vertx.deployVerticle(ConsumerVerticle.class.getName());
    TimeUnit.SECONDS.sleep(1);
    vertx.deployVerticle(ProducerVerticle.class.getName());

五 測試

Vertx使用EventBus如何實現(xiàn)發(fā)送或接收對象

關(guān)于Vertx使用EventBus如何實現(xiàn)發(fā)送或接收對象就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI