溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SkyWalking自定義插件怎么用

發(fā)布時間:2022-02-14 14:39:23 來源:億速云 閱讀:325 作者:小新 欄目:開發(fā)技術

這篇文章將為大家詳細講解有關SkyWalking自定義插件怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

RabbitMQ插件問題

skywalking官方提供的RabbitMQ插件存在缺陷,其只針對RabbitMQ官方原生Client實現(xiàn)擴展,但我們在項目中一般不直接使用原生Client,而是使用Spring RabitMQ Client,因Spring RabitMQ Consumer中存在跨線程操作,導致跟蹤ID斷鏈。

具體分析過程

1.官方插件源碼的攔截點是原生Consumer的handleDelivery方法,源碼如下:

SkyWalking自定義插件怎么用

2.而Spring RabbitMQ消費者的默認實現(xiàn)是BlockingQueueConsumer, handleDelivery核心邏輯是把消息放到內(nèi)部的BlockingQueue隊列,不做真正的消費處理,因此攔截此處無法關聯(lián)到消費者邏輯,源碼如下

@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
				byte[] body) {
		...
			try {
				if (BlockingQueueConsumer.this.abortStarted > 0) {
					if (!BlockingQueueConsumer.this.queue.offer(
							new Delivery(consumerTag, envelope, properties, body, this.queueName),
							BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

						Channel channelToClose = super.getChannel();
						RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
						// Defensive - should never happen
						BlockingQueueConsumer.this.queue.clear();
						if (!this.canceled) {
							RabbitUtils.cancel(channelToClose, consumerTag);
						}
						try {
							channelToClose.close();
						catch (@SuppressWarnings("unused") TimeoutException e) {
							// no-op
					}
				}
				else {
					BlockingQueueConsumer.this.queue
							.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
			}
			catch (@SuppressWarnings("unused") InterruptedException e) {
				Thread.currentThread().interrupt();
			catch (Exception e) {
				BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
		}

3.真正的消費處理在SimpleMessageListenerContainer,SimpleMessageListenerContainer繼承Runnable接口,在其run方法中while循環(huán)調(diào)用mainLoop方法,整體調(diào)用鏈路為

4.SimpleMessageListenerContainer.run() -> SimpleMessageListenerContainer.mainLoop() -> SimpleMessageListenerContainer.receiveAndExecute() -> SimpleMessageListenerContainer.doReceiveAndExecute() -> AbstractMessageListenerContainer.executeListener()。最終在executeListener中執(zhí)行消費邏輯

protected void executeListener(Channel channel, Object data) {
	...
		try {
      // 執(zhí)行消費邏輯
			doExecuteListener(channel, data);
			if (sample != null) {
				this.micrometerHolder.success(sample, data instanceof Message
						? ((Message) data).getMessageProperties().getConsumerQueue()
						: queuesAsListString());
			}
		}
		catch (RuntimeException ex) {
		....
		}
	}

實現(xiàn)自定義插件

從上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的攔截點
實現(xiàn)源碼已放到碼云倉庫:https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/

效果展示

SkyWalking調(diào)用鏈路

SkyWalking自定義插件怎么用

logback日志

SkyWalking自定義插件怎么用

關于“SkyWalking自定義插件怎么用”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI