您好,登錄后才能下訂單哦!
(手機橫屏看源碼更方便)
(1)自己動手寫一個線程池需要考慮哪些因素?
(2)自己動手寫的線程池如何測試?
線程池是Java并發(fā)編程中經(jīng)常使用到的技術(shù),那么自己如何動手寫一個線程池呢?本文彤哥將手把手帶你寫一個可用的線程池。
線程池,顧名思義它首先是一個“池”,這個池里面放的是線程,線程是用來執(zhí)行任務的。
首先,線程池中的線程應該是有類別的,有的是核心線程,有的是非核心線程,所以我們需要兩個變量標識核心線程數(shù)量coreSize和最大線程數(shù)量maxSize。
為什么要區(qū)分是否為核心線程呢?這是為了控制系統(tǒng)中線程的數(shù)量。
當線程池中線程數(shù)未達到核心線程數(shù)coreSize時,來一個任務加一個線程是可以的,也可以提高任務執(zhí)行的效率。
當線程池中線程數(shù)達到核心線程數(shù)后,得控制一下線程的數(shù)量,來任務了先進隊列,如果任務執(zhí)行足夠快,這些核心線程很快就能把隊列中的任務執(zhí)行完畢,完全沒有新增線程的必要。
當隊列中任務也滿了,這時候光靠核心線程就無法及時處理任務了,所以這時候就需要增加新的線程了,但是線程也不能無限制地增加,所以需要控制其最大線程數(shù)量maxSize。
其次,我們需要一個任務隊列來存放任務,這個隊列必須是線程安全的,我們一般使用BlockingQueue阻塞隊列來充當,當然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞隊列,不能運用在jdk的線程池中)。
最后,當任務越來越多而線程處理卻不及時,遲早會達到一種狀態(tài),隊列滿了,線程數(shù)也達到最大線程數(shù)了,這時候怎么辦呢?這時候就需要走拒絕策略了,也就是這些無法及時處理的任務怎么辦的一種策略,常用的策略有丟棄當前任務、丟棄最老的任務、調(diào)用者自己處理、拋出異常等。
根據(jù)上面的描述,我們定義一個線程池一共需要這么四個變量:核心線程數(shù)coreSize、最大線程數(shù)maxSize、阻塞隊列BlockingQueue、拒絕策略RejectPolicy。
另外,為了便于給線程池一個名稱,我們再加一個變量:線程池的名稱name。
所以我們得出了線程池的屬性及構(gòu)造方法大概如下:
public class MyThreadPoolExecutor implements Executor {
/**
* 線程池的名稱
*/
private String name;
/**
* 核心線程數(shù)
*/
private int coreSize;
/**
* 最大線程數(shù)
*/
private int maxSize;
/**
* 任務隊列
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 拒絕策略
*/
private RejectPolicy rejectPolicy;
public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
this.name = name;
this.coreSize = coreSize;
this.maxSize = maxSize;
this.taskQueue = taskQueue;
this.rejectPolicy = rejectPolicy;
}
}
根據(jù)上面的屬性分析,基本上我們已經(jīng)得到了任務流向的完整邏輯:
首先,如果運行的線程數(shù)小于核心線程數(shù),直接創(chuàng)建一個新的核心線程來運行新的任務。
其次,如果運行的線程數(shù)達到了核心線程數(shù),則把新任務入隊列。
然后,如果隊列也滿了,則創(chuàng)建新的非核心線程來運行新的任務。
最后,如果非核心線程數(shù)也達到最大了,那就執(zhí)行拒絕策略。
代碼邏輯大致如下:
@Override
public void execute(Runnable task) {
// 正在運行的線程數(shù)
int count = runningCount.get();
// 如果正在運行的線程數(shù)小于核心線程數(shù),直接加一個線程
if (count < coreSize) {
// 注意,這里不一定添加成功,addWorker()方法里面還要判斷一次是不是確實小
if (addWorker(task, true)) {
return;
}
// 如果添加核心線程失敗,進入下面的邏輯
}
// 如果達到了核心線程數(shù),先嘗試讓任務入隊
// 這里之所以使用offer(),是因為如果隊列滿了offer()會立即返回false
if (taskQueue.offer(task)) {
// do nothing,為了邏輯清晰這里留個空if
// 本文由公從號“彤哥讀源碼”原創(chuàng)
} else {
// 如果入隊失敗,說明隊列滿了,那就添加一個非核心線程
if (!addWorker(task, false)) {
// 如果添加非核心線程失敗了,那就執(zhí)行拒絕策略
rejectPolicy.reject(task, this);
}
}
}
首先,創(chuàng)建線程的依據(jù)是正在運行的線程數(shù)量有沒有達到核心線程數(shù)或者最大線程數(shù),所以我們還需要一個變量runningCount用來記錄正在運行的線程數(shù)。
其次,這個變量runningCount需要在并發(fā)環(huán)境下加加減減,所以這里需要使用到Unsafe的CAS指令來控制其值的修改,用了CAS就要給這個變量加上volatile修飾,為了方便我們這里直接使用AtomicInteger來作為這個變量的類型。
然后,因為是并發(fā)環(huán)境中,所以需要判斷runningCount < coreSize(或maxSize)(條件一)的同時修改runningCount CAS加一(條件二)成功了才表示可以增加一個線程,如果條件一失敗則表示不能再增加線程了直接返回false,如果條件二失敗則表示其它線程先修改了runningCount的值,則重試。
最后,創(chuàng)建一個線程并運行新任務,且不斷從隊列中拿任務來運行,本文由公從號“彤哥讀源碼”原創(chuàng)。
代碼邏輯如下:
private boolean addWorker(Runnable newTask, boolean core) {
// 自旋判斷是不是真的可以創(chuàng)建一個線程
for (; ; ) {
// 正在運行的線程數(shù)
int count = runningCount.get();
// 核心線程還是非核心線程
int max = core ? coreSize : maxSize;
// 不滿足創(chuàng)建線程的條件,直接返回false
if (count >= max) {
return false;
}
// 修改runningCount成功,可以創(chuàng)建線程
if (runningCount.compareAndSet(count, count + 1)) {
// 線程的名字
String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
// 創(chuàng)建線程并啟動
new Thread(() -> {
System.out.println("thread name: " + Thread.currentThread().getName());
// 運行的任務
Runnable task = newTask;
// 不斷從任務隊列中取任務執(zhí)行,如果取出來的任務為null,則跳出循環(huán),線程也就結(jié)束了
while (task != null || (task = getTask()) != null) {
try {
// 執(zhí)行任務
task.run();
} finally {
// 任務執(zhí)行完成,置為空
task = null;
}
}
}, threadName).start();
break;
}
}
return true;
}
從隊列中取任務應該使用take()方法,這個方法會一直阻塞直至取到任務或者中斷,如果中斷了就返回null,這樣當前線程也就可以安靜地結(jié)束了,另外還要注意中斷了記得把runningCount減一。
private Runnable getTask() {
try {
// take()方法會一直阻塞直到取到任務為止
return taskQueue.take();
} catch (InterruptedException e) {
// 線程中斷了,返回null可以結(jié)束當前線程
// 當前線程都要結(jié)束了,理應要把runningCount的數(shù)量減一
runningCount.decrementAndGet();
return null;
}
}
好了,到這里我們自己的線程池就寫完了,下面我們一起來想想怎么測試呢?
我們再來回顧下自己的寫的線程池的構(gòu)造方法:
public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
this.name = name;
this.coreSize = coreSize;
this.maxSize = maxSize;
this.taskQueue = taskQueue;
this.rejectPolicy = rejectPolicy;
}
name,這個隨便傳;
coreSize,我們假設為5;
maxSize,我們假設為10;
taskQueue,任務隊列,既然我們設置的是有邊界的,我們就用最簡單的ArrayBlockingQueue好吧,容量設置為15,這樣里面最多可以存儲15條任務;
rejectPolicy,拒絕策略,我們假設使用丟棄當前任務的策略,OK,我們來實現(xiàn)一個。
/**
* 丟棄當前任務
*/
public class DiscardRejectPolicy implements RejectPolicy {
@Override
public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
// do nothing
System.out.println("discard one task");
}
}
OK,這樣一個線程池就創(chuàng)建完成了,下面就是執(zhí)行任務了,我們假設通過for循環(huán)連續(xù)不斷地添加100個任務好不好。
public class MyThreadPoolExecutorTest {
public static void main(String[] args) {
Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
AtomicInteger num = new AtomicInteger(0);
for (int i = 0; i < 100; i++) {
threadPool.execute(()->{
try {
Thread.sleep(1000);
System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
我們分析下這段程序:
(1)先連續(xù)創(chuàng)建了5個核心線程,并執(zhí)行了新任務;
(2)后面的15個任務進了隊列;
(3)隊列滿了,又連續(xù)創(chuàng)建了5個線程,并執(zhí)行了新任務;
(4)后面的任務就沒得執(zhí)行了,全部走了丟棄策略;
(5)所以真正執(zhí)行成功的任務應該是 5 + 15 + 5 = 25 條任務;
運行之:
thread name: core_test2
thread name: core_test5
thread name: core_test3
thread name: core_test4
thread name: core_test1
thread name: test6
thread name: test7
thread name: test8
thread name: test9
discard one task
thread name: test10
discard one task
...省略被拒絕的任務
本文由公從號“彤哥讀源碼”原創(chuàng)
discard one task
running: 1570546871851: 2
running: 1570546871851: 8
running: 1570546871851: 7
running: 1570546871851: 6
running: 1570546871851: 5
running: 1570546871851: 3
running: 1570546871851: 4
running: 1570546871851: 1
running: 1570546871851: 10
running: 1570546871851: 9
running: 1570546872852: 14
running: 1570546872852: 20
running: 1570546872852: 19
running: 1570546872852: 17
running: 1570546872852: 18
running: 1570546872852: 16
running: 1570546872852: 15
running: 1570546872852: 12
running: 1570546872852: 13
running: 1570546872852: 11
running: 1570546873852: 21
running: 1570546873852: 24
running: 1570546873852: 23
running: 1570546873852: 25
running: 1570546873852: 22
可以看到,創(chuàng)建了5個核心線程、5個非核心線程,成功執(zhí)行了25條任務,完成沒問題,完美^^。
(1)自己動手寫一個線程池需要考慮的因素主要有:核心線程數(shù)、最大線程數(shù)、任務隊列、拒絕策略。
(2)創(chuàng)建線程的時候要時刻警惕并發(fā)的陷阱;
我們知道,jdk自帶的線程池還有兩個參數(shù):keepAliveTime、unit,它們是干什么的呢?
答:它們是用來控制何時銷毀非核心線程的,當然也可以銷毀核心線程,具體的分析請期待下一章吧。
public interface Executor {
void execute(Runnable command);
}
/**
* 自動動手寫一個線程池
*/
public class MyThreadPoolExecutor implements Executor {
/**
* 線程池的名稱
*/
private String name;
/**
* 線程序列號
*/
private AtomicInteger sequence = new AtomicInteger(0);
/**
* 核心線程數(shù)
*/
private int coreSize;
/**
* 最大線程數(shù)
*/
private int maxSize;
/**
* 任務隊列
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 拒絕策略
*/
private RejectPolicy rejectPolicy;
/**
* 當前正在運行的線程數(shù),本文由公從號“彤哥讀源碼”原創(chuàng)
* 需要修改時線程間立即感知,所以使用AtomicInteger
* 或者也可以使用volatile并結(jié)合Unsafe做CAS操作(參考Unsafe篇章講解)
*/
private AtomicInteger runningCount = new AtomicInteger(0);
public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
this.name = name;
this.coreSize = coreSize;
this.maxSize = maxSize;
this.taskQueue = taskQueue;
this.rejectPolicy = rejectPolicy;
}
@Override
public void execute(Runnable task) {
// 正在運行的線程數(shù)
int count = runningCount.get();
// 如果正在運行的線程數(shù)小于核心線程數(shù),直接加一個線程
if (count < coreSize) {
// 注意,這里不一定添加成功,addWorker()方法里面還要判斷一次是不是確實小
if (addWorker(task, true)) {
return;
}
// 如果添加核心線程失敗,進入下面的邏輯
}
// 如果達到了核心線程數(shù),先嘗試讓任務入隊
// 這里之所以使用offer(),是因為如果隊列滿了offer()會立即返回false
if (taskQueue.offer(task)) {
// do nothing,為了邏輯清晰這里留個空if
} else {
// 如果入隊失敗,說明隊列滿了,那就添加一個非核心線程
if (!addWorker(task, false)) {
// 如果添加非核心線程失敗了,那就執(zhí)行拒絕策略
rejectPolicy.reject(task, this);
}
}
}
private boolean addWorker(Runnable newTask, boolean core) {
// 自旋判斷是不是真的可以創(chuàng)建一個線程
for (; ; ) {
// 正在運行的線程數(shù)
int count = runningCount.get();
// 核心線程還是非核心線程
int max = core ? coreSize : maxSize;
// 不滿足創(chuàng)建線程的條件,直接返回false
if (count >= max) {
return false;
}
// 修改runningCount成功,可以創(chuàng)建線程
if (runningCount.compareAndSet(count, count + 1)) {
// 線程的名字
String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
// 創(chuàng)建線程并啟動
new Thread(() -> {
System.out.println("thread name: " + Thread.currentThread().getName());
// 運行的任務,本文由公從號“彤哥讀源碼”原創(chuàng)
Runnable task = newTask;
// 不斷從任務隊列中取任務執(zhí)行,如果取出來的任務為null,則跳出循環(huán),線程也就結(jié)束了
while (task != null || (task = getTask()) != null) {
try {
// 執(zhí)行任務
task.run();
} finally {
// 任務執(zhí)行完成,置為空
task = null;
}
}
}, threadName).start();
break;
}
}
return true;
}
private Runnable getTask() {
try {
// take()方法會一直阻塞直到取到任務為止
return taskQueue.take();
} catch (InterruptedException e) {
// 線程中斷了,返回null可以結(jié)束當前線程
// 當前線程都要結(jié)束了,理應要把runningCount的數(shù)量減一
runningCount.decrementAndGet();
return null;
}
}
}
public interface RejectPolicy {
void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
}
/**
* 丟棄當前任務
*/
public class DiscardRejectPolicy implements RejectPolicy {
@Override
public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
// do nothing
System.out.println("discard one task");
}
}
public class MyThreadPoolExecutorTest {
public static void main(String[] args) {
Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
AtomicInteger num = new AtomicInteger(0);
for (int i = 0; i < 100; i++) {
threadPool.execute(()->{
try {
Thread.sleep(1000);
System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。