溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

spring與disruptor集成的示例分析

發(fā)布時(shí)間:2021-08-25 13:44:36 來(lái)源:億速云 閱讀:160 作者:小新 欄目:編程語(yǔ)言

這篇文章給大家分享的是有關(guān)spring與disruptor集成的示例分析的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。

disruptor不過(guò)多介紹了,描述下當(dāng)前的業(yè)務(wù)場(chǎng)景,兩個(gè)應(yīng)用A,B,應(yīng)用 A 向應(yīng)用 B 傳遞數(shù)據(jù) . 數(shù)據(jù)傳送比較快,如果用http直接push數(shù)據(jù)然后入庫(kù),效率不高.有可能導(dǎo)致A應(yīng)用比較大的壓力. 使用mq 太重量級(jí),所以選擇了disruptor. 也可以使用Reactor

BaseQueueHelper.java

/**
 * lmax.disruptor 高效隊(duì)列處理模板. 支持初始隊(duì)列,即在init()前進(jìn)行發(fā)布。
 *
 * 調(diào)用init()時(shí)才真正啟動(dòng)線(xiàn)程開(kāi)始處理 系統(tǒng)退出自動(dòng)清理資源.
 *
 * @author xielongwang
 * @create 2018-01-18 下午3:49
 * @email xielong.wang@nvr-china.com
 * @description
 */
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {

  /**
   * 記錄所有的隊(duì)列,系統(tǒng)退出時(shí)統(tǒng)一清理資源
   */
  private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
  /**
   * Disruptor 對(duì)象
   */
  private Disruptor<E> disruptor;
  /**
   * RingBuffer
   */
  private RingBuffer<E> ringBuffer;
  /**
   * initQueue
   */
  private List<D> initQueue = new ArrayList<D>();

  /**
   * 隊(duì)列大小
   *
   * @return 隊(duì)列長(zhǎng)度,必須是2的冪
   */
  protected abstract int getQueueSize();

  /**
   * 事件工廠
   *
   * @return EventFactory
   */
  protected abstract EventFactory<E> eventFactory();

  /**
   * 事件消費(fèi)者
   *
   * @return WorkHandler[]
   */
  protected abstract WorkHandler[] getHandler();

  /**
   * 初始化
   */
  public void init() {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
    disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
    disruptor.setDefaultExceptionHandler(new MyHandlerException());
    disruptor.handleEventsWithWorkerPool(getHandler());
    ringBuffer = disruptor.start();

    //初始化數(shù)據(jù)發(fā)布
    for (D data : initQueue) {
      ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
        @Override
        public void translateTo(E event, long sequence, D data) {
          event.setValue(data);
        }
      }, data);
    }

    //加入資源清理鉤子
    synchronized (queueHelperList) {
      if (queueHelperList.isEmpty()) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
            for (BaseQueueHelper baseQueueHelper : queueHelperList) {
              baseQueueHelper.shutdown();
            }
          }
        });
      }
      queueHelperList.add(this);
    }
  }

  /**
   * 如果要改變線(xiàn)程執(zhí)行優(yōu)先級(jí),override此策略. YieldingWaitStrategy會(huì)提高響應(yīng)并在閑時(shí)占用70%以上CPU,
   * 慎用SleepingWaitStrategy會(huì)降低響應(yīng)更減少CPU占用,用于日志等場(chǎng)景.
   *
   * @return WaitStrategy
   */
  protected abstract WaitStrategy getStrategy();

  /**
   * 插入隊(duì)列消息,支持在對(duì)象init前插入隊(duì)列,則在隊(duì)列建立時(shí)立即發(fā)布到隊(duì)列處理.
   */
  public synchronized void publishEvent(D data) {
    if (ringBuffer == null) {
      initQueue.add(data);
      return;
    }
    ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
      @Override
      public void translateTo(E event, long sequence, D data) {
        event.setValue(data);
      }
    }, data);
  }

  /**
   * 關(guān)閉隊(duì)列
   */
  public void shutdown() {
    disruptor.shutdown();
  }
}

EventFactory.java

/**
 * @author xielongwang
 * @create 2018-01-18 下午6:24
 * @email xielong.wang@nvr-china.com
 * @description
 */
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {

  @Override
  public SeriesDataEvent newInstance() {
    return new SeriesDataEvent();
  }
}

MyHandlerException.java

public class MyHandlerException implements ExceptionHandler {

  private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);

  /*
   * (non-Javadoc) 運(yùn)行過(guò)程中發(fā)生時(shí)的異常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
   * , long, java.lang.Object)
   */
  @Override
  public void handleEventException(Throwable ex, long sequence, Object event) {
    ex.printStackTrace();
    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
  }

  /*
   * (non-Javadoc) 啟動(dòng)時(shí)的異常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
   * Throwable)
   */
  @Override
  public void handleOnStartException(Throwable ex) {
    logger.error("start disruptor error ==[{}]!", ex.getMessage());
  }

  /*
   * (non-Javadoc) 關(guān)閉時(shí)的異常
   *
   * @see
   * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
   * .Throwable)
   */
  @Override
  public void handleOnShutdownException(Throwable ex) {
    logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
  }
}

SeriesData.java (代表應(yīng)用A發(fā)送給應(yīng)用B的消息)

public class SeriesData {
  private String deviceInfoStr;
  public SeriesData() {
  }

  public SeriesData(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }

  public String getDeviceInfoStr() {
    return deviceInfoStr;
  }

  public void setDeviceInfoStr(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }

  @Override
  public String toString() {
    return "SeriesData{" +
        "deviceInfoStr='" + deviceInfoStr + '\'' +
        '}';
  }
}

SeriesDataEvent.java

public class SeriesDataEvent extends ValueWrapper<SeriesData> {
}

SeriesDataEventHandler.java

public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
  private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
  @Autowired
  private DeviceInfoService deviceInfoService;

  @Override
  public void onEvent(SeriesDataEvent event) {
    if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
      logger.warn("receiver series data is empty!");
    }
    //業(yè)務(wù)處理
    deviceInfoService.processData(event.getValue().getDeviceInfoStr());
  }
}

SeriesDataEventQueueHelper.java

@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
  private static final int QUEUE_SIZE = 1024;
  @Autowired
  private List<SeriesDataEventHandler> seriesDataEventHandler;

  @Override
  protected int getQueueSize() {
    return QUEUE_SIZE;
  }

  @Override
  protected com.lmax.disruptor.EventFactory eventFactory() {
    return new EventFactory();
  }

  @Override
  protected WorkHandler[] getHandler() {
    int size = seriesDataEventHandler.size();
    SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
    return paramEventHandlers;
  }

  @Override
  protected WaitStrategy getStrategy() {
    return new BlockingWaitStrategy();
    //return new YieldingWaitStrategy();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    this.init();
  }
}

ValueWrapper.java

public abstract class ValueWrapper<T> {
  private T value;
  public ValueWrapper() {}
  public ValueWrapper(T value) {
    this.value = value;
  }

  public T getValue() {
    return value;
  }

  public void setValue(T value) {
    this.value = value;
  }
}

DisruptorConfig.java

@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多實(shí)例幾個(gè)消費(fèi)者
public class DisruptorConfig {

  /**
   * smsParamEventHandler1
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler1() {
    return new SeriesDataEventHandler();
  }

  /**
   * smsParamEventHandler2
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler2() {
    return new SeriesDataEventHandler();
  }

  /**
   * smsParamEventHandler3
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler3() {
    return new SeriesDataEventHandler();
  }


  /**
   * smsParamEventHandler4
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler4() {
    return new SeriesDataEventHandler();
  }

  /**
   * smsParamEventHandler5
   *
   * @return SeriesDataEventHandler
   */
  @Bean
  public SeriesDataEventHandler smsParamEventHandler5() {
    return new SeriesDataEventHandler();
  }
}

測(cè)試

  //注入SeriesDataEventQueueHelper消息生產(chǎn)者
  @Autowired
  private SeriesDataEventQueueHelper seriesDataEventQueueHelper;

  @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
  public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
    long startTime1 = System.currentTimeMillis();

    if (StringUtils.isEmpty(deviceData)) {
      logger.info("receiver data is empty !");
      return new DataResponseVo<String>(400, "failed");
    }
    seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
    long startTime2 = System.currentTimeMillis();
    logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
    return new DataResponseVo<String>(200, "success");
  }

應(yīng)用A通過(guò)/data 接口把數(shù)據(jù)發(fā)送到應(yīng)用B ,然后通過(guò)seriesDataEventQueueHelper 把消息發(fā)給disruptor隊(duì)列,消費(fèi)者去消費(fèi),整個(gè)過(guò)程對(duì)不會(huì)堵塞應(yīng)用A. 可接受消息丟失, 可以通過(guò)擴(kuò)展SeriesDataEventQueueHelper來(lái)達(dá)到對(duì)disruptor隊(duì)列的監(jiān)控

感謝各位的閱讀!關(guān)于“spring與disruptor集成的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI