溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

java項目中的多線程實踐分析

發(fā)布時間:2021-11-15 09:19:23 來源:億速云 閱讀:119 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“java項目中的多線程實踐分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“java項目中的多線程實踐分析”吧!

項目開發(fā)中對于一些數(shù)據(jù)的處理需要用到多線程,比如文件的批量上傳,數(shù)據(jù)庫的分批寫入,大文件的分段下載等。 通常會使用spring自帶的線程池處理,做到對線程的定制化處理和更好的可控,建議使用自定義的線程池。 主要涉及到的幾個點:

1. 自定義線程工廠(ThreadFactoryBuilder),主要用于線程的命名,方便追蹤

2. 自定義的線程池(ThreadPoolExecutorUtils),可以按功能優(yōu)化配置參數(shù)

3. 一個抽象的多線程任務(wù)處理接口(OperationThreadService)和通用實現(xiàn)(OperationThread)

4. 統(tǒng)一的調(diào)度實現(xiàn)(MultiThreadOperationUtils)

核心思想:分治歸并,每個線程計算出自己的結(jié)果,最后統(tǒng)一匯總。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
 * description: 自定義實現(xiàn)的線程池,遵循alibaba編程規(guī)范,使用ThreadPoolExecutor創(chuàng)建線程池使用
 * 設(shè)置更有描述意義的線程名稱,默認的ThreadFactory,它給線程起名字大概規(guī)律就是pool-m-thread-n,如pool-1-thread-1。
 * 當(dāng)分析一個thread dump時,很難知道線程的目的,需要有描述意義的線程名稱來分析追蹤問題
 * 設(shè)置線程是否是守護線程,默認的ThreadFactory總是提交非守護線程
 * 設(shè)置線程優(yōu)先級,默認ThreadFactory總是提交的一般優(yōu)先級線程
 * <p>
 * CustomThreadFactoryBuilder類實現(xiàn)了一種優(yōu)雅的Builder Mechanism方式去得到一個自定義ThreadFactory實例。
 * ThreadFactory接口中有一個接受Runnable類型參數(shù)的方法newThread(Runnable r),
 * 業(yè)務(wù)的factory邏輯就應(yīng)該寫在這個方法中,去配置線程名稱、優(yōu)先級、守護線程狀態(tài)等屬性。
 * 原文鏈接:https://blog.csdn.net/zombres/article/details/80497515
 *
 * @author Hlingoes
 * @date 2019/12/22 0:45
 */
public class ThreadFactoryBuilder {
    private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);

    private String nameFormat = null;
    private boolean daemon = false;
    private int priority = Thread.NORM_PRIORITY;

    public ThreadFactoryBuilder setNameFormat(String nameFormat) {
        if (nameFormat == null) {
            throw new NullPointerException();
        }
        this.nameFormat = nameFormat;
        return this;
    }

    public ThreadFactoryBuilder setDaemon(boolean daemon) {
        this.daemon = daemon;
        return this;
    }

    public ThreadFactoryBuilder setPriority(int priority) {
        if (priority < Thread.MIN_PRIORITY) {
            throw new IllegalArgumentException(String.format(
                    "Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY));
        }

        if (priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(String.format(
                    "Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY));
        }

        this.priority = priority;
        return this;
    }

    public ThreadFactory build() {
        return build(this);
    }

    private static ThreadFactory build(ThreadFactoryBuilder builder) {
        final String nameFormat = builder.nameFormat;
        final Boolean daemon = builder.daemon;
        final Integer priority = builder.priority;
        final AtomicLong count = new AtomicLong(0);

        return (Runnable runnable) -> {
            Thread thread = new Thread(runnable);
            if (nameFormat != null) {
                thread.setName(String.format(nameFormat, count.getAndIncrement()));
            }
            if (daemon != null) {
                thread.setDaemon(daemon);
            }
            thread.setPriority(priority);
            thread.setUncaughtExceptionHandler((t, e) -> {
                String threadName = t.getName();
                logger.error("error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
            });
            return thread;
        };
    }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

/**
 * description: 創(chuàng)建通用的線程池
 * <p>
 * corePoolSize:線程池中核心線程數(shù)量
 * maximumPoolSize:線程池同時允許存在的最大線程數(shù)量
 * 內(nèi)部處理邏輯如下:
 * 當(dāng)線程池中工作線程數(shù)小于corePoolSize,創(chuàng)建新的工作線程來執(zhí)行該任務(wù),不管線程池中是否存在空閑線程。
 * 如果線程池中工作線程數(shù)達到corePoolSize,新任務(wù)嘗試放入隊列,入隊成功的任務(wù)將等待工作線程空閑時調(diào)度。
 * 1. 如果隊列滿并且線程數(shù)小于maximumPoolSize,創(chuàng)建新的線程執(zhí)行該任務(wù)(注意:隊列中的任務(wù)繼續(xù)排序)。
 * 2. 如果隊列滿且線程數(shù)超過maximumPoolSize,拒絕該任務(wù)
 * <p>
 * keepAliveTime
 * 當(dāng)線程池中工作線程數(shù)大于corePoolSize,并且線程空閑時間超過keepAliveTime,則這些線程將被終止。
 * 同樣,可以將這種策略應(yīng)用到核心線程,通過調(diào)用allowCoreThreadTimeout來實現(xiàn)。
 * <p>
 * BlockingQueue
 * 任務(wù)等待隊列,用于緩存暫時無法執(zhí)行的任務(wù)。分為如下三種堵塞隊列:
 * 1. 直接遞交,如SynchronousQueue,該策略直接將任務(wù)直接交給工作線程。如果當(dāng)前沒有空閑工作線程,創(chuàng)建新線程。
 * 這種策略最好是配合unbounded線程數(shù)來使用,從而避免任務(wù)被拒絕。但當(dāng)任務(wù)生產(chǎn)速度大于消費速度,將導(dǎo)致線程數(shù)不斷的增加。
 * 2. 無界隊列,如LinkedBlockingQueue,當(dāng)工作的線程數(shù)達到核心線程數(shù)時,新的任務(wù)被放在隊列上。
 * 因此,永遠不會有大于corePoolSize的線程被創(chuàng)建,maximumPoolSize參數(shù)失效。
 * 這種策略比較適合所有的任務(wù)都不相互依賴,獨立執(zhí)行。
 * 但是當(dāng)任務(wù)處理速度小于任務(wù)進入速度的時候會引起隊列的無限膨脹。
 * 3. 有界隊列,如ArrayBlockingQueue,按前面描述的corePoolSize、maximumPoolSize、BlockingQueue處理邏輯處理。
 * 隊列長度和maximumPoolSize兩個值會相互影響:
 * 長隊列 + 小maximumPoolSize。會減少CPU的使用、操作系統(tǒng)資源、上下文切換的消耗,但是會降低吞吐量,
 * 如果任務(wù)被頻繁的阻塞如IO線程,系統(tǒng)其實可以調(diào)度更多的線程。
 * 短隊列 + 大maximumPoolSize。CPU更忙,但會增加線程調(diào)度的消耗.
 * 總結(jié)一下,IO密集型可以考慮多些線程來平衡CPU的使用,CPU密集型可以考慮少些線程減少線程調(diào)度的消耗
 *
 * @author Hlingoes
 * @citation https://blog.csdn.net/wanghao112956/article/details/99292107
 * @citation https://www.jianshu.com/p/896b8e18501b
 * @date 2020/2/26 0:46
 */
public class ThreadPoolExecutorUtils {
    private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);

    public static int defaultCoreSize = Runtime.getRuntime().availableProcessors();
    private static int pollWaitingTime = 60;
    private static int defaultQueueSize = 10 * 1000;
    private static int defaultMaxSize = 4 * defaultCoreSize;
    private static String threadName = "custom-pool";

    /**
     * description: 創(chuàng)建線程池
     *
     * @param waitingTime
     * @param coreSize
     * @param maxPoolSize
     * @param queueSize
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/4/12
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int coreSize, int maxPoolSize, int queueSize) {
        pollWaitingTime = waitingTime;
        defaultCoreSize = coreSize;
        defaultMaxSize = maxPoolSize;
        defaultQueueSize = queueSize;
        return getExecutorPool();
    }

    /**
     * description: 創(chuàng)建線程池
     *
     * @param waitingTime
     * @param queueSize
     * @param maxPoolSize
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize, int maxPoolSize) {
        pollWaitingTime = waitingTime;
        defaultQueueSize = queueSize;
        defaultMaxSize = maxPoolSize;
        return getExecutorPool();
    }

    /**
     * description: 創(chuàng)建線程池
     *
     * @param waitingTime
     * @param queueSize
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize) {
        pollWaitingTime = waitingTime;
        defaultQueueSize = queueSize;
        return getExecutorPool();
    }

    /**
     * description: 創(chuàng)建線程池
     *
     * @param waitingTime
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(int waitingTime) {
        pollWaitingTime = waitingTime;
        return getExecutorPool();
    }

    /**
     * description: 創(chuàng)建線程池
     *
     * @param
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/6/6
     */
    public static ThreadPoolExecutor getExecutorPool() {
        return getExecutorPool(threadName);
    }

    /**
     * description: 創(chuàng)建線程池
     *
     * @param
     * @return java.util.concurrent.ThreadPoolExecutor
     * @author Hlingoes 2020/3/20
     */
    public static ThreadPoolExecutor getExecutorPool(String threadName) {
        ThreadFactory factory = new ThreadFactoryBuilder()
                .setNameFormat(threadName + "-%d")
                .build();
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(defaultQueueSize);
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(defaultCoreSize,
                defaultMaxSize, 60, TimeUnit.SECONDS, queue, factory,
                (r, executor) -> {
                    /**
                     * 自定義的拒絕策略
                     * 當(dāng)提交給線程池的某一個新任務(wù)無法直接被線程池中“核心線程”直接處理,
                     * 又無法加入等待隊列,也無法創(chuàng)建新的線程執(zhí)行;
                     * 又或者線程池已經(jīng)調(diào)用shutdown()方法停止了工作;
                     * 又或者線程池不是處于正常的工作狀態(tài);
                     * 這時候ThreadPoolExecutor線程池會拒絕處理這個任務(wù)
                     */
                    if (!executor.isShutdown()) {
                        logger.warn("ThreadPoolExecutor is over working, please check the thread tasks! ");
                    }
                }) {

            /**
             * description: 針對提交給線程池的任務(wù)可能會拋出異常這一問題,
             * 可自行實現(xiàn)線程池的afterExecute方法,或者實現(xiàn)Thread的UncaughtExceptionHandler接口
             * ThreadFactoryBuilder中已經(jīng)實現(xiàn)了UncaughtExceptionHandler接口,這里是為了進一步兼容
             *
             * @param r
             * @param t
             * @return void
             * @author Hlingoes 2020/5/27
             */
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                if (t == null && r instanceof Future<?>) {
                    try {
                        Future<?> future = (Future<?>) r;
                        future.get();
                    } catch (CancellationException ce) {
                        t = ce;
                    } catch (ExecutionException ee) {
                        t = ee.getCause();
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (t != null) {
                    logger.error("customThreadPool error msg: {}", t.getMessage(), t);
                }
            }
        };
        /**
         * 備選方法,事先知道會有很多任務(wù)會提交給這個線程池,可以在初始化的時候完成核心線程的創(chuàng)建,提高系統(tǒng)性能
         * 一個線程池創(chuàng)建出來之后,在沒有給它提交任何任務(wù)之前,這個線程池中的線程數(shù)為0
         * 一個個去創(chuàng)建新線程開銷太大,影響系統(tǒng)性能
         * 可以在創(chuàng)建線程池的時候就將所有的核心線程全部一次性創(chuàng)建完畢,系統(tǒng)起來之后就可以直接使用
         */
        poolExecutor.prestartAllCoreThreads();
        return poolExecutor;
    }

    /**
     * description: 所有任務(wù)執(zhí)行完之后,釋放線程池資源
     *
     * @param pool
     * @return void
     * @author Hlingoes 2020/3/20
     */
    public static void closeAfterComplete(ThreadPoolExecutor pool) {
        /**
         * 當(dāng)線程池調(diào)用該方法時,線程池的狀態(tài)則立刻變成SHUTDOWN狀態(tài)。
         * 此時,則不能再往線程池中添加任何任務(wù),否則將會拋出RejectedExecutionException異常。
         * 但是,此時線程池不會立刻退出,直到添加到線程池中的任務(wù)都已經(jīng)處理完成,才會退出。
         * 唯一的影響就是不能再提交任務(wù)了,正則執(zhí)行的任務(wù)即使在阻塞著也不會結(jié)束,在排隊的任務(wù)也不會取消。
         */
        pool.shutdown();
        try {
            /**
             * awaitTermination方法可以設(shè)定線程池在關(guān)閉之前的最大超時時間,
             * 如果在超時時間結(jié)束之前線程池能夠正常關(guān)閉,這個方法會返回true,否則,一旦超時,就會返回false。
             * 通常來說不可能無限制地等待下去,因此需要預(yù)估一個合理的超時時間,然后使用這個方法
             */
            if (!pool.awaitTermination(pollWaitingTime, TimeUnit.SECONDS)) {
                /**
                 * 如果awaitTermination方法返回false,又希望盡可能在線程池關(guān)閉之后再做其他資源回收工作,
                 * 可以考慮再調(diào)用一下shutdownNow方法,
                 * 此時隊列中所有尚未被處理的任務(wù)都會被丟棄,同時會設(shè)置線程池中每個線程的中斷標志位。
                 * shutdownNow并不保證一定可以讓正在運行的線程停止工作,除非提交給線程的任務(wù)能夠正確響應(yīng)中斷。
                 * 到了這一步,可以考慮繼續(xù)調(diào)用awaitTermination方法,或者直接放棄,去做接下來要做的事情。
                 */
                pool.shutdownNow();
            }
        } catch (InterruptedException e) {
            logger.error("ThreadPool overtime: {}", e.getMessage());
            //(重新)丟棄所有尚未被處理的任務(wù),同時會設(shè)置線程池中每個線程的中斷標志位
            pool.shutdownNow();
            // 保持中斷狀態(tài)
            Thread.currentThread().interrupt();
        }
    }
}
import java.util.Arrays;

/**
 * description: 分段參數(shù)
 *
 * @author Hlingoes
 * @date 2020/5/22 23:50
 */
public class PartitionElements {
    /**
     * 當(dāng)前的分段任務(wù)索引
     */
    private long index;
    /**
     * 批量處理的任務(wù)個數(shù)
     */
    private long batchCounts;
    /**
     * 任務(wù)的分段個數(shù)
     */
    private long partitions;
    /**
     * 任務(wù)總數(shù)
     */
    private long totalCounts;
    private Object[] args;
    private Object data;

    public PartitionElements() {

    }

    public PartitionElements(long batchCounts, long totalCounts, Object[] args) {
        this.batchCounts = batchCounts;
        this.totalCounts = totalCounts;
        this.partitions = aquirePartitions(totalCounts, batchCounts);
        this.args = args;
    }

    public PartitionElements(long index, PartitionElements elements) {
        this.index = index;
        this.batchCounts = elements.getBatchCounts();
        this.partitions = elements.getPartitions();
        this.totalCounts = elements.getTotalCounts();
        this.args = elements.getArgs();
    }

    /**
     * description: 根據(jù)任務(wù)總量和單次任務(wù)處理量,計算任務(wù)個數(shù)
     *
     * @param totalCounts
     * @param batchCounts
     * @return long partitions
     * @author Hlingoes 2020/5/23
     */
    public long aquirePartitions(long totalCounts, long batchCounts) {
        long partitions = totalCounts / batchCounts;
        if (totalCounts % batchCounts != 0) {
            partitions = partitions + 1;
        }
        //  兼容任務(wù)總數(shù)total = 1 的情況
        if (partitions == 0) {
            partitions = 1;
        }
        return partitions;
    }

    public long getIndex() {
        return index;
    }

    public void setIndex(long index) {
        this.index = index;
    }

    public long getBatchCounts() {
        return batchCounts;
    }

    public void setBatchCounts(long batchCounts) {
        this.batchCounts = batchCounts;
    }

    public long getPartitions() {
        return partitions;
    }

    public void setPartitions(long partitions) {
        this.partitions = partitions;
    }

    public long getTotalCounts() {
        return totalCounts;
    }

    public void setTotalCounts(long totalCounts) {
        this.totalCounts = totalCounts;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "PartitionElements{" +
                "index=" + index +
                ", batchCounts=" + batchCounts +
                ", partitions=" + partitions +
                ", totalCounts=" + totalCounts +
                ", args=" + Arrays.toString(args) +
                '}';
    }
}
import cn.henry.study.common.bo.PartitionElements;

/**
 * description: 業(yè)務(wù)分治歸并處理接口
 *
 * @author Hlingoes 2020/5/22
 */
public interface OperationThreadService {

    /**
     * description: 任務(wù)總量
     *
     * @param args
     * @return long
     * @throws Exception
     * @author Hlingoes 2020/5/22
     */
    long count(Object[] args) throws Exception;

    /**
     * description: 在多線程分治任務(wù)之前的預(yù)處理方法,返回業(yè)務(wù)數(shù)據(jù)
     *
     * @param args
     * @return Object
     * @throws Exception
     * @author Hlingoes 2020/5/23
     */
    Object prepare(Object[] args) throws Exception;

    /**
     * description: 多線程的任務(wù)邏輯
     *
     * @param elements
     * @return java.lang.Object
     * @throws Exception
     * @author Hlingoes 2020/5/24
     */
    Object invoke(PartitionElements elements) throws Exception;

    /**
     * description: 多線程單個任務(wù)結(jié)束后的歸并方法
     *
     * @param elements
     * @param object
     * @return void
     * @throws Exception
     * @author Hlingoes 2020/5/23
     */
    void post(PartitionElements elements, Object object) throws Exception;

    /**
     * description: 歸并結(jié)果之后的尾處理
     *
     * @param object
     * @return java.lang.Object
     * @throws Exception
     * @author Hlingoes 2020/5/24
     */
    Object finished(Object object) throws Exception;

}
import cn.henry.study.common.bo.PartitionElements;
import cn.henry.study.common.service.OperationThreadService;
import cn.henry.study.common.thread.OperationThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * description: 多線程業(yè)務(wù)分治歸并處理
 *
 * @author Hlingoes
 * @date 2020/5/22 0:42
 */
public class MultiThreadOperationUtils {
    private static Logger logger = LoggerFactory.getLogger(MultiThreadOperationUtils.class);

    /**
     * description: 開啟多線程執(zhí)行任務(wù),按順序歸并處理任務(wù)結(jié)果
     * 按照默認線程數(shù),計算批量任務(wù)數(shù)
     *
     * @param service
     * @param args
     * @return void
     * @author Hlingoes 2020/5/23
     */
    public static Object batchExecute(OperationThreadService service, Object[] args) throws Exception {
        long totalCounts = service.count(args);
        long batchCounts = totalCounts / ThreadPoolExecutorUtils.defaultCoreSize;
        // 兼容任務(wù)少于核心線程數(shù)的情況
        if (batchCounts == 0) {
            batchCounts = 1L;
        }
        PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args);
        return batchExecute(service, elements);
    }

    /**
     * description: 開啟多線程執(zhí)行任務(wù),按順序歸并處理任務(wù)結(jié)果
     * 給定每頁顯示條目個數(shù)
     *
     * @param service
     * @param batchCounts
     * @param args
     * @return void
     * @author Hlingoes 2020/5/23
     */
    public static Object batchExecute(OperationThreadService service, long batchCounts, Object[] args) throws Exception {
        long totalCounts = service.count(args);
        PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args);
        return batchExecute(service, elements);
    }

    /**
     * description: 開啟多線程執(zhí)行分治任務(wù),按順序歸并處理任務(wù)結(jié)果
     *
     * @param service
     * @param elements
     * @return void
     * @author Hlingoes 2020/5/23
     */
    private static Object batchExecute(OperationThreadService service, PartitionElements elements) throws Exception {
        ThreadPoolExecutor executor = ThreadPoolExecutorUtils.getExecutorPool();
        // 在多線程分治任務(wù)之前的預(yù)處理方法,返回業(yè)務(wù)數(shù)據(jù)
        final Object obj = service.prepare(elements.getArgs());
        // 預(yù)防list和map的resize,初始化給定容量,可提高性能
        ArrayList<Future<PartitionElements>> futures = new ArrayList<>((int) elements.getPartitions());
        OperationThread opThread = null;
        Future<PartitionElements> future = null;
        // 添加線程任務(wù)
        for (int i = 0; i < elements.getPartitions(); i++) {
            // 劃定任務(wù)分布
            opThread = new OperationThread(new PartitionElements(i + 1, elements), service);
            future = executor.submit(opThread);
            futures.add(future);
        }
        // 關(guān)閉線程池
        executor.shutdown();
        // 阻塞線程,同步處理數(shù)據(jù)
        futures.forEach(f -> {
            try {
                // 線程單個任務(wù)結(jié)束后的歸并方法
                service.post(f.get(), obj);
            } catch (Exception e) {
                logger.error("post routine fail", e);
            }
        });
        return service.finished(obj);
    }

}
import cn.henry.study.common.bo.PartitionElements;
import cn.henry.study.common.service.OperationThreadService;
import cn.henry.study.common.utils.MultiThreadOperationUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * description: 多線程的測試用例
 *
 * @author Hlingoes
 * @date 2020/6/12 20:52
 */
public class MultiThreadServiceTest implements OperationThreadService {
    private static Logger logger = LoggerFactory.getLogger(MultiThreadServiceTest.class);

    @Override
    public long count(Object[] args) throws Exception {
        return 100L;
    }

    @Override
    public Object prepare(Object[] args) throws Exception {
        return "success";
    }

    @Override
    public Object invoke(PartitionElements elements) throws Exception {
        List<Object> list = new ArrayList<>((int) elements.getBatchCounts());
        for (int i = 0; i < elements.getIndex(); i++) {
            list.add("test_" + i);
        }
        return list;
    }

    @Override
    public void post(PartitionElements elements, Object object) throws Exception {
        String insertSql = "insert into test (id) values ";
        StringBuilder sb = new StringBuilder();
        List<Object> datas = (List<Object>) elements.getData();
        for (int i = 0; i < datas.size(); i++) {
            if ((i + 1) % 5 == 0 || (i + 1) == datas.size()) {
                sb.append("('" + datas.get(i) + "')");
                logger.info("{}: 測試insert sql: {}", elements, insertSql + sb.toString());
                sb = new StringBuilder();
            } else {
                sb.append("('" + datas.get(i) + "'),");
            }
        }
    }

    @Override
    public Object finished(Object object) throws Exception {
        return object;
    }

    @Test
    public void testBatchExecute() {
        try {
            Object object = MultiThreadOperationUtils.batchExecute(new MultiThreadServiceTest(), 10, new Object[]{"test"});
            logger.info("測試完成: {}", object.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

到此,相信大家對“java項目中的多線程實踐分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI