溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

java多線(xiàn)程Callable跟Future對(duì)比

發(fā)布時(shí)間:2021-07-05 15:48:54 來(lái)源:億速云 閱讀:155 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“java多線(xiàn)程Callable跟Future對(duì)比”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

1、首先說(shuō)一下創(chuàng)建線(xiàn)程的方式

  • new Thread跟實(shí)現(xiàn)Runnable接口的弊端
    (1)、每次new Thread新建對(duì)象性能差。
    (2)、線(xiàn)程缺乏統(tǒng)一管理,可能無(wú)限制新建線(xiàn)程,相互之間競(jìng)爭(zhēng),及可能占用過(guò)多系統(tǒng)資源導(dǎo)致死機(jī)或oom。
    (3)、缺乏更多功能,如定時(shí)執(zhí)行、定期執(zhí)行、線(xiàn)程中斷。
    (4)、最大的一個(gè)弊端就是這兩種方式在執(zhí)行完任務(wù)之后無(wú)法獲取執(zhí)行結(jié)果。
    (5)、如果需要獲取執(zhí)行結(jié)果,就必須通過(guò)共享變量或者使用線(xiàn)程通信的方式來(lái)達(dá)到效果,這樣使用起來(lái)就比較麻煩。

2、Callable和Future出現(xiàn)的原因

而自從Java 1.5開(kāi)始,就提供了Callable和Future,通過(guò)它們可以在任務(wù)執(zhí)行完畢之后得到任務(wù)執(zhí)行結(jié)果。

Callable和Future介紹

(1)、Callable接口代表一段可以調(diào)用并返回結(jié)果的代碼。
(2)、Future接口表示異步任務(wù),是還沒(méi)有完成的任務(wù)給出的未來(lái)結(jié)果。
(3)、所以說(shuō)Callable用于產(chǎn)生結(jié)果,F(xiàn)uture用于獲取結(jié)果。

Callable接口使用泛型去定義它的返回類(lèi)型。Executors類(lèi)提供了一些有用的方法在線(xiàn)程池中執(zhí)行Callable內(nèi)的任務(wù)。由于Callable任務(wù)是并行的(并行就是整體看上去是并行的,其實(shí)在某個(gè)時(shí)間點(diǎn)只有一個(gè)線(xiàn)程在執(zhí)行),我們必須等待它返回的結(jié)果。

Callable與Runnable的對(duì)比

Runnable它是一個(gè)接口,在它里面只聲明了一個(gè)run()方法:

public interface Runnable {
    public abstract void run();
}

由于run()方法返回值為void類(lèi)型,所以在執(zhí)行完任務(wù)之后無(wú)法返回任何結(jié)果。

Callable位于java.util.concurrent包下,它也是一個(gè)接口,在它里面也只聲明了一個(gè)方法,只不過(guò)這個(gè)方法叫做call():

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * [@return](https://my.oschina.net/u/556800) computed result
     * [@throws](https://my.oschina.net/throws) Exception if unable to compute a result
     */
    V call() throws Exception;
}

可以看到,這是一個(gè)泛型接口,call()函數(shù)返回的類(lèi)型就是傳遞進(jìn)來(lái)的V類(lèi)型。

那么怎么使用Callable呢?

一般情況下是配合ExecutorService來(lái)使用的,在ExecutorService接口中聲明了若干個(gè)submit方法的重載版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

第一個(gè)submit方法里面的參數(shù)類(lèi)型就是Callable

暫時(shí)只需要知道Callable一般是和ExecutorService配合來(lái)使用的,具體的使用方法講在后面講述。 一般情況下我們使用第一個(gè)submit方法和第三個(gè)submit方法,第二個(gè)submit方法很少使用。

Future的介紹

Future就是對(duì)于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢(xún)是否完成、獲取結(jié)果。必要時(shí)可以通過(guò)get方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果。
Future類(lèi)位于java.util.concurrent包下,它是一個(gè)接口:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中聲明了5個(gè)方法,下面依次解釋每個(gè)方法的作用:

  • cancel:
    用來(lái)取消任務(wù),如果取消任務(wù)成功則返回true,如果取消任務(wù)失敗則返回false。參數(shù)mayInterruptIfRunning表示是否允許取消正在執(zhí)行卻沒(méi)有執(zhí)行完畢的任務(wù),如果設(shè)置true,則表示可以取消正在執(zhí)行過(guò)程中的任務(wù)。如果任務(wù)已經(jīng)完成,則無(wú)論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經(jīng)完成的任務(wù)會(huì)返回false;如果任務(wù)正在執(zhí)行,若mayInterruptIfRunning設(shè)置為true,則返回true,若mayInterruptIfRunning設(shè)置為false,則返回false;如果任務(wù)還沒(méi)有執(zhí)行,則無(wú)論mayInterruptIfRunning為true還是false,肯定返回true

  • isCancelled:
    表示任務(wù)是否被取消成功,如果在任務(wù)正常完成前被取消成功,則返回 true。

  • isDone:
    表示任務(wù)是否已經(jīng)完成,若任務(wù)完成,則返回true;

  • get():
    用來(lái)獲取執(zhí)行結(jié)果,這個(gè)方法會(huì)產(chǎn)生阻塞,會(huì)一直等到任務(wù)執(zhí)行完畢才返回。

  • get(long timeout, TimeUnit unit): 用來(lái)獲取執(zhí)行結(jié)果,如果在指定時(shí)間內(nèi),還沒(méi)獲取到結(jié)果,就直接返回null。

也就是說(shuō)Future提供了三種功能:

  • 判斷任務(wù)是否完成;

  • 能夠中斷任務(wù);

  • 能夠獲取任務(wù)執(zhí)行結(jié)果。

因?yàn)镕uture只是一個(gè)接口,所以是無(wú)法直接用來(lái)創(chuàng)建對(duì)象使用的,因此就有了下面的FutureTask。

FutureTask介紹

FutureTask實(shí)現(xiàn)了RunnableFuture接口,這個(gè)接口的定義如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {  
    void run();  
}

可以看到這個(gè)接口實(shí)現(xiàn)了Runnable和Future接口,接口中的具體實(shí)現(xiàn)由FutureTask來(lái)實(shí)現(xiàn)。這個(gè)類(lèi)的兩個(gè)構(gòu)造方法如下 :

    public FutureTask(Callable<V> callable) {  
        if (callable == null)  
            throw new NullPointerException();  
        sync = new Sync(callable);  
    }  
    public FutureTask(Runnable runnable, V result) {  
        sync = new Sync(Executors.callable(runnable, result));  
    }

如上提供了兩個(gè)構(gòu)造函數(shù),一個(gè)以Callable為參數(shù),另外一個(gè)以Runnable為參數(shù)。這些類(lèi)之間的關(guān)聯(lián)對(duì)于任務(wù)建模的辦法非常靈活,允許你基于FutureTask的Runnable特性(因?yàn)樗鼘?shí)現(xiàn)了Runnable接口),把任務(wù)寫(xiě)成Callable,然后封裝進(jìn)一個(gè)由執(zhí)行者調(diào)度并在必要時(shí)可以取消的FutureTask。

FutureTask可以由執(zhí)行者調(diào)度,這一點(diǎn)很關(guān)鍵。它對(duì)外提供的方法基本上就是Future和Runnable接口的組合:get()、cancel、isDone()、isCancelled()和run(),而run()方法通常都是由執(zhí)行者調(diào)用,我們基本上不需要直接調(diào)用它。

3、FutureTask實(shí)例

public class MyCallable implements Callable<String> {  
    private long waitTime;   
    public MyCallable(int timeInMillis){   
        this.waitTime=timeInMillis;  
    }  
    [@Override](https://my.oschina.net/u/1162528)  
    public String call() throws Exception {  
        Thread.sleep(waitTime);  
        //return the thread name executing this callable task  
        return Thread.currentThread().getName();  
    }  

}  
public class FutureTaskExample {  
     public static void main(String[] args) {  
        ExecutorService executor = Executors.newFixedThreadPool(2);          // 創(chuàng)建線(xiàn)程池并返回ExecutorService實(shí)例  
        MyCallable callable1 = new MyCallable(1000);                         // 要執(zhí)行的任務(wù)  
        MyCallable callable2 = new MyCallable(2000);  
        FutureTask<String> futureTask1 = new FutureTask<String>(callable1);// 將Callable寫(xiě)的任務(wù)封裝到一個(gè)由執(zhí)行者調(diào)度的FutureTask對(duì)象  
        FutureTask<String> futureTask2 = new FutureTask<String>(callable2);  
        executor.execute(futureTask1);  // 執(zhí)行任務(wù)  
        executor.execute(futureTask2);    


        或者執(zhí)行下面的方法
        ExecutorService executor = Executors.newFixedThreadPool(2);        // 創(chuàng)建線(xiàn)程池并返回ExecutorService實(shí)例  (單例的)
        MyCallable callable1 = new MyCallable(1000);//線(xiàn)程1
        MyCallable callable2 = new MyCallable(2000);線(xiàn)程2
        Future<String> s1 = executor.submit(callable1);//會(huì)創(chuàng)建FutureTask 并且會(huì)執(zhí)行execute方法
        Future<String> s2 = executor.submit(callable2);//會(huì)創(chuàng)建FutureTask 并且會(huì)執(zhí)行execute方法
	while (true) {  
            try {  
                if(futureTask1.isDone() && futureTask2.isDone()){//  兩個(gè)任務(wù)都完成  
                    System.out.println("Done");  
                    executor.shutdown();                          // 關(guān)閉線(xiàn)程池和服務(wù)   
                    return;  
                }  

                if(!futureTask1.isDone()){ // 任務(wù)1沒(méi)有完成,會(huì)等待,直到任務(wù)完成  
                    System.out.println("FutureTask1 output="+futureTask1.get());  
                }  

                System.out.println("Waiting for FutureTask2 to complete");  
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);  
                if(s !=null){  
                    System.out.println("FutureTask2 output="+s);  
                }  
            } catch (InterruptedException | ExecutionException e) {  
                e.printStackTrace();  
            }catch(TimeoutException e){  
                //do nothing  
            }  
        }  
    }  
}

運(yùn)行如上程序后,可以看到一段時(shí)間內(nèi)沒(méi)有輸出,因?yàn)間et()方法等待任務(wù)執(zhí)行完成然后才輸出內(nèi)容.

輸出結(jié)果如下:

FutureTask1 output=pool-1-thread-1
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
FutureTask2 output=pool-1-thread-2
Done

綜上所述,Callable和Future的出現(xiàn)是為了讓線(xiàn)程變得可以控制,并且可以返回線(xiàn)程執(zhí)行的結(jié)果

項(xiàng)目中的使用實(shí)例,多線(xiàn)程導(dǎo)入學(xué)員信息

1.創(chuàng)建線(xiàn)程池

/**
 * Created by ds on 2017/6/29.
 */
[@Component](https://my.oschina.net/u/3907912)
public class ThreadPool4ImportStudent {

    volatile private static ExecutorService instance = null;

    private ThreadPool4ImportStudent(){}

    public static ExecutorService getInstance() {
        try {
            if(instance != null){

            }else{
                Thread.sleep(300);
                synchronized (ThreadPool4ImportStudent.class) {
                    if(instance == null){
                        instance = Executors.newFixedThreadPool(10);
                    }
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return instance;
    }
}

上面的方法 創(chuàng)建了線(xiàn)程池,并且注入了相關(guān)的mapper類(lèi)

2.創(chuàng)建線(xiàn)程

public class ThreadImportStudents implements Callable<List<Integer>> {

    public static final String IMPORT_TYPE_STUDENT = "IMPORT_TYPE_STUDENT";
    public static final String IMPORT_TYPE_VIP = "IMPORT_TYPE_VIP";
    Log log_student = LogFactory.getLog("student");
    List<Integer> ids = new ArrayList<>();
    private String type;
    private Map<String, List<String>> allStudents;
    private List<ImportRowVo> students;
    private Integer userId;
    private Integer vipRecordId;
    private boolean is_custom;
    private StudentImportRecord record;

    private Date date = new Date();
	private Map<String, ImportFieldVo> importTemplateMap;
	private Integer companyId;
	private Integer schoolId;

	public ThreadImportStudents(String type, Map<String, List<String>> allStudents, Map<String, ImportFieldVo> importTemplateMap, List<ImportRowVo> students,
			Integer userId, StudentImportRecord record, Integer vipRecordId, boolean is_custom, Integer companyId, Integer schoolId) {
		this.type = type;
		this.allStudents = allStudents;
		this.importTemplateMap = importTemplateMap;
		this.students = students;
		this.userId = userId;
		this.record = record;
		this.vipRecordId = vipRecordId;
		this.is_custom = is_custom;
		this.companyId = companyId;
		this.schoolId = schoolId;
	}

    [@Override](https://my.oschina.net/u/1162528)
    public List<Integer> call() throws Exception {
    	try {
    	      return insertOrUpdate();
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
            return null;
    }
}

上面的類(lèi)是屬于批量添加學(xué)員的線(xiàn)程類(lèi),實(shí)現(xiàn)了Callable接口并重寫(xiě)了call方法,返回所有插入學(xué)員的id集合。

3、業(yè)務(wù)類(lèi)通過(guò)Future控制線(xiàn)程

List<Integer> studentIds = new ArrayList<Integer>();
List<Future<List<Integer>>> ids = new ArrayList<Future<List<Integer>>>();
Future<List<Integer>> stuId = ThreadPool4ImportStudent.getInstance().submit(new ThreadImportStudents(type, allStudents, importTemplateMap,
					student_list, userId, record, vo.getId(), is_custom, students.getCompanyId(), students.getSchoolId()));
ids.add(stuId);
for (Future<List<Integer>> id:ids) {
	try {
            List<Integer> idl=id.get();
            if(idl!=null){
                 for(Integer idi:idl){
                        if(idi==null){
                            error++;
                        }
                    }
                    studentIds.addAll(idl);
             }
	    }catch (Exception e){}
}

“java多線(xiàn)程Callable跟Future對(duì)比”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI