溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java中怎么實(shí)現(xiàn)線程間的通信功能

發(fā)布時(shí)間:2021-09-07 14:10:23 來(lái)源:億速云 閱讀:171 作者:chen 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Java中怎么實(shí)現(xiàn)線程間的通信功能”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

目錄
  • 前言

  • 1. 如何讓兩個(gè)線程依次執(zhí)行?

  • 2. 如何讓兩個(gè)線程按照指定的方式有序相交?

  • 3. 線程 D 在A、B、C都同步執(zhí)行完畢后執(zhí)行

  • 4. 三個(gè)運(yùn)動(dòng)員分開準(zhǔn)備同時(shí)開跑

  • 5. 子線程將結(jié)果返回給主線程


前言

雖然通常每個(gè)子線程只需要完成自己的任務(wù),但是有時(shí)我們希望多個(gè)線程一起工作來(lái)完成一個(gè)任務(wù),這就涉及到線程間通信。
關(guān)于線程間通信本文涉及到的方法和類包括:thread.join()、object.wait()、object.notify()、CountdownLatch、CyclicBarrier、FutureTask、Callable。

接下來(lái)將用幾個(gè)例子來(lái)介紹如何在Java中實(shí)現(xiàn)線程間通信:

  1. 如何讓兩個(gè)線程依次執(zhí)行,即一個(gè)線程等待另一個(gè)線程執(zhí)行完成后再執(zhí)行?

  2. 如何讓兩個(gè)線程以指定的方式有序相交執(zhí)行?

  3. 有四個(gè)線程:A、B、C、D,如何實(shí)現(xiàn) D 在 A、B、C 都同步執(zhí)行完畢后執(zhí)行?

  4. 三個(gè)運(yùn)動(dòng)員分開準(zhǔn)備,然后在每個(gè)人準(zhǔn)備好后同時(shí)開始跑步。

  5. 子線程完成任務(wù)后,將結(jié)果返回給主線程。

1. 如何讓兩個(gè)線程依次執(zhí)行?

假設(shè)有兩個(gè)線程:A 和 B,這兩個(gè)線程都可以按照順序打印數(shù)字,代碼如下:

public class Test01 {

    public static void main(String[] args) throws InterruptedException {
        demo1();
    }

    public static void demo1() {
        Thread a = new Thread(() -> {
            printNumber("A");
        });

        Thread b = new Thread(() -> {
            printNumber("B");
        });

        a.start();
        b.start();
    }

    public static void printNumber(String threadName) {
        int i = 0;
        while (i++ < 3) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(threadName + " print: " + i);
        }
    }

}

得到的結(jié)果如下:

A print: 1
B print: 1
B print: 2
A print: 2
A print: 3
B print: 3

可以看到 A 和 B 同時(shí)打印數(shù)字,如果我們希望 B 在 A 執(zhí)行完成之后開始執(zhí)行,那么可以使用 thread.join() 方法實(shí)現(xiàn),代碼如下:

public static void demo2() {
    Thread a = new Thread(() -> {
        printNumber("A");
    });

    Thread b = new Thread(() -> {
        System.out.println("B 等待 A 執(zhí)行");
        try {
            a.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        printNumber("B");
    });

    a.start();
    b.start();
}

得到的結(jié)果如下:

B 等待 A 執(zhí)行
A print: 1
A print: 2
A print: 3
B print: 1
B print: 2
B print: 3

我們可以看到該 a.join() 方法會(huì)讓 B 等待 A 完成打印。

thread.join() 方法的作用就是阻塞當(dāng)前線程,等待調(diào)用 join() 方法的線程執(zhí)行完畢后再執(zhí)行后面的代碼。

查看 join() 方法的源碼,內(nèi)部是調(diào)用了 join(0) ,如下:

public final void join() throws InterruptedException {
    join(0);
}

查看 join(0) 的源碼如下:

// 注意這里使用了 sychronized 加鎖,鎖對(duì)象是線程的實(shí)例對(duì)象
public final synchronized void join(long millis) throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }
	// 調(diào)用 join(0) 執(zhí)行下面的代碼
    if (millis == 0) {
        // 這里使用 while 循環(huán)的目的是為了避免虛假喚醒
        // 如果當(dāng)前線程存活則調(diào)用 wait(0), 0 表示永久等待,直到調(diào)用 notifyAll() 或者 notify() 方法
        // 當(dāng)線程結(jié)束的時(shí)候會(huì)調(diào)用 notifyAll() 方法
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

從源碼中可以看出 join(long millis) 方法是通過(guò) wait(long timeout) (Object 提供的方法)方法實(shí)現(xiàn)的,調(diào)用 wait 方法之前,當(dāng)前線程必須獲得對(duì)象的鎖,所以此 join 方法使用了 synchronized 加鎖,鎖對(duì)象是線程的實(shí)例對(duì)象。其中 wait(0)方法會(huì)讓當(dāng)前線程阻塞等待,直到另一個(gè)線程調(diào)用此對(duì)象的 notify() 或者 notifyAll() 方法才會(huì)繼續(xù)執(zhí)行。當(dāng)調(diào)用 join 方法的線程結(jié)束的時(shí)候會(huì)調(diào)用 notifyAll() 方法,所以 join() 方法可以實(shí)現(xiàn)一個(gè)線程等待另一個(gè)調(diào)用 join() 的線程結(jié)束后再執(zhí)行。

虛假喚醒:一個(gè)線程在沒(méi)有被通知、中斷、超時(shí)的情況下被喚醒;
虛假喚醒可能導(dǎo)致條件不成立的情況下執(zhí)行代碼,破壞被鎖保護(hù)的約束關(guān)系;
為什么使用 while 循環(huán)來(lái)避免虛假喚醒:
在 if 塊中使用 wait 方法,是非常危險(xiǎn)的,因?yàn)橐坏┚€程被喚醒,并得到鎖,就不會(huì)再判斷 if 條件而執(zhí)行 if 語(yǔ)句塊外的代碼,所以建議凡是先要做條件判斷,再 wait 的地方,都使用 while 循環(huán)來(lái)做,循環(huán)會(huì)在等待之前和之后對(duì)條件進(jìn)行測(cè)試。

2. 如何讓兩個(gè)線程按照指定的方式有序相交?

如果現(xiàn)在我們希望 B線程在 A 線程打印 1 后立即打印 1,2,3,然后 A 線程繼續(xù)打印 2,3,那么我們需要更細(xì)粒度的鎖來(lái)控制執(zhí)行順序。

在這里,我們可以利用 object.wait() 和 object.notify() 方法,代碼如下:

public static void demo3() {
    Object lock = new Object();
    Thread A = new Thread(() -> {
        synchronized (lock) {
            System.out.println("A 1");
            try {
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("A 2");
            System.out.println("A 3");
        }
    });

    Thread B = new Thread(() -> {
        synchronized (lock) {
            System.out.println("B 1");
            System.out.println("B 2");
            System.out.println("B 3");
            lock.notify();
        }
    });

    A.start();
    B.start();
}

得到的結(jié)果如下:

A 1
B 1
B 2
B 3
A 2
A 3

上述代碼的執(zhí)行流程如下:

  1. 首先我們創(chuàng)建一個(gè)由 A 和 B 共享的對(duì)象鎖: lock = new Object();

  2. 當(dāng)A拿到鎖時(shí),先打印1,然后調(diào)用lock.wait()方法進(jìn)入等待狀態(tài),然后交出鎖的控制權(quán);

  3. B 不會(huì)被執(zhí)行,直到 A 調(diào)用該lock.wait()方法釋放控制權(quán)并且 B 獲得鎖;

  4. B拿到鎖后打印1,2,3,然后調(diào)用lock.notify()方法喚醒正在等待的A;

  5. A 喚醒后繼續(xù)打印剩余的 2,3。

為了便于理解,我將上面的代碼添加了日志,代碼如下:

public static void demo3() {
    Object lock = new Object();
    Thread A = new Thread(() -> {
        System.out.println("INFO:A 等待獲取鎖");
        synchronized (lock) {
            System.out.println("INFO:A 獲取到鎖");
            System.out.println("A 1");
            try {
                System.out.println("INFO:A 進(jìn)入 waiting 狀態(tài),放棄鎖的控制權(quán)");
                lock.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("INFO:A 被 B 喚醒繼續(xù)執(zhí)行");
            System.out.println("A 2");
            System.out.println("A 3");
        }
    });

    Thread B = new Thread(() -> {
        System.out.println("INFO:B 等待獲取鎖");
        synchronized (lock) {
            System.out.println("INFO:B 獲取到鎖");
            System.out.println("B 1");
            System.out.println("B 2");
            System.out.println("B 3");
            System.out.println("INFO:B 執(zhí)行結(jié)束,調(diào)用 notify 方法喚醒 A");
            lock.notify();
        }
    });

    A.start();
    B.start();
}

得到的結(jié)果如下:

INFO:A 等待獲取鎖
INFO:A 獲取到鎖
A 1
INFO:A 進(jìn)入 waiting 狀態(tài),放棄鎖的控制權(quán)
INFO:B 等待獲取鎖
INFO:B 獲取到鎖
B 1
B 2
B 3
INFO:B 執(zhí)行結(jié)束,調(diào)用 notify 方法喚醒 A
INFO:A 被 B 喚醒繼續(xù)執(zhí)行
A 2
A 3

3. 線程 D 在A、B、C都同步執(zhí)行完畢后執(zhí)行

thread.join() 前面介紹的方法允許一個(gè)線程在等待另一個(gè)線程完成運(yùn)行后繼續(xù)執(zhí)行。但是如果我們將A、B、C依次加入到D線程中,就會(huì)讓A、B、C依次執(zhí)行,而我們希望它們?nèi)齻€(gè)同步運(yùn)行。

我們要實(shí)現(xiàn)的目標(biāo)是:A、B、C三個(gè)線程可以同時(shí)開始運(yùn)行,各自獨(dú)立運(yùn)行完成后通知D;D 不會(huì)開始運(yùn)行,直到 A、B 和 C 都運(yùn)行完畢。所以我們 CountdownLatch 用來(lái)實(shí)現(xiàn)這種類型的通信。它的基本用法是:

  1. 創(chuàng)建一個(gè)計(jì)數(shù)器,并設(shè)置一個(gè)初始值, CountdownLatch countDownLatch = new CountDownLatch(3);

  2. 調(diào)用countDownLatch.await()進(jìn)入等待狀態(tài),直到計(jì)數(shù)值變?yōu)?;

  3. 在其他線程調(diào)用countDownLatch.countDown(),該方法會(huì)將計(jì)數(shù)值減一;

  4. 當(dāng)計(jì)數(shù)器的值變?yōu)?0 時(shí),countDownLatch.await()等待線程中的方法會(huì)繼續(xù)執(zhí)行下面的代碼。

實(shí)現(xiàn)代碼如下:

public static void runDAfterABC() {
    int count = 3;
    CountDownLatch countDownLatch = new CountDownLatch(count);
    new Thread(() -> {
        System.out.println("INFO: D 等待 A B C 運(yùn)行完成");
        try {
            countDownLatch.await();
            System.out.println("INFO: A B C 運(yùn)行完成,D 開始運(yùn)行");
            System.out.println("D is working");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

    for (char threadName = 'A'; threadName <= 'C' ; threadName++) {
        final String name = String.valueOf(threadName);
        new Thread(() -> {
            System.out.println(name + " is working");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " finished");
            countDownLatch.countDown();
        }).start();
    }
}

得到的結(jié)果如下:

INFO: D 等待 A B C 運(yùn)行完成
A is working
B is working
C is working
C finished
B finished
A finished
INFO: A B C 運(yùn)行完成,D 開始運(yùn)行
D is working

其實(shí)CountDownLatch它本身就是一個(gè)倒數(shù)計(jì)數(shù)器,我們把初始的count值設(shè)置為3。D運(yùn)行的時(shí)候,首先調(diào)用該countDownLatch.await()方法檢查計(jì)數(shù)器的值是否為0,如果不是0則保持等待狀態(tài). A、B、C 運(yùn)行完畢后,分別使用countDownLatch.countDown()方法將倒數(shù)計(jì)數(shù)器減1。計(jì)數(shù)器將減為 0,然后通知await()方法結(jié)束等待,D開始繼續(xù)執(zhí)行。
因此,CountDownLatch適用于一個(gè)線程需要等待多個(gè)線程的情況。

4. 三個(gè)運(yùn)動(dòng)員分開準(zhǔn)備同時(shí)開跑

這一次,A、B、C這三個(gè)線程都需要分別準(zhǔn)備,等三個(gè)線程都準(zhǔn)備好后開始同時(shí)運(yùn)行,我們應(yīng)該如何做到這一點(diǎn)?
CountDownLatch可以用來(lái)計(jì)數(shù),但完成計(jì)數(shù)的時(shí)候,只有一個(gè)線程的一個(gè)await()方法會(huì)得到響應(yīng),所以多線程不能在同一時(shí)間被觸發(fā)。為了達(dá)到線程相互等待的效果,我們可以使用該CyclicBarrier,其基本用法為:

  1. 首先創(chuàng)建一個(gè)公共對(duì)象CyclicBarrier,并設(shè)置同時(shí)等待的線程數(shù),CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

  2. 這些線程同時(shí)開始準(zhǔn)備,準(zhǔn)備好后,需要等待別人準(zhǔn)備好,所以調(diào)用cyclicBarrier.await()方法等待別人;

  3. 當(dāng)指定的需要同時(shí)等待的線程都調(diào)用了該cyclicBarrier.await()方法時(shí),意味著這些線程準(zhǔn)備好了,那么這些線程就會(huì)開始同時(shí)繼續(xù)執(zhí)行。

想象一下有三個(gè)跑步者需要同時(shí)開始跑步,所以他們需要等待其他人都準(zhǔn)備好,實(shí)現(xiàn)代碼如下:

public static void runABCWhenAllReady() {
    int count = 3;
    CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
    Random random = new Random();
    for (char threadName = 'A'; threadName <= 'C' ; threadName++) {
        final String name = String.valueOf(threadName);
        new Thread(() -> {
            int prepareTime = random.nextInt(10000);
            System.out.println(name + " 準(zhǔn)備時(shí)間:" + prepareTime);
            try {
                Thread.sleep(prepareTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(name + " 準(zhǔn)備好了,等待其他人");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(name + " 開始跑步");
        }).start();
    }
}

得到結(jié)果如下:

A 準(zhǔn)備時(shí)間:1085
B 準(zhǔn)備時(shí)間:7729
C 準(zhǔn)備時(shí)間:8444
A 準(zhǔn)備好了,等待其他人
B 準(zhǔn)備好了,等待其他人
C 準(zhǔn)備好了,等待其他人
C 開始跑步
A 開始跑步
B 開始跑步

CyclicBarrier 的作用就是等待多個(gè)線程同時(shí)執(zhí)行。

5. 子線程將結(jié)果返回給主線程

在實(shí)際開發(fā)中,往往我們需要?jiǎng)?chuàng)建子線程來(lái)做一些耗時(shí)的任務(wù),然后將執(zhí)行結(jié)果傳回主線程。那么如何在 Java 中實(shí)現(xiàn)呢?

一般在創(chuàng)建線程的時(shí)候,我們會(huì)把 Runnable 對(duì)象傳遞給 Thread 執(zhí)行,Runable 的源碼如下:

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

可以看到 Runable 是一個(gè)函數(shù)式接口,該接口中的 run 方法沒(méi)有返回值,那么如果要返回結(jié)果,可以使用另一個(gè)類似的接口 Callable。

函數(shù)式接口:只有一個(gè)方法的接口

Callable 接口的源碼如下:

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

可以看出,最大的區(qū)別Callable在于它返回的是泛型。

那么接下來(lái)的問(wèn)題是,如何將子線程的結(jié)果傳回去呢?Java 有一個(gè)類,F(xiàn)utureTask,它可以與 一起工作Callable,但請(qǐng)注意,get用于獲取結(jié)果的方法會(huì)阻塞主線程。FutureTask 本質(zhì)上還是一個(gè) Runnable,所以可以直接傳到 Thread 中。

Java中怎么實(shí)現(xiàn)線程間的通信功能

比如我們想讓子線程計(jì)算1到100的總和,并將結(jié)果返回給主線程,代碼如下:

public static void getResultInWorker() {
    Callable<Integer> callable = () -> {
        System.out.println("子任務(wù)開始執(zhí)行");
        Thread.sleep(1000);
        int result = 0;
        for (int i = 0; i <= 100; i++) {
            result += i;
        }
        System.out.println("子任務(wù)執(zhí)行完成并返回結(jié)果");
        return result;
    };
    FutureTask<Integer> futureTask = new FutureTask<>(callable);
    new Thread(futureTask).start();

    try {
        System.out.println("開始執(zhí)行 futureTask.get()");
        Integer result = futureTask.get();
        System.out.println("執(zhí)行的結(jié)果:" + result);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

得到的結(jié)果如下:

開始執(zhí)行 futureTask.get()
子任務(wù)開始執(zhí)行
子任務(wù)執(zhí)行完成并返回結(jié)果
執(zhí)行的結(jié)果:5050

可以看出在主線程調(diào)用futureTask.get()方法時(shí)阻塞了主線程;然后Callable開始在內(nèi)部執(zhí)行并返回操作的結(jié)果;然后futureTask.get()得到結(jié)果,主線程恢復(fù)運(yùn)行。

在這里我們可以了解到,F(xiàn)utureTask和Callable可以直接在主線程中獲取子線程的結(jié)果,但是它們會(huì)阻塞主線程。當(dāng)然,如果你不希望阻塞主線程,可以考慮使用ExecutorService把FutureTask到線程池來(lái)管理執(zhí)行。

“Java中怎么實(shí)現(xiàn)線程間的通信功能”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI