溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java并發(fā)編程基本知識

發(fā)布時(shí)間:2020-07-23 01:44:05 來源:網(wǎng)絡(luò) 閱讀:218 作者:klloiy 欄目:編程語言

并發(fā)基礎(chǔ)

并發(fā)編程的原則

  1. 原子性

原子性是指在一個(gè)操作中就是cpu不可以在中途暫停然后再調(diào)度,既不被中斷操作,即一個(gè)操作或者多個(gè)操作 要么全部執(zhí)行并且執(zhí)行的過程不會被任何因素打斷,要么就都不執(zhí)行。

  1. 可見性

對于可見性,Java提供了volatile關(guān)鍵字來保證可見性。當(dāng)一個(gè)共享變量被volatile修飾時(shí),它會保證修改的值會立即被更新到主存,當(dāng)有其他線程需要讀取時(shí),它會去內(nèi)存中讀取新值。而普通的共享變量不能保證可見性,因?yàn)槠胀ü蚕碜兞勘恍薷闹?,什么時(shí)候被寫入主存是不確定的,當(dāng)其他線程去讀取時(shí),此時(shí)內(nèi)存中可能還是原來的舊值,因此無法保證可見性。另外,通過synchronized和Lock也能夠保證可見性,synchronized和Lock能保證同一時(shí)刻只有一個(gè)線程獲取鎖然后執(zhí)行同步代碼,并且在釋放鎖之前會將對變量的修改刷新到主存當(dāng)中。

  1. 有序性

在Java內(nèi)存模型中,允許編譯器和處理器對指令進(jìn)行重新排序,但是重新排序過程不會影響到單線程程序的執(zhí)行,卻會影響到多線程并發(fā)執(zhí)行的正確性。

Runnable和Thread
這里只說一下實(shí)現(xiàn)Runnable接口和繼承Thread類的區(qū)別:以賣10張票的任務(wù)為例,如果繼承Thread類的話,啟動(dòng)三個(gè)線程就相當(dāng)于開了三個(gè)窗口,每個(gè)窗口都有賣10張票的任務(wù),各賣各的;如果實(shí)現(xiàn)Runnable接口的話,啟動(dòng)三個(gè)線程相當(dāng)開了三個(gè)窗口賣票,這三個(gè)窗口一共賣10張票。

synchronized關(guān)鍵字

1.?synchronized對象鎖

synchronized(this)和synchronized方法都是鎖當(dāng)前對象,synchronized(obj)鎖臨界對象。使用synchronized的話最好是鎖臨界對象。如果想要使得任意多個(gè)線程任意多個(gè)用戶訪問的時(shí)候都不出任何問題,可以考慮一下用鎖當(dāng)前對象的方法,因?yàn)殒i當(dāng)前對象量級較重,所以一般不用。

如下面Sync類中的兩個(gè)方法test_01和test_02()鎖的都是程序創(chuàng)建的Sync對象,細(xì)粒度控制推薦用test_02()。

public synchronized void test_01() {
    System.out.println("鎖當(dāng)前對象");
}
public void test_02() {
    synchronized (this) {
        System.out.println("鎖當(dāng)前對象");
    }
}

下面這個(gè)方法鎖的是Sync對象中的object對象(即臨界對象)

public void test_03() {
    synchronized (object) {
        System.out.println("鎖臨界對象");
    }
}

2.?synchronized使用在靜態(tài)方法中鎖定當(dāng)前類

靜態(tài)同步方法鎖的是當(dāng)前類型的類對象,如在Sync類中的static test_04()方法上加了同步鎖synchronized,那么此時(shí)synchronized鎖的是Sync.class。

// 下面兩個(gè)方法都是靜態(tài)同步方法

public static synchronized void test_04() {
    System.out.println("鎖Sync.class");
}
public static void test_05() {
    synchronized (Sync.class)     {
        System.out.println("鎖Sync.class類");
    }
}

3.?synchronized作用于靜態(tài)和非靜態(tài)方法的區(qū)別

synchronized作用與非靜態(tài)方法,相當(dāng)于鎖定單個(gè)對象,不同對象之間沒有競爭關(guān)系;而作用于靜態(tài)方法時(shí),鎖加載類上,即鎖定class,這時(shí)相當(dāng)于所有對象競爭同一把鎖。

  1. 同步代碼塊中拋出異常,鎖被釋放

如下例子,線程1會在i=5的時(shí)候拋出異常,此時(shí)線程1鎖被釋放,線程2開始調(diào)用方法。

public class Test {
    static class Test02 implements Runnable {
        private int i = 0;
        @Override
        public synchronized void run() {
            while (true) {
                System.out.println(Thread.currentThread().getName() + "_" + i++);
                if (i == 5) { // 當(dāng)i==5時(shí)拋出異常,鎖被釋放
                    i = 1 / 0;
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                }catch (InterruptedException ignored) { }
            }
        }
    }
public static void main(String[] args) {
    Test02 test02 = new Test02();
    new Thread(test02, "LQ").start();
    new Thread(test02, "WH").start();
}

}

  1. 實(shí)例分析

在下面代碼中,object被LQ鎖定,WH阻塞。

public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object){
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}

static class Test01 implements Runnable {
    @Override
    public void run() {
        new Test().m();
    }
}

static class Test02 implements Runnable {
    @Override
    public void run() {
        new Test().m();
    }
}

public static void main(String[] args) {
    Test01 test01 = new Test01();
    Thread thread = new Thread(test01, "LQ");
    thread.start();
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (Exception ignored) {}
    Test02 test02 = new Test02();
    thread = new Thread(test02, "WH");
    thread.start();
}

}
在WH線程中新創(chuàng)建了一個(gè)Object,WH正常運(yùn)行。

public class Test {
    static Object object = new Object();
    void m() {
        System.out.println(Thread.currentThread().getName() + " start...");
        synchronized (object) {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ignored){}
                System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
            }
        }
    }
    static class Test01 implements Runnable {
        @Override
        public void run() {
            new Test().m();
        }
    }
    static class Test02 implements Runnable {
        @Override
        public void run() {
            object = new Object();
            new Test().m();
        }
    }

    public static void main(String[] args) {
        Test01 test01 = new Test01();
        Thread thread = new Thread(test01, "LQ");
        thread.start();
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception ignored) {}
        Test02 test02 = new Test02();
        thread = new Thread(test02, "WH");
        thread.start();
    }
}

上面代碼中,WH線程啟動(dòng)后會一只處于等待狀態(tài),因?yàn)閛bject被LQ線程鎖著,但如果在WH線程中重新new Object()并賦值給object,這樣的話WH線程就能夠正常運(yùn)行了,原因是:同步鎖鎖定的是對內(nèi)存中的對象,所以LQ鎖定的是第一次new的對象而WH鎖定的是第二次new的對象,如下圖。

?

對于常量:String a = “aaa” 和String b = “aaa”是同一個(gè)對象,因此,假如A方法鎖定了a,B方法鎖定了b,啟動(dòng)LQ線程調(diào)用A方法,然后啟動(dòng)WH線程調(diào)用B方法,這樣的話WH線程會等到LQ線程結(jié)束后才執(zhí)行。因此,在定義同步代碼塊時(shí),不要使用常量作為鎖的目標(biāo)對象。

volatile關(guān)鍵字
計(jì)算機(jī)中有CPU、內(nèi)存和緩存,當(dāng)CPU運(yùn)行的時(shí)候,默認(rèn)找緩存中的數(shù)據(jù)。當(dāng)CPU有中斷的時(shí)候,根據(jù)操作系統(tǒng)對CPU的管理特性,可能會清空緩存,重新將內(nèi)存中的數(shù)據(jù)讀到緩存中,也可能不清空緩存,仍舊使用緩存中的數(shù)據(jù)進(jìn)行后續(xù)的計(jì)算。如果CPU不中斷的話,默認(rèn)CPU只會找緩存數(shù)據(jù)。volatile這個(gè)關(guān)鍵字不是改變緩存數(shù)據(jù)特性的,而是直接改變內(nèi)存中的數(shù)據(jù)特性,當(dāng)對一個(gè)對象加了volatile關(guān)鍵字修飾的時(shí)候,相當(dāng)于通知了底層OS操作系統(tǒng),告訴CPU每次進(jìn)行計(jì)算的時(shí)候最好去看一下內(nèi)存數(shù)據(jù)是否發(fā)生了變更,這就是內(nèi)存的可見性。volatile關(guān)鍵字就是為了保證內(nèi)存的可見性。

如下代碼會發(fā)生死鎖現(xiàn)象。

public class Volatile01 {
    private static boolean b = true;
    private void m() {
        System.out.println("start...");
        while (b) {}
        System.out.println("end...");
    }
    static class Volatile_01 implements Runnable {
        @Override
        public void run() {
            new Volatile01().m();
        }
    }
    public static void main(String[] args) {
        Volatile_01 = new Volatile_01();
        new Thread(volatile_01).start();
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException ignored) {}
        b = false;
    }
}

當(dāng)將上述代碼塊中的共享變量b用volatile修飾時(shí)(保證了可見性),就能夠跳出循環(huán)了。

public class Volatile01 {
    private static volatile boolean b = true;
    private void m() {
        System.out.println("start...");
        while (b){}
        System.out.println("end...");
    }
    static class Volatile_01 implements Runnable {
        @Override
        public void run(){
            new Volatile01().m();
        }
    }
    public static void main(String[] args) {
        Volatile_01 = new Volatile_01();
        new Thread(volatile_01).start();
        try{
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException ignored){}
        b = false;
    }
}

join()方法
將多個(gè)線程連在一起,阻塞線程,直到調(diào)用join的線程執(zhí)行完成。

如下程序打印的結(jié)果時(shí)100000,如果不用join()的話打印的結(jié)果將遠(yuǎn)遠(yuǎn)小于100000。用join()可以用來等待一組線程執(zhí)行完畢后再進(jìn)行后續(xù)邏輯處理,以保證數(shù)據(jù)的正確。

public class Test {
    private static volatile int count = 0;

    private void m() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
    }

    static class Test02 implements Runnable {
        @Override
        public synchronized void run() {
            new Test().m();
        }
    }

    public static void main(String[] args) {
        Test02 test02 = new Test02();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(test02));
        }
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(count);
    }
}

上述代碼中用了synchronized關(guān)鍵字來實(shí)現(xiàn)原子性,也可以不用synchronized而用AtomicInteger對象,因?yàn)锳tomicInteger是一個(gè)原子性操作對象,代碼如下。

public class Test{
    private static AtomicInteger count = new AtomicInteger();
    private void m(){
        for (int i = 0; i < 10000; i++){
            count.incrementAndGet();
        }
    }
    static class Test02 implements Runnable{
        @Override
        public void run(){
            new Test().m();
        }
    }
    public static void main(String[] args){
        Test02 test02 = new Test02();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10; i++){
            threads.add(new Thread(test02));
        }
        for (Thread thread : threads){
            thread.start();
            try{
                thread.join();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        System.out.println(count);
    }
}

CountDownLatch對象
CountDownLatch相當(dāng)于一個(gè)門閂,在創(chuàng)建門閂對象的時(shí)候可以指定鎖的個(gè)數(shù),若某個(gè)方法調(diào)用了門閂的await()方法,那么該方法執(zhí)行到await()的時(shí)候會被阻塞等待門閂釋放,當(dāng)門閂上沒有鎖也就是門閂開放的時(shí)候繼續(xù)執(zhí)行。減門閂上鎖的方法時(shí)countDown()。

如下例,當(dāng)在m1中調(diào)用了await(),在m2中調(diào)用了countDown(),因此根據(jù)m2的邏輯當(dāng)m2執(zhí)行完了之后門閂上的鎖數(shù)量就為0了,此時(shí)m1方法可以繼續(xù)執(zhí)行了。

public class Test {
    private CountDownLatch countDownLatch = new CountDownLatch(5);

    private void m1() {
        try {
            countDownLatch.await(); // 等待門閂開放
        } catch (Exception ignored) {
        }
        System.out.println("method m1.");
    }

    private void m2() {
        while (countDownLatch.getCount() != 0) {
            countDownLatch.countDown(); // 減門閂上的鎖
            System.out.println("method m2");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    }
public static void main(String[] args) {
    Test count01 = new Test();
    new Thread(count01::m2).start();
    new Thread(count01::m1).start();
}

}

門閂可以和鎖混合使用,或替代鎖的功能,再門閂開放之前等待,當(dāng)門閂完全開放之后執(zhí)行,可避免鎖的效率低下問題。

wait()、notify()和notifyAll()
wait():在對象上調(diào)用wait(), 會使當(dāng)前線程進(jìn)入等待狀態(tài), 直至另一個(gè)線程對這個(gè)對象調(diào)用了notify() 或notifyAll() 方法喚醒線程。

notify():喚醒對象正在等待的一個(gè)線程。

notifyAll():當(dāng)調(diào)用對象的notifyAll()方法時(shí),所有waiting狀態(tài)的線程都會被喚醒。

(生產(chǎn)者消費(fèi)者)自定義同步容器,容器上限為10,可以在多線程中應(yīng)用,并保證數(shù)據(jù)線程安全。

public class DeviceSingleton<E> {
    private DeviceSingleton() {
    }

    private final int max = 10;
    private int count = 0;
    private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton();

    public static DeviceSingleton getInstance() {
        return DEVICE_SINGLETON;
    }

    private final List<E> devices = new ArrayList<>();

    /**
     * 添加
     */
    public synchronized void add(E data) {
        // 當(dāng)容器滿了之后進(jìn)入等待狀態(tài)
        while (devices.size() == max) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("add: " + data);
        ThreadUtils.sleep(1000);
        devices.add(data);
        count++;
        this.notify();
    }

    /**
     * 獲取
     */
    public synchronized E get() {
        E data = null;
        while (devices.size() == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        ThreadUtils.sleep(1000);
        data = devices.remove(0);
        count--;
        this.notifyAll();
        return data;
    }

    /**
     * 獲取長度
     */
    public synchronized int size() {
        return count;
    }

    @Data
    static class Device {
        private int id;
        private String name;

        public Device(int id, String name) {
            this.id = id;
            this.name = name;
        }
    }

    static class ThreadUtils {
        public static void sleep(int millis) {
            try {
                Thread.sleep(millis);
            } catch (Exception ignore) {}
        }
    }

}
public class Test {
    public static void main(String[] args) throws InterruptedException {
        DeviceSingleton deviceSingleton = DeviceSingleton.getInstance();
        for (int i = 0; i < 10; i++) {
            new Thread(() ->
            {
                for (int j = 0; j < 5; j++) {
                    System.out.println(deviceSingleton.get());
                }
            }, "consumer-" + i).start();
        }
        Thread.sleep(2000);

        for (int i = 0; i < 2; i++) {
            new Thread(() ->
            {
                for (int j = 0; j < 25; j++) {
                    deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j));
                }
            }, "producer").start();
        }
    }

}

ReentrantLock鎖

  1. 重入鎖

為盡量避免使用synchronized和同步方法出現(xiàn)的一種多線程鎖機(jī)制,建議使用的同步方式,效率比synchronized高。使用重入鎖時(shí),需要手動(dòng)釋放鎖(lock.unlock())。示例如下:

public class ReentrantLockTest {
    private final Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // 加鎖
        for (int i = 0; i < 10; i++) {
            System.out.println("method m1() " + i);
            ThreadUtils.sleep(1000);
        }
        lock.unlock(); // 解鎖
    }

    private void m2() {
        lock.lock(); // 加鎖
        System.out.println("method m2()");
        lock.unlock(); // 解鎖
    }

    public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        new Thread(reentrantLockTest::m1).start();
        new Thread(reentrantLockTest::m2).start();
    }

}
  1. 嘗試鎖 lock.tryLock()

如果沒有獲取到鎖標(biāo)記則返回false,當(dāng)前線程等待,如果獲取到了鎖標(biāo)記,則返回true,當(dāng)前線程被鎖定執(zhí)行。示例如下:

public class ReentrantLockTest {
    private Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // 加鎖
        for (int i = 0; i < 10; i++) {
            ThreadUtils.sleep(1000);
            System.out.println("method m1() " + i);
        }
        lock.unlock(); // 解鎖
    }
private void m2() {
        boolean isLocked = false;
        try {
                /*
                嘗試鎖,如果有鎖,則無法獲取鎖標(biāo)記,返回false,否則返回true
                如果無法獲取到鎖標(biāo)記,則說明別的線程正在使用鎖,該線程等待
                如果獲取到了鎖標(biāo)記,則該線程的代碼塊被鎖定
                下面是獲取鎖標(biāo)記的無參方法,當(dāng)執(zhí)行到該語句的時(shí)候立刻獲取鎖標(biāo)記
                也可以用有參的,即當(dāng)執(zhí)行到該語句多長時(shí)間之內(nèi)獲取鎖標(biāo)記,如果超時(shí),不等待,直接返回。如isLocked = lock.tryLock(5, TimeUnit.SECONDS);表示5秒之內(nèi)獲取鎖標(biāo)記(5秒之內(nèi)任何時(shí)間獲取到鎖標(biāo)記都會繼續(xù)執(zhí)行),如果超時(shí)則直接返回。
                 */
                isLocked = lock.tryLock();
                System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized");
        } catch (Exception e) {
                e.printStackTrace();
        } finally {
                // 嘗試鎖在解除鎖標(biāo)記的時(shí)候一定要判斷是否獲取到鎖標(biāo)記
                if (isLocked) {
                        lock.unlock();
                }
        }
}
public static void main(String[] args) {
        ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
        new Thread(reentrantLockTest::m1).start();
        new Thread(reentrantLockTest::m2).start();
}

}
  1. 可中斷鎖lock.lockInterruptibly()

非可中斷鎖當(dāng)客戶端調(diào)用interrupt方法時(shí),只是簡單的去設(shè)置interrupted中斷狀態(tài),并沒有進(jìn)一步拋出異常,而可中斷鎖在監(jiān)測到中斷請求時(shí)會拋出InterruptedException ,進(jìn)而中斷線程執(zhí)行。示例如下:

public class ReentrantLockTest {
    private Lock lock = new ReentrantLock();

    private void m1() {
        lock.lock(); // 加鎖
        for (int i = 0; i < 5; i++) {
            ThreadUtils.sleep(1000);
            System.out.println("method m1() " + i);
        }
        lock.unlock(); // 解鎖
    }

    private void m2() {
        try {
            /*
            可打斷鎖,阻塞等待鎖,可以被其他的線程打斷阻塞狀態(tài)
             */
            lock.lockInterruptibly(); // 可嘗試打斷
            System.out.println("method m2()");
        } catch (InterruptedException e) {
            System.out.println("鎖被打斷");
        } finally {
            try {
                lock.unlock();
            } catch (Exception ignored) {
            }
        }
    }
public static void main(String[] args) {
    ReentrantLockTest reentrantLockTest = new ReentrantLockTest();
    Thread thread1 = new Thread(reentrantLockTest::m1);
    thread1.start();
    ThreadUtils.sleep(1000);
    Thread thread2 = new Thread(reentrantLockTest::m2);
    thread2.start();
    ThreadUtils.sleep(1000);
    thread2.interrupt(); // 打斷線程休眠
}

}
注意:用ReentrantLock打斷鎖,如果要打斷的話是用線程打斷,跟喚醒不同,notifyAll喚醒是用對象區(qū)喚醒。(打斷thread.interruped(); 喚醒object.notifyAll())。

線程打斷有什么用呢?

我們在用Windows的時(shí)候經(jīng)常會遇到軟件鎖死的問題,這時(shí)候我們往往會通過打開任務(wù)管理器來結(jié)束進(jìn)程,這種結(jié)束進(jìn)程可以認(rèn)為是打斷鎖的阻塞狀態(tài)(即非正常結(jié)束)。

  1. 公平鎖

先到先得。若沒有特殊情況,不建議使用公平鎖,如果使用公平鎖的話,一般來說并發(fā)量<=10,如果并發(fā)量較大,而不可避免的有訪問先后順序的話,建議采用別的方法。

public class ReentrantLockTest {
    static class TestReentrantLock extends Thread {
        // 在創(chuàng)建ReentrantLock對象的時(shí)候傳參為true就代表創(chuàng)建公平鎖
        private ReentrantLock lock = new ReentrantLock(true);

        public void run() {
            for (int i = 0; i < 5; i++) {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " get lock.");
                    ThreadUtils.sleep(1000);
                } finally {
                    lock.unlock();
                }
            }
        }
    }
public static void main(String[] args) {
    TestReentrantLock lock = new TestReentrantLock();
    lock.start();
    new Thread(lock).start();
    new Thread(lock).start();
}

}

  1. Condition

為Lock增加條件,當(dāng)條件滿足時(shí)做一些事情,如加鎖或解鎖、等待或喚醒等。下面示例就是使用Condition實(shí)現(xiàn)的生產(chǎn)者消費(fèi)者。

public class DeviceContainer<T> {
private DeviceContainer() {
}

private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();

public static DeviceContainer getInstance() {
    return DEVICE_CONTAINER;
}

private final List<T> list = new LinkedList<>();

private final int max = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public void add(T t) {
    lock.lock();
    try {
        while (this.size() == max) {
            System.out.println(Thread.currentThread().getName() + " 等待");
            // 當(dāng)數(shù)據(jù)長度為max的時(shí)候,生產(chǎn)者進(jìn)入等待隊(duì)列,釋放鎖標(biāo)記
            // 借助條件進(jìn)入的等待隊(duì)列
            producer.await();
        }
        System.out.println(Thread.currentThread().getName() + " 添加");
        list.add(t);
        count++;
        // 借助條件喚醒所有的消費(fèi)者
        consumer.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

public T get() {
    T t = null;
    lock.lock();
    try {
        while (this.size() == 0) {
            System.out.println(Thread.currentThread().getName() + " 等待");
            // 借助條件使消費(fèi)者進(jìn)入等待隊(duì)列
            consumer.await();
        }
        System.out.println(Thread.currentThread().getName() + " 獲取");
        t = list.remove(0);
        count--;
        // 借助條件喚醒所有生產(chǎn)者
        producer.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
    return t;
}

private int size() {
    return count;
}

}

public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceContainer<Device> deviceSingleton = DeviceContainer.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
ThreadUtils.sleep(1000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new Device(j, "device " + j));
}
}, "producer-" + i).start();
}

}

}

Java中的同步容器

  1. Map/Set

ConcurrentHashMap/ConcurrentHashSet:底層哈希實(shí)現(xiàn)的Map/Set,效率高,使用底層技術(shù)實(shí)現(xiàn)的線程安全,量級較synchronized輕。key和value不能為null(不同于HashMap和HashSet)

ConcurrentSkipListMap/ConcurrentSkipListSet:底層跳表實(shí)現(xiàn)的Map/Set,有序,線程安全,效率較ConcurrentHashMap/ConcurrentHashSet低。

CopyOnWriteArraySet:底層數(shù)組,線程安全,增加和刪除效率低,查詢效率高。

  1. List

CopyOnWriteArrayList:底層數(shù)組,線程安全,增加和刪除效率低,查詢效率高。

  1. Queue

ConcurrentLinkedQueue/ ConcurrentLinkedDeue:基礎(chǔ)鏈表同步隊(duì)列,非阻塞,ConcurrentLinkedQueue底層單向鏈表,ConcurrentLinkedDeue底層雙向鏈表,均***。

ArrayBlockingQueue/LinkedBlockingQueue:阻塞隊(duì)列,隊(duì)列容量不足自動(dòng)阻塞,隊(duì)列容量為0自動(dòng)阻塞。ArrayBlockingQueue底層使用數(shù)組,有界;LinkedBlockingQueue底層使用鏈表,默認(rèn)***。ArrayBlockingQueue根據(jù)調(diào)用API的不同,有不同的特性。當(dāng)容量不足的時(shí)候有阻塞能力。add方法在容量不足的時(shí)候會拋出異常;put方法在容量不足時(shí)阻塞等待;offer默認(rèn)不阻塞,當(dāng)容量不足的時(shí)候返回false,否則返回true;三參offer可設(shè)定阻塞時(shí)長,若在阻塞時(shí)長內(nèi)有容量空閑,則添加并返回true,如果阻塞時(shí)長范圍內(nèi)無容量空閑,放棄新增數(shù)據(jù)并返回false。LinkedBlockingQueue的add方法在容量不足的時(shí)候會拋出異常;offer方法在容量不足時(shí)返回false,否則返回true;三參offer可設(shè)定阻塞時(shí)長,若在阻塞時(shí)長內(nèi)有容量空閑,則添加并返回true,如果阻塞時(shí)長范圍內(nèi)無容量空閑,放棄新增數(shù)據(jù)并返回false。

PriorityQueue:有限集隊(duì)列,底層數(shù)組,***。

PriorityBlockingQueue:優(yōu)先級阻塞隊(duì)列,底層數(shù)組,***。

LinkedTransferQueue:轉(zhuǎn)移隊(duì)列,使用transfer方法實(shí)現(xiàn)數(shù)據(jù)的即時(shí)處理。隊(duì)列使用add保存數(shù)據(jù),不做阻塞等待。transfer是TransferQueue的特有方法,轉(zhuǎn)移隊(duì)列必須要有消費(fèi)者(take()方法的調(diào)用者)。如果沒有任何線程消費(fèi)數(shù)據(jù),則transfer方法阻塞。一般用于處理即時(shí)消息。

SynchronousQueue:阻塞的同步隊(duì)列,有界。是一個(gè)容量為0的隊(duì)列,是一個(gè)特殊的TransferQuque。必須先有消費(fèi)線程等待才能使用的隊(duì)列。add方法無阻塞,若沒有消費(fèi)線程阻塞等待數(shù)據(jù),則拋出異常。put方法有阻塞,若沒有消費(fèi)線程阻塞等待數(shù)據(jù),則put方法阻塞。

DelayQueue:延時(shí)阻塞隊(duì)列,***。類似輪詢機(jī)制,一般用來做定時(shí)任務(wù)。業(yè)務(wù)場景舉例:具有過期時(shí)間的緩存,訂單過期自動(dòng)取消等。

?

線程池
線程池是一個(gè)進(jìn)程級的資源,默認(rèn)的生命周期和JVM一致,即從開啟線程池開始,到JVM關(guān)閉為止,是線程池的默認(rèn)生命周期。如果顯式調(diào)用shutdown方法,那么線程池執(zhí)行所有的任務(wù)后自動(dòng)關(guān)閉。

Executor接口
線程池頂級接口。Executor中只有一個(gè)方法execute,是用來處理任務(wù)的一個(gè)服務(wù)方法。調(diào)用者提供Runnable接口的實(shí)現(xiàn),線程池通過執(zhí)行線程執(zhí)行這個(gè)Runnable。

public class Executor01 {
public static void main(String[] args) {
new Executor_01().execute(() ->
System.out.println(Thread.currentThread().getName() + " test executor.")
);
}
static class Executor_01 implements Executor {@Override
br/>@Override
new Thread(command).start();
}
}
}
ExecutorService
Executor的子接口,與Executor不同的是,它還提供了一個(gè)返回值為Future的服務(wù)方法submit。

Executors工具類
Executor的工具類,為線程池提供工具方法,可快速創(chuàng)建線程池,所有的線程池類型都實(shí)現(xiàn)了這個(gè)接口,實(shí)現(xiàn)了這個(gè)接口就代表有提供線程池的能力。常用方法有:void execute(),F(xiàn)uture submit(Callable),F(xiàn)uture submit(Runnable),void shutdown,boolean isShutdown(),boolean isTerminated()。

public class Test {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個(gè)長度為5的線程池對象
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
System.out.println(executorService);

    // 優(yōu)雅關(guān)閉
    executorService.shutdown();
    // 是否已經(jīng)結(jié)束,相當(dāng)于判斷是否回收了資源,因?yàn)榫€程睡眠,此時(shí)還未回收,因此為false
    System.out.println(executorService.isTerminated());
    // 是否已經(jīng)關(guān)閉,即是否調(diào)用過shutdown方法
    System.out.println(executorService.isShutdown());
    System.out.println(executorService);

    ThreadUtils.sleep(1000);

    // 因?yàn)樯厦嫠?秒,任務(wù)都已經(jīng)執(zhí)行完了,資源也被回收了,因此為true
    System.out.println(executorService.isTerminated());
    System.out.println(executorService.isShutdown());
    System.out.println(executorService);
}

}
Future
未來結(jié)果,代表線程執(zhí)行結(jié)束后的結(jié)果。通過get方法獲取線程執(zhí)行結(jié)果。

常用方法:get()、get(long, TimeUnit)和isDown()。

get():阻塞等待線程執(zhí)行結(jié)束并得到返回結(jié)果;

get(long, TimeUnit):阻塞固定時(shí)長,等待線程結(jié)束后的結(jié)果,如果在阻塞時(shí)長范圍內(nèi)線程未執(zhí)行結(jié)束,拋出異常。

isDown():判斷線程是否結(jié)束即判斷call方法是否已完成,要特別注意,這里的isDown與ExecutorService中的isShutdown不同,isShutdown是用來判斷線程是否關(guān)閉的。

public class ExecutorServiceTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
testExecutorService();
}

private static void testExecutorService() throws ExecutionException, InterruptedException {
    ExecutorService service = Executors.newFixedThreadPool(1);
    Future<String> future = service.submit(() -> {
        ThreadUtils.sleep(1000);
        return Thread.currentThread().getName() + " submit.";
    });

    // 查看任務(wù)是否完成即線程是否結(jié)束即call方法是否執(zhí)行結(jié)束,
    // 要注意的是,這里判斷是否結(jié)束,跟ExecutorService中的isShutDowm不同, isShutdowm是判斷線程是否結(jié)束,而shutdown表示關(guān)閉線程
    System.out.println(future.isDone());
    // 獲取call方法的返回值
    System.out.println(future.get()); // false

    System.out.println(future.isDone());
    System.out.println(future.get()); // true

    // 關(guān)閉線程池
    service.shutdown();
}

}
Callable接口
可執(zhí)行接口。類似Runnable接口,也是可以啟動(dòng)線程的接口。

接口方法:call(),相當(dāng)于Runnable中的run方法,區(qū)別在于call方法有返回值。

Callable和Runnable的選擇:當(dāng)需要返回值或需要拋出異常時(shí),使用Callable,其他情況任意選。

ThreadPoolExecutor創(chuàng)建線程池
通過new ThreadPoolExecutor來創(chuàng)建,下圖是ThreadPoolExecutor的三個(gè)構(gòu)造方法:

參數(shù)說明:

corePoolSize? 核心線程數(shù)

maximumPoolSize? 最大線程數(shù)

keepAliveTime? 線程最大空閑時(shí)間

unitTimeUnit? 時(shí)間單位

workQueueBlockingQueue<Runnable>? 線程等待隊(duì)列

threadFactoryThreadFactory? 線程創(chuàng)建工廠

handlerRejectedExecutionHandler? 拒絕策略
?

核心線程數(shù)和最大線程數(shù):

當(dāng)提交一個(gè)新任務(wù)到線程池時(shí)首先判斷核心線程數(shù)corePoolSize是否已達(dá)上限,若未達(dá)到corePoolSize上限,創(chuàng)建一個(gè)工作線程來執(zhí)行任務(wù);否則,再判斷線程池工作隊(duì)列workQueueBlockingQueue是否已滿,若沒滿,則將新提交的任務(wù)存儲在工作隊(duì)列里;否則,線程池將判斷最大線程數(shù)是否已達(dá)上限,若未達(dá)到maximumPoolSize上限,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù),滿了,則交給飽和策略來處理這個(gè)任務(wù)。如果線程池中的線程數(shù)量大于核心線程數(shù) corePoolSize 時(shí),線程空閑時(shí)間超過線程最大空閑時(shí)間keepAliveTime,則線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize。

自定義線程池

public class ExecutorThreadPoolTest {
public static void main(String[] args) {
testExecutorThreadPool();
}

private static void testExecutorThreadPool() {
    // 創(chuàng)建線程池,核心線程數(shù)為2,最大線程數(shù)為4,最大空閑時(shí)間為10
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
            4,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2),
            new MyTreadFactory(),
            new MyIgnorePolicy());
    // 啟動(dòng)所有核心線程,使其出與等待狀態(tài)
    executor.prestartAllCoreThreads();

    // 創(chuàng)建并執(zhí)行任務(wù)
    for (int i = 1; i <= 10; i++) {
        MyTask task = new MyTask(String.valueOf(i));
        executor.execute(task);
    }
}

static class MyTreadFactory implements ThreadFactory {

    private final AtomicInteger mThreadNum = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable runnable) {
        Thread t = new Thread(runnable, "線程【" + mThreadNum.getAndIncrement() + "】");
        System.out.println(t.getName() + " 已創(chuàng)建");
        return t;
    }
}

public static class MyIgnorePolicy implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        doLog(runnable, executor);
    }

    private void doLog(Runnable runnable, ThreadPoolExecutor executor) {
        System.err.println(runnable.toString() + " 被拒絕");
    }
}

@Data
static class MyTask implements Runnable {
    private String name;

    public MyTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(this.toString() + " 正在運(yùn)行");
        ThreadUtils.sleep(1000);
    }

    @Override
    public String toString() {
        return "線程【" + name + "】";
    }
}

}
FixedThreadPool線程池
固定容量的線程池,可由Executors來創(chuàng)建,活動(dòng)狀態(tài)和線程池容量是有上限的,需要手動(dòng)銷毀線程池。構(gòu)造方法如下:

由此可見,該線程池核心線程數(shù)和最大線程數(shù)均為構(gòu)造參數(shù)值nThreads,線程最大空閑時(shí)間為0,任務(wù)隊(duì)列采用LinkedBlockingQueue,默認(rèn)容量上限是Integer.MAX_VALUE。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創(chuàng)建容量為10的FixedThreadPool線程池
    ExecutorService service = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 100; i++) {
        service.execute(()-> System.out.println(Thread.currentThread().getName()));
    }
    // 銷毀線程池
    service.shutdown();
}

}
CachedThreadPool線程池
緩存線程池,通過Executors來創(chuàng)建,默認(rèn)最大容量為Integer.MAX_VALUE,自動(dòng)擴(kuò)容,執(zhí)行完后自動(dòng)銷毀(這一點(diǎn)與FixedThreadPool不同,F(xiàn)ixedThreadPool的銷毀需要手動(dòng)調(diào)用shutdown方法)。構(gòu)造方法如下:

由構(gòu)造方法可見,核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,最大空閑時(shí)間為60秒,任務(wù)隊(duì)列使用SynchronousQueue。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創(chuàng)建緩存線程池
    ExecutorService service = Executors.newCachedThreadPool();
    System.out.println(service);
    for (int i = 0; i < 5; i++) {
        service.execute(() -> {
            ThreadUtils.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " executor.");
        });
    }
    System.out.println(service);
    ThreadUtils.sleep(65);
    System.out.println(service);
}

}
ScheduledThreadPool線程池
計(jì)劃任務(wù)線程池,可以根據(jù)任務(wù)自動(dòng)執(zhí)行計(jì)劃的線程池,由Executors創(chuàng)建,需要手動(dòng)銷毀。計(jì)劃任務(wù)時(shí)選用,如需要定時(shí)整理數(shù)據(jù)、服務(wù)器定期清除無效文件等。構(gòu)造方法如下:

核心線程數(shù)為構(gòu)造參數(shù)大小,最大線程數(shù)為Integer.MAX_VALUE,最大空閑時(shí)間0,任務(wù)隊(duì)列使用DelayedWorkQuquq。

常用方法有:scheduledAtFixedRate、schedule、execute等。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創(chuàng)建計(jì)劃任務(wù)線程池
    ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
    System.out.println(service);
    // 定期任務(wù),線程池啟動(dòng)500毫秒后第一次執(zhí)行任務(wù),以后每300毫秒執(zhí)行一次
    service.scheduleAtFixedRate(() -> {
        ThreadUtils.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " executor.");
    }, 500, 300, TimeUnit.MILLISECONDS);
    System.out.println(service);
    service.shutdown();
}

}
SingleThreadExecutor線程池
單一容量的線程池。需要手動(dòng)銷毀。有保證任務(wù)順序需求時(shí)可選用。如大廳中的公共頻道聊天,固定數(shù)量商品的秒殺等。構(gòu)造方法如下:

核心線程數(shù)和最大線程數(shù)均為1,任務(wù)隊(duì)列為LinkedBlockingQueue。

public class Test {
public static void main(String[] args) {
new Test().test();
}

public void test() {
    // 創(chuàng)建單一容量線程池
    ExecutorService service = Executors.newSingleThreadExecutor();
    System.out.println(service);
    for (int i = 0; i < 5; i++) {
        service.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " executor.");
            ThreadUtils.sleep(1000);
        });
    }
    service.shutdown();
}

}
ForkJoinPool線程池
分支合并線程池,適用于處理復(fù)雜任務(wù)。初始化線程容量與CPU核心數(shù)有關(guān)。

ForkJoinPool沒有所謂的容量,默認(rèn)都是一個(gè)線程,根據(jù)任務(wù)自動(dòng)分支新的子線程,,當(dāng)子線程結(jié)束后自動(dòng)合并。所謂自動(dòng)合并,是用fork和join兩個(gè)方法實(shí)現(xiàn)的(手動(dòng)調(diào)用)。

線程池中運(yùn)行的可分治合并的任務(wù)必須是ForkJoinTask的子類型(RecursiveTask或RecursiveAction,二者的區(qū)別在于一個(gè)運(yùn)行完之后有返回值,一個(gè)沒有),其中提供了分支和合并能力。

ForkJoinTask提供了兩個(gè)抽象子類型RecursiveTask和RecursiveAction,RecursiveTask是有返回結(jié)果的分支合并任務(wù),RecursiveAction是無返回結(jié)果的分支合并任務(wù)(類似Callable和Runnable的區(qū)別)。

ForkJoinTask提供了一個(gè)compute方法,這個(gè)方法里面就是任務(wù)的執(zhí)行邏輯。

該線程池主要用于大量數(shù)據(jù)的計(jì)算、數(shù)據(jù)分析等。

public class Test {

public static void main(String[] args) throws ExecutionException, InterruptedException {
    long result = 0L;
    for (int NUMBER : NUMBERS) {
        result += NUMBER;
    }
    System.out.println(result);

    ForkJoinPool pool = new ForkJoinPool();
    // 分支合并任務(wù)
    AddTask task = new AddTask(0, NUMBERS.length);
    // 提交任務(wù)
    Future<Long> future = pool.submit(task);
    System.out.println(future.get());
}

private static final int[] NUMBERS = new int[1000000];
private static final int MAX_SIZE = 50000;
private static final Random RANDOM = new Random();

static {
    for (int i = 0; i < NUMBERS.length; i++) {
        NUMBERS[i] = RANDOM.nextInt(1000);
    }
}

static class AddTask extends RecursiveTask<Long> {
    int begin, end;

    AddTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if ((end - begin) < MAX_SIZE) {
            long sum = 0L;
            for (int i = begin; i < end; i++) {
                sum += NUMBERS[i];
            }
            return sum;
        } else {
            // 當(dāng)結(jié)束值減去開始值大于臨界值的時(shí)候進(jìn)行分支
            int middle = begin + (end - begin) / 2;
            AddTask task1 = new AddTask(begin, middle);
            AddTask task2 = new AddTask(middle, end);
            // 分支的工作,就是開啟一個(gè)新的線程任務(wù)
            task1.fork();
            task2.fork();
            // join就是合并,將任務(wù)的結(jié)果獲取,是一個(gè)阻塞方法,一定會得到結(jié)果數(shù)據(jù)
            return task1.join() + task2.join();
        }
    }
}

}

線程組

一組線程的集合,線程組中多個(gè)線程執(zhí)行同一批任務(wù),線程之間是隔離的,互不影響。同一組的線程之間可以通信,但不同組的線程之間不能通信,這樣就做到了線程屏蔽,保證了線程安全。

public class Test {

public static void main(String[] args) {
    new Test().test();
}

public void test() {
    ThreadGroup group = new ThreadGroup("LQ");
    Thread thread = new Thread(group, () ->
            System.out.println("group is " + Thread.currentThread().getThreadGroup().getName())
    );
    thread.start();
}

}
朋友們覺得內(nèi)容有什么錯(cuò)誤、不足之處,或者有什么疑問,盡可留言指出來,一起學(xué)習(xí)哦。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI