溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么快速了解Java線程池

發(fā)布時(shí)間:2021-12-08 09:24:42 來(lái)源:億速云 閱讀:154 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“怎么快速了解Java線程池”,在日常操作中,相信很多人在怎么快速了解Java線程池問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”怎么快速了解Java線程池”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

老王 是個(gè)深耕在帝都的一線碼農(nóng),辛苦一年掙了點(diǎn)錢,想把錢存儲(chǔ)到銀行卡里,錢銀行卡辦理遇到了如下的遭遇

  1. 老王      銀行門口取號(hào)后發(fā)現(xiàn)有柜臺(tái)營(yíng)業(yè)但是沒人辦理業(yè)務(wù)     直接辦理了。
  2. 老王      取號(hào)后發(fā)現(xiàn)柜臺(tái)都有人在辦理,等待席有空地,去     坐著等辦理去了。
  3. 老王      取號(hào)后發(fā)現(xiàn)柜臺(tái)都有人辦理,等待席也人坐滿了,這個(gè)時(shí)候銀行經(jīng)理看到小麥?zhǔn)抢蠈?shí)人本著關(guān)愛老實(shí)人的態(tài)度,新開一個(gè)     臨時(shí)窗口給他辦理了。
  4. 老王      取號(hào)后發(fā)現(xiàn)柜臺(tái)都滿了,等待座位席也滿了,     臨時(shí)窗口也人滿了。這個(gè)時(shí)候銀行經(jīng)理給出了若干     解決策略
  1. 直接告知人太多不給你辦理了。
  2. 看到       老王        就來(lái)氣,也不給不辦理也不讓他走。
  3. 經(jīng)理讓       老王        取嘗試跟座位席中最前面的人聊一聊看是否可以加塞,可以就辦理,不可以還是被踢走。
  4. 經(jīng)理直接跟       老王        說(shuō)誰(shuí)讓你來(lái)的你找誰(shuí)去我這辦理不了。
怎么快速了解Java線程池  

 

上面的這個(gè)流程幾乎就跟JDK線程池的大致流程類似,

  1. 營(yíng)業(yè)中的3個(gè)窗口對(duì)應(yīng)核心線程池?cái)?shù):corePoolSize
  2. 銀行總的營(yíng)業(yè)窗口數(shù)對(duì)應(yīng):maximumPoolSize
  3. 打開的臨時(shí)窗口在多少時(shí)間內(nèi)無(wú)人辦理則關(guān)閉對(duì)應(yīng):unit
  4. 銀行里的等待座椅就是等待隊(duì)列:workQueue
  5. 無(wú)法辦理的時(shí)候銀行給出的解決方法對(duì)應(yīng):RejectedExecutionHandler
  6. threadFactory 該參數(shù)在JDK中是 線程工廠,用來(lái)創(chuàng)建線程對(duì)象,一般不會(huì)動(dòng)。

5分鐘線程池的核心工作流程講解完畢,更細(xì)節(jié)的知識(shí)看下面。

 

什么是線程池

簡(jiǎn)單理解就是 預(yù)先創(chuàng)建好一定數(shù)量的線程對(duì)象,存入緩沖池中,需要用的時(shí)候直接從緩沖池中取出,用完之后不要銷毀,還回到緩沖池中。

 

線程池存在必要性

  1. 提高線程的利用率,降低資源的消耗。
  2. 提高響應(yīng)速度,線程的創(chuàng)建時(shí)間為T1,執(zhí)行時(shí)間T2,銷毀時(shí)間T3,用線程池可以免去T1和T3的時(shí)間。
  3. 便于統(tǒng)一管理線程對(duì)象
  4. 可控制最大并發(fā)數(shù)
 

手動(dòng)實(shí)現(xiàn)

如果先不看線程池源碼讓我們自己手動(dòng)實(shí)現(xiàn)一個(gè)線程池你可以考慮到幾個(gè)重要點(diǎn)?

  1. 有若干個(gè)初始化好的線程數(shù)組來(lái)充當(dāng)線程池。
  2. 線程池要去一個(gè) 等待的任務(wù)隊(duì)列 中去拿任務(wù)。

簡(jiǎn)單來(lái)說(shuō)就是初始化N個(gè)線程充當(dāng)線程池然后一起去阻塞隊(duì)列中進(jìn)行阻塞take,新添加的任務(wù)都通過put將任務(wù)追加到任務(wù)隊(duì)列,關(guān)于任務(wù)隊(duì)列的講解看這blog

  1. 核心類
public class MyThreadPool2 {
    // 線程池中默認(rèn)線程的個(gè)數(shù)為5
    private static int WORK_NUM = 5;
    // 隊(duì)列默認(rèn)任務(wù)個(gè)數(shù)為100 來(lái)不及保存任務(wù)
    private static int TASK_COUNT = 100;
    // 工作線程組
    private WorkThread[] workThreads;
    // 任務(wù)隊(duì)列,作為一個(gè)緩沖
    private final BlockingQueue<Runnable> taskQueue;
    //用戶在構(gòu)造這個(gè)池,希望的啟動(dòng)的線程數(shù)
    private final int worker_num;
    // 創(chuàng)建具有默認(rèn)線程個(gè)數(shù)的線程池
    public MyThreadPool2() {
        this(WORK_NUM, TASK_COUNT);
    }

    // 創(chuàng)建線程池,worker_num為線程池中工作線程的個(gè)數(shù)
    public MyThreadPool2(int worker_num, int taskCount) {
        if (worker_num <= 0) worker_num = WORK_NUM;
        if (taskCount <= 0) taskCount = TASK_COUNT;
        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        workThreads = new WorkThread[worker_num];
        for (int i = 0; i < worker_num; i++) {
            workThreads[i] = new WorkThread();
            workThreads[i].start();
        }
        Runtime.getRuntime().availableProcessors();
    }

    // 執(zhí)行任務(wù),其實(shí)只是把任務(wù)加入任務(wù)隊(duì)列,什么時(shí)候執(zhí)行有線程池管理器決定
    public void execute(Runnable task) {
        try {
            taskQueue.put(task);// 阻塞 放置任務(wù)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 銷毀線程池,該方法保證在所有任務(wù)都完成的情況下才銷毀所有線程,否則等待任務(wù)完成才銷毀
    public void destroy() {
        // 工作線程停止工作,且置為null
        System.out.println("準(zhǔn)備關(guān)閉線程池");
        for (int i = 0; i < worker_num; i++) {
            workThreads[i].stopWorker();
            workThreads[i] = null;//help gc
        }
        taskQueue.clear();// 清空任務(wù)隊(duì)列
    }

    // 覆蓋toString方法,返回線程池信息:工作線程個(gè)數(shù)和已完成任務(wù)個(gè)數(shù)
    @Override
    public String toString() {
        return "線程池大小 :" + worker_num  + " 等待執(zhí)行任務(wù)個(gè)數(shù):" + taskQueue.size();
    }
     //內(nèi)部類,工作線程
    private class WorkThread extends Thread {
        @Override
        public void run() {
            Runnable r = null;
            try {
                while (!isInterrupted()) {
                    r = taskQueue.take();//阻塞獲得任務(wù)
                    if (r != null) {
                        System.out.println(getId() + " 準(zhǔn)備執(zhí)行 :" + r);
                        r.run();
                    }
                    r = null; //help gc;
                }
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
        public void stopWorker() {
            interrupt();
        }
    }
}
 
  1. 測(cè)試類
public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 創(chuàng)建3個(gè)線程的線程池
        MyThreadPool2 t = new MyThreadPool2(3, 5);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy();// 所有線程都執(zhí)行完成 才destory
        System.out.println(t);
    }
    // 任務(wù)類
    static class MyTask implements Runnable {
        private String name;
        private Random r = new Random();
        public MyTask(String name) {
            this.name = name;
        }
        public String getName() {
            return name;
        }

        @Override
        public void run() {// 執(zhí)行任務(wù)
            try {
                Thread.sleep(r.nextInt(1000) + 2000); //隨機(jī)休眠
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId() + " 被打斷:"
                        + Thread.currentThread().isInterrupted());
            }
            System.out.println("任務(wù) " + name + " 完成");
        }
    }
}
   

ThreadPoolExecutor

ThreadPoolExecutor是JDK中所有線程池實(shí)現(xiàn)類的父類,構(gòu)造函數(shù)有多個(gè)入?yún)⑼ㄟ^靈活的組合來(lái)實(shí)現(xiàn)線程池的初始化,核心構(gòu)造函數(shù)如下。

public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) 
   

重要參數(shù)解析

  1. corePoolSize

此值是用來(lái)初始化線程池中核心線程數(shù),當(dāng)線程池中線程池?cái)?shù)< corePoolSize時(shí),系統(tǒng)默認(rèn)是添加一個(gè)任務(wù)才創(chuàng)建一個(gè)線程池??梢酝ㄟ^調(diào)用prestartAllCoreThreads方法一次性的啟動(dòng)corePoolSize個(gè)數(shù)的線程。當(dāng)線程數(shù) = corePoolSize時(shí),新任務(wù)會(huì)追加到workQueue中。

  1. maximumPoolSize

maximumPoolSize表示允許的最大線程數(shù) = (非核心線程數(shù)+核心線程數(shù)),當(dāng)BlockingQueue也滿了,但線程池中總線程數(shù) < maximumPoolSize時(shí)候就會(huì)再次創(chuàng)建新的線程。

  1. keepAliveTime

非核心線程 =(maximumPoolSize - corePoolSize ) ,非核心線程閑置下來(lái)不干活最多存活時(shí)間。

  1. unit

線程池中非核心線程保持存活的時(shí)間

TimeUnit.DAYS; 天 TimeUnit.HOURS; 小時(shí) TimeUnit.MINUTES; 分鐘 TimeUnit.SECONDS; 秒 TimeUnit.MILLISECONDS; 毫秒 TimeUnit.MICROSECONDS; 微秒 TimeUnit.NANOSECONDS; 納秒

  1. workQueue

線程池 等待隊(duì)列,維護(hù)著等待執(zhí)行的Runnable對(duì)象。當(dāng)運(yùn)行當(dāng)線程數(shù)= corePoolSize時(shí),新的任務(wù)會(huì)被添加到workQueue中,如果workQueue也滿了則嘗試用非核心線程執(zhí)行任務(wù),另外等待隊(duì)列盡量用有界的哦??!

  1. threadFactory

創(chuàng)建一個(gè)新線程時(shí)使用的工廠,可以用來(lái)設(shè)定線程名、是否為daemon線程等等。

  1. handler

corePoolSizeworkQueue、maximumPoolSize都不可用的時(shí)候執(zhí)行的 飽和策略。

AbortPolicy :直接拋出異常,默認(rèn)用此 CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù) DiscardOldestPolicy:丟棄阻塞隊(duì)列里最老的任務(wù),隊(duì)列里最靠前的任務(wù) DiscardPolicy :當(dāng)前任務(wù)直接丟棄 想實(shí)現(xiàn)自己的飽和策略,實(shí)現(xiàn)RejectedExecutionHandler接口即可

形象流程圖如下:怎么快速了解Java線程池

 

提交

  1. execute 不需要返回
// 核心思想跟上面的流程圖類似
    public void execute(Runnable command) {
        if (command == null) //規(guī)范性檢查
            throw new NullPointerException();
        int c = ctl.get();//當(dāng)前工作的線程數(shù)跟線程狀態(tài) ctl = AtomicInteger CAS級(jí)別 
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
            // 如果當(dāng)前線程池中工作線程數(shù)小于核心線程數(shù),直接添加任務(wù) 然后return
                return;
            c = ctl.get();// 添加失敗了重新獲得線程池中工作線程數(shù)
        }
        if (isRunning(c) && workQueue.offer(command)) { 
        // 線程池狀態(tài)是否處于可用,可用就嘗試將線程添加到queue
            int recheck = ctl.get();// 獲得線程池狀態(tài)
            if (! isRunning(recheck) && remove(command))
                reject(command);// 如果線程狀態(tài)不在運(yùn)行中 則remove 該任務(wù)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))// 嘗試將任務(wù)用非核心線程執(zhí)行,
            reject(command);//失敗了則執(zhí)行失敗策略。
    }
 
  1. submit 需要返回值     ThreadPoolExecutor extends AbstractExecutorService父類中存在一個(gè)submit方法,    
        public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
 

關(guān)閉線程池

注意線程之間是協(xié)作式的哦,所以的關(guān)閉只是發(fā)出關(guān)閉指令。

  1. shutdown() 將線程池狀態(tài)置為shutdown,并不會(huì)立即停止:
  1. 停止接收外部submit的任務(wù)
  2. 內(nèi)部正在跑的任務(wù)和隊(duì)列里等待的任務(wù),會(huì)執(zhí)行完
  3. 等到第二步完成后,才真正停止
  1. shutdownNow() 將線程池狀態(tài)置為stop。企圖立即停止,事實(shí)上不一定:
  1. 跟shutdown()一樣,先停止接收外部提交的任務(wù)
  2. 忽略隊(duì)列里等待的任務(wù)
  3. 嘗試將正在跑的任務(wù)interrupt中斷
  4. 返回未執(zhí)行的任務(wù)列表

shutdown 跟shutdownnow簡(jiǎn)單來(lái)說(shuō)區(qū)別如下:

shutdownNow()能立即停止線程池,正在跑的和正在等待的任務(wù)都停下了。這樣做立即生效,但是風(fēng)險(xiǎn)也比較大。shutdown()只是關(guān)閉了提交通道,用submit()是無(wú)效的;而內(nèi)部該怎么跑還是怎么跑,跑完再停。

  1. awaitTermination
pool.showdown()
boolean b = pool.awaitTermination(3, TimeUnit.SECONDS)
 

awaitTermination有兩個(gè)參數(shù),一個(gè)是timeout即超時(shí)時(shí)間,另一個(gè)是unit即時(shí)間單位。這個(gè)方法會(huì)使線程等待timeout時(shí)長(zhǎng),當(dāng)超過timeout時(shí)間后,會(huì)監(jiān)測(cè)ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。一般情況下會(huì)和shutdown方法組合使用,調(diào)用后當(dāng)前線程會(huì)阻塞,直到

  1. 等所有已提交的任務(wù)(包括正在跑的和隊(duì)列中等待的)執(zhí)行完
  2. 或者等超時(shí)時(shí)間到
  3. 或者線程被中斷,拋出InterruptedException

總結(jié)

優(yōu)雅的關(guān)閉,用shutdown() 想立馬關(guān)閉,并得到未執(zhí)行任務(wù)列表,用shutdownNow() 優(yōu)雅的關(guān)閉,發(fā)出關(guān)閉指令后看下是否真的關(guān)閉了用awaitTermination()。

 

合理配置線程池

線程在Java中屬于稀缺資源,線程池不是越大越好也不是越小越好。任務(wù)分為計(jì)算密集型、IO密集型、混合型。

  1. 計(jì)算密集型:大部分都在用CPU跟內(nèi)存,加密,邏輯操作業(yè)務(wù)處理等。
  2. IO密集型:數(shù)據(jù)庫(kù)鏈接,網(wǎng)絡(luò)通訊傳輸?shù)取?/section>
  1. 計(jì)算密集型一般推薦線程池不要過大,一般是CPU數(shù) + 1,+1是因?yàn)榭赡艽嬖?     頁(yè)缺失(就是可能存在有些數(shù)據(jù)在硬盤中需要多來(lái)一個(gè)線程將數(shù)據(jù)讀入內(nèi)存)。如果線程池?cái)?shù)太大,可能會(huì)頻繁的 進(jìn)行線程上下文切換跟任務(wù)調(diào)度。獲得當(dāng)前CPU核心數(shù)代碼如下:
Runtime.getRuntime().availableProcessors();
 
  1. IO密集型:線程數(shù)適當(dāng)大一點(diǎn),機(jī)器的Cpu核心數(shù)*2。
  2. 混合型:如果密集型站大頭則拆分的必要性不大,如果IO型占據(jù)不少有必要,Mark 下。
 

常見線程池

每個(gè)線程池都是一個(gè)實(shí)現(xiàn)了接口ExecutorService并且繼承自ThreadPoolExecutor的具體實(shí)現(xiàn)類,這些類的創(chuàng)建統(tǒng)一由一個(gè)工廠類Executors來(lái)提供對(duì)外創(chuàng)建接口。Executors框架圖如下:

怎么快速了解Java線程池ThreadPoolExecutor中一個(gè)線程就是一個(gè)Worker對(duì)象,它與一個(gè)線程綁定,當(dāng)Worker執(zhí)行完畢就是線程執(zhí)行完畢。而Worker帶了鎖AQS,根據(jù)我后面準(zhǔn)備寫的讀寫鎖的例子,發(fā)現(xiàn)線程池是線程安全的。看看圖二的類圖。下面簡(jiǎn)單介紹幾個(gè)常用的線程池模式。

 
FixedThreadPool
  1. 定長(zhǎng)的線程池,有核心線程,核心線程的即為最大的線程數(shù)量,沒有非核心線程。
  2. 使用的     無(wú)界的等待隊(duì)列是     LinkedBlockingQueue。使用時(shí)候小心堵滿等待隊(duì)列。     怎么快速了解Java線程池
 
SingleThreadPool

只有一條線程來(lái)執(zhí)行任務(wù),適用于有順序的任務(wù)的應(yīng)用場(chǎng)景,也是用的無(wú)界等待隊(duì)列怎么快速了解Java線程池

 
CachedThreadPool

可緩存的線程池,該線程池中沒有核心線程,非核心線程的數(shù)量為Integer.max_value,就是無(wú)限大,當(dāng)有需要時(shí)創(chuàng)建線程來(lái)執(zhí)行任務(wù),沒有需要時(shí)回收線程,適用于耗時(shí)少,任務(wù)量大的情況。任務(wù)隊(duì)列用的是SynchronousQueue如果生產(chǎn)多快消費(fèi)慢,則會(huì)導(dǎo)致創(chuàng)建很多線程需注意。怎么快速了解Java線程池

 
WorkStealingPool

JDK7以后 基于ForkJoinPool實(shí)現(xiàn)。怎么快速了解Java線程池PS:其中FixedThreadPool、SingleThreadPool、CachedThreadPool都用的無(wú)界等待隊(duì)列,因此實(shí)際工作中都不建議這樣做的哦,阿里巴巴Java編程規(guī)范建議如下:怎么快速了解Java線程池最后來(lái)個(gè)簡(jiǎn)單的線程使用demo:

public class UseThreadPool
{
 // 工作線程
 static class Worker implements Runnable
 {
  private String taskName;
  private Random r = new Random();

  public Worker(String taskName)
  {
   this.taskName = taskName;
  }

  public String getName()
  {
   return taskName;
  }

  @Override
  public void run()
  {
   System.out.println(Thread.currentThread().getName() + " 當(dāng)前任務(wù): " + taskName);
   try
   {
    TimeUnit.MILLISECONDS.sleep(r.nextInt(100) * 5);
   } catch (Exception e)
   {
    e.printStackTrace();
   }
  }
 }

 static class CallWorker implements Callable<String>
 {
  private String taskName;
  private Random r = new Random();

  public CallWorker(String taskName)
  {
   this.taskName = taskName;
  }

  public String getName()
  {
   return taskName;
  }

  @Override
  public String call() throws Exception
  {
   System.out.println(Thread.currentThread().getName() + " 當(dāng)前任務(wù) : " + taskName);
   return Thread.currentThread().getName() + ":" + r.nextInt(100) * 5;
  }

 }

 public static void main(String[] args) throws InterruptedException, ExecutionException
 {
  ExecutorService pool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(10),
    new ThreadPoolExecutor.DiscardOldestPolicy());
//     ExecutorService pool = Executors.newCachedThreadPool(); 
  for (int i = 0; i < 5; i++)
  {
   Worker worker = new Worker("Runnable_" + i);
   pool.execute(worker);
  }
  for (int i = 0; i < 5; i++)
  {
   CallWorker callWorker = new CallWorker("CallWorker_" + i);
   Future<String> result = pool.submit(callWorker);
   System.out.println(result.get());
  }
  pool.shutdown();
 }
}
   
ScheduledThreadPoolExecutor

周期性執(zhí)行任務(wù)的線程池,按照某種特定的計(jì)劃執(zhí)行線程中的任務(wù),有核心線程,但也有非核心線程,非核心線程的大小也為無(wú)限大。適用于執(zhí)行周期性的任務(wù)。怎么快速了解Java線程池看構(gòu)造函數(shù):調(diào)用的還是ThreadPoolExecutor構(gòu)造函數(shù),區(qū)別不同點(diǎn)在于任務(wù)隊(duì)列是用的DelayedWorkQueue,沒什么新奇的了。

怎么快速了解Java線程池核心函數(shù)講解:

  1. schedule 只執(zhí)行一次,任務(wù)還可以延時(shí)執(zhí)行,傳入待執(zhí)行任務(wù)跟延時(shí)時(shí)間。     怎么快速了解Java線程池
  2. scheduleAtFixedRate 提交固定時(shí)間間隔的任務(wù),提交任務(wù),延時(shí)時(shí)間,已經(jīng)循環(huán)時(shí)間間隔時(shí)間。這個(gè)的含義是只是在固定的時(shí)間間隔嘗試運(yùn)行該任務(wù)。     怎么快速了解Java線程池
  3. scheduleWithFixedDelay 提交固定延時(shí)間隔執(zhí)行的任務(wù)。上一個(gè)任務(wù)執(zhí)行完畢后等多久再執(zhí)行下個(gè)任務(wù),這個(gè)中間時(shí)間叫     FixedDelay     怎么快速了解Java線程池其中     scheduleAtFixedRate 跟     scheduleWithFixedDelay區(qū)別如下圖     怎么快速了解Java線程池scheduleAtFixedRate任務(wù)超時(shí)狀態(tài),比如我們?cè)O(shè)定60s執(zhí)行一次,其中第一個(gè)任務(wù)時(shí)長(zhǎng) 80s,第二個(gè)任務(wù)20s,第三個(gè)任務(wù) 50s。
  1. 第一個(gè)任務(wù)第0秒開始,第80s結(jié)束.
  2. 第二個(gè)任務(wù)第80s開始,在第100s結(jié)束.
  3. 第三個(gè)任務(wù)第120s秒開始,170s結(jié)束.
  4. 第四個(gè)任務(wù)從180s開始.

簡(jiǎn)單Mark個(gè)循環(huán)任務(wù)demo:

class ScheduleWorker implements Runnable {
    public final static int Normal = 0;//普通任務(wù)類型
    public final static int HasException = -1;//會(huì)拋出異常的任務(wù)類型
    public final static int ProcessException = 1;//拋出異常但會(huì)捕捉的任務(wù)類型
    public static SimpleDateFormat formater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private int taskType;
    public ScheduleWorker(int taskType) {
        this.taskType = taskType;
    }
    @Override
    public void run() {
        if (taskType == HasException) {
            System.out.println(formater.format(new Date()) + " 異常產(chǎn)生");
            throw new RuntimeException("有異常");
        } else if (taskType == ProcessException) {
            try {
                System.out.println(formater.format(new Date()) + " 異常產(chǎn)生被捕捉");
                throw new RuntimeException("異常被捕捉");//異常導(dǎo)致下個(gè)任務(wù)無(wú)法執(zhí)行
            } catch (Exception e) {
                System.out.println(" 異常被主播");
            }
        } else {
            System.out.println("正常" + formater.format(new Date()));
        }
    }
}
public class SchTest{
 public static void main(String[] args) {
  ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);

  schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.HasException),
    1000, 3000, TimeUnit.MILLISECONDS); // 任務(wù)在 1秒后執(zhí)行 周期3秒

  schedule.scheduleAtFixedRate(new ScheduleWorker(ScheduleWorker.Normal),
    1000, 3000, TimeUnit.MILLISECONDS);
 }
}
   
CompletionService

JDK8中新添加的一個(gè)類,攝像一個(gè)場(chǎng)景你去詢問兩個(gè)商品價(jià)格然后將價(jià)格保存數(shù)據(jù)庫(kù)。

ExecutorService executor =Executors.newFixedThreadPool(2);
// 異步向電商 S1 詢價(jià)
Future<Integer> f1 = executor.submit(()->getPriceByS1());
// 異步向電商 S2 詢價(jià)
Future<Integer> f2 = executor.submit(()->getPriceByS2());

// 獲取電商 S1 報(bào)價(jià)并保存
r=f1.get();
executor.execute(()->save(r));
// 獲取電商 S2 報(bào)價(jià)并保存
r=f2.get();
executor.execute(()->save(r));
 

上面的這個(gè)方案本身沒有太大問題,但是有個(gè)地方的處理需要你注意,那就是如果獲取電商 S1 報(bào)價(jià)的耗時(shí)很長(zhǎng),那么即便獲取電商 S2 報(bào)價(jià)的耗時(shí)很短,也無(wú)法讓保存 S2 報(bào)價(jià)的操作先執(zhí)行,因?yàn)檫@個(gè)主線程都阻塞在了 f1.get(),那我們?nèi)绾谓鉀Q了?解決方法:結(jié)果都存入到一個(gè)阻塞隊(duì)列中去。

// 創(chuàng)建阻塞隊(duì)列
BlockingQueue<Integer> bq =new LinkedBlockingQueue<>();
// 電商 S1 報(bào)價(jià)異步進(jìn)入阻塞隊(duì)列  
executor.execute(()->bq.put(f1.get()));
// 電商 S2 報(bào)價(jià)異步進(jìn)入阻塞隊(duì)列  
executor.execute(()->bq.put(f2.get()));
// 異步保存所有報(bào)價(jià)  
for (int i=0; i<2; i++) {
  Integer r = bq.take();
  executor.execute(()->save(r));
}  
 

在JDK8中不建議上面的工作都手動(dòng)實(shí)現(xiàn),JDK提供了CompletionService ,它實(shí)現(xiàn)原理也是內(nèi)部維護(hù)了一個(gè)阻塞隊(duì)列,它的核心功效就是讓先執(zhí)行的任務(wù)先放到結(jié)果集。當(dāng)任務(wù)執(zhí)行結(jié)束就把任務(wù)的執(zhí)行結(jié)果加入到阻塞隊(duì)列中,不同的是CompletionService是把任務(wù)執(zhí)行結(jié)果的 Future 對(duì)象加入到阻塞隊(duì)列中,而上面的示例代碼是把任務(wù)最終的執(zhí)行結(jié)果放入了阻塞隊(duì)列中。CompletionServiceExecutorBlockingQueue的功能融合在一起,CompletionService內(nèi)部有個(gè)阻塞隊(duì)列。CompletionService 接口的實(shí)現(xiàn)類是 ExecutorCompletionService,這個(gè)實(shí)現(xiàn)類的構(gòu)造方法有兩個(gè),分別是:

ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
 

這兩個(gè)構(gòu)造方法都需要傳入一個(gè)線程池,如果不指定 completionQueue,那么默認(rèn)會(huì)使用無(wú)界的 LinkedBlockingQueue。任務(wù)執(zhí)行結(jié)果的 Future 對(duì)象就是加入到 completionQueue 中。

// 創(chuàng)建線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 創(chuàng)建 CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 異步向電商 S1 詢價(jià)
cs.submit(()->getPriceByS1());
// 異步向電商 S2 詢價(jià)
cs.submit(()->getPriceByS2());
// 將詢價(jià)結(jié)果異步保存到數(shù)據(jù)庫(kù)
for (int i=0; i<2; i++) {
  Integer r = cs.take().get();
  executor.execute(()->save(r));
}
 

來(lái)一個(gè)整體的demo加深印象:

// 任務(wù)類
class WorkTask implements Callable<Integer>
{
 private String name;

 public WorkTask(String name)
 {
  this.name = name;
 }
 @Override
 public Integer call()
 {
  int sleepTime = new Random().nextInt(1000);
  try
  {
   Thread.sleep(sleepTime);
  } catch (InterruptedException e)
  {
   e.printStackTrace();
  }
  return sleepTime;
 }
}

public class CompletionCase
{
 private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
 private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors();
 public void selfByQueue() throws Exception
 {
  long start = System.currentTimeMillis();  //  統(tǒng)計(jì)所有任務(wù)休眠的總時(shí)長(zhǎng)
  AtomicInteger count = new AtomicInteger(0);
  ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);  // 創(chuàng)建線程池
  BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();//容器存放提交給線程池的任務(wù),list,map,

  for (int i = 0; i < TOTAL_TASK; i++)
  {
   Future<Integer> future = pool.submit(new WorkTask("要執(zhí)行的第幾個(gè)任務(wù)" + i));
   queue.add(future);//i=0 先進(jìn)隊(duì)列,i=1的任務(wù)跟著進(jìn)
  }
  for (int i = 0; i < TOTAL_TASK; i++)
  {
   int sleptTime = queue.take().get(); // 檢查線程池任務(wù)執(zhí)行結(jié)果  i=0先取到,i=1的后取到
   System.out.println(" 休眠毫秒數(shù) =  " + sleptTime + " ms ");
   count.addAndGet(sleptTime);
  }
  pool.shutdown();
  System.out.println("休眠時(shí)間" + count.get() + "ms,耗時(shí)時(shí)間" + (System.currentTimeMillis() - start) + " ms");
 }

 public void testByCompletion() throws Exception
 {
  long start = System.currentTimeMillis();
  AtomicInteger count = new AtomicInteger(0);
  // 創(chuàng)建線程池
  ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
  CompletionService<Integer> cService = new ExecutorCompletionService<>(pool);

  // 向里面扔任務(wù)
  for (int i = 0; i < TOTAL_TASK; i++)
  {
   cService.submit(new WorkTask("執(zhí)行任務(wù)" + i));
  }
  // 檢查線程池任務(wù)執(zhí)行結(jié)果
  for (int i = 0; i < TOTAL_TASK; i++)
  {
   int sleptTime = cService.take().get();
   System.out.println("休眠毫秒數(shù) = " + sleptTime + " ms ...");
   count.addAndGet(sleptTime);
  }
  pool.shutdown();
  System.out.println("休眠時(shí)間 " + count.get() + "ms,耗時(shí)時(shí)間" + (System.currentTimeMillis() - start) + " ms");
 }

 public static void main(String[] args) throws Exception
 {
  CompletionCase t = new CompletionCase();
  t.selfByQueue();
  t.testByCompletion();
 }
}
 
怎么快速了解Java線程池    

常見考題

  1. 為什么用線程池?
  2. 線程池的作用?
  3. 常用的線程池模版?
  4. 7大重要參數(shù)?
  5. 4大拒絕策略?
  6. 常見線程池任務(wù)隊(duì)列,如何理解有界跟無(wú)界?7.如何分配線程池個(gè)數(shù)?
  7. 單機(jī)線程池執(zhí)行一般斷電了如何考慮?

正在處理的實(shí)現(xiàn)事務(wù)功能,下次自動(dòng)回滾,隊(duì)列實(shí)現(xiàn)持久化儲(chǔ)存,下次啟動(dòng)自動(dòng)載入。

  1. 設(shè)定一個(gè)線程池優(yōu)先級(jí)隊(duì)列,Runable類要實(shí)現(xiàn)可對(duì)比功能,任務(wù)隊(duì)列使用優(yōu)先級(jí)隊(duì)列

到此,關(guān)于“怎么快速了解Java線程池”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI