溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行KubernetesClientException資源版本太舊的分析

發(fā)布時(shí)間:2021-12-16 09:48:07 來源:億速云 閱讀:190 作者:柒染 欄目:大數(shù)據(jù)

如何進(jìn)行KubernetesClientException資源版本太舊的分析,相信很多沒有經(jīng)驗(yàn)的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

背景

公司目前在基于k8s做調(diào)度(基于io.fabric8:kubernetes-client:4.2.0),在運(yùn)行的過程中,遇到了如下問題:

DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket close received. code: 1000, reason:
DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Submitting reconnect task to the executor
[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Scheduling reconnect task
[reconnectAttempt|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@700f518a
199 - 2020-11-17T06:39:13.874Z -[merlion-k8s-backend]-[merlion-k8s-backend-6b4cc44855-s6wnq]: 06:39:13.873 [OkHttp https://10.25.61.82:6443/...] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket successfully opened
  WARN PodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 135562761 (135563127)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:254)[kubernetes-client-4.2.2.jar:?]
at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [okhttp-3.12.0.jar:?]
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [okhttp-3.12.0.jar:?]
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.12.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

單憑這個(gè)問題其實(shí)沒什么,但是代碼中是:

  watchConnection = kubernetesClient.pods()
         .withLabel(MERLION_TASK_LABEL, applicationId)
         //      .withResourceVersion(resourceVersion)
         .watch(new TaskPodsWatcher())

因?yàn)槲覀円呀?jīng)注釋掉了withResourceVersion(resourceVersion),(如果沒有注釋掉,說明我們的代碼中設(shè)置的resourceVersion太?。┑沁€會報(bào)too old resource version

分析

直接跳轉(zhuǎn)到WatchConnectionManager onClosed 如下:

     @Override
      public void onClosed(WebSocket webSocket, int code, String reason) {
        logger.debug("WebSocket close received. code: {}, reason: {}", code, reason);
        if (forceClosed.get()) {
          logger.debug("Ignoring onClose for already closed/closing websocket");
          return;
        }
        if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
          closeEvent(new KubernetesClientException("Connection unexpectedly closed"));
          return;
        }
        scheduleReconnect();
      }

對于onclosed的解釋是

 /**
   * Invoked when both peers have indicated that no more messages will be transmitted and the
   * connection has been successfully released. No further calls to this listener will be made.
   */
  public void onClosed(WebSocket webSocket, int code, String reason) {
  }

說明由于長時(shí)間沒有event的傳輸,導(dǎo)致該connect被釋放了,從而導(dǎo)致WebSocket 被關(guān)閉了(這種在任務(wù)不是很多的情況下發(fā)生的概率很大),從而進(jìn)行了重聯(lián)操作scheduleReconnect,而該方法調(diào)用了runWatch():

executor.schedule(new NamedRunnable("reconnectAttempt") {
              @Override
              public void execute() {
                try {
                  runWatch();
                  reconnectPending.set(false);
                } catch (Exception e) {
                  // An unexpected error occurred and we didn't even get an onFailure callback.
                  logger.error("Exception in reconnect", e);
                  webSocketRef.set(null);
                  closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
                  close();
                }
              }
            }, nextReconnectInterval(), TimeUnit.MILLISECONDS);
          }

而在runWatch()方法中,我們又調(diào)用了

 if (this.resourceVersion.get() != null) {
      httpUrlBuilder.addQueryParameter("resourceVersion", this.resourceVersion.get());
    }

而this.resourceVersion 值的設(shè)置在 public void onMessage(WebSocket webSocket, String message) 方法中:

WatchEvent event = readWatchEvent(message);
          Object object = event.getObject();
          if (object instanceof HasMetadata) {
            @SuppressWarnings("unchecked")
            T obj = (T) object;
            // Dirty cast - should always be valid though
            resourceVersion.set(((HasMetadata) obj).getMetadata().getResourceVersion());
            Watcher.Action action = Watcher.Action.valueOf(event.getType());
            watcher.eventReceived(action, obj);
          } else if (object instanceof KubernetesResourceList) {
            @SuppressWarnings("unchecked")

            KubernetesResourceList list = (KubernetesResourceList) object;
            // Dirty cast - should always be valid though
            resourceVersion.set(list.getMetadata().getResourceVersion());
            Watcher.Action action = Watcher.Action.valueOf(event.getType());
            List<HasMetadata> items = list.getItems();
            if (items != null) {
              for (HasMetadata item : items) {
                watcher.eventReceived(action, (T) item);
              }
            }

也就是說,假如說如果重聯(lián)的時(shí)候距離上次設(shè)置resourceVersion超過了etc保留的最小resourceVersion的話,就會報(bào)too old resource version錯誤:

解決

通過網(wǎng)上查詢kubernetes-too-old-resource-version,該Kubernetes Client team memeber 提到了:

Fabric8 does not handle it with plain watch. But it is handling it in SharedInformer API, see ReflectorWatcher. I would recommend using informer API when writing operators since it's better than plain list and watch

也就是說,我們可以用SharedInformer api來實(shí)現(xiàn),而watch機(jī)制處理不了這種情況,所以我們可以用SharedInformer實(shí)現(xiàn),截止到2020年11月16日,我們獲取到kubernetes-client最新版本,kubernetes-client:4.13.0,編碼實(shí)現(xiàn):

val sharedInformerFactory = kubernetesClient.informers()
    val podInformer = sharedInformerFactory
      .sharedIndexInformerFor(classOf[Pod], classOf[PodList],
        new OperationContext().withNamespace("test"), 30 * 1000L)
    podInformer.addEventHandler(new ResourceEventHandler[Pod] {
      override def onAdd(obj: Pod): Unit = {
        eventReceived(obj, "ADD")
      }

      override def onDelete(obj: Pod, deletedFinalStateUnknown: Boolean): Unit = {
        eventReceived(obj, "DELETE")
      }

      override def onUpdate(oldObj: Pod, newObj: Pod): Unit = {
        eventReceived(newObj, "UPDATE")
      }

      private def idShouldUpdate(pod: Pod): Boolean = {
        pod.getMetadata.getLabels.getOrDefault(MERLION_TASK_LABEL, "") == applicationId
      }

      private def eventReceived(pod: Pod, action: String): Unit = {
        if (idShouldUpdate(pod)) {
          val podName = pod.getMetadata.getName
          logger.info(s"Received job pod update for pod named $podName, action ${action}")
          snapshotsStore.updatePod(pod)
        }
      }
    })
    sharedInformerFactory.startAllRegisteredInformers()
 }

其中SharedInformerFactory的機(jī)制和k8s Informer機(jī)制一樣的,能夠保證消息的可靠性, 其中最主要的是ReflectorWatcher和Reflector和DefaultSharedIndexInformer,我們簡單的分析一下:

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
    this.resyncCheckPeriodMillis = resyncPeriod;
    this.defaultEventHandlerResyncPeriod = resyncPeriod;

    this.processor = new SharedProcessor<>();
    this.indexer = new Cache();

    DeltaFIFO<T> fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer);

    this.controller = new Controller<>(apiTypeClass, fifo, listerWatcher, this::handleDeltas, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners);
    controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName());
  }

DefaultSharedIndexInformer 中,用DeltaFIFO作為event的存儲,而this::handleDeltas的調(diào)用是在Controller作為this.queue.pop 的參數(shù)processFunc函數(shù)被調(diào)用,也就是說這個(gè)函數(shù)來消費(fèi)fifo里面的event,如下:

private void processLoop() throws Exception {
    while (true) {
      try {
        this.queue.pop(this.processFunc);
      } catch (InterruptedException t) {
        log.error("DefaultController#processLoop got interrupted {}", t.getMessage(), t);
        return;
      } catch (Exception e) {
        log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e);
        throw e;
      }
    }

而queue也是DeltaFIFO的形參傳進(jìn)來的,也就是說queue就是fifo,而fifo里面的數(shù)據(jù)從哪里來呢?在controller::run函數(shù)中:

 if (fullResyncPeriod > 0) {
          reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);
        } else {
          reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);
        }
reflector.listAndWatch()

將會調(diào)用reflector.listAndWatch()方法,該方法進(jìn)行類似k8s的list-watch機(jī)制,如下:

public void listAndWatch() throws Exception {
    try {
      log.info("Started ReflectorRunnable watch for {}", apiTypeClass);
      reListAndSync();
      resyncExecutor.scheduleWithFixedDelay(this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS);
      startWatcher();
    } catch (Exception exception) {
      store.isPopulated(false);
      throw new RejectedExecutionException("Error while starting ReflectorRunnable watch", exception);
    }
  }

reListAndSync進(jìn)行全量event數(shù)據(jù)的拉取,startWatcher進(jìn)行watch,獲取增量event數(shù)據(jù),那這個(gè)watch是什么呢?如下:

 watch.set(
        listerWatcher.watch(new ListOptionsBuilder()
          .withWatch(Boolean.TRUE).withResourceVersion(lastSyncResourceVersion.get()).withTimeoutSeconds(null).build(),
        operationContext.getNamespace(), operationContext, watcher)
      )

這里的watcher在reflector的構(gòu)造函數(shù)中初始化

watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);

而ReflectorWatcher是繼承自Watcher,所以也有對應(yīng)的eventReceived方法和onClose方法,如下:

@Override
  public void eventReceived(Action action, T resource) {
    if (action == null) {
      final String errorMessage = String.format("Unrecognized event %s", resource.getMetadata().getName());
      log.error(errorMessage);
      throw new KubernetesClientException(errorMessage);
    }
    log.info("Event received {}", action.name());
    switch (action) {
      case ERROR:
        final String errorMessage = String.format("ERROR event for %s", resource.getMetadata().getName());
        log.error(errorMessage);
        throw new KubernetesClientException(errorMessage);
      case ADDED:
        store.add(resource);
        break;
      case MODIFIED:
        store.update(resource);
        break;
      case DELETED:
        store.delete(resource);
        break;
    }
    lastSyncResourceVersion.set(resource.getMetadata().getResourceVersion());
    log.debug("{}#Receiving resourceVersion {}", resource.getKind(), lastSyncResourceVersion.get());
  }

  @Override
  public void onClose(KubernetesClientException exception) {
    log.error("Watch closing");
    Optional.ofNullable(exception)
      .map(e -> {
        log.debug("Exception received during watch", e);
        return exception;
      })
      .map(KubernetesClientException::getStatus)
      .map(Status::getCode)
      .filter(c -> c.equals(HttpURLConnection.HTTP_GONE))
      .ifPresent(c -> onHttpGone.run());
    onClose.run();
  }

在eventReceived方法中,所有消息的都會添加到store中也就是fifo的queue中,在onClose方法中,我們看到如果HTTP_GONE,也就是too old resource version的話,會進(jìn)行onHttpGone.run(),也會進(jìn)行onClose.run(),而 onHttpGone就是Reflector的reListAndSync函數(shù),onClose是Reflector的startWatcher函數(shù),也就是說一旦該watcher被關(guān)閉,就會重新進(jìn)行watch。

注意

在kubernetes-client:4.6.2中,WatchConnectionManager onMessage 對于HTTP_GONE的處理是不一樣的,如下:

if (status.getCode() == HTTP_GONE) {
                logger.info("The resource version {} no longer exists. Scheduling a reconnect.", resourceVersion.get());
                resourceVersion.set(null);
                scheduleReconnect();
              } else {
                logger.error("Error received: {}", status.toString());
              }

一旦發(fā)生了HTTP_GONE,,會把resourceVersion設(shè)置為null,也就是獲取最新的event,而且會立即重聯(lián),而在4.13.0版本和4.2.0版本,是不會立即重聯(lián),而是讓用戶去處理的。

看完上述內(nèi)容,你們掌握如何進(jìn)行KubernetesClientException資源版本太舊的分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI