溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

java并發(fā)編程的入門過程

發(fā)布時(shí)間:2021-10-12 11:10:46 來源:億速云 閱讀:115 作者:柒染 欄目:云計(jì)算

本篇文章給大家分享的是有關(guān)java并發(fā)編程的入門過程,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

1、入門介紹

1.1、實(shí)現(xiàn)線程的2種方式

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 14:05
 */
public class MyThreadDemo1 {
    public static void main(String[] args) {
        new Thread1().start();
        
        new Thread(new Thread2()).start();
    }
}

class Thread1 extends Thread{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

class Thread2 implements Runnable{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

1.2、線程的狀態(tài)(生命周期)

1.2.1、線程的5種狀態(tài)

1. 新建(NEW):新創(chuàng)建了一個(gè)線程對(duì)象。

2. 可運(yùn)行(RUNNABLE):線程對(duì)象創(chuàng)建后,其他線程(比如main線程)調(diào)用了該對(duì)象的start()方法。該狀態(tài)的線程位于可運(yùn)行線程池中,等待被線程調(diào)度選中,獲取cpu 的使用權(quán) 。

3. 運(yùn)行(RUNNING):可運(yùn)行狀態(tài)(runnable)的線程獲得了cpu 時(shí)間片(timeslice) ,執(zhí)行程序代碼。

4. 阻塞(BLOCKED):阻塞狀態(tài)是指線程因?yàn)槟撤N原因放棄了cpu 使用權(quán),也即讓出了cpu timeslice,暫時(shí)停止運(yùn)行。直到線程進(jìn)入可運(yùn)行(runnable)狀態(tài),才有機(jī)會(huì)再次獲得cpu timeslice 轉(zhuǎn)到運(yùn)行(running)狀態(tài)。阻塞的情況分三種: 

(一). 等待阻塞:運(yùn)行(running)的線程執(zhí)行o.wait()方法,JVM會(huì)把該線程放入等待隊(duì)列(waitting queue)中。
(二). 同步阻塞:運(yùn)行(running)的線程在獲取對(duì)象的同步鎖時(shí),若該同步鎖被別的線程占用,則JVM會(huì)把該線程放入鎖池(lock pool)中。
(三). 其他阻塞:運(yùn)行(running)的線程執(zhí)行Thread.sleep(long ms)或t.join()方法,或者發(fā)出了I/O請(qǐng)求時(shí),JVM會(huì)把該線程置為阻塞狀態(tài)。當(dāng)sleep()狀態(tài)超時(shí)、join()等待線程終止或者超時(shí)、或者I/O處理完畢時(shí),線程重新轉(zhuǎn)入可運(yùn)行(runnable)狀態(tài)。

5. 死亡(DEAD):線程run()、main() 方法執(zhí)行結(jié)束,或者因異常退出了run()方法,則該線程結(jié)束生命周期。死亡的線程不可再次復(fù)生。

1.2.2、線程運(yùn)行狀態(tài)圖

java并發(fā)編程的入門過程

1). 初始狀態(tài)

  1. 實(shí)現(xiàn)Runnable接口和繼承Thread可以得到一個(gè)線程類,new一個(gè)實(shí)例出來,線程就進(jìn)入了初始狀態(tài)

2). 可運(yùn)行狀態(tài)

  1. 可運(yùn)行狀態(tài)只是說你資格運(yùn)行,調(diào)度程序沒有挑選到你,你就永遠(yuǎn)是可運(yùn)行狀態(tài)。

  2. 調(diào)用線程的start()方法,此線程進(jìn)入可運(yùn)行狀態(tài)。

  3. 當(dāng)前線程sleep()方法結(jié)束,其他線程join()結(jié)束,等待用戶輸入完畢,某個(gè)線程拿到對(duì)象鎖,這些線程也將進(jìn)入可運(yùn)行狀態(tài)。

  4. 當(dāng)前線程時(shí)間片用完了,調(diào)用當(dāng)前線程的yield()方法,當(dāng)前線程進(jìn)入可運(yùn)行狀態(tài)。

  5. 鎖池里的線程拿到對(duì)象鎖后,進(jìn)入可運(yùn)行狀態(tài)。

3). 運(yùn)行狀態(tài)

  1. 線程調(diào)度程序從可運(yùn)行池中選擇一個(gè)線程作為當(dāng)前線程時(shí)線程所處的狀態(tài)。這也是線程進(jìn)入運(yùn)行狀態(tài)的唯一一種方式。

4). 死亡狀態(tài)

  1. 當(dāng)線程的run()方法完成時(shí),或者主線程的main()方法完成時(shí),我們就認(rèn)為它死去。這個(gè)線程對(duì)象也許是活的,但是,它已經(jīng)不是一個(gè)單獨(dú)執(zhí)行的線程。線程一旦死亡,就不能復(fù)生。

  2. 在一個(gè)死去的線程上調(diào)用start()方法,會(huì)拋出java.lang.IllegalThreadStateException異常。

5). 阻塞狀態(tài)

  1. 當(dāng)前線程T調(diào)用Thread.sleep()方法,當(dāng)前線程進(jìn)入阻塞狀態(tài)。

  2. 運(yùn)行在當(dāng)前線程里的其它線程t2調(diào)用join()方法,當(dāng)前線程進(jìn)入阻塞狀態(tài)。

  3. 等待用戶輸入的時(shí)候,當(dāng)前線程進(jìn)入阻塞狀態(tài)。

6). 等待隊(duì)列(是Object里的方法,但影響了線程)

  1. 調(diào)用obj的wait(), notify()方法前,必須獲得obj鎖,也就是必須寫在synchronized(obj) 代碼段內(nèi)。

  2. 與等待隊(duì)列相關(guān)的步驟和圖

  • 線程1獲取對(duì)象A的鎖,正在使用對(duì)象A。

  • 線程1調(diào)用對(duì)象A的wait()方法。

  • 線程1釋放對(duì)象A的鎖,并馬上進(jìn)入等待隊(duì)列。

  • 鎖池里面的對(duì)象爭搶對(duì)象A的鎖。

  • 線程5獲得對(duì)象A的鎖,進(jìn)入synchronized塊,使用對(duì)象A。

  • 線程5調(diào)用對(duì)象A的notifyAll()方法,喚醒所有線程,所有線程進(jìn)入鎖池。||||| 線程5調(diào)用對(duì)象A的notify()方法,喚醒一個(gè)線程,不知道會(huì)喚醒誰,被喚醒的那個(gè)線程進(jìn)入鎖池。

  • notifyAll()方法所在synchronized結(jié)束,線程5釋放對(duì)象A的鎖。

  • 鎖池里面的線程爭搶對(duì)象鎖,但線程1什么時(shí)候能搶到就不知道了。||||| 原本鎖池+第6步被喚醒的線程一起爭搶對(duì)象鎖。

java并發(fā)編程的入門過程

7). 鎖池狀態(tài)

  1. 當(dāng)前線程想調(diào)用對(duì)象A的同步方法時(shí),發(fā)現(xiàn)對(duì)象A的鎖被別的線程占有,此時(shí)當(dāng)前線程進(jìn)入鎖池狀態(tài)。簡言之,鎖池里面放的都是想爭奪對(duì)象鎖的線程。

  2. 當(dāng)一個(gè)線程1被另外一個(gè)線程2喚醒時(shí),1線程進(jìn)入鎖池狀態(tài),去爭奪對(duì)象鎖。

  3. 鎖池是在同步的環(huán)境下才有的概念,一個(gè)對(duì)象對(duì)應(yīng)一個(gè)鎖池。

1.3. sleep、yield、join、wait的比較

  1. Thread.sleep(long millis),一定是當(dāng)前線程調(diào)用此方法,當(dāng)前線程進(jìn)入阻塞,但不釋放對(duì)象鎖,millis后線程自動(dòng)蘇醒進(jìn)入可運(yùn)行狀態(tài)。作用:給其它線程執(zhí)行機(jī)會(huì)的最佳方式。

  2. Thread.yield(),一定是當(dāng)前線程調(diào)用此方法,當(dāng)前線程放棄獲取的cpu時(shí)間片,由運(yùn)行狀態(tài)變會(huì)可運(yùn)行狀態(tài),讓OS再次選擇線程。作用:讓相同優(yōu)先級(jí)的線程輪流執(zhí)行,但并不保證一定會(huì)輪流執(zhí)行。實(shí)際中無法保證yield()達(dá)到讓步目的,因?yàn)樽尣降木€程還有可能被線程調(diào)度程序再次選中。Thread.yield()不會(huì)導(dǎo)致阻塞。

  3. t.join()/t.join(long millis),當(dāng)前線程里調(diào)用其它線程1的join方法,當(dāng)前線程阻塞,但不釋放對(duì)象鎖,直到線程1執(zhí)行完畢或者millis時(shí)間到,當(dāng)前線程進(jìn)入可運(yùn)行狀態(tài)。

  4. obj.wait(),當(dāng)前線程調(diào)用對(duì)象的wait()方法,當(dāng)前線程釋放對(duì)象鎖,進(jìn)入等待隊(duì)列。依靠notify()/notifyAll()喚醒或者wait(long timeout)timeout時(shí)間到自動(dòng)喚醒。

  5. obj.notify()喚醒在此對(duì)象監(jiān)視器上等待的單個(gè)線程,選擇是任意性的。notifyAll()喚醒在此對(duì)象監(jiān)視器上等待的所有線程。

1.4、Thread.sleep(long time)

Thread.sleep(long time)方法的作用是讓當(dāng)前正在執(zhí)行的線程休眠(讓出CPU時(shí)間片)指定的毫秒數(shù)。

需要注意的是:

  • 調(diào)用sleep()方法時(shí),如果當(dāng)前線程持有鎖不會(huì)導(dǎo)致當(dāng)前線程釋放鎖。

  • sleep不釋放鎖 線程是進(jìn)入阻塞狀態(tài)還是就緒狀態(tài)?

答案是進(jìn)入阻塞狀態(tài),確切的說Thread在Java的TIMED_WAITING狀態(tài)(但這個(gè)狀態(tài)其實(shí)并沒那么重要,可以認(rèn)為是java的內(nèi)部細(xì)節(jié),用戶不用太操心)。往下一層,在不同OS上底層的sleep的實(shí)現(xiàn)細(xì)節(jié)不太一樣。但是大體上就是掛起當(dāng)前的線程,然后設(shè)置一個(gè)信號(hào)或者時(shí)鐘中斷到時(shí)候喚醒。sleep后的的Thread在被喚醒前是不會(huì)消耗任何CPU的(確切的說,大部分OS都會(huì)這么實(shí)現(xiàn),除非某個(gè)OS的實(shí)現(xiàn)偷懶了)。這點(diǎn)上,wait對(duì)當(dāng)前線程的效果差不多是一樣的,也會(huì)暫停調(diào)度,等著notify或者一個(gè)超時(shí)的時(shí)間。期間CPU也不會(huì)被消耗。

2、多線程實(shí)現(xiàn)銀行叫號(hào)排隊(duì)

2.1 版本1

package chapter2.version1;
/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 8:58
 */
public class Bank {
    public static void main(String[] args) {
        TicketWindow ticketWindow = new TicketWindow("1號(hào)窗口");
        TicketWindow ticketWindow2 = new TicketWindow("2號(hào)窗口");
        TicketWindow ticketWindow3 = new TicketWindow("3號(hào)窗口");
        ticketWindow.start();
        ticketWindow2.start();
        ticketWindow3.start();
    }
}
package chapter2.version1;
/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 8:58
 */
public class TicketWindow extends Thread {
    private static int MAX = 50;

    private static int current = 1;
    private String name;
    public TicketWindow(String name){
        this.name = name;
    }


    @Override
    public void run() {

        while (current < MAX){
            System.out.println("窗口:" + name +" 當(dāng)前叫號(hào):" + current);
            current++;

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

此版本中注意TicketWindow 類的成員變量current 為static, 表示無論實(shí)例化這個(gè)類多少次,都共享同一份變量,因?yàn)檫@個(gè)變量是在類加載的時(shí)候就已經(jīng)創(chuàng)建好的。為了讓多個(gè)線程消耗同一個(gè)current 所以才定義為static的,不然每個(gè)線程的current都是各自的,與其他線程不相關(guān)。

以上代碼把業(yè)務(wù)與線程緊密摻雜在一起,為了讓多個(gè)線程訪問同一份current把他定義為static明顯是不合適的。

2.2 版本2

package chapter2.version2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 8:58
 */
public class Bank {

    public static void main(String[] args) {
        // 有線程安全問題, 多個(gè)線程同時(shí)訪問到current變量
        TicketWindow ticketWindow = new TicketWindow();

        Thread windowThread1 = new Thread(ticketWindow, "1號(hào)窗口");
        Thread windowThread2 = new Thread(ticketWindow, "2號(hào)窗口");
        Thread windowThread3 = new Thread(ticketWindow, "3號(hào)窗口");

        windowThread1.start();
        windowThread2.start();
        windowThread3.start();
    }

}

package chapter2.version2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 9:02
 */
public class TicketWindow implements Runnable {

    private int MAX = 50;

    private int current = 1;


    @Override
    public void run() {

        while (current < MAX){
            System.out.println("窗口:" + Thread.currentThread().getName() +" 當(dāng)前叫號(hào):" + current);
            current++;

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

這里TicketWindow 并不是繼承Thread, 這樣啟動(dòng)線程多次線程可以共享同一份TicketWindow 實(shí)例, 把業(yè)務(wù)邏輯與線程啟動(dòng)拆分清晰明了。 注:以上2個(gè)版本均存在線程安全性問題,這是由于current變量在run()方法可能被多個(gè)線程同時(shí)訪問, 可能多個(gè)線程同時(shí)執(zhí)行到

System.out.println("窗口:" + Thread.currentThread().getName() +" 當(dāng)前叫號(hào):" + current);

這行代碼,導(dǎo)致不同的線程打印出了一樣的叫號(hào)值,比如運(yùn)行以上程序會(huì)輸出如下結(jié)果: java并發(fā)編程的入門過程

3、守護(hù)線程

要點(diǎn):守護(hù)線程會(huì)隨著父線程的退出而退出, 守護(hù)線程適宜一些輔助性的工作,而不能把核心工作的線程設(shè)置為守護(hù)線程。

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 14:07
 */
public class DeemonThreadDemo {

    public static void main(String[] args) {
        Thread thread = new Thread(){
            @Override
            public void run() {
                while (true){
                    System.out.println("hello");
                }
            }
        };

        // 設(shè)置為守護(hù)線程
        thread.setDaemon(true);

        thread.start();

        System.out.println("主線程退出");

    }
}

java并發(fā)編程的入門過程

從代碼中可以看到thread 中的代碼有while(true){ }語句, 在我們的映像中while(true)是死循環(huán)應(yīng)該永遠(yuǎn)不退出,永遠(yuǎn)打印hello, 但是從輸出結(jié)果可以看到運(yùn)行一段時(shí)間后就沒有再打印了,說明父線程結(jié)束執(zhí)行了,子線程也隨機(jī)退出了。

4、Thread.join()

Thread.join() 作用:調(diào)用join()方法的線程會(huì)等到他自己執(zhí)行結(jié)束才會(huì)繼續(xù)往后執(zhí)行

Thread.join(time) 作用:調(diào)用join(time)方法的線程會(huì)等到他自己執(zhí)行指定時(shí)間后就會(huì)繼續(xù)往后執(zhí)行,

4.1 無限等待

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 14:32
 */
public class ThreadJoinDemo {

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            for (int i =0; i < 500; i++){
                System.out.println(Thread.currentThread().getName() + ":" + i);
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i =0; i < 500; i++){
                System.out.println(Thread.currentThread().getName() + ":" + i);
            }
        });

        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();

        System.out.println("所有線程已結(jié)束執(zhí)行");

    }
}

java并發(fā)編程的入門過程 java并發(fā)編程的入門過程

thread1 和thread2交互執(zhí)行, 而main線程的打印結(jié)果一定是在2個(gè)線程全部執(zhí)行完后才打印結(jié)果。

4.2 等待指定時(shí)間

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 14:32
 */
public class ThreadJoinDemo2 {

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() +"開始執(zhí)行");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() +"結(jié)束執(zhí)行");
        });

        thread1.start();
        
        // 雖然join了, 但是只會(huì)等待指定的時(shí)間,而不會(huì)無限等待
        thread1.join(1000);

        System.out.println("主線程等到1000毫秒就執(zhí)行到了,而無需等到thread1執(zhí)行完畢");

    }
}

java并發(fā)編程的入門過程

5、中斷線程的幾種方式

5.1、介紹

package chapter2;

import org.junit.Test;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 14:53
 */
public class ThreadInterrupt {

    public static void main(String[] args) {
        Thread thread = new Thread(() ->{
            while (true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("線程被打斷");
                    e.printStackTrace();

                    // 中斷后讓其退出while循環(huán), 這樣來控制線程的退出
                    break;
                }
            }
        });

        thread.start();

        System.out.println(thread.isInterrupted());
        // 打斷thread線程的sleep, 讓thread1不再繼續(xù)sleep
        thread.interrupt();
        System.out.println(thread.isInterrupted());

        System.out.println("主線程退出");
    }

    @Test
    public void test2(){
        Object MONITOR = new Object();

        Thread thread = new Thread(() ->{
            while (true){
                synchronized (MONITOR){
                    try {
                        MONITOR.wait(100);
                    } catch (InterruptedException e) {
                        System.out.println("線程被打斷" + Thread.interrupted());
                        e.printStackTrace();
                    }
                }
            }
        });

        thread.start();

        System.out.println(thread.isInterrupted());
        thread.interrupt();
        System.out.println(thread.isInterrupted());

        System.out.println("主線程退出");
    }

    @Test
    public void test3(){
        Thread thread1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() +"開始執(zhí)行");
            while (true){

            }
        });

        Thread main = Thread.currentThread();
        Thread thread2 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 目前是main線程在等待thread1線程,所以需要打斷main線程,才能讓main線程不再繼續(xù)等待而是繼續(xù)往后執(zhí)行
            main.interrupt();
            System.out.println("interrupt");
        });

        thread1.start();
        thread2.start();

        try {
            // main線程會(huì)等待thread1線程執(zhí)行完畢才能繼續(xù)往后執(zhí)行
            thread1.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("主線程執(zhí)行到了");

    }
}

5.2、通過whilte(running) { }變量控制

package chapter2;

import org.junit.Test;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 15:34
 */
public class ThreadCloseDemo {

    class Worker implements Runnable{
        private volatile boolean running = true;

        @Override
        public void run() {
            // 這里有問題, 假如doSomething()非常耗時(shí), 下一次執(zhí)行while (running)沒有機(jī)會(huì)得到執(zhí)行,沒法立即中斷
            while (running){
                // 假如執(zhí)行doSomething()方法
            }

            System.out.println("退出死循環(huán)");
        }

        public void shutdown(){
            this.running = false;
        }
    }

    @Test
    public void close1(){
        Worker worker = new Worker();
        Thread thread = new Thread(worker);
        thread.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        worker.shutdown();

        System.out.println("主線程執(zhí)行完畢");
    }
}

test1()的執(zhí)行結(jié)果:

java并發(fā)編程的入門過程

5.3、通過Thread.interrupt()控制

package chapter2;

import org.junit.Test;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 15:34
 */
public class ThreadCloseDemo2 {

    class Worker2 implements Runnable{
        private volatile boolean running = true;

        @Override
        public void run() {
            while (true){
                if (Thread.currentThread().isInterrupted()){
                    break;
                }

                // 這里有問題, 假如doSomething()非常耗時(shí), 上面的代碼根本沒有機(jī)會(huì)得到執(zhí)行,沒法立即中斷
                // doSomething()


            }

            System.out.println("退出死循環(huán)");
        }
    }

    @Test
    public void close2(){
        Worker2 worker = new Worker2();
        Thread thread = new Thread(worker);
        thread.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        thread.interrupt();

        System.out.println("主線程執(zhí)行完畢");
    }
}

java并發(fā)編程的入門過程

5.4、暴力終止

package chapter2;

import org.junit.Test;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 16:06
 */
public class ThreadCloseDemo2 {

    class ThreadService{

        private Thread executeThread;

        private boolean running = true;

        public void execute(Runnable task){

            executeThread = new Thread(() -> {
                Thread workerThread = new Thread(task);
                workerThread.setDaemon(true);
                workerThread.start();

                //
                try {
                    // 讓executeThread等待workerThread結(jié)束執(zhí)行才往后執(zhí)行
                    // 假如workerThread比較耗時(shí),由于守護(hù)線程是依附于父線程的,假如這里不join,
                    // 那么可能會(huì)產(chǎn)生workerThread還沒有真正執(zhí)行完,executeThread線程就執(zhí)行完了,導(dǎo)致workerThread也跟著退出了
                    workerThread.join();
                } catch (InterruptedException e) {
                    // e.printStackTrace();
                }

                //executeThread線程被打斷等待 或者 workerThread正常執(zhí)行結(jié)束
                this.running = false;

            });

            executeThread.start();
        }

        public void shutdown(long timeout){
            long beginTime = System.currentTimeMillis();

            // workerThread還在執(zhí)行
            while (running){
                // 判斷是否等待超時(shí)
                if (System.currentTimeMillis() - beginTime >= timeout){
                    // 等待超時(shí)了, 中斷executeThread的等待會(huì)使this.running = false;執(zhí)行到,
                    // executeThread執(zhí)行完畢導(dǎo)致workerThread守護(hù)線程跟著退出
                    executeThread.interrupt();

                    //退出while循環(huán)
                    break;
                }

                try {
                    Thread.sleep(1);
                }
                catch (InterruptedException e) {
                    // 說明main線程被打斷了
                    e.printStackTrace();

                    break;
                }
            }
        }

        public boolean isRunning(){
            return this.running;
        }

    }

    /**
     * 必須等待3秒后才結(jié)束執(zhí)行
     */
    @Test
    public void test1() {
        ThreadService threadService = new ThreadService();
        threadService.execute(() -> {
            // 這是一個(gè)非常非常耗時(shí)的操作
           while (true){

           }
        });

        //讓線程2秒后立即中斷執(zhí)行
        long begin = System.currentTimeMillis();
        threadService.shutdown(3000);
        long end = System.currentTimeMillis();

        System.out.println("耗時(shí): " + (end - begin));

        System.out.println("主線程執(zhí)行結(jié)果");
    }

    /**
     * 跟隨threadService的執(zhí)行而結(jié)束,無需等待3秒(1秒后就可以執(zhí)行主線程后續(xù)的代碼)
     */
    @Test
    public void test2() {
        ThreadService threadService = new ThreadService();
        threadService.execute(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        //讓線程2秒后立即中斷執(zhí)行
        long begin = System.currentTimeMillis();
        threadService.shutdown(3000);
        long end = System.currentTimeMillis();

        System.out.println("耗時(shí): " + (end - begin));

        System.out.println("主線程執(zhí)行結(jié)果");
    }
}

5.5、interrupt()、interrupted()、isInterrupted()區(qū)別與原理

5.5.1、結(jié)論

  • interrupt()方法:用于中斷線程的,調(diào)用該方法的線程的狀態(tài)將被置為"中斷"狀態(tài)。注意:線程中斷僅僅是設(shè)置線程的中斷狀態(tài)位,不會(huì)停止線程。需要用戶自己去監(jiān)視線程的狀態(tài)并做處理。支持線程中斷的方法(也就是線程中斷后會(huì)拋出InterruptedException的方法,比如Thread.sleep,以及Object.wait等方法)就是在監(jiān)視線程的中斷狀態(tài),一旦線程的中斷狀態(tài)被置為“中斷狀態(tài)”,就會(huì)拋出中斷異常,并將線程的中斷狀態(tài)為設(shè)置為false。

  • interrupted():返回線程是否處于已中斷狀態(tài)并清除中斷狀態(tài)

  • isInterrupted():返回線程是否處于已中斷狀態(tài)

5.5.2、原理

以下內(nèi)容來源于 https://my.oschina.net/itblog/blog/787024

public class Interrupt {
	public static void main(String[] args) throws Exception {
		Thread t = new Thread(new Worker());
		t.start();
		
		Thread.sleep(200);
		t.interrupt();
		
		System.out.println("Main thread stopped.");
	}
	
	public static class Worker implements Runnable {
		public void run() {
			System.out.println("Worker started.");
			
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				System.out.println("Worker IsInterrupted: " + 
						Thread.currentThread().isInterrupted());
			}
			
			System.out.println("Worker stopped.");
		}
	}
}

內(nèi)容很簡單:主線程main啟動(dòng)了一個(gè)子線程Worker,然后讓worker睡500ms,而main睡200ms,之后main調(diào)用worker線程的interrupt方法去中斷worker,worker被中斷后打印中斷的狀態(tài)。 下面是執(zhí)行結(jié)果:

Worker started.
Main thread stopped.
Worker IsInterrupted: false
Worker stopped.

Worker明明已經(jīng)被中斷,而isInterrupted()方法竟然返回了false,為什么呢?

在stackoverflow上搜索了一圈之后,發(fā)現(xiàn)有網(wǎng)友提到:可以查看拋出InterruptedException方法的JavaDoc(或源代碼),于是我查看了Thread.sleep方法的文檔,doc中是這樣描述這個(gè)InterruptedException異常的:

InterruptedException - if any thread has interrupted the current thread. The interrupted status of the current thread is cleared when this exception is thrown.

注意到后面這句“當(dāng)拋出這個(gè)異常的時(shí)候,中斷狀態(tài)已被清除”。所以isInterrupted()方法應(yīng)該返回false??墒怯械臅r(shí)候,我們需要isInterrupted這個(gè)方法返回true,怎么辦呢?這里就要先說說interrupt, interrupted和isInterrupted的區(qū)別了:

interrupt方法是用于中斷線程的,調(diào)用該方法的線程的狀態(tài)將被置為"中斷"狀態(tài)。注意:線程中斷僅僅是設(shè)置線程的中斷狀態(tài)位,不會(huì)停止線程。需要用戶自己去監(jiān)視線程的狀態(tài)為并做處理。支持線程中斷的方法(也就是線程中斷后會(huì)拋出InterruptedException的方法,比如這里的sleep,以及Object.wait等方法)就是在監(jiān)視線程的中斷狀態(tài),一旦線程的中斷狀態(tài)被置為“中斷狀態(tài)”,就會(huì)拋出中斷異常。這個(gè)觀點(diǎn)可以通過這篇文章證實(shí):

再來看看interrupted方法的實(shí)現(xiàn):

public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

和isInterrupted的實(shí)現(xiàn):

public boolean isInterrupted() {
    return isInterrupted(false);
}

這兩個(gè)方法一個(gè)是static的,一個(gè)不是,但實(shí)際上都是在調(diào)用同一個(gè)方法,只是interrupted方法傳入的參數(shù)為true,而iInterrupted傳入的參數(shù)為false。那么這個(gè)參數(shù)到底是什么意思呢?來看下這個(gè)isInterrupted(boolean)方法的實(shí)現(xiàn):

/**
 * Tests if some Thread has been interrupted.  The interrupted state
 * is reset or not based on the value of ClearInterrupted that is
 * passed.
 */
private native boolean isInterrupted(boolean ClearInterrupted);

這是一個(gè)native方法,看不到源碼沒有關(guān)系,參數(shù)名字ClearInterrupted已經(jīng)清楚的表達(dá)了該參數(shù)的作用----是否清除中斷狀態(tài)。方法的注釋也清晰的表達(dá)了“中斷狀態(tài)將會(huì)根據(jù)傳入的ClearInterrupted參數(shù)值確定是否重置”。所以,靜態(tài)方法interrupted將會(huì)清除中斷狀態(tài)(傳入的參數(shù)ClearInterrupted為true),而實(shí)例方法isInterrupted則不會(huì)(傳入的參數(shù)ClearInterrupted為false)。 回到剛剛的問題:很明顯,如果要isInterrupted這個(gè)方法返回true,通過在調(diào)用isInterrupted方法之前再次調(diào)用interrupt()方法來恢復(fù)這個(gè)中斷的狀態(tài)即可:

public class Interrupt  {
	public static void main(String[] args) throws Exception {
		Thread t = new Thread(new Worker());
		t.start();
		
		Thread.sleep(200);
		t.interrupt();
		
		System.out.println("Main thread stopped.");
	}
	
	public static class Worker implements Runnable {
		public void run() {
			System.out.println("Worker started.");
			
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				Thread curr = Thread.currentThread();
				//再次調(diào)用interrupt方法中斷自己,將中斷狀態(tài)設(shè)置為“中斷”
				curr.interrupt();
				System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
				System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
				System.out.println("Static Call: " + Thread.interrupted());//clear status
				System.out.println("---------After Interrupt Status Cleared----------");
				System.out.println("Static Call: " + Thread.interrupted());
				System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
				System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
			}
			
			System.out.println("Worker stopped.");
		}
	}
}

執(zhí)行結(jié)果:

Worker started.
Main thread stopped.
Worker IsInterrupted: true
Worker IsInterrupted: true
Static Call: true
---------After Interrupt Status Cleared----------
Static Call: false
Worker IsInterrupted: false
Worker IsInterrupted: false
Worker stopped.

從執(zhí)行結(jié)果也可以看到,前兩次調(diào)用isInterrupted方法都返回true,說明isInterrupted方法不會(huì)改變線程的中斷狀態(tài),而接下來調(diào)用靜態(tài)的interrupted()方法,第一次返回了true,表示線程被中斷,第二次則返回了false,因?yàn)榈谝淮握{(diào)用的時(shí)候已經(jīng)清除了中斷狀態(tài)。最后兩次調(diào)用isInterrupted()方法就肯定返回false了。

那么,在什么場景下,我們需要在catch塊里面中斷線程(重置中斷狀態(tài))呢?

答案是:如果不能拋出InterruptedException(就像這里的Thread.sleep語句放在了Runnable的run方法中,這個(gè)方法不允許拋出任何受檢查的異常),但又想告訴上層調(diào)用者這里發(fā)生了中斷的時(shí)候,就只能在catch里面重置中斷狀態(tài)了。

public class TaskRunner implements Runnable {
    private BlockingQueue<task> queue;
 
    public TaskRunner(BlockingQueue<task> queue) { 
        this.queue = queue; 
    }
 
    public void run() { 
        try {
             while (true) {
                 Task task = queue.take(10, TimeUnit.SECONDS);
                 task.execute();
             }
         } catch (InterruptedException e) { 
             // Restore the interrupted status
             Thread.currentThread().interrupt();
         }
    }
}

那么問題來了:為什么要在拋出InterruptedException的時(shí)候清除掉中斷狀態(tài)呢?

這個(gè)問題沒有找到官方的解釋,估計(jì)只有Java設(shè)計(jì)者們才能回答了。但這里的解釋似乎比較合理:一個(gè)中斷應(yīng)該只被處理一次(你catch了這個(gè)InterruptedException,說明你能處理這個(gè)異常,你不希望上層調(diào)用者看到這個(gè)中斷)。

參考地址:

https://my.oschina.net/itblog/blog/787024

http://stackoverflow.com/questions/7142665/why-does-thread-isinterrupted-always-return-false

http://stackoverflow.com/questions/2523721/why-do-interruptedexceptions-clear-a-threads-interrupted-status

http://www.ibm.com/developerworks/library/j-jtp05236/

http://blog.csdn.net/z69183787/article/details/25076033

6、synchronized

6.1、示例代碼

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 17:11
 */
public class SynchronizedDemo {

    private static final Object LOCK = new Object();

    public static void main(String[] args) {
        Runnable runnable = () -> {
            synchronized (LOCK){
                System.out.println(Thread.currentThread().getName() +"正在執(zhí)行");
                try {
                    Thread.sleep(60_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(Thread.currentThread().getName() +"執(zhí)行結(jié)束");
            }
        };
        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        Thread thread3 = new Thread(runnable);

        thread1.start();
        thread2.start();
        thread3.start();

        System.out.println("主線程執(zhí)行結(jié)束");
    }
}

6.2、通過jconsole查看

通過在cmd輸入jconsole命令顯示如下界面:

java并發(fā)編程的入門過程

選中本地進(jìn)程中剛剛運(yùn)行的java程序, 進(jìn)入主界面

java并發(fā)編程的入門過程

可以看到 Thread-0 、Thread-1、Thread-2運(yùn)行的三個(gè)線程,點(diǎn)擊對(duì)應(yīng)線程右邊可以看到“擁有者 Thread-0”

6.3、通過jps、jstack查看

java并發(fā)編程的入門過程

啟動(dòng)程序后:

第1步:輸入jps命令, 查看當(dāng)前正在運(yùn)行的java進(jìn)程

第2步: 輸入jstack [pid] 可以看到有三個(gè)線程 Thread-1、Thread-2均處于BLOCKED(on object nonitor)狀態(tài)、而Thread-0處于TIMED_WATING(sleepint)狀態(tài)(因?yàn)門hread.sleep()的作用)

6.4、通過javap -c [xxxx.class]命令查看匯編指令

java并發(fā)編程的入門過程

6.5 this鎖

含義:指在方法上面加上synchronized 關(guān)鍵字

package chapter2;
/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 17:56
 */
public class ThisLockDemo {

    public static void main(String[] args) {
        ThisLock thisLock = new ThisLock();

        Thread thread1 = new Thread(() -> {
           thisLock.m1();
        });

        Thread thread2 = new Thread(() -> {
            thisLock.m2();
        });

        thread1.start();
        thread2.start();

        System.out.println("主線程結(jié)束執(zhí)行");
    }
}

class ThisLock{

    public synchronized void m1(){
        System.out.println(Thread.currentThread().getName() +" m1開始執(zhí)行");

        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() +" m1結(jié)束執(zhí)行");
    }

    public synchronized void m2(){
        System.out.println(Thread.currentThread().getName() +" m2開始執(zhí)行");

        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() +" m2結(jié)束執(zhí)行");
    }
}

運(yùn)行結(jié)果:

java并發(fā)編程的入門過程

可以看到Thread-1 m2的執(zhí)行必須要等待Thread-0 m1執(zhí)行完畢后才能執(zhí)行, 說明同一個(gè)類在方法上面加上synchronized關(guān)鍵字所使用的的鎖是this鎖

6.6 class鎖

介紹:在static方法上加synchronized關(guān)鍵所使用的的鎖是class鎖

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 18:29
 */
public class ClassLockDemo {

    public static void main(String[] args) {
        ClassLock classLock1 = new ClassLock();
        ClassLock classLock2 = new ClassLock();

        Thread thread1 = new Thread(() -> {
            classLock1.m1();
        });

        Thread thread2 = new Thread(() -> {
            classLock2.m2();
        });

        thread1.start();
        thread2.start();

        System.out.println("主線程結(jié)束執(zhí)行");
    }
}


class ClassLock{

    public static synchronized void m1(){
        System.out.println(Thread.currentThread().getName() +" m1開始執(zhí)行");

        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() +" m1結(jié)束執(zhí)行");
    }

    public static synchronized void m2(){
        System.out.println(Thread.currentThread().getName() +" m2開始執(zhí)行");

        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() +" m2結(jié)束執(zhí)行");
    }
}

java并發(fā)編程的入門過程

可以看到Thread-1 m2的執(zhí)行必須要等待Thread-0 m1執(zhí)行完畢后才能執(zhí)行, 說明同一個(gè)類的多個(gè)實(shí)例在static方法上面加上synchronized關(guān)鍵字所使用的的鎖是class鎖

6.7 死鎖

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/29 18:33
 */
public class DieLockDemo {



    public static void main(String[] args) {
        DieLock dieLock = new DieLock();

        Thread thread1 = new Thread(() ->{
            dieLock.doSomething1();
        });

        Thread thread2 = new Thread(() ->{
            dieLock.doSomething2();
        });

        thread1.start();
        thread2.start();

        System.out.println("主線程執(zhí)行完畢");
    }
}

class DieLock{

    private final Object LOCK1 = new Object();
    private final Object LOCK2 = new Object();

    public void doSomething1(){
        synchronized (LOCK1){
            try {
                // 代表一個(gè)耗時(shí)操作
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + "獲取到LOCK1, 等待LOCK2");
            synchronized (LOCK2){
                System.out.println("doSomething1");
            }
        }
    }

    public void doSomething2(){
        synchronized (LOCK2){
            try {
                // 代表一個(gè)耗時(shí)操作
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName() + "獲取到LOCK2, 等待LOCK1");

            synchronized (LOCK1){
                System.out.println("doSomething2");
            }
        }
    }
}

java并發(fā)編程的入門過程

6.8、synchronzied鎖重入

關(guān)鍵字synchronized擁有鎖重入的功能,也就是在使用synchronized時(shí),當(dāng)一個(gè)線程獲得一個(gè)對(duì)象鎖,再次請(qǐng)求該對(duì)象鎖時(shí)是可以獲得該對(duì)象的鎖的。表現(xiàn)在一個(gè)synchronized方法 / 塊的內(nèi)部調(diào)用本類的其他synchronized方法 / 塊時(shí),是永遠(yuǎn)可以得到鎖的。

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/9/1 15:43
 */
public class SynchronizedReentrantDemo {

    public synchronized void methodA(){
        System.out.println("methodA invoked ");

        this.methodB();
    }

    public synchronized void methodB(){
        System.out.println("methodB invoked ");
    }

    public static void main(String[] args) {
        SynchronizedReentrantDemo demo = new SynchronizedReentrantDemo();

        demo.methodA();
    }
}

java并發(fā)編程的入門過程

可以看到在synchronized方法methodsA中調(diào)用被synchronized修飾的methodB()也是可以的,即使methodA()還沒有釋放this鎖,這證明了synchronzied鎖是可以重入的

6.9、synchronized的原理

Java中synchronized的鎖實(shí)現(xiàn)包括:偏向鎖、輕量級(jí)鎖、重量級(jí)鎖(等待時(shí)間長)

1個(gè)對(duì)象的實(shí)例包括3部分:對(duì)象頭、實(shí)例變量、填充數(shù)據(jù)

java并發(fā)編程的入門過程

  • 對(duì)象頭:實(shí)現(xiàn)synchronized的鎖對(duì)象的基礎(chǔ)

  • 實(shí)例變量: 存放類的屬性數(shù)據(jù)信息,包括父類的屬性信息,如果是數(shù)組的實(shí)例部分還包括數(shù)組的長度,這部分內(nèi)存按4字節(jié)對(duì)齊。

  • 填充數(shù)據(jù):由于虛擬機(jī)要求對(duì)象起始地址必須是8字節(jié)的整數(shù)倍。填充數(shù)據(jù)不是必須存在的,僅僅是為了字節(jié)對(duì)齊,這點(diǎn)了解即可。

synchronized使用的鎖對(duì)象是存儲(chǔ)在Java對(duì)象頭里的,jvm中采用2個(gè)字來存儲(chǔ)對(duì)象頭(如果對(duì)象是數(shù)組則會(huì)分配3個(gè)字,多出來的1個(gè)字記錄的是數(shù)組長度),其主要結(jié)構(gòu)是由Mark Word 和 Class Metadata Address 組成,其結(jié)構(gòu)說明如下表:

其中Mark Word在默認(rèn)情況下存儲(chǔ)著對(duì)象的HashCode、分代年齡、鎖標(biāo)記位等以下是32位JVM的Mark Word默認(rèn)存儲(chǔ)結(jié)構(gòu)

java并發(fā)編程的入門過程

由于對(duì)象頭的信息是與對(duì)象自身定義的數(shù)據(jù)沒有關(guān)系的額外存儲(chǔ)成本,因此考慮到JVM的空間效率,Mark Word 被設(shè)計(jì)成為一個(gè)非固定的數(shù)據(jù)結(jié)構(gòu),以便存儲(chǔ)更多有效的數(shù)據(jù),它會(huì)根據(jù)對(duì)象本身的狀態(tài)復(fù)用自己的存儲(chǔ)空間,如32位JVM下,除了上述列出的Mark Word默認(rèn)存儲(chǔ)結(jié)構(gòu)外,還有如下可能變化的結(jié)構(gòu):

java并發(fā)編程的入門過程

其中輕量級(jí)鎖和偏向鎖是Java 6 對(duì) synchronized 鎖進(jìn)行優(yōu)化后新增加的,稍后我們會(huì)簡要分析。這里我們主要分析一下重量級(jí)鎖也就是通常說synchronized的對(duì)象鎖,鎖標(biāo)識(shí)位為10,其中指針指向的是monitor對(duì)象(也稱為管程或監(jiān)視器鎖)的起始地址。每個(gè)對(duì)象都存在著一個(gè) monitor 與之關(guān)聯(lián),對(duì)象與其 monitor 之間的關(guān)系有存在多種實(shí)現(xiàn)方式,如monitor可以與對(duì)象一起創(chuàng)建銷毀或當(dāng)線程試圖獲取對(duì)象鎖時(shí)自動(dòng)生成,但當(dāng)一個(gè) monitor 被某個(gè)線程持有后,它便處于鎖定狀態(tài)。在Java虛擬機(jī)(HotSpot)中,monitor是由ObjectMonitor實(shí)現(xiàn)的,其主要數(shù)據(jù)結(jié)構(gòu)如下(位于HotSpot虛擬機(jī)源碼ObjectMonitor.hpp文件,C++實(shí)現(xiàn)的)

ObjectMonitor() {
    _header       = NULL;
    _count        = 0; //記錄個(gè)數(shù)
    _waiters      = 0,
    _recursions   = 0;
    _object       = NULL;
    _owner        = NULL;
    _WaitSet      = NULL; //處于wait狀態(tài)的線程,會(huì)被加入到_WaitSet
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;
    FreeNext      = NULL ;
    _EntryList    = NULL ; //處于等待鎖block狀態(tài)的線程,會(huì)被加入到該列表
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
  }

ObjectMonitor中有兩個(gè)隊(duì)列,_WaitSet 和 _EntryList,用來保存ObjectWaiter對(duì)象列表( 每個(gè)等待鎖的線程都會(huì)被封裝成ObjectWaiter對(duì)象),_owner指向持有ObjectMonitor對(duì)象的線程,當(dāng)多個(gè)線程同時(shí)訪問一段同步代碼時(shí),首先會(huì)進(jìn)入 _EntryList 集合,當(dāng)線程獲取到對(duì)象的monitor 后進(jìn)入 _Owner 區(qū)域并把monitor中的owner變量設(shè)置為當(dāng)前線程同時(shí)monitor中的計(jì)數(shù)器count加1,若線程調(diào)用 wait() 方法,將釋放當(dāng)前持有的monitor,owner變量恢復(fù)為null,count自減1,同時(shí)該線程進(jìn)入 WaitSe t集合中等待被喚醒。若當(dāng)前線程執(zhí)行完畢也將釋放monitor(鎖)并復(fù)位變量的值,以便其他線程進(jìn)入獲取monitor(鎖)。如下圖所示 java并發(fā)編程的入門過程

由此看來,monitor對(duì)象存在于每個(gè)Java對(duì)象的對(duì)象頭中(存儲(chǔ)的指針的指向),synchronized鎖便是通過這種方式獲取鎖的,也是為什么Java中任意對(duì)象可以作為鎖的原因,同時(shí)也是notify/notifyAll/wait等方法存在于頂級(jí)對(duì)象Object中的原因(關(guān)于這點(diǎn)稍后還會(huì)進(jìn)行分析),ok~,有了上述知識(shí)基礎(chǔ)后,下面我們將進(jìn)一步分析synchronized在字節(jié)碼層面的具體語義實(shí)現(xiàn)。

6.10、synchronized底層實(shí)現(xiàn)原理

為了分析底層原理,我們編寫1個(gè)最簡單的類,代碼如下:

package chapter2;
/**
 * @author calebzhao
 * @date 2019/11/30 18:41
 */
public class SynchronizedDemo2 {
    public synchronized static void test1(){
        System.out.println("test1");
    }
    public synchronized void test2(){
        System.out.println("test2");
    }
    public void test3(){
        synchronized (this){
            System.out.println("test4");
        }
    }
}

6.10.1、synchronized方法底層原理:

方法級(jí)的同步是隱式,即無需通過字節(jié)碼指令來控制的,它實(shí)現(xiàn)在方法調(diào)用和返回操作之中。JVM可以從方法常量池中的方法表結(jié)構(gòu)(method_info Structure) 中的 ACC_SYNCHRONIZED 訪問標(biāo)志區(qū)分一個(gè)方法是否同步方法。當(dāng)方法調(diào)用時(shí),調(diào)用指令將會(huì) 檢查方法的 ACC_SYNCHRONIZED 訪問標(biāo)志是否被設(shè)置,如果設(shè)置了,執(zhí)行線程將先持有monitor(虛擬機(jī)規(guī)范中用的是管程一詞), 然后再執(zhí)行方法,最后再方法完成(無論是正常完成還是非正常完成)時(shí)釋放monitor。在方法執(zhí)行期間,執(zhí)行線程持有了monitor,其他任何線程都無法再獲得同一個(gè)monitor。如果一個(gè)同步方法執(zhí)行期間拋 出了異常,并且在方法內(nèi)部無法處理此異常,那這個(gè)同步方法所持有的monitor將在異常拋到同步方法之外時(shí)自動(dòng)釋放。下面我們看看字節(jié)碼層面如何實(shí)現(xiàn):

通過執(zhí)行javap -v SynchroizedDemo2.class會(huì)輸出如下字節(jié)碼指令:

test1()方法對(duì)應(yīng)的字節(jié)碼指令:

java并發(fā)編程的入門過程

test2()方法對(duì)應(yīng)的字節(jié)碼指令:

java并發(fā)編程的入門過程

從字節(jié)碼中可以看出,synchronized修飾的方法并沒有monitorenter指令和monitorexit指令,取得代之的確實(shí)是ACC_SYNCHRONIZED標(biāo)識(shí),該標(biāo)識(shí)指明了該方法是一個(gè)同步方法,JVM通過該ACC_SYNCHRONIZED訪問標(biāo)志來辨別一個(gè)方法是否聲明為同步方法,從而執(zhí)行相應(yīng)的同步調(diào)用。這便是synchronized鎖在同步代碼塊和同步方法上實(shí)現(xiàn)的基本原理。同時(shí)我們還必須注意到的是在Java早期版本中,synchronized屬于重量級(jí)鎖,效率低下,因?yàn)楸O(jiān)視器鎖(monitor)是依賴于底層的操作系統(tǒng)的Mutex Lock來實(shí)現(xiàn)的,而操作系統(tǒng)實(shí)現(xiàn)線程之間的切換時(shí)需要從用戶態(tài)轉(zhuǎn)換到核心態(tài),這個(gè)狀態(tài)之間的轉(zhuǎn)換需要相對(duì)比較長的時(shí)間,時(shí)間成本相對(duì)較高,這也是為什么早期的synchronized效率低的原因。慶幸的是在Java 6之后Java官方對(duì)從JVM層面對(duì)synchronized較大優(yōu)化,所以現(xiàn)在的synchronized鎖效率也優(yōu)化得很不錯(cuò)了,Java 6之后,為了減少獲得鎖和釋放鎖所帶來的性能消耗,引入了輕量級(jí)鎖和偏向鎖,接下來我們將簡單了解一下Java官方在JVM層面對(duì)synchronized鎖的優(yōu)化。

6.10.2、synchonized代碼塊底層實(shí)現(xiàn)原理:

test3()方法對(duì)應(yīng)的字節(jié)碼指令:

java并發(fā)編程的入門過程

JSR133的解釋:

synchronized 語句需要一個(gè)對(duì)象的引用;隨后會(huì)嘗試在該對(duì)象的管程上執(zhí)行 lock 動(dòng) 作,如果 lock 動(dòng)作未能成功完成,將一直等待。當(dāng) lock 動(dòng)作執(zhí)行成功,就會(huì)運(yùn)行synchronized 語句塊中的代碼。一旦語句塊中的代碼執(zhí)行結(jié)束,不管是正常還是異 常結(jié)束,都會(huì)在之前執(zhí)行 lock 動(dòng)作的那個(gè)管程上自動(dòng)執(zhí)行一個(gè) unlock 動(dòng)作。

synchronized 方法在調(diào)用時(shí)會(huì)自動(dòng)執(zhí)行一個(gè) lock 動(dòng)作。在 lock 動(dòng)作成功完成之前, 都不會(huì)執(zhí)行方法體。如果是實(shí)例方法,鎖的是調(diào)用該方法的實(shí)例(即,方法體執(zhí)行 期間的 this)相關(guān)聯(lián)的管程。如果是靜態(tài)方法,鎖的是定義該方法的類所對(duì)應(yīng)的 Class 對(duì)象。一旦方法體執(zhí)行結(jié)束,不管是正常還是異常結(jié)束,都會(huì)在之前執(zhí)行 lock 動(dòng)作的那個(gè)管程上自動(dòng)執(zhí)行一個(gè) unlock 動(dòng)作。

從字節(jié)碼中可知同步語句塊的實(shí)現(xiàn)使用的是monitorenter 和 monitorexit 指令,其中monitorenter指令指向同步代碼塊的開始位置,monitorexit指令則指明同步代碼塊的結(jié)束位置,當(dāng)執(zhí)行monitorenter指令時(shí),當(dāng)前線程將試圖獲取 objectref(即對(duì)象鎖) 所對(duì)應(yīng)的 monitor 的持有權(quán),當(dāng) objectref 的 monitor 的進(jìn)入計(jì)數(shù)器為 0,那線程可以成功取得 monitor,并將計(jì)數(shù)器值設(shè)置為 1,取鎖成功。如果當(dāng)前線程已經(jīng)擁有 objectref 的 monitor 的持有權(quán),那它可以重入這個(gè) monitor (關(guān)于重入性稍后會(huì)分析),重入時(shí)計(jì)數(shù)器的值也會(huì)加 1。倘若其他線程已經(jīng)擁有 objectref 的 monitor 的所有權(quán),那當(dāng)前線程將被阻塞,直到正在執(zhí)行線程執(zhí)行完畢,即monitorexit指令被執(zhí)行,執(zhí)行線程將釋放 monitor(鎖)并設(shè)置計(jì)數(shù)器值為0 ,其他線程將有機(jī)會(huì)持有 monitor 。值得注意的是編譯器將會(huì)確保無論方法通過何種方式完成,方法中調(diào)用過的每條 monitorenter 指令都有執(zhí)行其對(duì)應(yīng) monitorexit 指令,而無論這個(gè)方法是正常結(jié)束還是異常結(jié)束。為了保證在方法異常完成時(shí) monitorenter 和 monitorexit 指令依然可以正確配對(duì)執(zhí)行,編譯器會(huì)自動(dòng)產(chǎn)生一個(gè)異常處理器,這個(gè)異常處理器聲明可處理所有的異常,它的目的就是用來執(zhí)行 monitorexit 指令。從字節(jié)碼中也可以看出多了一個(gè)monitorexit指令,它就是異常結(jié)束時(shí)被執(zhí)行的釋放monitor 的指令。

6.11、Java虛擬機(jī)對(duì)synchronized的優(yōu)化

鎖的狀態(tài)總共有四種,無鎖狀態(tài)、偏向鎖、輕量級(jí)鎖和重量級(jí)鎖。隨著鎖的競爭,鎖可以從偏向鎖升級(jí)到輕量級(jí)鎖,再升級(jí)的重量級(jí)鎖,但是鎖的升級(jí)是單向的,也就是說只能從低到高升級(jí),不會(huì)出現(xiàn)鎖的降級(jí),關(guān)于重量級(jí)鎖,前面我們已詳細(xì)分析過,下面我們將介紹偏向鎖和輕量級(jí)鎖以及JVM的其他優(yōu)化手段,這里并不打算深入到每個(gè)鎖的實(shí)現(xiàn)和轉(zhuǎn)換過程更多地是闡述Java虛擬機(jī)所提供的每個(gè)鎖的核心優(yōu)化思想,畢竟涉及到具體過程比較繁瑣,如需了解詳細(xì)過程可以查閱《深入理解Java虛擬機(jī)原理》。

6.11.1、偏向鎖

偏向鎖是Java 6之后加入的新鎖,它是一種針對(duì)加鎖操作的優(yōu)化手段,經(jīng)過研究發(fā)現(xiàn),在大多數(shù)情況下,鎖不僅不存在多線程競爭,而且總是由同一線程多次獲得,因此為了減少同一線程獲取鎖(會(huì)涉及到一些CAS操作,耗時(shí))的代價(jià)而引入偏向鎖。偏向鎖的核心思想是,如果一個(gè)線程獲得了鎖,那么鎖就進(jìn)入偏向模式,此時(shí)Mark Word 的結(jié)構(gòu)也變?yōu)槠蜴i結(jié)構(gòu),當(dāng)這個(gè)線程再次請(qǐng)求鎖時(shí),無需再做任何同步操作,即獲取鎖的過程,這樣就省去了大量有關(guān)鎖申請(qǐng)的操作,從而也就提供程序的性能。所以,對(duì)于沒有鎖競爭的場合,偏向鎖有很好的優(yōu)化效果,畢竟極有可能連續(xù)多次是同一個(gè)線程申請(qǐng)相同的鎖。但是對(duì)于鎖競爭比較激烈的場合,偏向鎖就失效了,因?yàn)檫@樣場合極有可能每次申請(qǐng)鎖的線程都是不相同的,因此這種場合下不應(yīng)該使用偏向鎖,否則會(huì)得不償失,需要注意的是,偏向鎖失敗后,并不會(huì)立即膨脹為重量級(jí)鎖,而是先升級(jí)為輕量級(jí)鎖。下面我們接著了解輕量級(jí)鎖。

6.11.2、輕量級(jí)鎖

倘若偏向鎖失敗,虛擬機(jī)并不會(huì)立即升級(jí)為重量級(jí)鎖,它還會(huì)嘗試使用一種稱為輕量級(jí)鎖的優(yōu)化手段(1.6之后加入的),此時(shí)Mark Word 的結(jié)構(gòu)也變?yōu)檩p量級(jí)鎖的結(jié)構(gòu)。輕量級(jí)鎖能夠提升程序性能的依據(jù)是“對(duì)絕大部分的鎖,在整個(gè)同步周期內(nèi)都不存在競爭”,注意這是經(jīng)驗(yàn)數(shù)據(jù)。需要了解的是,輕量級(jí)鎖所適應(yīng)的場景是線程交替執(zhí)行同步塊的場合,如果存在同一時(shí)間訪問同一鎖的場合,就會(huì)導(dǎo)致輕量級(jí)鎖膨脹為重量級(jí)鎖。

6.11.3、自旋鎖

輕量級(jí)鎖失敗后,虛擬機(jī)為了避免線程真實(shí)地在操作系統(tǒng)層面掛起,還會(huì)進(jìn)行一項(xiàng)稱為自旋鎖的優(yōu)化手段。這是基于在大多數(shù)情況下,線程持有鎖的時(shí)間都不會(huì)太長,如果直接掛起操作系統(tǒng)層面的線程可能會(huì)得不償失,畢竟操作系統(tǒng)實(shí)現(xiàn)線程之間的切換時(shí)需要從用戶態(tài)轉(zhuǎn)換到核心態(tài),這個(gè)狀態(tài)之間的轉(zhuǎn)換需要相對(duì)比較長的時(shí)間,時(shí)間成本相對(duì)較高,因此自旋鎖會(huì)假設(shè)在不久將來,當(dāng)前的線程可以獲得鎖,因此虛擬機(jī)會(huì)讓當(dāng)前想要獲取鎖的線程做幾個(gè)空循環(huán)(這也是稱為自旋的原因),一般不會(huì)太久,可能是50個(gè)循環(huán)或100循環(huán),在經(jīng)過若干次循環(huán)后,如果得到鎖,就順利進(jìn)入臨界區(qū)。如果還不能獲得鎖,那就會(huì)將線程在操作系統(tǒng)層面掛起,這就是自旋鎖的優(yōu)化方式,這種方式確實(shí)也是可以提升效率的。最后沒辦法也就只能升級(jí)為重量級(jí)鎖了。

6.11.4、鎖消除

消除鎖是虛擬機(jī)另外一種鎖的優(yōu)化,這種優(yōu)化更徹底,Java虛擬機(jī)在JIT編譯時(shí)(可以簡單理解為當(dāng)某段代碼即將第一次被執(zhí)行時(shí)進(jìn)行編譯,又稱即時(shí)編譯),通過對(duì)運(yùn)行上下文的掃描,去除不可能存在共享資源競爭的鎖,通過這種方式消除沒有必要的鎖,可以節(jié)省毫無意義的請(qǐng)求鎖時(shí)間,如下StringBuffer的append是一個(gè)同步方法,但是在add方法中的StringBuffer屬于一個(gè)局部變量,并且不會(huì)被其他線程所使用,因此StringBuffer不可能存在共享資源競爭的情景,JVM會(huì)自動(dòng)將其鎖消除。

public class StringBufferRemoveSync {

	public void add(String str1, String str2) {
		//StringBuffer是線程安全,由于sb只會(huì)在append方法中使用,不可能被其他線程引用
		//因此sb屬于不可能共享的資源,JVM會(huì)自動(dòng)消除內(nèi)部的鎖
		StringBuffer sb = new StringBuffer();
		sb.append(str1).append(str2);
	}

	public static void main(String[] args) {
		StringBufferRemoveSync rmsync = new StringBufferRemoveSync();
		for (int i = 0; i < 10000000; i++) {
			rmsync.add("abc", "123");
		}
	}
}

7、wait、notify方法

7.1、含義(需仔細(xì)研讀)

  1. wait()、notify()是Object類的方法, 調(diào)用這2個(gè)方法前,執(zhí)行線程必須已經(jīng)獲得改對(duì)象的對(duì)象鎖,即只能在同步方法或同步代碼塊中調(diào)用wait() 或者notify()方法,如果調(diào)用這2個(gè)方法時(shí)沒有獲得對(duì)象鎖將會(huì)拋出IllegalMonitorStateException異常

  2. 調(diào)用wait()方法后,當(dāng)前線程立即釋放已經(jīng)獲得的鎖,并且將當(dāng)前線程置入“預(yù)執(zhí)行隊(duì)列(WaitSet)中”, 并且在wait()所在的代碼處停止執(zhí)行必須直到收到notify()方法的通知或者被中斷執(zhí)行當(dāng)前線程才能被喚醒繼續(xù)往wait()方法后面的代碼執(zhí)行

  3. notitify()方法用來通知那些等待獲取該對(duì)象鎖的線程, 如果有多個(gè)線程等待,則由線程規(guī)劃器隨機(jī)挑選出一個(gè)處于wait狀態(tài)的線程B,對(duì)其發(fā)出Notify通知,使B退出等待隊(duì)列,處于就緒狀態(tài),被重新喚醒的線程B會(huì)嘗試獲取臨界區(qū)的對(duì)象鎖,被喚醒線程B在真正獲取到鎖后就會(huì)繼續(xù)執(zhí)行wait()后面的代碼。需要說明的是,在執(zhí)行notify()方法后,并不會(huì)使當(dāng)前線程A馬上釋放對(duì)象鎖,處于wait狀態(tài)的線程B也不能馬上獲取對(duì)象鎖,要等到執(zhí)行notify()方法的線程A將程序執(zhí)行完,也就是退出synchronized代碼塊后,當(dāng)前線程A才會(huì)釋放對(duì)象鎖,但是釋放之后并不代表線程B就一定會(huì)獲取到對(duì)象鎖,只是說此時(shí)A、B都有機(jī)會(huì)競爭獲取到對(duì)象鎖

  4. 如果notify()方法執(zhí)行時(shí),此時(shí)并沒有任何線程處于wait狀態(tài),那么執(zhí)行該方法相當(dāng)于無效操作

  5. notify()與notifyAll()的區(qū)別是:notify()方法每次調(diào)用時(shí)都只是從所有處于wait狀態(tài)的線程中隨機(jī)選擇一個(gè)線程進(jìn)入就緒狀態(tài),而notifyAll()則是使所有處于wait狀態(tài)的線程全部退出等待隊(duì)列,全部進(jìn)入就緒狀態(tài),此處喚醒不等于所有線程都獲得該對(duì)象的monitor,此時(shí)優(yōu)先級(jí)最高的那個(gè)線程優(yōu)先執(zhí)行(獲得對(duì)象鎖),但也有可能是隨機(jī)執(zhí)行(獲得對(duì)象鎖),這要取決于jvm實(shí)現(xiàn)

7.2、生產(chǎn)者消費(fèi)者模式

7.2.1、 版本1產(chǎn)生假死(全部進(jìn)入等待wait狀態(tài))

package chapter2;

import java.util.stream.Stream;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/30 7:44
 */
public class ProducerAndConsumerVersion1 {

    private final Object LOCK = new Object();

    private boolean isProduced;

    private int num = 0;

    public static void main(String[] args) {
        ProducerAndConsumerVersion1 version1 = new ProducerAndConsumerVersion1();
        Stream.of("P1", "P2", "P3", "P4").forEach((item) ->{
            new Thread(() ->{
                while (true) {
                    version1.produce();
                }

            }, item).start();
        });
        Stream.of("C1").forEach(item ->{
            new Thread(() ->{
                while (true) {
                    version1.consumer();
                }

            }, item).start();
        });


        System.out.println("主線程執(zhí)行結(jié)束");
    }

    public void produce(){
        synchronized (LOCK){
            if (isProduced){
                try {
                    System.out.println("[" + Thread.currentThread().getName() +"] produce wait");
                    LOCK.wait();

                    System.out.println("[" + Thread.currentThread().getName() +"] produce wait after");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            else {
                num++;
                System.out.println("[" + Thread.currentThread().getName() + "] P ==>"  + num);
                isProduced=true;
                LOCK.notify();
                System.out.println("[" + Thread.currentThread().getName() + "] notify after");
            }
        }
    }

    public void consumer(){
        synchronized (LOCK){

            if (isProduced){
               
                System.out.println("[" + Thread.currentThread().getName() + "] C ==>" + num);
                isProduced = false;

                LOCK.notify();

                System.out.println("[" + Thread.currentThread().getName() + "] notify after");
            }
            else {
                try {
                    System.out.println("[" + Thread.currentThread().getName() +"] consumer wait");
                    LOCK.wait();

                    System.out.println("[" + Thread.currentThread().getName() +"] consumer wait after");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

java并發(fā)編程的入門過程

以上代碼執(zhí)行永遠(yuǎn)不會(huì)結(jié)束,所有線程最終都變?yōu)閣ait狀態(tài)(沒有發(fā)生死鎖)

7.2.2、多消費(fèi)者、多生產(chǎn)者正確版本

package chapter2;

import java.util.stream.Stream;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/30 7:44
 */
public class ProducerAndConsumerVersion2 {

    private final Object LOCK = new Object();

    private boolean isProduced;

    private int num = 0;

    public static void main(String[] args) {
        ProducerAndConsumerVersion2 version1 = new ProducerAndConsumerVersion2();
        Stream.of("P1", "P2", "P3", "P4").forEach((item) ->{
            new Thread(() ->{
                while (true) {
                    version1.produce();
                }

            }, item).start();
        });
        Stream.of("C1", "C2", "C3", "C4").forEach(item ->{
            new Thread(() ->{
                while (true) {
                    version1.consumer();
                }

            }, item).start();
        });


        System.out.println("主線程執(zhí)行結(jié)束");
    }

    public void produce(){
        synchronized (LOCK){
            while (isProduced){
                try {
                    System.out.println("[" + Thread.currentThread().getName() +"] produce wait");
                    LOCK.wait();

                    System.out.println("[" + Thread.currentThread().getName() +"] produce wait after");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            num++;
            System.out.println("[" + Thread.currentThread().getName() + "] P ==>"  + num);
            isProduced=true;
            LOCK.notifyAll();
            System.out.println("[" + Thread.currentThread().getName() + "] notify after");
        }
    }

    public void consumer(){
        synchronized (LOCK){
            while (!isProduced){
                try {
                    System.out.println("[" + Thread.currentThread().getName() +"] consumer wait");
                    LOCK.wait();

                    System.out.println("[" + Thread.currentThread().getName() +"] consumer wait after");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            System.out.println("[" + Thread.currentThread().getName() + "] C ==>" + num);
            isProduced = false;

            LOCK.notifyAll();

            System.out.println("[" + Thread.currentThread().getName() + "] notify after");

        }
    }

}

7.2.3、多生產(chǎn)者、多消費(fèi)者, 阻塞, 錯(cuò)誤版本,浪費(fèi)執(zhí)行機(jī)會(huì)

package chapter2;

import java.util.LinkedList;
import java.util.stream.Stream;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/30 7:44
 */
public class ProducerAndConsumerVersion3 {

    private final Object LOCK = new Object();

    private boolean isProduced;

    private int num = 0;

    public static void main(String[] args) {
        Container container = new Container(10);

        Producer producer = new Producer(container);
        Consumer consumer = new Consumer(container);
        Stream.of("P1", "P2", "P3", "P4").forEach((item) ->{
            new Thread(() ->{
                while (true) {
                    producer.produce();
                }

            }, item).start();
        });

        Stream.of("C1", "C2", "C3", "C4").forEach(item ->{
            new Thread(() ->{
                while (true) {
                    consumer.consume();
                }

            }, item).start();
        });


        System.out.println("主線程執(zhí)行結(jié)束");
    }
}

class Producer{

    private Container container;
    private int i = 0;

    public Producer(Container container){
        this.container = container;
    }

    public void produce(){
        while (true){
            synchronized (container){
                if (container.isOverflow()){
                    try {
                        container.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                else{
                    container.push(++i);
                    System.out.println("[" + Thread.currentThread().getName() +"] produce " + i);

                    container.notifyAll();
                }
            }

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer{
    private Container container;

    public Consumer(Container container){
        this.container = container;
    }

    public void consume(){
        while (true){
            synchronized (container){
                if (container.isEmpty()){
                    try {
                        container.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 這里其實(shí)是有問題的, 假設(shè)A、B線程都執(zhí)行到container.wait();處于wait狀態(tài), 此時(shí)生產(chǎn)者執(zhí)行了container.notifyAll();
              // A線程獲得了對(duì)象鎖, 從container.wait();處往后執(zhí)行,但是由于這里用的是if..else結(jié)構(gòu),
              // 所以會(huì)導(dǎo)致A線程剛被喚醒獲得了對(duì)象鎖,又什么都不做,馬上又釋放了對(duì)象鎖,假設(shè)A線程的CPU時(shí)間片剛好用完又讓B獲得了對(duì)象鎖,
            // 可能出現(xiàn)后續(xù)都一直是B獲得CPU實(shí)現(xiàn)片獲得對(duì)象鎖,而A明明之前獲得過一次對(duì)象鎖卻啥事也不干,白白浪費(fèi)了一次執(zhí)行機(jī)會(huì)

                else {
                    Object value = container.pop();
                    System.out.println("[" + Thread.currentThread().getName() +"]  consume " + value);

                    container.notifyAll();
                }
            }
        }
    }
}

class Container{

    private LinkedList<object> storage;

    private int capticy;


    public Container(int capticy){
        this.storage = new LinkedList<>();

        this.capticy = capticy;
    }

    public void push(Object obj){
        this.storage.addLast(obj);
    }

    public Object pop(){
        return this.storage.removeFirst();
    }

    public int size(){
        return this.storage.size();
    }

    public boolean isOverflow(){
        return size() >= capticy;
    }

    public boolean isEmpty(){
        return this.storage.isEmpty();
    }
}

java并發(fā)編程的入門過程

7.2.4、完全正確版本

package 生產(chǎn)者消費(fèi)者模式;

import java.util.LinkedList;
import java.util.stream.Stream;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/6/30 7:44
 */
@SuppressWarnings("ALL")
public class ProducerAndConsumerVersion4 {

    private final Object LOCK = new Object();

    private boolean isProduced;

    private int num = 0;

    public static void main(String[] args) {
        Container2 container = new Container2(10);

        Producer2 producer = new Producer2(container);
        Consumer2 consumer = new Consumer2(container);
        Stream.of("P1", "P2", "P3", "P4").forEach((item) -> {
            new Thread(() ->{
                while (true) {
                    producer.produce();
                }

            }, item).start();
        });

        Stream.of("C1", "C2", "C3", "C4").forEach(item ->{
            new Thread(() -> {
                while (true) {
                    consumer.consume();
                }

            }, item).start();
        });


        System.out.println("主線程執(zhí)行結(jié)束");
    }
}

@SuppressWarnings("ALL")
class Producer2 {

    private Container2 container;
    private int i = 0;

    public Producer2(Container2 container){
        this.container = container;
    }

    public void produce(){
        
        synchronized (container){
            // 這里的while不能換成if
            // 假設(shè)container現(xiàn)在是滿的,C線程消費(fèi)了一個(gè)個(gè),然后調(diào)用container.notifyAll();通知A、B線程喚醒
            // A、B線程從container.wait();這一行代碼處喚醒后處于就緒狀態(tài), A獲得鎖, 往container.wait();后面執(zhí)行,
            // A執(zhí)行 container.push(++i); 執(zhí)行完后container滿了,A執(zhí)行完synchronized代碼塊釋放鎖, 緊接著B獲取到鎖一樣從
            // container.wait();往后執(zhí)行,假如這里while換成if, 那么B就會(huì)又執(zhí)行container.push(++i);
            // 導(dǎo)致容器滿了仍然向container push數(shù)據(jù),這樣就出現(xiàn)錯(cuò)誤數(shù)據(jù)了
            while (container.isOverflow()) {
                try {
                    container.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
            container.push(++i);
            System.out.println("[" + Thread.currentThread().getName() +"] produce " + i);

            container.notifyAll();
        }

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@SuppressWarnings("ALL")
class Consumer2{
    private Container2 container;

    public Consumer2(Container2 container){
        this.container = container;
    }

    public void consume(){
        synchronized (container){
            // 這里的while不能換成if
            
            // 假設(shè)A、B線程都執(zhí)行到container.wait();這行代碼處,A、B處于wait狀態(tài), 此時(shí)生產(chǎn)者執(zhí)行了container.notifyAll();
            // 然后A線程獲得了對(duì)象鎖, 從container.wait();處往后執(zhí)行,A調(diào)用container.pop()后container變?yōu)榭盏?
            // 假設(shè)這里while換成if, 那么會(huì)出現(xiàn)緊接著B獲得了對(duì)象鎖,一樣地從從container.wait();處往后執(zhí)行,但是container已經(jīng)是空的了
            // 任然調(diào)用container.pop()就會(huì)報(bào)出ArrayIndeOutOfBoundExeption了
            while (container.isEmpty()){
                try {
                    container.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
           
            Object value = container.pop();
            System.out.println("[" + Thread.currentThread().getName() +"]  consume " + value);

            container.notifyAll();
        }
    }
}

class Container2{

    private LinkedList<object> storage;

    private int capticy;


    public Container2(int capticy){
        this.storage = new LinkedList<>();

        this.capticy = capticy;
    }

    public void push(Object obj){
        this.storage.addLast(obj);
    }

    public Object pop(){
        return this.storage.removeFirst();
    }

    public int size(){
        return this.storage.size();
    }

    public boolean isOverflow(){
        return size() >= capticy;
    }

    public boolean isEmpty(){
        return this.storage.isEmpty();
    }
}

8 、捕獲線程運(yùn)行期間的異常

package chapter2;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/1 22:03
 */
public class ExceptionCaught {

    public static void main(String[] args) {

        Thread mythread = new Thread(() ->{
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "mythread");

        // 捕獲異常
        mythread.setUncaughtExceptionHandler((thread, throwable) ->{
            System.out.println(thread.getName());

            throwable.printStackTrace();
        });

        mythread.start();

        mythread.interrupt();
    }
}

java并發(fā)編程的入門過程

9、ThreadGroup

package chapter2;

import java.util.stream.Stream;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/1 20:27
 */
public class ThreadGroupDemo {

    public static void main(String[] args) {
        // main方法也是一個(gè)線程, 其名稱為main
        System.out.println(Thread.currentThread().getName());

        // main線程的線程組的名稱是main
        System.out.println(Thread.currentThread().getThreadGroup().getName());

        // 創(chuàng)建線程組tg1
        ThreadGroup tg1 = new ThreadGroup("tg1");

        new Thread(tg1, "t1"){
            @Override
            public void run() {
                    try {
                        Thread.sleep(1000);
                        //獲取當(dāng)前線程組的名稱
                        System.out.println(this.getThreadGroup().getName());
                        //獲取當(dāng)前線程組的父線程組
                        System.out.println(this.getThreadGroup().getParent());
                        //獲取當(dāng)前線程組的父線程組的名稱
                        System.out.println(this.getThreadGroup().getParent().getName());
                        //評(píng)估當(dāng)前線程組的父線程組及子級(jí)的線程數(shù)量
                        System.out.println(this.getThreadGroup().getParent().activeCount());
                        System.out.println("--------------");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            }
        }.start();

        // 創(chuàng)建線程組tg2
        ThreadGroup tg2 = new ThreadGroup("tg2");
        tg2.setDaemon(true);
        Stream.of("T1", "T2", "T3").forEach(name ->{
            new Thread(tg2, name){
                @Override
                public void run() {
                    System.out.println(this.getThreadGroup().getName());
                    System.out.println(tg1.getParent());
                    System.out.println(this.getThreadGroup().getParent().getName());
                    // 評(píng)估父線程組下的線程數(shù)量
                    System.out.println(this.getThreadGroup().getParent().activeCount());

                    Thread[] threads = new Thread[tg1.activeCount()];
                    this.getThreadGroup().enumerate(threads);
                    Stream.of(threads).forEach(System.out::println);
                    System.out.println("***************");

                    // 測試某個(gè)線程是parentOf(group)參數(shù)中g(shù)roup的父線程(直接或間接)
                    System.out.println("parentOf: " + this.getThreadGroup().getParent().parentOf(this.getThreadGroup()));
                    System.out.println(Thread.currentThread().getName() +" isDaemon:" + this.isDaemon());

                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        });

        // 打斷整個(gè)線程組下的所有線程
        tg2.interrupt();
    }
}

java并發(fā)編程的入門過程

10、單例設(shè)計(jì)模式

version1 (不好)

 package chapter3.singleton;

/**
 * 問題是不能懶加載, 在類加載的時(shí)候就會(huì)初始化instance
 *
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/4 19:47
 */
public class SingletonVersion1 {

    private static final SingletonVersion1 instance = new SingletonVersion1();

    public static SingletonVersion1 getInstance(){
        return instance;
    }
}

version2 (錯(cuò)誤)

package chapter3.singleton;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/4 19:47
 */
public class SingletonVersion2 {
    private static SingletonVersion2 instance ;

    private SingletonVersion2(){

    }

    /**
     * 多線程并發(fā)訪問時(shí)可能同時(shí)進(jìn)入if代碼里,造成多次實(shí)例化
     * @return
     */
    public static SingletonVersion2 getInstance(){
        if (instance == null){
           instance = new SingletonVersion2();
        }
        return instance;
    }

 }

version3 (不好)

package chapter3.singleton;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/4 19:47
 */
public class SingletonVersion3 {
    private static SingletonVersion3 instance ;

    private SingletonVersion3(){

    }

    /**
     * 會(huì)有性能問題, 實(shí)例化后每次讀取都要同步
     *
     * @return
     */
    public static synchronized SingletonVersion3 getInstance(){
        if (instance == null){
           instance = new SingletonVersion3();
        }

        return instance;
    }
}

version4 (不好)

jvm指令重排序可能NullPointerException

package chapter3.singleton;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/4 19:47
 */
public class SingletonVersion4 {
    private static SingletonVersion4 instance ;

    private static Object LOCK = new Object();

    private boolean init;

    private SomeObject someObject = null;

    private SingletonVersion4(){
        init = true;
//        try {
//            Thread.sleep(3000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        someObject = new SomeObject();
    }

    /**
     * 貌似沒問題,但是極端情況會(huì)有NPL空指針異常問題, 比如這個(gè)類里有一些變量, 這些變量在構(gòu)造方法里初始化
     * 當(dāng)有多個(gè)線程調(diào)用getInstance時(shí), 第一個(gè)線程訪問時(shí)instance為null會(huì)進(jìn)行實(shí)例化, 這是會(huì)在堆內(nèi)存分配內(nèi)存空間
     * 分配完內(nèi)存空間,但是構(gòu)造方法并沒有執(zhí)行完, 此時(shí)第二個(gè)線程訪問時(shí)instance不為null返回Instance實(shí)例,直接調(diào)用里面的方法
     * new SingletonVersion4()
     *
     * @return
     */
    public static SingletonVersion4 getInstance(){
        if (instance == null){
            synchronized (LOCK){
                if (instance == null){
                    instance = new SingletonVersion4();
                }
            }

        }

        return instance;
    }

    public void print(){
        this.someObject.doSomething();
    }

    public static void main(String[] args) {
        Thread t1 = new Thread(() ->{
            SingletonVersion4.getInstance();
        }, "t1");

        Thread t2 = new Thread(() ->{
            SingletonVersion4.getInstance().print();
        }, "t2");

        t1.start();
        t2.start();
    }
}

class SomeObject{

    public void doSomething(){
        System.out.println(Thread.currentThread().getName() + " doSomething...");
    }
}

java并發(fā)編程的入門過程

version5 (正確、推薦)
package chapter3.singleton;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/4 19:47
 */
public class SingletonVersion5 {

    private SingletonVersion5(){
    }

    private static class Singleton{
        private static final SingletonVersion5 INSTANCE = new SingletonVersion5();
    }

    /**
     * 貌似沒問題,但是極端情況會(huì)有NPL空指針異常問題, 比如這個(gè)類里有一些變量, 這些變量在構(gòu)造方法里初始化
     * 當(dāng)有多個(gè)線程調(diào)用getInstance時(shí), 第一個(gè)線程訪問時(shí)instance為null會(huì)進(jìn)行實(shí)例化, 這是會(huì)在堆內(nèi)存分配內(nèi)存空間
     * 分配完內(nèi)存空間,但是構(gòu)造方法并沒有執(zhí)行完, 此時(shí)第二個(gè)線程訪問時(shí)instance不為null返回Instance實(shí)例,直接調(diào)用里面的方法
     * new SingletonVersion4()
     *
     * @return
     */
    public static SingletonVersion5 getInstance(){
        return Singleton.INSTANCE;
    }
}
version 6 (正確)
package chapter3.singleton;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/4 19:47
 */
public class SingletonVersion6 {

    private SingletonVersion6(){
    }

    enum Singleton{
        INSTANCE;

        private SingletonVersion6 singletonVersion6 = new SingletonVersion6();

        public SingletonVersion6 getInstance(){
            return singletonVersion6;
        }
    }

   
    public static SingletonVersion6 getInstance(){
        return Singleton.INSTANCE.getInstance();
    }
}

11、volatile關(guān)鍵字

11.1 高并發(fā)的三個(gè)特性

  1. 原子性

例如 i = 9; 在16位計(jì)算機(jī)中 可能是16 16位分2次賦值的,比如低16位賦值成功、高16位賦值失敗。

在一個(gè)或多個(gè)操作中,要么全部成功,要么全部失敗,不能有中間狀態(tài)。

a = 1; 保證原子性

a++; 非原子性,執(zhí)行步驟:1:讀取a; 2: 對(duì)a加1; 3:將結(jié)果賦值給a

a = 1+2 保證原子性, 編譯期會(huì)確定值

a = a+1 ** 非原子性**,執(zhí)行步驟:1:讀取a; 2: 對(duì)a加1; 3:將結(jié)果賦值給a

證明 ++value 操作不是原子性的

package volatileDemo;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/8 22:01
 */
public class AtomDemo {

    private static int initValue = 1;

    private static int maxValue = 500;

    /**
     * 證明++initValue操作不是原子性的,
     *
     * 假設(shè)如下2種情況:
     * 1、如果 ++initValue操作是原子性的, 那么輸出一定不會(huì)有重復(fù)的
     * 2、如果 ++initValue操作不熟原子性的,而是拆分成1、讀取initValue; 2其他操作,那么可能多個(gè)線程讀取到一樣的initValue
     *
     * 那么可能出現(xiàn)如下情況:
     * t1  -> 讀取 initValue值為10, cpu執(zhí)行權(quán)被切換到t2
     * t2  -> 讀取 initValue值為10,
     * t2  -> 11 = 10 +1
     * t2  -> initValue = 11
     * t2  -> System.out.printf("t2 執(zhí)行后結(jié)果 [%d] \n", 11);
     * t1  -> 11 = 10 +1
     * t1  -> initValue = 11
     * t1  -> System.out.printf("t1 執(zhí)行后結(jié)果 [%d] \n", 11);
     *
     * @param args
     */
    public static void main(String[] args) {
        new Thread(() ->{
            while (initValue < maxValue){
                System.out.printf("t1 執(zhí)行后結(jié)果 [%d] \n", ++initValue);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(() ->{
            while (initValue < maxValue){
                System.out.printf("t2 執(zhí)行后結(jié)果 [%d] \n", ++initValue);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

java并發(fā)編程的入門過程

  1. 可見性

volatile關(guān)鍵字會(huì)保證多線程下的內(nèi)存可見性及指令執(zhí)行的有序性

https://www.cnblogs.com/yanlong300/p/8986041.html

例子:

package volatileDemo;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/7/8 21:25
 */
public class VolatileDemo {

    private static int  initValue = 1;

    private static int MAX_VALUE = 50;

    public static void main(String[] args) {
        new Thread(() ->{
            int localValue = initValue;
            while (localValue < MAX_VALUE){
//                System.out.printf("t1線程讀取到initValue的值為 [%d] localValue:[%d]\n", initValue, localValue);
                if (localValue != initValue){
                    localValue = initValue;
                    System.out.printf("initValue的值已被更新為 [%d]\n", initValue);
                }
//                else{
//                    System.out.printf("t1線程讀取到initValue的值為 [%d] localValue:[%d]\n", initValue, localValue);
//                }
            }

            System.out.println("t1線程結(jié)束執(zhí)行");
        }, "t1").start();

        new Thread(() ->{
            int localValue = initValue;
            while (initValue < MAX_VALUE){
                localValue++;
                initValue = localValue;
                System.out.printf("更新initValue的值為 [%d]\n", initValue);

                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("t2線程結(jié)束執(zhí)行");
        }, "t2").start();
    }
}

java并發(fā)編程的入門過程

當(dāng)private static volatile int *initValue *= 1; 這句代碼加上volatile 關(guān)鍵字就是正確的結(jié)果了

java并發(fā)編程的入門過程

  1. 有序性

value = 3;

void exeToCPUA(){
  value = 10;   # 可能發(fā)生重排序 value的賦值發(fā)生在isFinsh之后
  isFinsh = true;
}

void exeToCPUB(){
  if(isFinsh){
    //value一定等于10?!
    assert value == 10;
  }
}

試想一下開始執(zhí)行時(shí),CPU A保存著finished在E(獨(dú)享)狀態(tài),而value并沒有保存在它的緩存中。(例如,Invalid)。在這種情況下,value會(huì)比finished更遲地拋棄存儲(chǔ)緩存。完全有可能CPU B讀取finished的值為true,而value的值不等于10。 ** 即isFinsh的賦值在value賦值之前。**

這種在可識(shí)別的行為中發(fā)生的變化稱為重排序(reordings)。注意,這不意味著你的指令的位置被惡意(或者好意)地更改。

它只是意味著其他的CPU會(huì)讀到跟程序中寫入的順序不一樣的結(jié)果。

為什么會(huì)有指令的重排序?

答案:因?yàn)闉榱耸咕彺婺軌虻玫礁雍侠淼乩谩?/p>

int a=1
int b=2

省略一萬行代碼...

int c=a+b

最后一句放第三行就能讓緩存更合理, 原因:cpu將a=1讀入CPU高速緩存,然后將b=2讀入高速緩存,由于CPU高速緩存的容量很小,所以當(dāng)執(zhí)行后面的一萬行代碼時(shí)CPU高速緩存滿了,那么就會(huì)把a(bǔ)=1、b=2這2個(gè)緩存行覆蓋掉,當(dāng)真正執(zhí)行int c= a+ b時(shí)由于CPU高速緩存里面沒有數(shù)據(jù)那么CPU就要重新從主存讀取數(shù)據(jù)然后計(jì)算,這樣就出現(xiàn)了不必須的重復(fù)讀取主存的操作,浪費(fèi)CPU,通過重指令排序讓int c=a+b放到第三行則可以緩存能夠立即得到利用,將c=a+b的結(jié)果計(jì)算后可以立即回寫主內(nèi)存,避免后續(xù)a=1、b=2的緩存行被其他指令的緩存行覆蓋

11.2、volatile的內(nèi)存語義

12、比較并交換(CAS)

12.1、使用CAS與使用鎖相比的好處

與鎖相比,使用比較并交換(CAS)會(huì)使程序看起來更復(fù)雜,但由于其非阻塞性,它對(duì)死鎖問題天生免疫,并且線程間的影響也遠(yuǎn)遠(yuǎn)比基于鎖的方式小的多。更為重要的是,使用無鎖的方式完全沒有鎖競爭帶來的開銷,也沒有線程間頻繁調(diào)度調(diào)來的開銷,因此它比基于鎖的方式擁有更優(yōu)越的性能。

12.2、CAS原理

CAS算法的過程是:它包含3個(gè)參數(shù)CAS(realValue, expectValue, newValue), 其中realValue表示要更新的變量,expectValue表示預(yù)期值,newValue表示新值。僅當(dāng)realValue等與expectValue值時(shí),才將realValue的值變更為newValue,如果realValue與expectValue不相等,說明有其他線程已經(jīng)更新做了更新,則當(dāng)前線程什么也不做,最后CAS返回當(dāng)前realValue的真實(shí)值。CAS是抱著樂觀的態(tài)度去進(jìn)行的,它總是認(rèn)為自己可以完成操作,當(dāng)多個(gè)線程同時(shí)使用CAS操作一個(gè)變量時(shí),只有一個(gè)會(huì)勝出,并成功更新,其余均會(huì)失敗。失敗的線程不會(huì)被掛起,僅是被告知失敗,并且允許再次嘗試,當(dāng)然允許失敗的線程放棄操作。

簡單的說,CAS需要你額外給出一個(gè)預(yù)期值,也就是你認(rèn)為現(xiàn)在這個(gè)變量應(yīng)該是什么樣子的。如果變量不是你想象的那樣,則說明已經(jīng)被別人修改過了。你就重新讀取,再次嘗試修改就好了。

在硬件層面大部分的處理器都已支持原子化的CAS指令。在JDK5后,虛擬機(jī)便可以使用這個(gè)指令來進(jìn)行原子化操作。

13.3、CAS實(shí)現(xiàn)計(jì)數(shù)器

package 自旋鎖計(jì)數(shù);

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class Counter<psvm> {

	/**
     * 線程安全方式計(jì)數(shù), 內(nèi)部的value子段聲明為 private volatile int value;所以保證了內(nèi)存可見性
     */
	private AtomicInteger atomic = new AtomicInteger(0);

	/**
     * 線程不安全,非原子性操作計(jì)數(shù)
     */
	private int i = 0;

	public void count(){
		i++;
	}

	public void safeCount(){

		while (true){
			// 1、多核處理器可能會(huì)同時(shí)運(yùn)行到這行代碼,單核處理器由于時(shí)間片分配算法T1執(zhí)行到這行代碼后CPU執(zhí)行權(quán)被T2獲取了, 線程T1、T2均通過get()方法返回0
			// 2、假如T1先執(zhí)行atomic.compareAndSet(currentValue, ++currentValue)這行代碼,
			//    由于currentValue和atomic的值一致,cas操作成功,atomic變成1,退出循環(huán),
			// 3、然后T2繼續(xù)執(zhí)行atomic.compareAndSet(currentValue, ++currentValue);
			//    這行代碼會(huì)發(fā)現(xiàn)atomic內(nèi)部維護(hù)的value值1已經(jīng)與currentValue的值0不相等,不會(huì)進(jìn)行設(shè)置值操作
			//    T2繼續(xù)下次循環(huán), 又執(zhí)行atomic.get();獲取到的currentValue為1, 再次執(zhí)行compareAndSet時(shí),
			//    atomic為1和currentValue為1相等,成功進(jìn)行cas操作,然后退出循環(huán)
			int currentValue = atomic.get();
			boolean success = atomic.compareAndSet(currentValue, ++currentValue);
			if (success){
				break;
			}
		}

	}


	public static void main(String[] args) {
		Counter counter = new Counter();

		List<thread> threadList = new ArrayList<>(500);
		for (int j = 0; j < 500; j++){
			Thread thread = new Thread(() ->{
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				counter.count();

				counter.safeCount();
			});

			threadList.add(thread);
		}

		threadList.stream().forEach(thread -> thread.start());

		threadList.forEach(thread -> {
			try {
				thread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		});

		System.out.println("count: " + counter.i);
		System.out.println("safeCount:" + counter.atomic);

	}
}

13.3、使用CAS實(shí)現(xiàn)無鎖同步

package atomic;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 使用cas實(shí)現(xiàn)無鎖同步
 *
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/23 19:46
 */
public class CasLock {

    private AtomicInteger lock = new AtomicInteger(0);

    //記錄當(dāng)前獲取到鎖的線程ID
    private Long getLockThreadId;


    public void tryLock(){
        while (true){
            boolean success = lock.compareAndSet(0, 1);
            if (success){
                getLockThreadId = Thread.currentThread().getId();
                break;
            }

            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void unLock(){
        long currentThreadId = Thread.currentThread().getId();
        if (currentThreadId != getLockThreadId){
            throw new IllegalStateException("未獲取到鎖,無需解鎖");
        }

        int value = lock.get();
        if (value == 1){
            lock.set(0);
        }
    }
}

測試示例:

package atomic;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Random;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/23 19:50
 */
public class CasLockDemo {

    private static int i = 0;

    public static void main(String[] args) {
        BitSet bitSet = new BitSet();

        CasLock lock = new CasLock();

        List<thread> threadList = new ArrayList<>(500);
        Random random = new Random();

        for (int  j =0 ; j < 50000; j++){
            Thread t = new Thread(() -> {
                try {
                    Thread.sleep(random.nextInt(20) + 200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                lock.tryLock();
                i++;
                if(bitSet.get(i)){
                    throw new RuntimeException("lock有問題");
                }

                bitSet.set(i);
                System.out.println(Thread.currentThread().getName() + " get lock, i=" + i);

                lock.unLock();
            }, "thread-" + j);

            threadList.add(t);
        }

        threadList.forEach(thread -> thread.start());
    }
}

13、atomic包原子操作類(無鎖CAS)

13.1、AtomicInteger

主要方法:

java并發(fā)編程的入門過程

示例:

package atomic;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/25 15:07
 */
public class AtomicIntegerDemo {

    /**
     * 線程安全方式計(jì)數(shù), 內(nèi)部的value子段聲明為 private volatile int value;所以保證了內(nèi)存可見性
     */
    private AtomicInteger atomic = new AtomicInteger(0);

    /**
     * 線程不安全,非原子性操作計(jì)數(shù)
     */
    private int i = 0;

    public void count(){
        i++;
    }

    public void safeCount(){
        atomic.incrementAndGet();
    }

    public static void main(String[] args) {
        AtomicIntegerDemo counter = new AtomicIntegerDemo();

        List<thread> threadList = new ArrayList<>(500);

        for (int j = 0; j < 500; j++){
            Thread thread = new Thread(() ->{
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                counter.count();

                counter.safeCount();
            });

            threadList.add(thread);
        }

        threadList.stream().forEach(thread -> thread.start());

        threadList.forEach(thread -> {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("count: " + counter.i);
        System.out.println("safeCount:" + counter.atomic);
    }
}

java并發(fā)編程的入門過程

13.2、AtomicReference

AtomicReference,顧名思義,就是以原子方式更新對(duì)象引用

可以看到,AtomicReference持有一個(gè)對(duì)象的引用——value,并通過Unsafe類來操作該引用:

java并發(fā)編程的入門過程

為什么需要AtomicReference?難道多個(gè)線程同時(shí)對(duì)一個(gè)引用變量賦值也會(huì)出現(xiàn)并發(fā)問題? 引用變量的賦值本身沒有并發(fā)問題,也就是說對(duì)于引用變量var ,類似下面的賦值操作本身就是原子操作: Foo var = ... ; AtomicReference的引入是為了可以用一種類似樂觀鎖的方式操作共享資源,在某些情景下以提升性能。

我們知道,當(dāng)多個(gè)線程同時(shí)訪問共享資源時(shí),一般需要以加鎖的方式控制并發(fā):

volatile Foo sharedValue = value;
Lock lock = new ReentrantLock();

lock.lock();
try{
    // 操作共享資源sharedValue
}
finally{
    lock.unlock();
}

上述訪問方式其實(shí)是一種對(duì)共享資源加悲觀鎖的訪問方式。 而AtomicReference提供了以無鎖方式訪問共享資源的能力,看看如何通過AtomicReference保證線程安全,來看個(gè)具體的例子:

package atomic;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/24 20:50
 */
public class AtomicReferenceCounter {

    public static void main(String[] args) throws InterruptedException {
        AtomicReference<integer> ref = new AtomicReference<>(new Integer(0));

        List<thread> list = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            Thread t = new Thread(new Task(ref), "Thread-" + i);
            list.add(t);
        }

        for (Thread t : list) {
            t.start();
            t.join();
        }

        // 打印2000
        System.out.println(ref.get());
    }
}

class Task implements Runnable{

    private AtomicReference<integer> reference;

    public Task(AtomicReference<integer> reference){
        this.reference = reference;
    }

    @Override
    public void run() {
        while (true){
            Integer oldValue = reference.get();
            boolean success = reference.compareAndSet(oldValue, oldValue + 1);
            if (success){
                break;
            }
        }
    }
}

java并發(fā)編程的入門過程

該示例并沒有使用鎖,而是使用自旋+CAS的無鎖操作保證共享變量的線程安全。1000個(gè)線程,每個(gè)線程對(duì)金額增加1,最終結(jié)果為2000,如果線程不安全,最終結(jié)果應(yīng)該會(huì)小于2000。

通過示例,可以總結(jié)出AtomicReference的一般使用模式如下

AtomicReference<object> ref = new AtomicReference<>(new Object());
Object oldCache = ref.get();

// 對(duì)緩存oldCache做一些操作
Object newCache  =  someFunctionOfOld(oldCache); 

// 如果期間沒有其它線程改變了緩存值,則更新
boolean success = ref.compareAndSet(oldCache , newCache);

上面的代碼模板就是AtomicReference的常見使用方式,看下compareAndSet方法: java并發(fā)編程的入門過程

該方法會(huì)將入?yún)⒌?strong>expect變量所指向的對(duì)象和AtomicReference中的引用對(duì)象進(jìn)行比較,如果兩者指向同一個(gè)對(duì)象,則將AtomicReference中的引用對(duì)象重新置為update,修改成功返回true,失敗則返回false。也就是說,AtomicReference其實(shí)是比較對(duì)象的引用。

13.2、CAS操作可能存在的ABA問題

13.2.1、介紹

CAS操作可能存在ABA的問題,就是說: 假如一個(gè)值原來是A,變成了B,又變成了A,那么CAS檢查時(shí)會(huì)發(fā)現(xiàn)它的值沒有發(fā)生變化,但是實(shí)際上卻變化了。

一般來講這并不是什么問題,比如數(shù)值運(yùn)算,線程其實(shí)根本不關(guān)心變量中途如何變化,只要最終的狀態(tài)和預(yù)期值一樣即可。

但是,有些操作會(huì)依賴于對(duì)象的變化過程,此時(shí)的解決思路一般就是使用版本號(hào)。在變量前面追加上版本號(hào),每次變量更新的時(shí)候把版本號(hào)加一,那么A-B-A 就會(huì)變成1A - 2B - 3A。

13.2.3、貴賓充值卡問題(ABA示例)

舉例:有一家蛋糕店為了挽留客戶,決定為貴賓卡里小于20元的客戶一次性充值20元,刺激客戶充值和消費(fèi),但條件是:每位客戶只能被贈(zèng)送一次。

package atomic;

import java.util.concurrent.atomic.AtomicReference;

/**
 * cas ABA問題
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/25 15:30
 */
public class CasABAPromblem {

    private static AtomicReference<integer> money = new AtomicReference<>(19);

    public static void main(String[] args) {

        ChargeMoneyWorker chargeMoneyWorker = new ChargeMoneyWorker(money);
        for (int i = 0; i < 3; i++){
            new Thread(chargeMoneyWorker).start();
        }

        ConsumeMoneyWorker consumeMoneyWorker = new ConsumeMoneyWorker(money);
        new Thread(consumeMoneyWorker).start();
    }

}

class ChargeMoneyWorker implements Runnable{

    private AtomicReference<integer> money;

    public ChargeMoneyWorker(AtomicReference<integer> money){
        this.money = money;
    }

    @Override
    public void run() {
        while (true){

            while (true){
                Integer m = money.get();
                if (m < 20){
                    boolean success  = money.compareAndSet(m, m +20);
                    if (success){
                        System.out.println("余額小于20元,充值成功, 充值后余額:" + money.get());
                        break;
                    }
                }
                else {
//                    System.out.println("余額大于20元,無需充值");
                    break;
                }
            }

        }

    }
}

class ConsumeMoneyWorker implements Runnable{
    private AtomicReference<integer> money;

    public ConsumeMoneyWorker(AtomicReference<integer> money){
        this.money = money;
    }

    @Override
    public void run() {
        while (true){

            while (true){
                Integer m = money.get();
                if (m > 10){
                    boolean success  = money.compareAndSet(m, m - 10);
                    if (success){
                        System.out.println("成功消費(fèi)10元, 余額:" + money.get());
                        break;
                    }
                }
                else {
                    System.out.println("余額不足10元");
                }
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
 }

java并發(fā)編程的入門過程

從上面的輸出可以到用戶的賬戶被先后反復(fù)充值,其原因是用戶的賬戶余額被反復(fù)修改,導(dǎo)致修改后又滿足的原充值條件,使得充值線程無法正確判斷該用戶是否已經(jīng)充值過。

雖然這種情況出現(xiàn)的概率不大,但是依然也是由可能出現(xiàn)的,因此當(dāng)業(yè)務(wù)中確實(shí)出現(xiàn)這種問題,我們需要注意是否是我們的業(yè)務(wù)本身就不合理。JDK為我們考慮到了這種情況,使用AtomicStampedReference可以很好的解決這個(gè)問題。

13.3、AtomicStampedReference

AtomicStampedReference就是上面所說的加了版本號(hào)的AtomicReference。

13.3.1、AtomicStampedReference原理

先來看下如何構(gòu)造一個(gè)AtomicStampedReference對(duì)象,AtomicStampedReference只有一個(gè)構(gòu)造器:

java并發(fā)編程的入門過程

可以看到,除了傳入一個(gè)初始的引用變量initialRef外,還有一個(gè)initialStamp變量,initialStamp其實(shí)就是版本號(hào)(或者說時(shí)間戳),用來唯一標(biāo)識(shí)引用變量。

在構(gòu)造器內(nèi)部,實(shí)例化了一個(gè)Pair對(duì)象,Pair對(duì)象記錄了對(duì)象引用和時(shí)間戳信息,采用int作為時(shí)間戳,實(shí)際使用的時(shí)候,要保證時(shí)間戳唯一(一般做成自增的),如果時(shí)間戳如果重復(fù),還會(huì)出現(xiàn)ABA的問題。

AtomicStampedReference的所有方法,其實(shí)就是Unsafe類針對(duì)這個(gè)Pair對(duì)象的操作。 和AtomicReference相比,AtomicStampedReference中的每個(gè)引用變量都帶上了pair.stamp這個(gè)版本號(hào),這樣就可以解決CAS中的ABA問題了。

13.3.2、AtomicStampedReference使用示例

// 創(chuàng)建AtomicStampedReference對(duì)象,持有Foo對(duì)象的引用,初始為null,版本為0
AtomicStampedReference<foo>  asr = new AtomicStampedReference<>(null,0);  

int[] stamp=new  int[1];
Foo  oldRef = asr.get(stamp);   // 調(diào)用get方法獲取引用對(duì)象和對(duì)應(yīng)的版本號(hào)

int oldStamp=stamp[0]; // stamp[0]保存版本號(hào)

asr.compareAndSet(oldRef, null, oldStamp, oldStamp + 1) //嘗試以CAS方式更新引用對(duì)象,并將版本號(hào)+1

上述模板就是AtomicStampedReference的一般使用方式,注意下compareAndSet方法: java并發(fā)編程的入門過程

我們知道,AtomicStampedReference內(nèi)部保存了一個(gè)pair對(duì)象,該方法的邏輯如下:

  1. 如果AtomicStampedReference內(nèi)部pair的引用變量、時(shí)間戳 與 入?yún)?strong>expectedReference、expectedStamp都一樣,說明期間沒有其它線程修改過AtomicStampedReference,可以進(jìn)行修改。此時(shí),會(huì)創(chuàng)建一個(gè)新的Pair對(duì)象(casPair方法,因?yàn)镻air是Immutable類)。

但這里有段優(yōu)化邏輯,就是如果 newReference == current.reference && newStamp == current.stamp,說明用戶修改的新值和AtomicStampedReference中目前持有的值完全一致,那么其實(shí)不需要修改,直接返回true即可。

13.3.3、AtomicStampedReference解決貴賓卡多次充值問題

package atomic;

import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * cas ABA問題解決方案
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/25 15:30
 */
public class ResolveCasABAProblem {

    private static AtomicStampedReference<integer> money = new AtomicStampedReference<>(19, 0);

    public static void main(String[] args) {

        ResolveChargeMoneyWorker chargeMoneyWorker = new ResolveChargeMoneyWorker(money);
        for (int i = 0; i < 3; i++){
            new Thread(chargeMoneyWorker).start();
        }

        ResolveConsumeMoneyWorker consumeMoneyWorker = new ResolveConsumeMoneyWorker(money);
        new Thread(consumeMoneyWorker).start();
    }

}

class ResolveChargeMoneyWorker implements Runnable{

    private AtomicStampedReference<integer> money;

    public ResolveChargeMoneyWorker(AtomicStampedReference<integer> money){
        this.money = money;
    }

    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while (true){
                Integer m = money.getReference();
                if (m < 20){
                    boolean success  = money.compareAndSet(m, m +20, 0 , 1);
                    if (success){
                        System.out.println("余額小于20元,充值成功, 充值后余額:" + money.getReference());
                        break;
                    }
                }
                else {
//                    System.out.println("余額大于20元,無需充值");
                    break;
                }
            }

        }

    }
}

class ResolveConsumeMoneyWorker implements Runnable{
    private AtomicStampedReference<integer> money;

    public ResolveConsumeMoneyWorker(AtomicStampedReference<integer> money){
        this.money = money;
    }

    @Override
    public void run() {
        while (true){

            while (true){
                Integer m = money.getReference();
                int stamp = money.getStamp();
                if (m > 10){
                    // 這里為什么不是給版本號(hào)加1呢?
                    // 假如這里變成  boolean success  = money.compareAndSet(m, m - 10, stamp, stamp + 1);
                    // 考慮一種情況,用戶賬戶本身有19元錢, 初始充值狀態(tài)為0表示為充值, 用戶先消費(fèi)了10元,stamp變?yōu)?
                    // 這時(shí)用戶還沒有充值,賬戶金額也確實(shí)少于20元,這會(huì)導(dǎo)致充值線程掃描是發(fā)現(xiàn)stamp已變?yōu)?,就不會(huì)充值了
                    // 根本原因是用戶是否消費(fèi)與是否充值過無關(guān),充值的狀態(tài)不能由于其他因素改變
                    // 這個(gè)例子是《實(shí)戰(zhàn)高并發(fā)程序設(shè)計(jì)》一書中的例子,書中例子沒有考慮到這點(diǎn),
                    // 書中假想的情況是先充值后消費(fèi),但如果是先消費(fèi)再充值就有問題了
                    boolean success  = money.compareAndSet(m, m - 10, stamp, stamp);
                    if (success){
                        System.out.println("成功消費(fèi)10元, 余額:" + money.getReference());
                        break;
                    }
                }
                else {
                    System.out.println("余額不足10元");
                }
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

情況1:先充值后消費(fèi)

java并發(fā)編程的入門過程

情況2:先消費(fèi),后充值

java并發(fā)編程的入門過程

13.4、AtomicMarkableReference

AtomicMarkableReference是AtomicStampedReference的特殊化形式AtomicMarkableReference用于無需知道數(shù)據(jù)目前具體是哪個(gè)版本,只需要知道數(shù)據(jù)是否被更改過。

前面的客戶賬戶充值例子ABA問題使用AtomicMarkableReference解決的代碼如下:

package atomic;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/25 17:45
 */

import java.util.concurrent.atomic.AtomicMarkableReference;

/**
 * cas ABA問題
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/25 15:30
 */
public class AtomicMarkableReferenceDemo {

    private static AtomicMarkableReference<integer> money = new AtomicMarkableReference<>(19, false);

    public static void main(String[] args) {

        MarkableChargeMoneyWorker chargeMoneyWorker = new MarkableChargeMoneyWorker(money);
        for (int i = 0; i < 3; i++){
            new Thread(chargeMoneyWorker).start();
        }

        MarkableConsumeMoneyWorker consumeMoneyWorker = new MarkableConsumeMoneyWorker(money);
        new Thread(consumeMoneyWorker).start();
    }

}

class MarkableChargeMoneyWorker implements Runnable{

    private AtomicMarkableReference<integer> money;

    public MarkableChargeMoneyWorker(AtomicMarkableReference<integer> money){
        this.money = money;
    }

    @Override
    public void run() {
        while (true){
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            while (true){
                Integer m = money.getReference();
                if (m < 20){
                    boolean success  = money.compareAndSet(m, m +20, false , true);
                    if (success){
                        System.out.println("余額小于20元,充值成功, 充值后余額:" + money.getReference());
                        break;
                    }
                }
                else {
//                    System.out.println("余額大于20元,無需充值");
                    break;
                }
            }

        }

    }
}

class MarkableConsumeMoneyWorker implements Runnable{
    private AtomicMarkableReference<integer> money;

    public MarkableConsumeMoneyWorker(AtomicMarkableReference<integer> money){
        this.money = money;
    }

    @Override
    public void run() {
        while (true){

            while (true){
                Integer m = money.getReference();
                boolean isMarked = money.isMarked();
                if (m > 10){
                    // 消費(fèi)不更改充值標(biāo)識(shí)
                    boolean success  = money.compareAndSet(m, m - 10, isMarked, isMarked);
                    if (success){
                        System.out.println("成功消費(fèi)10元, 余額:" + money.getReference());
                        break;
                    }
                }
                else {
                    System.out.println("余額不足10元");
                }
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

java并發(fā)編程的入門過程

13.4、AtomicIntegerArray

AtomicIntegerArray本質(zhì)上是對(duì)int[]類型的封裝,使用Unsafe類通過CAS的方式控制int[]在多線程下的安全性,它提供了以下幾個(gè)核心API:

java并發(fā)編程的入門過程

13.4.1、如果沒有AtomicIntegerArray的錯(cuò)誤示例

package atomic;

/**
 * Increment任務(wù):這個(gè)類使用vector[i]++方法增加數(shù)組中所有元素的值
 * Decrement任務(wù):這個(gè)類使用vector[i]--方法減少數(shù)組中所有元素的值
 *
 * 在main方法中創(chuàng)建了1000個(gè)元素的int[]數(shù)組,
 * 執(zhí)行了1000個(gè)Increment任務(wù)和1000個(gè)Decrement任務(wù),在任務(wù)的結(jié)尾如果沒有不一致的錯(cuò)誤,
 * 數(shù)組中所有元素的值不全為0,執(zhí)行程序后會(huì)看到程序輸出了一些不全為0的數(shù)值
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/25 12:22
 */
public class BadAtomiceIntegerArrayDemo {

    public static void main(String[] args) throws InterruptedException {
        int[] vector = new int[1000];
        for (int  i = 0; i < vector.length; i++){
            vector[i] = 0;
        }

        BadIncrement increment = new BadIncrement(vector);
        BadDecrement decrement = new BadDecrement(vector);

        Thread[] badThreadIncrements = new Thread[1000];
        Thread[] badThreadDecrements = new Thread[1000];

        for (int i =0 ; i < badThreadIncrements.length; i++){
            badThreadIncrements[i] = new Thread(increment);
            badThreadDecrements[i] = new Thread(decrement);
        }

        for (int i =0 ; i < badThreadIncrements.length; i++){
            badThreadIncrements[i].start();
            badThreadDecrements[i].start();
        }

        for (int i =0 ; i < badThreadIncrements.length; i++){
            badThreadIncrements[i].join();
            badThreadDecrements[i].join();
        }

        for (int i =0 ; i < vector.length; i++){
           if (vector[i] != 0){
               System.out.println("Vector["+i+"] : " + vector[i]);
           }
        }

        System.out.println("main end");
    }
}

class BadIncrement implements Runnable{

    private int[] vector;

    public BadIncrement(int[] vector){
        this.vector = vector;
    }

    @Override
    public void run() {
        for (int i = 0; i <vector.length; i++){
            vector[i]++;
        }
    }
}

class BadDecrement implements Runnable{

    private int[] vector;

    public BadDecrement(int[] vector){
        this.vector = vector;
    }

    @Override
    public void run() {
        for (int i = 0; i <vector.length; i++){
            vector[i]--;
        }
    }
}

java并發(fā)編程的入門過程

13.4.2、atomicatintegerarray的正確使用

package atomic;

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomiceIntegerArrayDemo {

    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerArray vector = new AtomicIntegerArray(1000);

        Increment increment = new Increment(vector);
        Decrement decrement = new Decrement(vector);

        Thread[] threadIncrements = new Thread[1000];
        Thread[] threadDecrements = new Thread[1000];

        for (int i =0 ; i < threadIncrements.length; i++){
            threadIncrements[i] = new Thread(increment);
            threadDecrements[i] = new Thread(decrement);
        }

        for (int i =0 ; i < threadIncrements.length; i++){
            threadIncrements[i].start();
            threadDecrements[i].start();
        }

        for (int i =0 ; i < threadIncrements.length; i++){
            threadIncrements[i].join();
            threadDecrements[i].join();
        }

        for (int i =0 ; i < vector.length(); i++){
           if (vector.get(i) != 0){
               System.out.println("Vector["+i+"] : " + vector.get(i));
           }
        }

        System.out.println("main end");
    }
}

class Increment implements Runnable{

    private AtomicIntegerArray vector;

    public Increment(AtomicIntegerArray vector){
        this.vector = vector;
    }

    @Override
    public void run() {
        for (int i = 0; i <vector.length(); i++){
            vector.getAndIncrement(i);
        }
    }
}

class Decrement implements Runnable{

    private AtomicIntegerArray vector;

    public Decrement(AtomicIntegerArray vector){
        this.vector = vector;
    }

    @Override
    public void run() {
        for (int i = 0; i <vector.length(); i++){
            vector.getAndDecrement(i);
        }
    }
}

工作原理: Increment任務(wù):這個(gè)類使用getAndIncrement方法增加數(shù)組中所有元素的值 Decrement任務(wù):這個(gè)類使用getAndDecrement方法減少數(shù)組中所有元素的值

在main方法中創(chuàng)建了1000個(gè)元素的AtomicIntegerArray數(shù)組, 執(zhí)行了1000個(gè)Increment任務(wù)和1000個(gè)Decrement任務(wù),在任務(wù)的結(jié)尾如果沒有不一致的錯(cuò)誤, 數(shù)組中所有元素的值都應(yīng)該是0,執(zhí)行程序后會(huì)看到程序只將最后的main end消息打印到控制臺(tái),因?yàn)樗性刂禐?

13.5、atomicreferencearray

atomicreferencearray是針對(duì)普通自定義對(duì)象的數(shù)組的原子更新類的通用形式 下面的類實(shí)現(xiàn)了atomicintegerarray的功能

package atomic;

import java.util.concurrent.atomic.AtomicReferenceArray;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/25 17:26
 */
public class AtomicReferenceArrayDemo {


    public static void main(String[] args) throws InterruptedException {
        AtomicReferenceArray<Integer> vector = new AtomicReferenceArray(1000);
        for (int  i =0; i < vector.length(); i++){
            vector.set(i, 0);
        }

        ReferenceIncrement increment = new ReferenceIncrement(vector);
        ReferenceDecrement decrement = new ReferenceDecrement(vector);

        Thread[] threadIncrements = new Thread[1000];
        Thread[] threadDecrements = new Thread[1000];

        for (int i =0 ; i < threadIncrements.length; i++){
            threadIncrements[i] = new Thread(increment);
            threadDecrements[i] = new Thread(decrement);
        }

        for (int i =0 ; i < threadIncrements.length; i++){
            threadIncrements[i].start();
            threadDecrements[i].start();
        }

        for (int i =0 ; i < threadIncrements.length; i++){
            threadIncrements[i].join();
            threadDecrements[i].join();
        }

        for (int i =0 ; i < vector.length(); i++){
            if (vector.get(i) != 0){
                System.out.println("Vector["+i+"] : " + vector.get(i));
            }
        }

        System.out.println("main end");
    }
}


class ReferenceIncrement implements Runnable{

    private AtomicReferenceArray<Integer> vector;

    public ReferenceIncrement(AtomicReferenceArray<Integer> vector){
        this.vector = vector;
    }

    @Override
    public void run() {
        for (int i = 0; i <vector.length(); i++){


            // 不能用set, 用了set后就是不管內(nèi)存現(xiàn)有真實(shí)值是多少,直接設(shè)置為新值, 在cpu時(shí)間片切換時(shí)會(huì)有問題
            // 正確的應(yīng)該是在原有值基礎(chǔ)上加1
//            vector.set(i, current + 1);

            while (true){
                int current = vector.get(i);
                if(vector.compareAndSet(i, current, current + 1)){
                    break;
                }
            }

        }
    }
}

class ReferenceDecrement implements Runnable{

    private AtomicReferenceArray<Integer> vector;

    public ReferenceDecrement(AtomicReferenceArray<Integer> vector){
        this.vector = vector;
    }

    @Override
    public void run() {
        for (int i = 0; i <vector.length(); i++){
            while (true){
                int current = vector.get(i);
                if(vector.compareAndSet(i, current, current - 1)){
                    break;
                }
            }
        }
    }
}

java并發(fā)編程的入門過程

13.6、atomicintegerfieldupdater

根據(jù)數(shù)據(jù)類型不同, updater有3種, 分別是atomicintegerfieldupdater、atomiclongfieldupdater、atomicreferencefieldupdater

示例場景: 假設(shè)某地要進(jìn)行一次選舉。現(xiàn)在模擬這個(gè)投票場景,如果選民投了候選人1票,就記為1,否則記為0,最終就是要統(tǒng)計(jì)某個(gè)選擇被投票的次數(shù)。

package atomic; 
import java.util.arrays; 
java.util.concurrent.atomic.atomicinteger;
java.util.concurrent.atomic.atomicintegerfieldupdater; 
/** 
* @author calebzhao<9 3 9 4 7 5 0 @ qq.com>
 * 2019/8/25 18:01
 */
public class AtomicIntegerFieldUpdaterDemo {

    private static AtomicIntegerFieldUpdater<candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    //作用是為了來驗(yàn)證AtomicIntegerFieldUpdater計(jì)算的正確性
    private static AtomicInteger allScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        Candidate candidate = new Candidate();

        Thread[] threads = new Thread[1000];

        for (int i = 0; i < 1000; i++){
            threads[i] = new Thread(() -> {
                // 模擬投票過程隨機(jī)
                if (Math.random() > 0.5){
                    scoreUpdater.incrementAndGet(candidate);
                    allScore.incrementAndGet();
                }
            });
        }

        Arrays.stream(threads).forEach(thread -> thread.start());

        for (int i =0 ; i < threads.length; i++){
            threads[i].join();
        }

        System.out.println("scoreUpdater:" + scoreUpdater.get(candidate));
        System.out.println("allScore:" +allScore.get());
    }
}

class Candidate{

    int id;

     volatile int score;
}

java并發(fā)編程的入門過程

上述代碼模擬了這個(gè)場景,候選人的得票數(shù)量記錄在Candidate.score中,注意它是一個(gè)普通的volatile 變量,而volatile 變量并不會(huì)保證線程安全性,只會(huì)保證內(nèi)存可見性和禁止重排序,代碼中通過Math.random()來模擬隨機(jī)投票過程, AtomicInteger allScore = new AtomicInteger(0);這一行代碼用來驗(yàn)證 AtomicIntegerFieldUpdater計(jì)算的正確性, 運(yùn)行這段程序會(huì)發(fā)現(xiàn) allScore的值總是和scoreUpdater的值相等。

AtomicIntegerFieldUpdater使用注意事項(xiàng):

  1. Updater只能修改它可見范圍內(nèi)的變量,因?yàn)閁pdater使用反射獲得這個(gè)變量,如果變量不可見就會(huì)報(bào)錯(cuò),比如score聲明為private的,就不行。

  2. 為了保證變量在多線程環(huán)境被正確的讀取,它必須是volatile修飾的,如果不修飾也會(huì)報(bào)錯(cuò)。

  3. 由于CAS操作通過對(duì)象實(shí)例中的偏移量直接進(jìn)行賦值,因此它不支持static字段(U.objectFieldOffset(field)不支持靜態(tài)變量),如果被static修飾了也會(huì)報(bào)錯(cuò)

14、Unsafe

14.1、獲取unsafe

package 自旋鎖計(jì)數(shù);

import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
 * @author calebzhao<9 3 9 3 4 7 5 0 7 @ qq.com>
 * 2019/8/26 20:17
 */
public class UnsafeUtil
{
    public static Unsafe getUnsafe(){
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            
            // 這里是null是因?yàn)閠heUnsafe屬性是static靜態(tài)屬性
            return (Unsafe) field.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new RuntimeException();
        }
    }
}

15、幾種Couter計(jì)數(shù)的性能比較

package 自旋鎖計(jì)數(shù);

import sun.misc.Unsafe;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class CounterDemo {

    private  final int THREAD_COUNT = 1000;

    public static void main(String[] args) {
        CounterDemo counterDemo =  new CounterDemo();

        counterDemo.atomicCouterTest();

        counterDemo.casCouterTest();

        counterDemo.synchronizedCouterTest();

        counterDemo.badCouterTest();
    }

    public void atomicCouterTest(){
        try{
            AtomicIntegerCouter atomicIntegerCouter = new AtomicIntegerCouter();
            CouterThread couterThread = new CouterThread(atomicIntegerCouter);
            List<thread> threadList = new ArrayList<>();
            for (int j = 0; j < THREAD_COUNT; j++){
                Thread thread = new Thread(couterThread);
                threadList.add(thread);
            }

            long start = System.currentTimeMillis();
            threadList.stream().forEach(thread -> thread.start());

            threadList.forEach(thread -> {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            long time = System.currentTimeMillis() - start;

            System.out.println("atomicIntegerCouter: " + atomicIntegerCouter.getValue() + " 耗時(shí): " + time);
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }

    public void casCouterTest(){
        try{
            CasCounter casCounter = new CasCounter();
            CouterThread couterThread = new CouterThread(casCounter);
            List<thread> threadList = new ArrayList<>();
            for (int j = 0; j < THREAD_COUNT; j++){
                Thread thread = new Thread(couterThread);
                threadList.add(thread);
            }

            long start = System.currentTimeMillis();
            threadList.stream().forEach(thread -> thread.start());

            threadList.forEach(thread -> {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            long time = System.currentTimeMillis() - start;
            System.out.println("casCounter: " + casCounter.getValue() + " 耗時(shí): " + time);
        }
        catch (Error e){
            e.printStackTrace();
        }
    }

    public void synchronizedCouterTest(){
        try{
            SynchronizedCouter synchronizedCouter = new SynchronizedCouter();
            CouterThread couterThread = new CouterThread(synchronizedCouter);
            List<thread> threadList = new ArrayList<>();
            for (int j = 0; j < THREAD_COUNT; j++){
                Thread thread = new Thread(couterThread);
                threadList.add(thread);
            }

            long start = System.currentTimeMillis();
            threadList.stream().forEach(thread -> thread.start());

            threadList.forEach(thread -> {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            long time = System.currentTimeMillis() - start;

            System.out.println("synchronizedCounter: " + synchronizedCouter.getValue() + " 耗時(shí): " + time);
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }

    public void badCouterTest(){
        try{
            BadCouter badCouter = new BadCouter();
            CouterThread couterThread = new CouterThread(badCouter);
            List<thread> threadList = new ArrayList<>();
            for (int j = 0; j < THREAD_COUNT; j++){
                Thread thread = new Thread(couterThread);
                threadList.add(thread);
            }

            long start = System.currentTimeMillis();
            threadList.stream().forEach(thread -> thread.start());

            threadList.forEach(thread -> {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            long time = System.currentTimeMillis() - start;
            System.out.println("badCouter: " + badCouter.getValue() + " 耗時(shí): " + time);
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }

}

class CouterThread implements Runnable{

    private Counter counter;

    public CouterThread(Counter counter){
        this.counter = counter;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        counter.incrementAndGet();
    }
}

interface Counter{

    int incrementAndGet();

    int getAndIncrement();

    int getValue();
}

class AtomicIntegerCouter implements Counter{
    /**
     * 線程安全方式計(jì)數(shù), 內(nèi)部的value子段聲明為 private volatile int value;所以保證了內(nèi)存可見性
     */
    private AtomicInteger atomic = new AtomicInteger(0);

    @Override
    public int incrementAndGet(){

        while (true){
            // 1、多核處理器可能會(huì)同時(shí)運(yùn)行到這行代碼,單核處理器由于時(shí)間片分配算法T1執(zhí)行到這行代碼后CPU執(zhí)行權(quán)被T2獲取了, 線程T1、T2均通過get()方法返回0
            // 2、假如T1先執(zhí)行atomic.compareAndSet(currentValue, ++currentValue)這行代碼,
            //    由于currentValue和atomic的值一致,cas操作成功,atomic變成1,退出循環(huán),
            // 3、然后T2繼續(xù)執(zhí)行atomic.compareAndSet(currentValue, ++currentValue);
            //    這行代碼會(huì)發(fā)現(xiàn)atomic內(nèi)部維護(hù)的value值1已經(jīng)與currentValue的值0不相等,不會(huì)進(jìn)行設(shè)置值操作
            //    T2繼續(xù)下次循環(huán), 又執(zhí)行atomic.get();獲取到的currentValue為1, 再次執(zhí)行compareAndSet時(shí),
            //    atomic為1和currentValue為1相等,成功進(jìn)行cas操作,然后退出循環(huán)
            int currentValue = atomic.get();
            boolean success = atomic.compareAndSet(currentValue, ++currentValue);
            if (success){
                return atomic.get();
            }
        }
    }

    @Override
    public int getAndIncrement() {
        while (true){
            int currentValue = atomic.get();
            boolean success = atomic.compareAndSet(currentValue, ++currentValue);
            if (success){
                return currentValue;
            }
        }
    }

    @Override
    public int getValue() {
        return atomic.get();
    }
}

class CasCounter implements Counter{

    private volatile int count = 0;

    private static final Unsafe unsafe = UnsafeUtil.getUnsafe();

    private static long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset(CasCounter.class.getDeclaredField("count"));
        }
        catch (NoSuchFieldException e) {
            throw new Error();
        }
    }

    @Override
    public int incrementAndGet() {
        while (true){
            boolean success = unsafe.compareAndSwapInt(this, valueOffset, count, count + 1);
            if (success){
                return count;
            }
        }
    }

    @Override
    public int getAndIncrement() {
        while (true){
            boolean success = unsafe.compareAndSwapInt(this, valueOffset, count, count + 1);
            if (success){
                return count;
            }
        }
    }

    @Override
    public int getValue() {
        return count;
    }
}

class BadCouter implements Counter{
    private volatile int count = 0;

    @Override
    public int incrementAndGet() {
        return ++count;
    }

    @Override
    public int getAndIncrement() {
        return count++;
    }

    @Override
    public int getValue() {
        return count;
    }
}

class SynchronizedCouter implements Counter{
    private volatile int count = 0;

    @Override
    public synchronized int incrementAndGet() {
        return ++count;
    }

    @Override
    public synchronized int getAndIncrement() {
        return count++;
    }

    @Override
    public int getValue() {
        return count;
    }
}

java并發(fā)編程的入門過程

多次運(yùn)行示例代碼,會(huì)發(fā)現(xiàn)基本上每次synchronizedCouter的耗時(shí)最短,這也說明了synchronized使用鎖的方式性能并不一定低。

以上就是java并發(fā)編程的入門過程,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI