溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

java中怎么實現(xiàn)異步處理

發(fā)布時間:2021-06-22 16:32:27 來源:億速云 閱讀:342 作者:Leah 欄目:編程語言

本篇文章給大家分享的是有關(guān)java中怎么實現(xiàn)異步處理,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

1.DeferredResult 加線程池 (DeferredResult 提供了超時、錯誤處理,功能非常完善,再加上多線程處理請求效果很不錯)

2.新開個定時任務(wù)線程池 定時輪詢當(dāng)前任務(wù)列表 超時就停止(需要自己維護任務(wù)列表)Hystrix就是這種方案

3.JDK9 可以采用CompletableFuture orTimeout、completeOnTimeout 方法處理 前者拋出異常后者返回默認(rèn)值

總結(jié),其實線程池統(tǒng)一設(shè)置超時這個需求本身就是偽需求,線程執(zhí)行任務(wù)時間本身就是參差不齊的,而且這個控制權(quán)應(yīng)該交給Runable或Callable內(nèi)部業(yè)務(wù)處理,不同的業(yè)務(wù)處理超時、異常、報警等各不相同。CompletableFuture、ListenableFuture 、DeferredResult 的功能相當(dāng)豐富,建議在多線程處理的場景多使用這些api。

具體實現(xiàn):

  1. DeferredResult 先建個工具類。調(diào)用方使用execute方法,傳入new的DeferredResultDTO(DeferredResultDTO只有msgId,也可以自定義一些成員變量方便后期業(yè)務(wù)擴展使用)

然后在其他線程業(yè)務(wù)處理完設(shè)置結(jié)果,調(diào)用setResult方法,傳入msgId相同的DeferredResultDTO和result對象

/**
 * DeferredResult 工具類
 *
 * @author tiancong
 * @date 2020/10/14 19:23
 */
@UtilityClass
@Slf4j
public class DeferredResultUtil {

    private Map<DeferredResultDTO, DeferredResult<ResultVO<Object>>> taskMap = new ConcurrentHashMap<>(16);

    public DeferredResult<ResultVO<Object>> execute(DeferredResultDTO dto) {
        return execute(dto, 5000L);
    }

    public DeferredResult<ResultVO<Object>> execute(DeferredResultDTO dto, Long time) {
        if (taskMap.containsKey(dto)) {
            throw new BusinessException(String.format("msgId=%s 已經(jīng)存在,請勿重發(fā)消息", dto.getMsgId()));
        }
        DeferredResult<ResultVO<Object>> deferredResult = new DeferredResult<>(time);
        deferredResult.onError((e) -> {
            taskMap.remove(dto);
            log.info("處理失敗 ", e);
            deferredResult.setResult(ResultVoUtil.fail("處理失敗"));
        });
        deferredResult.onTimeout(() -> {
            taskMap.remove(dto);
            if (dto.getType().equals(DeferredResultTypeEnum.CLOTHES_DETECTION)) {
                ExamController.getCURRENT_STUDENT().remove(dto.getMsgId());
            }
            deferredResult.setResult(ResultVoUtil.fail("請求超時,請聯(lián)系工作人員!"));
        });
        taskMap.putIfAbsent(dto, deferredResult);
        return deferredResult;
    }

    public void setResult(DeferredResultDTO dto, ResultVO<Object> resultVO) {
        if (taskMap.containsKey(dto)) {
            DeferredResult<ResultVO<Object>> deferredResult = taskMap.get(dto);
            deferredResult.setResult(resultVO);
            taskMap.remove(dto);
        } else {
            log.error("ERROR 未找到該消息msgId:{}", dto.getMsgId());
        }
    }
}

2.   新開個定時任務(wù)線程池 定時輪詢當(dāng)前任務(wù)列表

/**
 * @author tiancong
 * @date 2021/4/10 11:06
 */
@Slf4j
public class T {

    private static final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(
                    2,
                    r -> {
                        Thread thread = new Thread(r);
                        thread.setName("failAfter-%d");
                        thread.setDaemon(true);
                        return thread;
                    });
    private static int timeCount;

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
        executorService.setCorePoolSize(4);
        executorService.setQueueCapacity(10);
        executorService.setMaxPoolSize(100);
        executorService.initialize();
//        executorService.setAwaitTerminationSeconds(5);
//        executorService.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS);
        executorService.setWaitForTasksToCompleteOnShutdown(true);


        Random random = new Random();

        long start = System.currentTimeMillis();
        List<ListenableFuture<Boolean>> asyncResultList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            ListenableFuture<Boolean> asyncResult = executorService.submitListenable(() -> {
                int r = random.nextInt(10);
                log.info("{} 開始睡{}s", Thread.currentThread().getName(), r);
                TimeUnit.SECONDS.sleep(r);
                log.info("{} 干完了 {}s", Thread.currentThread().getName(), r);
                //throw new RuntimeException("出現(xiàn)異常");
                return true;
            });

            asyncResult.addCallback(data -> {
                try {
                    // 休息3毫秒模擬獲取到執(zhí)行結(jié)果后的操作
                    TimeUnit.MILLISECONDS.sleep(3);
                    log.info("{} 收到結(jié)果:{}", Thread.currentThread().getName(), data);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, ex -> log.info("**異常信息**", ex));

            asyncResultList.add(asyncResult);
        }

        System.out.println(String.format("總結(jié)耗時:%s ms", System.currentTimeMillis() - start));

        // 守護進程 定時輪詢 終止超時的任務(wù)
        scheduler.scheduleAtFixedRate(() -> {
            // 模擬守護進程 終止超過6s的任務(wù)
            timeCount++;
            if (timeCount > 6) {
                for (ListenableFuture<Boolean> future : asyncResultList) {
                    if (!future.isDone()) {
                        log.error("future 因超時終止任務(wù),{}", future);
                        future.cancel(true);
                    }
                }
            }
        }, 0, 1000, TimeUnit.MILLISECONDS);

    }
}

額外補充:

CompletableFuture實現(xiàn)了CompletionStage接口,里面很多豐富的異步編程接口。

applyToEither方法是哪個先完成,就apply哪一個結(jié)果(但是兩個任務(wù)都會最終走完)

/**
 * @author tiancong
 * @date 2021/4/10 11:06
 */
@Slf4j
public class T {

    public static void main(String[] args) throws InterruptedException {

//        CompletableFuture<String> responseFuture = within(
//                createTaskSupplier("5"), 3000, TimeUnit.MILLISECONDS);
//        responseFuture
//                .thenAccept(T::send)
//                .exceptionally(throwable -> {
//                    log.error("Unrecoverable error", throwable);
//                    return null;
//                });
//

        // 注意 exceptionally是new 的CompletableFuture
        CompletableFuture<Object> timeoutCompletableFuture = timeoutAfter(1000, TimeUnit.MILLISECONDS).exceptionally(xxx -> "超時");


        // 異步任務(wù)超時、異常處理
        List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
//                .map(x -> within(
//                        createTaskSupplier(x), 3000, TimeUnit.MILLISECONDS)
//                        .thenAccept(T::send)
//                        .exceptionally(throwable -> {
//                            log.error("Unrecoverable error", throwable);
//                            return null;
//                        }))
                .map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
                        , timeoutCompletableFuture))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
//                .map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
//                , oneSecondTimeout).join())
//                .collect(Collectors.toList());
        System.out.println("-------結(jié)束------");
        System.out.println(collect.toString());

    }

    private static final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(
                    2,
                    r -> {
                        Thread thread = new Thread(r);
                        thread.setName("failAfter-%d");
                        thread.setDaemon(true);
                        return thread;
                    });

    private static String send(String s) {
        log.info("最終結(jié)果是{}", s);
        return s;
    }

    private static CompletableFuture<String> createTaskSupplier(String x) {
        return CompletableFuture.supplyAsync(getStringSupplier(x))
                .exceptionally(Throwable::getMessage);
    }

    private static Supplier<String> getStringSupplier(String text) {
        return () -> {
            System.out.println("開始 " + text);
            if ("1".equals(text)) {
                throw new RuntimeException("運行時錯誤");
            }
            try {
                if ("5".equals(text)) {
                    TimeUnit.SECONDS.sleep(5);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結(jié)束 " + text);
            return text + "號";
        };
    }


    private static <T> CompletableFuture<T> within(CompletableFuture<T> future, long timeout, TimeUnit unit) {
        final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
        // 哪個先完成 就apply哪一個結(jié)果 這是一個關(guān)鍵的API
        return future.applyToEither(timeoutFuture, Function.identity());
    }

    private static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 時間后 拋出TimeoutException 類似于sentinel / watcher
        scheduler.schedule(() -> result.completeExceptionally(new TimeoutException("超時:" + timeout)), timeout, unit);
//        return CompletableFuture.supplyAsync(()-> (T)"另一個分支任務(wù)");
        return result;
    }

}

以上就是java中怎么實現(xiàn)異步處理,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI