您好,登錄后才能下訂單哦!
這篇文章主要介紹“java并發(fā)包的介紹以及線程池的創(chuàng)建和使用”,在日常操作中,相信很多人在java并發(fā)包的介紹以及線程池的創(chuàng)建和使用問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”java并發(fā)包的介紹以及線程池的創(chuàng)建和使用”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
1.java并發(fā)包介紹
JDK5.0(JDK1.5更名后)以后的版本引入高級(jí)并發(fā)特性,大多數(shù)的特性在java.util.concurrent包中,是專門用于多線程編程的,充分利用了現(xiàn)代多處理器和多核心系統(tǒng)的功能以編寫(xiě)大規(guī)模并發(fā)應(yīng)用程序。主要包括原子量、并發(fā)集合、同步器、可重入鎖,并對(duì)線程池的構(gòu)造提供了強(qiáng)力的支持
2.線程池
java.util.concurrent.Executors提供了一個(gè) java.util.concurrent.Executor接口的實(shí)現(xiàn)用于創(chuàng)建線程池
多線程技術(shù)主要解決處理器單元內(nèi)多個(gè)線程執(zhí)行的問(wèn)題,它可以顯著減少處理器單元的閑置時(shí)間,增加處理器單元的吞吐能力。
假設(shè)服務(wù)器完成一項(xiàng)任務(wù)所需時(shí)間為:T1 創(chuàng)建線程時(shí)間,T2 在線程中執(zhí)行任務(wù)的時(shí)間,T3 銷毀線程時(shí)間。如果T1 + T3 遠(yuǎn)大于 T2,則可以采用線程池,以提高服務(wù)器性能,減少創(chuàng)建和銷毀線程所需消耗的時(shí)間。
一個(gè)線程池由以下四個(gè)基本部分組成:
線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池,包括 創(chuàng)建線程池,銷毀線程池,添加新任務(wù);
工作線程(PoolWorker):線程池中線程,在沒(méi)有任務(wù)時(shí)處于等待狀態(tài),可以循環(huán)的執(zhí)行任務(wù);
任務(wù)接口(Task):每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的執(zhí)行,它主要規(guī)定了任務(wù)的入口,任務(wù)執(zhí)行完后的收尾工作,任務(wù)的執(zhí)行狀態(tài)等;
任務(wù)隊(duì)列(taskQueue):用于存放沒(méi)有處理的任務(wù)。提供一種緩沖機(jī)制。
線程池技術(shù)正是關(guān)心如何縮短或調(diào)整T1,T3時(shí)間從而提高服務(wù)器程序性能的技術(shù)。它把T1,T3分別安排在服務(wù)器程序的啟動(dòng)和結(jié)束的時(shí)間段或者一些空閑的時(shí)間段,這樣在服務(wù)器程序處理客戶請(qǐng)求時(shí),免去了線程創(chuàng)建和銷毀的開(kāi)銷。
線程池不僅調(diào)整T1,T3產(chǎn)生的時(shí)間段,而且它還顯著減少了創(chuàng)建線程的數(shù)目,看一個(gè)例子:
假設(shè)一個(gè)服務(wù)器一天要處理100000個(gè)請(qǐng)求,并且每個(gè)請(qǐng)求需要一個(gè)單獨(dú)的線程完成。在線程池中,線程數(shù)一般是固定的,
一般線程池大小是遠(yuǎn)小于100000。所以利用線程池的服務(wù)器程序不會(huì)為了創(chuàng)建100000而在處理請(qǐng)求時(shí)浪費(fèi)時(shí)間,從而提高效率。
線程池的五種創(chuàng)建方式
Single Thread Executor:只有一個(gè)線程的線程池,因此所提交的任務(wù)是順序執(zhí)行,Executors.newSingleThreadExecutor();
Cached Thread Pool:線程池里有很多線程需同時(shí)進(jìn)行,舊的可用線程將被新的任務(wù)觸發(fā)從而重新執(zhí)行,如果線程超過(guò)60秒內(nèi)沒(méi)有執(zhí)行,那么將被終止并從池中刪除Executors.newCachedThreadPool();
Fixed Thread Pool:擁有固定線程數(shù)的線程池,如果沒(méi)有任務(wù)執(zhí)行,那么線程會(huì)一直等待,Executors.newFixedThreadPool(10);在構(gòu)造函數(shù)中的參數(shù)10是線程池的大小,你可以隨意設(shè)置,也可以和cpu的數(shù)量保持一致,獲取cpu的數(shù)量int cpuNums = Runtime.getRuntime().availableProcessors();
Scheduled Thread Pool:用來(lái)調(diào)度即將執(zhí)行的任務(wù)的線程池Executors.newScheduledThreadPool();
Sing Thread Scheduled Pool:只有一個(gè)線程,用來(lái)調(diào)度任務(wù)在指定時(shí)間執(zhí)行Executors.newSingleThreadScheduledExecutor();
3.線程池的使用
以下用Fixed Thread Pool作為示范,提供一個(gè)使用參考
LogNumVo
package com.ithzk.threadpool; /** * 用作返回 執(zhí)行的數(shù)量的 * @author hzk * @date 2018/3/29 */ public class LogNumVo { private static final long serialVersionUID = -5541722936350755569L; private Integer dataNum; private Integer successNum; private Integer waitNum; public Integer getDataNum() { return dataNum; } public void setDataNum(Integer dataNum) { this.dataNum = dataNum; } public Integer getSuccessNum() { return successNum; } public void setSuccessNum(Integer successNum) { this.successNum = successNum; } public Integer getWaitNum() { return waitNum; } public void setWaitNum(Integer waitNum) { this.waitNum = waitNum; } }
DealObject
package com.ithzk.threadpool; /** * @author hzk * @date 2018/3/29 */ public class DealObject { private Integer identifyId; private String data; public DealObject(Integer identifyId, String data) { this.identifyId = identifyId; this.data = data; } public DealObject() { } public Integer getIdentifyId() { return identifyId; } public void setIdentifyId(Integer identifyId) { this.identifyId = identifyId; } public String getData() { return data; } public void setData(String data) { this.data = data; } }
AbstractCalculateThread
package com.ithzk.threadpool; import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; /** * @author hzk * @date 2018/3/29 */ public class AbstractCalculateThread<T> implements Callable<String> { protected Collection<T> insertList; protected CountDownLatch countd; protected String threadCode; protected String batchNumber; public Collection<T> getInsertList() { return insertList; } public void setInsertList(Collection<T> insertList) { this.insertList = insertList; } public CountDownLatch getCountd() { return countd; } public void setCountd(CountDownLatch countd) { this.countd = countd; } public String getThreadCode() { return threadCode; } public void setThreadCode(String threadCode) { this.threadCode = threadCode; } public String getBatchNumber() { return batchNumber; } public void setBatchNumber(String batchNumber) { this.batchNumber = batchNumber; } public AbstractCalculateThread() { super(); } public AbstractCalculateThread(Collection<T> insertList, CountDownLatch countd, String threadCode,String batchNumber) { super(); this.insertList = insertList; this.countd = countd; this.threadCode = threadCode; this.batchNumber = batchNumber; } public String call() throws Exception { return null; } }
CalculateDealThread
package com.ithzk.threadpool; import java.util.Collection; import java.util.concurrent.CountDownLatch; /** * @author hzk * @date 2018/3/29 */ public class CalculateDealThread extends AbstractCalculateThread<DealObject> { private ExecutorPool executorPool = SpringContextUtil.getBean(ExecutorPool.class); @Override public String call() throws Exception { try { System.out.println("========開(kāi)始跑線程【"+threadCode+"】"); return executorPool.syncBatchDealObject(insertList,batchNumber); } catch (Exception e) { e.printStackTrace(); System.out.println("========開(kāi)始跑線程【"+threadCode+"】:"+e.getMessage()); }finally { countd.countDown(); } return null; } public CalculateDealThread() { super(); } public CalculateDealThread(Collection<DealObject> insertList, CountDownLatch countd, String threadCode,String batchNumber) { super(insertList, countd, threadCode, batchNumber); } }
ExecutorPool
package com.ithzk.threadpool; import java.util.*; import java.util.concurrent.*; /** * @author hzk * @date 2018/3/29 */ public class ExecutorPool { /** * 模擬需要處理數(shù)據(jù)的大小 */ private static final int ARRAY_COUNT = 50000; /** * 開(kāi)啟多線程處理的條件 */ private static final int MULTI_THREAD_STARTCOUNT = 10000; /** * 批量處理的大小 */ private static final int BATCH_DEAL_SIZE = 100; /** * 每次開(kāi)啟線程數(shù)量 */ public static final int THREAD_POOL_NUM=10; public static void main(String[] args){ testExecutorPool(); } public static void testExecutorPool(){ ArrayList<DealObject> dealObjects = new ArrayList<DealObject>(); for (int i = 0;i<ARRAY_COUNT;i++){ DealObject dealObject = new DealObject(i,"data_"+i); dealObjects.add(dealObject); System.out.println("Data add success current:"+i); } int size = dealObjects.size(); int successNum = 0; int waitNum = 0; System.out.println("需要處理的數(shù)據(jù)數(shù)據(jù)量為:"+size); // 判斷數(shù)據(jù)是否大于10000 如果大于則開(kāi)啟線程池 跑數(shù)據(jù) if (size > MULTI_THREAD_STARTCOUNT) { try { System.out.println("===================dataNum > 1000 | Multiple Thread Run======================="); // 每次新增處理的條數(shù) int batchInsertSize = BATCH_DEAL_SIZE; // 定義保存的線程池 ExecutorService executorInsert = Executors.newFixedThreadPool(THREAD_POOL_NUM); // 定義保存過(guò)程中返回的線程執(zhí)行返回參數(shù) List<Future<String>> futureListIsert = new ArrayList<Future<String>>(); // 線程 修改list List<Map<Integer, DealObject>> listDealObjects = new ArrayList<Map<Integer, DealObject>>(); List<Map<Integer, DealObject>> listLiveSyncLogInsert = pointDateClassify(dealObjects, batchInsertSize, listDealObjects); if (null != listLiveSyncLogInsert && !listDealObjects.isEmpty()) { System.out.println("===================切割后的大小:"+listLiveSyncLogInsert.size()+"======================="); //配合使用CountDownLatch為了保證在執(zhí)行完所有子程序之后再執(zhí)行主程序 CountDownLatch countd = new CountDownLatch(listLiveSyncLogInsert.size()); for (int j = 0; j < listLiveSyncLogInsert.size(); j++) { Map<Integer, DealObject> insert = listLiveSyncLogInsert.get(j); Future<String> future = executorInsert.submit(new CalculateDealThread(insert.values(), countd,"executor_pool_test_thread", null)); futureListIsert.add(future); } } // 等待線程執(zhí)行完成 executorInsert.shutdown(); for (Future<String> future : futureListIsert) { String json = future.get(); if (null != json && !"".equals(json)) { 將返回的json格式數(shù)據(jù)轉(zhuǎn)換為實(shí)體類 進(jìn)行業(yè)務(wù)記錄 LogNumVo logNumVo = JSON.toJavaObject(JSON.parseObject(json),LogNumVo.class); successNum += logNumVo.getSuccessNum(); waitNum += logNumVo.getWaitNum(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } /** * 拆分線程數(shù) * 假設(shè)集合中有50000個(gè)元素 則按照100個(gè)一組切分 可切分為500組 * 即每個(gè)線程一次處理一組(100個(gè)元素) * * @author * @param lPostUploadIntegralList * @param batchInsertSize * @param listPostUploadIsert */ @SuppressWarnings("all") public static List<Map<Integer, DealObject>> pointDateClassify(List<DealObject> lPostUploadIntegralList,int batchInsertSize, List<Map<Integer, DealObject>> listJSONObjectUpdate) { List<Map<Integer, DealObject>> listLiveSyncLogInsert = new Vector<Map<Integer, DealObject>>(); // 新增數(shù)據(jù)list List<DealObject> integralListInsert = lPostUploadIntegralList; System.out.println("============integralListInsert.size()=====:" + integralListInsert.size()); // 拆分?jǐn)?shù)據(jù)(拆成多個(gè)List) int inserti = 0; if (integralListInsert != null && integralListInsert.size() > 0) { ConcurrentHashMap<Integer, DealObject> integralListIns = null; for (int l = 0; l < integralListInsert.size(); l++) { if (integralListIns == null) { integralListIns = new ConcurrentHashMap<Integer, DealObject>(); } integralListIns.put(integralListInsert.get(l).getIdentifyId(), integralListInsert.get(l)); inserti++; if ((inserti % batchInsertSize) == 0) { listLiveSyncLogInsert.add(integralListIns); integralListIns = null; } else { // 最后100條或不足100條數(shù)據(jù) if ((l + 1) == integralListInsert.size()) { listLiveSyncLogInsert.add(integralListIns); } } } } System.out.println("=============listPostUploadInsert.size()====:" + listLiveSyncLogInsert.size()); return listLiveSyncLogInsert; } /** * 多線程保存數(shù)據(jù)至數(shù)據(jù)庫(kù) */ public String syncBatchDealObject(Collection<DealObject> insertList,String batchNumber) { int successNum = 0, waitNum = 0; Date currentDate = new Date(System.currentTimeMillis()); for (DealObject dealObject : insertList) { try { int icount = syncDealObject(dealObject,currentDate); if(icount > 0){ successNum ++; }else { waitNum ++; } } catch (Exception e) { e.printStackTrace(); ++waitNum; } } LogNumVo logNum = new LogNumVo(); logNum.setDataNum(0); logNum.setSuccessNum(successNum); logNum.setWaitNum(waitNum); // 將記錄實(shí)體類轉(zhuǎn)為json格式反饋給線程池 return JSON.toJSONString(logNum); } /** * 處理數(shù)據(jù)業(yè)務(wù) * @param dealObject * @param currentDate * @return */ private int syncDealObject(DealObject dealObject,Date currentDate){ int successNum = 0; //業(yè)務(wù)處理邏輯 if(null != dealObject.getData()){ successNum++; } return successNum; } }
4.BlockingQueue
BlockingQueue也是java.util.concurrent下的主要用來(lái)控制線程同步的工具。主要的方法是:put、take一對(duì)阻塞存??;add、poll一對(duì)非阻塞存取。
插入:
add(anObject)
把a(bǔ)nObject加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常
offer(anObject)
把a(bǔ)nObject加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則返回false.
put(anObject)
把a(bǔ)nObject加到BlockingQueue里,如果BlockQueue沒(méi)有空間,則調(diào)用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續(xù).
讀?。?/strong>
poll(time)
取走BlockingQueue里排在首位的對(duì)象,若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,取不到時(shí)返回null
take()
取走BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到Blocking有新的對(duì)象被加入為止
其他:
int remainingCapacity()
返回理想情況下(沒(méi)有內(nèi)存和資源約束)此隊(duì)列可接受并且不會(huì)被阻塞的附加元素?cái)?shù)量。
該數(shù)量總是等于此隊(duì)列的初始容量,小于隊(duì)列的當(dāng)前 size(返回隊(duì)列剩余的容量)。
注意,不能總是通過(guò)檢查 remainingcapacity 來(lái)斷定試圖插入一個(gè)元素是否成功,因?yàn)榭赡苁橇硪粋€(gè)線程將插入或移除某個(gè)元
素。
boolean remove(Object o)
從隊(duì)列移除元素,如果存在,即移除一個(gè)或者更多,隊(duì)列改變了返回true
public boolean contains(Object o)
查看隊(duì)列是否存在這個(gè)元素,存在返回true
int drainTo(Collection<? super E> c)
傳入的集合中的元素,如果在隊(duì)列中存在,那么將隊(duì)列中的元素移動(dòng)到集合中
int drainTo(Collection<? super E> c, int maxElements)
和上面方法的區(qū)別在于,制定了移動(dòng)的數(shù)量
以下是一個(gè)BlockQueue的基本使用參考:
Producer
package com.ithzk.BlockingQueueTest; import java.util.concurrent.BlockingQueue; /** * @author hzk * @date 2018/3/31 */ public class Producer implements Runnable{ BlockingQueue<String> blockingQueue; public Producer(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { try { String threadIdentify = "A Producer,生產(chǎn)線程"+Thread.currentThread().getName(); blockingQueue.put(threadIdentify); System.out.println("Produce success! Thread:"+Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer
package com.ithzk.BlockingQueueTest; import java.util.concurrent.BlockingQueue; /** * @author hzk * @date 2018/3/31 */ public class Consumer implements Runnable{ BlockingQueue<String> blockingQueue; public Consumer(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { try { String consumer = Thread.currentThread().getName(); System.out.println("Current Consumer Thread:"+consumer); //如果隊(duì)列為空會(huì)阻塞當(dāng)前線程 String take = blockingQueue.take(); System.out.println(consumer + " consumer get a product:"+take); } catch (InterruptedException e) { e.printStackTrace(); } } }
BlockTest
package com.ithzk.BlockingQueueTest; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * @author hzk * @date 2018/3/31 */ public class BlockTest { public static void main(String[] args) throws InterruptedException { // 不設(shè)置的話,LinkedBlockingQueue默認(rèn)大小為Integer.MAX_VALUE // BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(); // BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(2); BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(2); Consumer consumer = new Consumer(blockingQueue); Producer producer = new Producer(blockingQueue); for (int i = 0; i < 3; i++) { new Thread(producer, "Producer" + (i + 1)).start(); } for (int i = 0; i < 5; i++) { new Thread(consumer, "Consumer" + (i + 1)).start(); } Thread.sleep(5000); new Thread(producer, "Producer" + (5)).start(); } }
BlockingQueue有四個(gè)具體的實(shí)現(xiàn)類,常用的兩種實(shí)現(xiàn)類為:
ArrayBlockingQueue:一個(gè)由數(shù)組支持的有界阻塞隊(duì)列,規(guī)定大小的BlockingQueue,其構(gòu)造函數(shù)必須帶一個(gè)int參數(shù)來(lái)指明其大小.其所含的對(duì)象是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue:大小不定的BlockingQueue,若其構(gòu)造函數(shù)帶一個(gè)規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制。
若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來(lái)決定.其所含的對(duì)象是以FIFO(先入先出)順序排序的。
LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認(rèn)最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊(duì)列滿的時(shí)候會(huì)阻塞直到有隊(duì)列成員被消費(fèi),take方法在隊(duì)列空的時(shí)候會(huì)阻塞,直到有隊(duì)列成員被放進(jìn)來(lái)。
LinkedBlockingQueue和ArrayBlockingQueue區(qū)別
LinkedBlockingQueue和ArrayBlockingQueue比較起來(lái),它們背后所用的數(shù)據(jù)結(jié)構(gòu)不一樣,導(dǎo)致LinkedBlockingQueue的數(shù)據(jù)吞吐量要大于ArrayBlockingQueue,但在線程數(shù)量很大時(shí)其性能的可預(yù)見(jiàn)性低于ArrayBlockingQueue.
到此,關(guān)于“java并發(fā)包的介紹以及線程池的創(chuàng)建和使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。