您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“java項目中的多線程實踐分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“java項目中的多線程實踐分析”吧!
項目開發(fā)中對于一些數(shù)據(jù)的處理需要用到多線程,比如文件的批量上傳,數(shù)據(jù)庫的分批寫入,大文件的分段下載等。 通常會使用spring自帶的線程池處理,做到對線程的定制化處理和更好的可控,建議使用自定義的線程池。 主要涉及到的幾個點:
1. 自定義線程工廠(ThreadFactoryBuilder),主要用于線程的命名,方便追蹤
2. 自定義的線程池(ThreadPoolExecutorUtils),可以按功能優(yōu)化配置參數(shù)
3. 一個抽象的多線程任務(wù)處理接口(OperationThreadService)和通用實現(xiàn)(OperationThread)
4. 統(tǒng)一的調(diào)度實現(xiàn)(MultiThreadOperationUtils)
核心思想:分治歸并,每個線程計算出自己的結(jié)果,最后統(tǒng)一匯總。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; /** * description: 自定義實現(xiàn)的線程池,遵循alibaba編程規(guī)范,使用ThreadPoolExecutor創(chuàng)建線程池使用 * 設(shè)置更有描述意義的線程名稱,默認的ThreadFactory,它給線程起名字大概規(guī)律就是pool-m-thread-n,如pool-1-thread-1。 * 當(dāng)分析一個thread dump時,很難知道線程的目的,需要有描述意義的線程名稱來分析追蹤問題 * 設(shè)置線程是否是守護線程,默認的ThreadFactory總是提交非守護線程 * 設(shè)置線程優(yōu)先級,默認ThreadFactory總是提交的一般優(yōu)先級線程 * <p> * CustomThreadFactoryBuilder類實現(xiàn)了一種優(yōu)雅的Builder Mechanism方式去得到一個自定義ThreadFactory實例。 * ThreadFactory接口中有一個接受Runnable類型參數(shù)的方法newThread(Runnable r), * 業(yè)務(wù)的factory邏輯就應(yīng)該寫在這個方法中,去配置線程名稱、優(yōu)先級、守護線程狀態(tài)等屬性。 * 原文鏈接:https://blog.csdn.net/zombres/article/details/80497515 * * @author Hlingoes * @date 2019/12/22 0:45 */ public class ThreadFactoryBuilder { private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class); private String nameFormat = null; private boolean daemon = false; private int priority = Thread.NORM_PRIORITY; public ThreadFactoryBuilder setNameFormat(String nameFormat) { if (nameFormat == null) { throw new NullPointerException(); } this.nameFormat = nameFormat; return this; } public ThreadFactoryBuilder setDaemon(boolean daemon) { this.daemon = daemon; return this; } public ThreadFactoryBuilder setPriority(int priority) { if (priority < Thread.MIN_PRIORITY) { throw new IllegalArgumentException(String.format( "Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY)); } if (priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException(String.format( "Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY)); } this.priority = priority; return this; } public ThreadFactory build() { return build(this); } private static ThreadFactory build(ThreadFactoryBuilder builder) { final String nameFormat = builder.nameFormat; final Boolean daemon = builder.daemon; final Integer priority = builder.priority; final AtomicLong count = new AtomicLong(0); return (Runnable runnable) -> { Thread thread = new Thread(runnable); if (nameFormat != null) { thread.setName(String.format(nameFormat, count.getAndIncrement())); } if (daemon != null) { thread.setDaemon(daemon); } thread.setPriority(priority); thread.setUncaughtExceptionHandler((t, e) -> { String threadName = t.getName(); logger.error("error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e); }); return thread; }; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; /** * description: 創(chuàng)建通用的線程池 * <p> * corePoolSize:線程池中核心線程數(shù)量 * maximumPoolSize:線程池同時允許存在的最大線程數(shù)量 * 內(nèi)部處理邏輯如下: * 當(dāng)線程池中工作線程數(shù)小于corePoolSize,創(chuàng)建新的工作線程來執(zhí)行該任務(wù),不管線程池中是否存在空閑線程。 * 如果線程池中工作線程數(shù)達到corePoolSize,新任務(wù)嘗試放入隊列,入隊成功的任務(wù)將等待工作線程空閑時調(diào)度。 * 1. 如果隊列滿并且線程數(shù)小于maximumPoolSize,創(chuàng)建新的線程執(zhí)行該任務(wù)(注意:隊列中的任務(wù)繼續(xù)排序)。 * 2. 如果隊列滿且線程數(shù)超過maximumPoolSize,拒絕該任務(wù) * <p> * keepAliveTime * 當(dāng)線程池中工作線程數(shù)大于corePoolSize,并且線程空閑時間超過keepAliveTime,則這些線程將被終止。 * 同樣,可以將這種策略應(yīng)用到核心線程,通過調(diào)用allowCoreThreadTimeout來實現(xiàn)。 * <p> * BlockingQueue * 任務(wù)等待隊列,用于緩存暫時無法執(zhí)行的任務(wù)。分為如下三種堵塞隊列: * 1. 直接遞交,如SynchronousQueue,該策略直接將任務(wù)直接交給工作線程。如果當(dāng)前沒有空閑工作線程,創(chuàng)建新線程。 * 這種策略最好是配合unbounded線程數(shù)來使用,從而避免任務(wù)被拒絕。但當(dāng)任務(wù)生產(chǎn)速度大于消費速度,將導(dǎo)致線程數(shù)不斷的增加。 * 2. 無界隊列,如LinkedBlockingQueue,當(dāng)工作的線程數(shù)達到核心線程數(shù)時,新的任務(wù)被放在隊列上。 * 因此,永遠不會有大于corePoolSize的線程被創(chuàng)建,maximumPoolSize參數(shù)失效。 * 這種策略比較適合所有的任務(wù)都不相互依賴,獨立執(zhí)行。 * 但是當(dāng)任務(wù)處理速度小于任務(wù)進入速度的時候會引起隊列的無限膨脹。 * 3. 有界隊列,如ArrayBlockingQueue,按前面描述的corePoolSize、maximumPoolSize、BlockingQueue處理邏輯處理。 * 隊列長度和maximumPoolSize兩個值會相互影響: * 長隊列 + 小maximumPoolSize。會減少CPU的使用、操作系統(tǒng)資源、上下文切換的消耗,但是會降低吞吐量, * 如果任務(wù)被頻繁的阻塞如IO線程,系統(tǒng)其實可以調(diào)度更多的線程。 * 短隊列 + 大maximumPoolSize。CPU更忙,但會增加線程調(diào)度的消耗. * 總結(jié)一下,IO密集型可以考慮多些線程來平衡CPU的使用,CPU密集型可以考慮少些線程減少線程調(diào)度的消耗 * * @author Hlingoes * @citation https://blog.csdn.net/wanghao112956/article/details/99292107 * @citation https://www.jianshu.com/p/896b8e18501b * @date 2020/2/26 0:46 */ public class ThreadPoolExecutorUtils { private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class); public static int defaultCoreSize = Runtime.getRuntime().availableProcessors(); private static int pollWaitingTime = 60; private static int defaultQueueSize = 10 * 1000; private static int defaultMaxSize = 4 * defaultCoreSize; private static String threadName = "custom-pool"; /** * description: 創(chuàng)建線程池 * * @param waitingTime * @param coreSize * @param maxPoolSize * @param queueSize * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/4/12 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime, int coreSize, int maxPoolSize, int queueSize) { pollWaitingTime = waitingTime; defaultCoreSize = coreSize; defaultMaxSize = maxPoolSize; defaultQueueSize = queueSize; return getExecutorPool(); } /** * description: 創(chuàng)建線程池 * * @param waitingTime * @param queueSize * @param maxPoolSize * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize, int maxPoolSize) { pollWaitingTime = waitingTime; defaultQueueSize = queueSize; defaultMaxSize = maxPoolSize; return getExecutorPool(); } /** * description: 創(chuàng)建線程池 * * @param waitingTime * @param queueSize * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize) { pollWaitingTime = waitingTime; defaultQueueSize = queueSize; return getExecutorPool(); } /** * description: 創(chuàng)建線程池 * * @param waitingTime * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(int waitingTime) { pollWaitingTime = waitingTime; return getExecutorPool(); } /** * description: 創(chuàng)建線程池 * * @param * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/6/6 */ public static ThreadPoolExecutor getExecutorPool() { return getExecutorPool(threadName); } /** * description: 創(chuàng)建線程池 * * @param * @return java.util.concurrent.ThreadPoolExecutor * @author Hlingoes 2020/3/20 */ public static ThreadPoolExecutor getExecutorPool(String threadName) { ThreadFactory factory = new ThreadFactoryBuilder() .setNameFormat(threadName + "-%d") .build(); BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(defaultQueueSize); ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(defaultCoreSize, defaultMaxSize, 60, TimeUnit.SECONDS, queue, factory, (r, executor) -> { /** * 自定義的拒絕策略 * 當(dāng)提交給線程池的某一個新任務(wù)無法直接被線程池中“核心線程”直接處理, * 又無法加入等待隊列,也無法創(chuàng)建新的線程執(zhí)行; * 又或者線程池已經(jīng)調(diào)用shutdown()方法停止了工作; * 又或者線程池不是處于正常的工作狀態(tài); * 這時候ThreadPoolExecutor線程池會拒絕處理這個任務(wù) */ if (!executor.isShutdown()) { logger.warn("ThreadPoolExecutor is over working, please check the thread tasks! "); } }) { /** * description: 針對提交給線程池的任務(wù)可能會拋出異常這一問題, * 可自行實現(xiàn)線程池的afterExecute方法,或者實現(xiàn)Thread的UncaughtExceptionHandler接口 * ThreadFactoryBuilder中已經(jīng)實現(xiàn)了UncaughtExceptionHandler接口,這里是為了進一步兼容 * * @param r * @param t * @return void * @author Hlingoes 2020/5/27 */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Future<?> future = (Future<?>) r; future.get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } if (t != null) { logger.error("customThreadPool error msg: {}", t.getMessage(), t); } } }; /** * 備選方法,事先知道會有很多任務(wù)會提交給這個線程池,可以在初始化的時候完成核心線程的創(chuàng)建,提高系統(tǒng)性能 * 一個線程池創(chuàng)建出來之后,在沒有給它提交任何任務(wù)之前,這個線程池中的線程數(shù)為0 * 一個個去創(chuàng)建新線程開銷太大,影響系統(tǒng)性能 * 可以在創(chuàng)建線程池的時候就將所有的核心線程全部一次性創(chuàng)建完畢,系統(tǒng)起來之后就可以直接使用 */ poolExecutor.prestartAllCoreThreads(); return poolExecutor; } /** * description: 所有任務(wù)執(zhí)行完之后,釋放線程池資源 * * @param pool * @return void * @author Hlingoes 2020/3/20 */ public static void closeAfterComplete(ThreadPoolExecutor pool) { /** * 當(dāng)線程池調(diào)用該方法時,線程池的狀態(tài)則立刻變成SHUTDOWN狀態(tài)。 * 此時,則不能再往線程池中添加任何任務(wù),否則將會拋出RejectedExecutionException異常。 * 但是,此時線程池不會立刻退出,直到添加到線程池中的任務(wù)都已經(jīng)處理完成,才會退出。 * 唯一的影響就是不能再提交任務(wù)了,正則執(zhí)行的任務(wù)即使在阻塞著也不會結(jié)束,在排隊的任務(wù)也不會取消。 */ pool.shutdown(); try { /** * awaitTermination方法可以設(shè)定線程池在關(guān)閉之前的最大超時時間, * 如果在超時時間結(jié)束之前線程池能夠正常關(guān)閉,這個方法會返回true,否則,一旦超時,就會返回false。 * 通常來說不可能無限制地等待下去,因此需要預(yù)估一個合理的超時時間,然后使用這個方法 */ if (!pool.awaitTermination(pollWaitingTime, TimeUnit.SECONDS)) { /** * 如果awaitTermination方法返回false,又希望盡可能在線程池關(guān)閉之后再做其他資源回收工作, * 可以考慮再調(diào)用一下shutdownNow方法, * 此時隊列中所有尚未被處理的任務(wù)都會被丟棄,同時會設(shè)置線程池中每個線程的中斷標志位。 * shutdownNow并不保證一定可以讓正在運行的線程停止工作,除非提交給線程的任務(wù)能夠正確響應(yīng)中斷。 * 到了這一步,可以考慮繼續(xù)調(diào)用awaitTermination方法,或者直接放棄,去做接下來要做的事情。 */ pool.shutdownNow(); } } catch (InterruptedException e) { logger.error("ThreadPool overtime: {}", e.getMessage()); //(重新)丟棄所有尚未被處理的任務(wù),同時會設(shè)置線程池中每個線程的中斷標志位 pool.shutdownNow(); // 保持中斷狀態(tài) Thread.currentThread().interrupt(); } } }
import java.util.Arrays; /** * description: 分段參數(shù) * * @author Hlingoes * @date 2020/5/22 23:50 */ public class PartitionElements { /** * 當(dāng)前的分段任務(wù)索引 */ private long index; /** * 批量處理的任務(wù)個數(shù) */ private long batchCounts; /** * 任務(wù)的分段個數(shù) */ private long partitions; /** * 任務(wù)總數(shù) */ private long totalCounts; private Object[] args; private Object data; public PartitionElements() { } public PartitionElements(long batchCounts, long totalCounts, Object[] args) { this.batchCounts = batchCounts; this.totalCounts = totalCounts; this.partitions = aquirePartitions(totalCounts, batchCounts); this.args = args; } public PartitionElements(long index, PartitionElements elements) { this.index = index; this.batchCounts = elements.getBatchCounts(); this.partitions = elements.getPartitions(); this.totalCounts = elements.getTotalCounts(); this.args = elements.getArgs(); } /** * description: 根據(jù)任務(wù)總量和單次任務(wù)處理量,計算任務(wù)個數(shù) * * @param totalCounts * @param batchCounts * @return long partitions * @author Hlingoes 2020/5/23 */ public long aquirePartitions(long totalCounts, long batchCounts) { long partitions = totalCounts / batchCounts; if (totalCounts % batchCounts != 0) { partitions = partitions + 1; } // 兼容任務(wù)總數(shù)total = 1 的情況 if (partitions == 0) { partitions = 1; } return partitions; } public long getIndex() { return index; } public void setIndex(long index) { this.index = index; } public long getBatchCounts() { return batchCounts; } public void setBatchCounts(long batchCounts) { this.batchCounts = batchCounts; } public long getPartitions() { return partitions; } public void setPartitions(long partitions) { this.partitions = partitions; } public long getTotalCounts() { return totalCounts; } public void setTotalCounts(long totalCounts) { this.totalCounts = totalCounts; } public Object[] getArgs() { return args; } public void setArgs(Object[] args) { this.args = args; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } @Override public String toString() { return "PartitionElements{" + "index=" + index + ", batchCounts=" + batchCounts + ", partitions=" + partitions + ", totalCounts=" + totalCounts + ", args=" + Arrays.toString(args) + '}'; } }
import cn.henry.study.common.bo.PartitionElements; /** * description: 業(yè)務(wù)分治歸并處理接口 * * @author Hlingoes 2020/5/22 */ public interface OperationThreadService { /** * description: 任務(wù)總量 * * @param args * @return long * @throws Exception * @author Hlingoes 2020/5/22 */ long count(Object[] args) throws Exception; /** * description: 在多線程分治任務(wù)之前的預(yù)處理方法,返回業(yè)務(wù)數(shù)據(jù) * * @param args * @return Object * @throws Exception * @author Hlingoes 2020/5/23 */ Object prepare(Object[] args) throws Exception; /** * description: 多線程的任務(wù)邏輯 * * @param elements * @return java.lang.Object * @throws Exception * @author Hlingoes 2020/5/24 */ Object invoke(PartitionElements elements) throws Exception; /** * description: 多線程單個任務(wù)結(jié)束后的歸并方法 * * @param elements * @param object * @return void * @throws Exception * @author Hlingoes 2020/5/23 */ void post(PartitionElements elements, Object object) throws Exception; /** * description: 歸并結(jié)果之后的尾處理 * * @param object * @return java.lang.Object * @throws Exception * @author Hlingoes 2020/5/24 */ Object finished(Object object) throws Exception; }
import cn.henry.study.common.bo.PartitionElements; import cn.henry.study.common.service.OperationThreadService; import cn.henry.study.common.thread.OperationThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * description: 多線程業(yè)務(wù)分治歸并處理 * * @author Hlingoes * @date 2020/5/22 0:42 */ public class MultiThreadOperationUtils { private static Logger logger = LoggerFactory.getLogger(MultiThreadOperationUtils.class); /** * description: 開啟多線程執(zhí)行任務(wù),按順序歸并處理任務(wù)結(jié)果 * 按照默認線程數(shù),計算批量任務(wù)數(shù) * * @param service * @param args * @return void * @author Hlingoes 2020/5/23 */ public static Object batchExecute(OperationThreadService service, Object[] args) throws Exception { long totalCounts = service.count(args); long batchCounts = totalCounts / ThreadPoolExecutorUtils.defaultCoreSize; // 兼容任務(wù)少于核心線程數(shù)的情況 if (batchCounts == 0) { batchCounts = 1L; } PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args); return batchExecute(service, elements); } /** * description: 開啟多線程執(zhí)行任務(wù),按順序歸并處理任務(wù)結(jié)果 * 給定每頁顯示條目個數(shù) * * @param service * @param batchCounts * @param args * @return void * @author Hlingoes 2020/5/23 */ public static Object batchExecute(OperationThreadService service, long batchCounts, Object[] args) throws Exception { long totalCounts = service.count(args); PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args); return batchExecute(service, elements); } /** * description: 開啟多線程執(zhí)行分治任務(wù),按順序歸并處理任務(wù)結(jié)果 * * @param service * @param elements * @return void * @author Hlingoes 2020/5/23 */ private static Object batchExecute(OperationThreadService service, PartitionElements elements) throws Exception { ThreadPoolExecutor executor = ThreadPoolExecutorUtils.getExecutorPool(); // 在多線程分治任務(wù)之前的預(yù)處理方法,返回業(yè)務(wù)數(shù)據(jù) final Object obj = service.prepare(elements.getArgs()); // 預(yù)防list和map的resize,初始化給定容量,可提高性能 ArrayList<Future<PartitionElements>> futures = new ArrayList<>((int) elements.getPartitions()); OperationThread opThread = null; Future<PartitionElements> future = null; // 添加線程任務(wù) for (int i = 0; i < elements.getPartitions(); i++) { // 劃定任務(wù)分布 opThread = new OperationThread(new PartitionElements(i + 1, elements), service); future = executor.submit(opThread); futures.add(future); } // 關(guān)閉線程池 executor.shutdown(); // 阻塞線程,同步處理數(shù)據(jù) futures.forEach(f -> { try { // 線程單個任務(wù)結(jié)束后的歸并方法 service.post(f.get(), obj); } catch (Exception e) { logger.error("post routine fail", e); } }); return service.finished(obj); } }
import cn.henry.study.common.bo.PartitionElements; import cn.henry.study.common.service.OperationThreadService; import cn.henry.study.common.utils.MultiThreadOperationUtils; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; /** * description: 多線程的測試用例 * * @author Hlingoes * @date 2020/6/12 20:52 */ public class MultiThreadServiceTest implements OperationThreadService { private static Logger logger = LoggerFactory.getLogger(MultiThreadServiceTest.class); @Override public long count(Object[] args) throws Exception { return 100L; } @Override public Object prepare(Object[] args) throws Exception { return "success"; } @Override public Object invoke(PartitionElements elements) throws Exception { List<Object> list = new ArrayList<>((int) elements.getBatchCounts()); for (int i = 0; i < elements.getIndex(); i++) { list.add("test_" + i); } return list; } @Override public void post(PartitionElements elements, Object object) throws Exception { String insertSql = "insert into test (id) values "; StringBuilder sb = new StringBuilder(); List<Object> datas = (List<Object>) elements.getData(); for (int i = 0; i < datas.size(); i++) { if ((i + 1) % 5 == 0 || (i + 1) == datas.size()) { sb.append("('" + datas.get(i) + "')"); logger.info("{}: 測試insert sql: {}", elements, insertSql + sb.toString()); sb = new StringBuilder(); } else { sb.append("('" + datas.get(i) + "'),"); } } } @Override public Object finished(Object object) throws Exception { return object; } @Test public void testBatchExecute() { try { Object object = MultiThreadOperationUtils.batchExecute(new MultiThreadServiceTest(), 10, new Object[]{"test"}); logger.info("測試完成: {}", object.toString()); } catch (Exception e) { e.printStackTrace(); } } }
到此,相信大家對“java項目中的多線程實踐分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。