溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

java重試機制使用RPC要考慮什么

發(fā)布時間:2023-03-21 16:19:22 來源:億速云 閱讀:84 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“java重試機制使用RPC要考慮什么”的相關(guān)知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“java重試機制使用RPC要考慮什么”文章能幫助大家解決問題。

    1 為什么重試

    如果簡單對一個RPC交互過程進行分類,我們可以分為三類:響應(yīng)成功、響應(yīng)失敗、沒有響應(yīng)。

    java重試機制使用RPC要考慮什么

    對于響應(yīng)成功和響應(yīng)失敗這兩種情況,消費者很好處理。因為響應(yīng)信息明確,所以只要根據(jù)響應(yīng)信息,繼續(xù)處理成功或者失敗邏輯即可。但是沒有響應(yīng)這種場景比較難處理,這是因為沒有響應(yīng)可能包含以下情況:

    (1) 生產(chǎn)者根本沒有接收到請求

    (2) 生產(chǎn)者接收到請求并且已處理成功,但是消費者沒有接收到響應(yīng)

    (3) 生產(chǎn)者接收到請求并且已處理失敗,但是消費者沒有接收到響應(yīng)

    假設(shè)你是一名RPC框架設(shè)計者,究竟是選擇重試還是放棄調(diào)用呢?其實最終如何選擇取決于業(yè)務(wù)特性,有的業(yè)務(wù)本身就具有冪等性,但是有的業(yè)務(wù)不能允許重試否則會造成重復(fù)數(shù)據(jù)。

    那么誰對業(yè)務(wù)特性最熟悉呢?答案是消費者,因為消費者作為調(diào)用方肯定最熟悉自身業(yè)務(wù),所以RPC框架只要提供一些策略供消費者選擇即可。

    2 怎么做重試

    2.1 集群容錯策略

    DUBBO作為一款優(yōu)秀RPC框架,提供了如下集群容錯策略供消費者選擇:

    Failover: 故障轉(zhuǎn)移
    Failfast: 快速失敗
    Failsafe: 安全失敗
    Failback: 異步重試
    Forking:  并行調(diào)用
    Broadcast:廣播調(diào)用

    (1) Failover

    故障轉(zhuǎn)移策略。作為默認策略當消費發(fā)生異常時通過負載均衡策略再選擇一個生產(chǎn)者節(jié)點進行調(diào)用,直到達到重試次數(shù)

    (2) Failfast

    快速失敗策略。消費者只消費一次服務(wù),當發(fā)生異常時則直接拋出

    (3) Failsafe

    安全失敗策略。消費者只消費一次服務(wù),如果消費失敗則包裝一個空結(jié)果,不拋出異常

    (4) Failback

    異步重試策略。當消費發(fā)生異常時返回一個空結(jié)果,失敗請求將會進行異步重試。如果重試超過最大重試次數(shù)還不成功,放棄重試并不拋出異常

    (5) Forking

    并行調(diào)用策略。消費者通過線程池并發(fā)調(diào)用多個生產(chǎn)者,只要有一個成功就算成功

    (6) Broadcast

    廣播調(diào)用策略。消費者遍歷調(diào)用所有生產(chǎn)者節(jié)點,任何一個出現(xiàn)異常則拋出異常

    2.2 源碼分析

    2.2.1 Failover

    Failover故障轉(zhuǎn)移策略作為默認策略,當消費發(fā)生異常時通過負載均衡策略再選擇一個生產(chǎn)者節(jié)點進行調(diào)用,直到達到重試次數(shù)。即使業(yè)務(wù)代碼沒有顯示重試,也有可能多次執(zhí)行消費邏輯從而造成重復(fù)數(shù)據(jù):

    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
        public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
        @Override
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            // 所有生產(chǎn)者Invokers
            List<Invoker<T>> copyInvokers = invokers;
            checkInvokers(copyInvokers, invocation);
            String methodName = RpcUtils.getMethodName(invocation);
            // 獲取重試次數(shù)
            int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            RpcException le = null;
            // 已經(jīng)調(diào)用過的生產(chǎn)者
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
            Set<String> providers = new HashSet<String>(len);
            // 重試直到達到最大次數(shù)
            for (int i = 0; i < len; i++) {
                if (i > 0) {
                    // 如果當前實例被銷毀則拋出異常
                    checkWhetherDestroyed();
                    // 根據(jù)路由策略選出可用生產(chǎn)者Invokers
                    copyInvokers = list(invocation);
                    // 重新檢查
                    checkInvokers(copyInvokers, invocation);
                }
                // 負載均衡選擇一個生產(chǎn)者Invoker
                Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    // 服務(wù)消費發(fā)起遠程調(diào)用
                    Result result = invoker.invoke(invocation);
                    if (le != null && logger.isWarnEnabled()) {
                        logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                    }
                    // 有結(jié)果則返回
                    return result;
                } catch (RpcException e) {
                    // 業(yè)務(wù)異常直接拋出
                    if (e.isBiz()) {
                        throw e;
                    }
                    le = e;
                } catch (Throwable e) {
                    // RpcException不拋出繼續(xù)重試
                    le = new RpcException(e.getMessage(), e);
                } finally {
                    // 保存已經(jīng)訪問過的生產(chǎn)者
                    providers.add(invoker.getUrl().getAddress());
                }
            }
            throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
        }
    }

    消費者調(diào)用生產(chǎn)者節(jié)點A發(fā)生RpcException異常時(例如超時異常),在未達到最大重試次數(shù)之前,消費者會通過負載均衡策略再次選擇其它生產(chǎn)者節(jié)點消費。試想如果生產(chǎn)者節(jié)點A其實已經(jīng)處理成功了,但是沒有及時將成功結(jié)果返回給消費者,那么再次重試可能就會造成重復(fù)數(shù)據(jù)問題。

    2.2.2 Failfast

    快速失敗策略。消費者只消費一次服務(wù),當發(fā)生異常時則直接拋出,不會進行重試:

    public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
        public FailfastClusterInvoker(Directory<T> directory) {
            super(directory);
        }
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            // 檢查生產(chǎn)者Invokers是否合法
            checkInvokers(invokers, invocation);
            // 負載均衡選擇一個生產(chǎn)者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            try {
                // 服務(wù)消費發(fā)起遠程調(diào)用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                // 服務(wù)消費失敗不重試直接拋出異常
                if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                    throw (RpcException) e;
                }
                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                                       "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                       + " select from all providers " + invokers + " for service " + getInterface().getName()
                                       + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                       + " use dubbo version " + Version.getVersion()
                                       + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                                       e.getCause() != null ? e.getCause() : e);
            }
        }
    }
    2.2.3 Failsafe

    安全失敗策略。消費者只消費一次服務(wù),如果消費失敗則包裝一個空結(jié)果,不拋出異常,不會進行重試:

    public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
        public FailsafeClusterInvoker(Directory<T> directory) {
            super(directory);
        }
        @Override
        public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                // 檢查生產(chǎn)者Invokers是否合法
                checkInvokers(invokers, invocation);
                // 負載均衡選擇一個生產(chǎn)者Invoker
                Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                // 服務(wù)消費發(fā)起遠程調(diào)用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                // 消費失敗包裝為一個空結(jié)果對象
                logger.error("Failsafe ignore exception: " + e.getMessage(), e);
                return new RpcResult();
            }
        }
    }
    2.2.4 Failback

    異步重試策略。當消費發(fā)生異常時返回一個空結(jié)果,失敗請求將會進行異步重試。如果重試超過最大重試次數(shù)還不成功,放棄重試并不拋出異常:

    public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
        private static final long RETRY_FAILED_PERIOD = 5;
        private final int retries;
        private final int failbackTasks;
        private volatile Timer failTimer;
        public FailbackClusterInvoker(Directory<T> directory) {
            super(directory);
            int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
            if (retriesConfig <= 0) {
                retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
            }
            int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
            if (failbackTasksConfig <= 0) {
                failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
            }
            retries = retriesConfig;
            failbackTasks = failbackTasksConfig;
        }
        private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
            if (failTimer == null) {
                synchronized (this) {
                    if (failTimer == null) {
                        // 創(chuàng)建定時器
                        failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
                    }
                }
            }
            // 構(gòu)造定時任務(wù)
            RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
            try {
                // 定時任務(wù)放入定時器等待執(zhí)行
                failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
            } catch (Throwable e) {
                logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
            }
        }
        @Override
        protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            Invoker<T> invoker = null;
            try {
                // 檢查生產(chǎn)者Invokers是否合法
                checkInvokers(invokers, invocation);
                // 負責均衡選擇一個生產(chǎn)者Invoker
                invoker = select(loadbalance, invocation, invokers, null);
                // 消費服務(wù)發(fā)起遠程調(diào)用
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
                // 如果服務(wù)消費失敗則記錄失敗請求
                addFailed(loadbalance, invocation, invokers, invoker);
                // 返回空結(jié)果
                return new RpcResult();
            }
        }
        @Override
        public void destroy() {
            super.destroy();
            if (failTimer != null) {
                failTimer.stop();
            }
        }
        /**
         * RetryTimerTask
         */
        private class RetryTimerTask implements TimerTask {
            private final Invocation invocation;
            private final LoadBalance loadbalance;
            private final List<Invoker<T>> invokers;
            private final int retries;
            private final long tick;
            private Invoker<T> lastInvoker;
            private int retryTimes = 0;
            RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
                this.loadbalance = loadbalance;
                this.invocation = invocation;
                this.invokers = invokers;
                this.retries = retries;
                this.tick = tick;
                this.lastInvoker = lastInvoker;
            }
            @Override
            public void run(Timeout timeout) {
                try {
                    // 負載均衡選擇一個生產(chǎn)者Invoker
                    Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                    lastInvoker = retryInvoker;
                    // 服務(wù)消費發(fā)起遠程調(diào)用
                    retryInvoker.invoke(invocation);
                } catch (Throwable e) {
                    logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                    // 超出最大重試次數(shù)記錄日志不拋出異常
                    if ((++retryTimes) >= retries) {
                        logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                    } else {
                        // 未超出最大重試次數(shù)重新放入定時器
                        rePut(timeout);
                    }
                }
            }
            private void rePut(Timeout timeout) {
                if (timeout == null) {
                    return;
                }
                Timer timer = timeout.timer();
                if (timer.isStop() || timeout.isCancelled()) {
                    return;
                }
                timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
            }
        }
    }
    2.2.5 Forking

    并行調(diào)用策略。消費者通過線程池并發(fā)調(diào)用多個生產(chǎn)者,只要有一個成功就算成功:

    public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
        public ForkingClusterInvoker(Directory<T> directory) {
            super(directory);
        }
        @Override
        public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            try {
                checkInvokers(invokers, invocation);
                final List<Invoker<T>> selected;
                // 獲取配置參數(shù)
                final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
                final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                // 獲取并行執(zhí)行的Invoker列表
                if (forks <= 0 || forks >= invokers.size()) {
                    selected = invokers;
                } else {
                    selected = new ArrayList<>();
                    for (int i = 0; i < forks; i++) {
                        // 選擇生產(chǎn)者
                        Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                        // 防止重復(fù)增加Invoker
                        if (!selected.contains(invoker)) {
                            selected.add(invoker);
                        }
                    }
                }
                RpcContext.getContext().setInvokers((List) selected);
                final AtomicInteger count = new AtomicInteger();
                final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
                for (final Invoker<T> invoker : selected) {
                    // 在線程池中并發(fā)執(zhí)行
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // 執(zhí)行消費邏輯
                                Result result = invoker.invoke(invocation);
                                // 存儲消費結(jié)果
                                ref.offer(result);
                            } catch (Throwable e) {
                                // 如果異常次數(shù)大于等于forks參數(shù)值說明全部調(diào)用失敗,則把異常放入隊列
                                int value = count.incrementAndGet();
                                if (value >= selected.size()) {
                                    ref.offer(e);
                                }
                            }
                        }
                    });
                }
                try {
                    // 從隊列獲取結(jié)果
                    Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                    // 如果異常類型表示全部調(diào)用失敗則拋出異常
                    if (ret instanceof Throwable) {
                        Throwable e = (Throwable) ret;
                        throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                    }
                    return (Result) ret;
                } catch (InterruptedException e) {
                    throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
                }
            } finally {
                RpcContext.getContext().clearAttachments();
            }
        }
    }
    2.2.6 Broadcast

    廣播調(diào)用策略。消費者遍歷調(diào)用所有生產(chǎn)者節(jié)點,任何一個出現(xiàn)異常則拋出異常:

    public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
        private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
        public BroadcastClusterInvoker(Directory<T> directory) {
            super(directory);
        }
        @Override
        public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            checkInvokers(invokers, invocation);
            RpcContext.getContext().setInvokers((List) invokers);
            RpcException exception = null;
            Result result = null;
            // 遍歷調(diào)用所有生產(chǎn)者節(jié)點
            for (Invoker<T> invoker : invokers) {
                try {
                    // 執(zhí)行消費邏輯
                    result = invoker.invoke(invocation);
                } catch (RpcException e) {
                    exception = e;
                    logger.warn(e.getMessage(), e);
                } catch (Throwable e) {
                    exception = new RpcException(e.getMessage(), e);
                    logger.warn(e.getMessage(), e);
                }
            }
            // 任何一個出現(xiàn)異常則拋出異常
            if (exception != null) {
                throw exception;
            }
            return result;
        }
    }

    3 怎么做冪等

    經(jīng)過上述分析我們知道,RPC框架自帶的重試機制可能會造成數(shù)據(jù)重復(fù)問題,那么在使用中必須考慮冪等性。冪等性是指一次操作與多次操作產(chǎn)生結(jié)果相同,并不會因為多次操作而產(chǎn)生不一致性。常見冪等方案有取消重試、冪等表、數(shù)據(jù)庫鎖、狀態(tài)機。

    3.1 取消重試

    取消重試有兩種方法,第一是設(shè)置重試次數(shù)為零,第二是選擇不重試的集群容錯策略。

    <!-- 設(shè)置重試次數(shù)為零 -->
    <dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />
    <!-- 選擇集群容錯方案 -->
    <dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />

    3.2 冪等表

    假設(shè)用戶支付成功后,支付系統(tǒng)將支付成功消息,發(fā)送至消息隊列。物流系統(tǒng)訂閱到這個消息,準備為這筆訂單創(chuàng)建物流單。

    但是消息隊列可能會重復(fù)推送,物流系統(tǒng)有可能接收到多次這條消息。我們希望達到效果是:無論接收到多少條重復(fù)消息,只能創(chuàng)建一筆物流單。

    解決方案是冪等表方案。新建一張冪等表,該表就是用來做冪等,無其它業(yè)務(wù)意義,有一個字段名為key建有唯一索引,這個字段是冪等標準。

    物流系統(tǒng)訂閱到消息后,首先嘗試插入冪等表,訂單編號作為key字段。如果成功則繼續(xù)創(chuàng)建物流單,如果訂單編號已經(jīng)存在則違反唯一性原則,無法插入成功,說明已經(jīng)進行過業(yè)務(wù)處理,丟棄消息。

    這張表數(shù)據(jù)量會比較大,我們可以通過定時任務(wù)對數(shù)據(jù)進行歸檔,例如只保留7天數(shù)據(jù),其它數(shù)據(jù)存入歸檔表。

    還有一種廣義冪等表就是我們可以用Redis替代數(shù)據(jù)庫,在創(chuàng)建物流單之前,我們可以檢查Redis是否存在該訂單編號數(shù)據(jù),同時可以為這類數(shù)據(jù)設(shè)置7天過期時間。

    3.3 狀態(tài)機

    物流單創(chuàng)建成功后會發(fā)送消息,訂單系統(tǒng)訂閱到消息后更新狀態(tài)為完成,假設(shè)變更是將訂單狀態(tài)0更新至狀態(tài)1。訂單系統(tǒng)也可能收到多條消息,可能在狀態(tài)已經(jīng)被更新至狀態(tài)1之后,依然收到物流單創(chuàng)建成功消息。

    解決方案是狀態(tài)機方案。首先繪制狀態(tài)機圖,分析狀態(tài)流轉(zhuǎn)形態(tài)。例如經(jīng)過分析狀態(tài)1已經(jīng)是最終態(tài),那么即使接收到物流單創(chuàng)建成功消息也不再處理,丟棄消息。

    3.4 數(shù)據(jù)庫鎖

    數(shù)據(jù)庫鎖又可以分為悲觀鎖和樂觀鎖兩種類型,悲觀鎖是在獲取數(shù)據(jù)時加鎖:

    select * from table where col='xxx' for update

    樂觀鎖是在更新時加鎖,第一步首先查出數(shù)據(jù),數(shù)據(jù)包含version字段。第二步進行更新操作,如果此時記錄已經(jīng)被修改則version字段已經(jīng)發(fā)生變化,無法更新成功:

    update table set xxx,
    version = #{version} + 1 
    where id = #{id} 
    and version = #{version}

    關(guān)于“java重試機制使用RPC要考慮什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識,可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI