溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java并發(fā)編程在各主流框架中怎么應(yīng)用

發(fā)布時(shí)間:2021-11-30 14:06:24 來(lái)源:億速云 閱讀:125 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“Java并發(fā)編程在各主流框架中怎么應(yīng)用”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Java并發(fā)編程在各主流框架中怎么應(yīng)用”吧!

Java 內(nèi)存模型

JVM 規(guī)范定義了 Java 內(nèi)存模型來(lái)屏蔽掉各種操作系統(tǒng)、虛擬機(jī)實(shí)現(xiàn)廠商和硬件的內(nèi)存訪問(wèn)差異,以確保 Java 程序在所有操作系統(tǒng)和平臺(tái)上能夠達(dá)到一致的內(nèi)存訪問(wèn)效果。

工作內(nèi)存和主內(nèi)存

Java 內(nèi)存模型規(guī)定所有的變量都存儲(chǔ)在主內(nèi)存中,每個(gè)線程都有自己獨(dú)立的工作內(nèi)存,工作內(nèi)存保存了對(duì)應(yīng)該線程使用的變量的主內(nèi)存副本拷貝。 線程對(duì)這些變量的操作都在自己的工作內(nèi)存中進(jìn)行,不能直接操作主內(nèi)存和其他工作內(nèi)存中存儲(chǔ)的變量或者變量副本。線程間的變量傳遞需通過(guò)主內(nèi)存來(lái)完成,三者的關(guān)系如下圖所示。

Java并發(fā)編程在各主流框架中怎么應(yīng)用

Java 內(nèi)存操作協(xié)議

Java 內(nèi)存模型定義了 8 種操作來(lái)完成主內(nèi)存和工作內(nèi)存的變量訪問(wèn),具體如下。

Java并發(fā)編程在各主流框架中怎么應(yīng)用

?read:把一個(gè)變量的值從主內(nèi)存?zhèn)鬏數(shù)骄€程的工作內(nèi)存中,以便隨后的 load 動(dòng)作使用。?load:把從主內(nèi)存中讀取的變量值載入工作內(nèi)存的變量副本中。?use:把工作內(nèi)存中一個(gè)變量的值傳遞給 Java 虛擬機(jī)執(zhí)行引擎。?assign:把從執(zhí)行引擎接收到的變量的值賦值給工作內(nèi)存中的變量。?store:把工作內(nèi)存中一個(gè)變量的值傳送到主內(nèi)存中,以便隨后的 write 操作。?write:工作內(nèi)存?zhèn)鬟f過(guò)來(lái)的變量值放入主內(nèi)存中。?lock:把主內(nèi)存的一個(gè)變量標(biāo)識(shí)為某個(gè)線程獨(dú)占的狀態(tài)。?unlock:把主內(nèi)存中 一個(gè)處于鎖定狀態(tài)的變量釋放出來(lái),被釋放后的變量才可以被其他線程鎖定。

內(nèi)存模型三大特性

1、原子性

這個(gè)概念與事務(wù)中的原子性大概一致,表明此操作是不可分割,不可中斷的,要么全部執(zhí)行,要么全部不執(zhí)行。Java 內(nèi)存模型直接保證的原子性操作包括 read、load、use、assign、store、write、lock、unlock 這八個(gè)。

2、可見(jiàn)性

可見(jiàn)性是指當(dāng)一個(gè)線程修改了共享變量的值,其他線程能夠立即得知這個(gè)修改。 Java 內(nèi)存模型是通過(guò)在變量修改后將新值同步回主內(nèi)存,在變量讀取前從主內(nèi)存刷新變量值這種依賴主內(nèi)存作為傳遞媒介的方式來(lái)實(shí)現(xiàn)可見(jiàn)性的,無(wú)論是普通變量還是 volatile 變量都是如此,普通變量與 volatile 變量的區(qū)別是,volatile 的特殊規(guī)則保證了新值能立即同步到主內(nèi)存,以及每次使用前立即從主內(nèi)存刷新。因此,可以說(shuō) volatile 保證了多線程操作時(shí)變量的可見(jiàn)性,而普通變量則不能保證這一點(diǎn)。除了 volatile 外,synchronized 也提供了可見(jiàn)性,synchronized 的可見(jiàn)性是由 “對(duì)一個(gè)變量執(zhí)行 unlock 操作 之前,必須先把此變量同步回主內(nèi)存中(執(zhí)行 store、write 操作)” 這條規(guī)則獲得。

3、有序性

單線程環(huán)境下,程序會(huì) “有序的”執(zhí)行,即:線程內(nèi)表現(xiàn)為串行語(yǔ)義。但是在多線程環(huán)境下,由于指令重排,并發(fā)執(zhí)行的正確性會(huì)受到影響。在 Java 中使用 volatile 和 synchronized 關(guān)鍵字,可以保證多線程執(zhí)行的有序性。volatile 通過(guò)加入內(nèi)存屏障指令來(lái)禁止內(nèi)存的重排序。synchronized 通過(guò)加鎖,保證同一時(shí)刻只有一個(gè)線程來(lái)執(zhí)行同步代碼。

volatile 的應(yīng)用

打開(kāi) NioEventLoop 的代碼中,有一個(gè)控制 IO 操作 和 其他任務(wù)運(yùn)行比例的,用 volatile 修飾的 int 類型字段 ioRatio,代碼如下。

    private volatile int ioRatio = 50;

這里為什么要用 volatile 修飾呢?我們首先對(duì) volatile 關(guān)鍵字進(jìn)行說(shuō)明,然后再結(jié)合 Netty 的代碼進(jìn)行分析。

關(guān)鍵字 volatile 是 Java 提供的最輕量級(jí)的同步機(jī)制,Java 內(nèi)存模型對(duì) volatile 專門定義了一些特殊的訪問(wèn)規(guī)則。下面我們就看它的規(guī)則。當(dāng)一個(gè)變量被 volatile 修飾后,它將具備以下兩種特性。

?線程可見(jiàn)性:當(dāng)一個(gè)線程修改了被 volatile 修飾的變量后,無(wú)論是否加鎖,其他線程都可以立即看到最新的修改,而普通變量卻做不到這點(diǎn)。?禁止指令重排序優(yōu)化:普通的變量?jī)H僅保證在該方法的執(zhí)行過(guò)程中所有依賴賦值結(jié)果的地方都能獲取正確的結(jié)果,而不能保證變量賦值操作的順序與程序代碼的執(zhí)行順序一致。舉個(gè)簡(jiǎn)單的例子說(shuō)明下指令重排序優(yōu)化問(wèn)題,代碼如下。

public class ThreadStopExample {    private static boolean stop;    public static void main(String[] args) throws InterruptedException {        Thread workThread = new Thread(new Runnable() {            public void run() {                int i= 0;                while (!stop) {                    i++;                    try{                        TimeUnit.SECONDS.sleep(1);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        });        workThread.start();        TimeUnit.SECONDS.sleep(3);        stop = true;    }}

我們預(yù)期程序會(huì)在 3s 后停止,但是實(shí)際上它會(huì)一直執(zhí)行下去,原因就是虛擬機(jī)對(duì)代碼進(jìn)行了指令重排序和優(yōu)化,優(yōu)化后的指令如下。

    if (!stop)    While(true)        ......

workThread 線程在執(zhí)行重排序后的代碼時(shí),是無(wú)法發(fā)現(xiàn)變量 stop 被其它線程修改的,因此無(wú)法停止運(yùn)行。要解決這個(gè)問(wèn)題,只要將 stop 前增加 volatile 修飾符即可。volatile 解決了如下兩個(gè)問(wèn)題。第一,主線程對(duì) stop 的修改在 workThread 線程 中可見(jiàn),也就是說(shuō) workThread 線程 立即看到了其他線程對(duì)于 stop 變量 的修改。第二,禁止指令重排序,防止因?yàn)橹嘏判驅(qū)е碌牟l(fā)訪問(wèn)邏輯混亂。

一些人認(rèn)為使用 volatile 可以代替?zhèn)鹘y(tǒng)鎖,提升并發(fā)性能,這個(gè)認(rèn)識(shí)是錯(cuò)誤的。volatile 僅僅解決了可見(jiàn)性的問(wèn)題,但是它并不能保證互斥性,也就是說(shuō)多個(gè)線程并發(fā)修改某個(gè)變量時(shí),依舊會(huì)產(chǎn)生多線程問(wèn)題。因此,不能靠 volatile 來(lái)完全替代傳統(tǒng)的鎖。根據(jù)經(jīng)驗(yàn)總結(jié),volatile 最適用的場(chǎng)景是 “ 一個(gè)線程寫(xiě),其他線程讀 ”,如果有多個(gè)線程并發(fā)寫(xiě)操作,仍然需要使用鎖或者線程安全的容器或者原子變量來(lái)代替。下面我們繼續(xù)對(duì) Netty 的源碼做分析。上面講到了 ioRatio 被定義成 volatile,下面看看代碼為什么要這樣定義。

    final long ioTime = System.nanoTime() - ioStartTime;    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

通過(guò)代碼分析我們發(fā)現(xiàn),在 NioEventLoop 線程 中,ioRatio 并沒(méi)有被修改,它是只讀操作。既然沒(méi)有修改,為什么要定義成 volatile 呢?繼續(xù)看代碼,我們發(fā)現(xiàn) NioEventLoop 提供了重新設(shè)置 IO 執(zhí)行時(shí)間比例的公共方法。

    public void setIoRatio(int ioRatio) {        if (ioRatio <= 0 || ioRatio > 100) {            throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");        }        this.ioRatio = ioRatio;    }

首先,NioEventLoop 線程 沒(méi)有調(diào)用該 set 方法,說(shuō)明調(diào)整 IO 執(zhí)行時(shí)間比例 是外部發(fā)起的操作,通常是由業(yè)務(wù)的線程調(diào)用該方法,重新設(shè)置該參數(shù)。這樣就形成了一個(gè)線程寫(xiě)、一個(gè)線程讀。根據(jù)前面針對(duì) volatile 的應(yīng)用總結(jié),此時(shí)可以使用 volatile 來(lái)代替?zhèn)鹘y(tǒng)的 synchronized 關(guān)鍵字,以提升并發(fā)訪問(wèn)的性能。

ThreadLocal 的應(yīng)用及源碼解析

ThreadLocal 又稱為線程本地存儲(chǔ)區(qū)(Thread Local Storage,簡(jiǎn)稱為 TLS),每個(gè)線程都有自己的私有的本地存儲(chǔ)區(qū)域,不同線程之間彼此不能訪問(wèn)對(duì)方的 TLS 區(qū)域。使用 ThreadLocal 變量 的 set(T value) 方法 可以將數(shù)據(jù)存入該線程本地存儲(chǔ)區(qū),使用 get() 方法 可以獲取到之前存入的值。

ThreadLocal 的常見(jiàn)應(yīng)用

不使用 ThreadLocal。

public class SessionBean {    public static class Session {        private String id;        public String getId() {            return id;        }        public void setId(String id) {            this.id = id;        }    }    public Session createSession() {        return new Session();    }    public void setId(Session session, String id) {        session.setId(id);    }    public String getId(Session session) {        return session.getId();    }    public static void main(String[] args) {    //沒(méi)有使用ThreadLocal,在方法間共享session需要進(jìn)行session在方法間的傳遞        new Thread(() -> {            SessionBean bean = new SessionBean();            Session session = bean.createSession();            bean.setId(session, "susan");            System.out.println(bean.getId(session));        }).start();    }}

上述代碼中,session 需要在方法間傳遞才可以修改和讀取,保證線程中各方法操作的是一個(gè)。下面看一下使用 ThreadLocal 的代碼。

public class SessionBean {//定義一個(gè)靜態(tài)ThreadLocal變量session,就能夠保證各個(gè)線程有自己的一份,并且方法可以方便獲取,不用傳遞    private static ThreadLocal<Session> session = new ThreadLocal<>();    public static class Session {        private String id;        public String getId() {            return id;        }        public void setId(String id) {            this.id = id;        }    }    public void createSession() {        session.set(new Session());    }    public void setId(String id) {        session.get().setId(id);    }    public String getId() {        return session.get().getId();    }    public static void main(String[] args) {        new Thread(() -> {            SessionBean bean = new SessionBean();            bean.createSession();            bean.setId("susan");            System.out.println(bean.getId());        }).start();    }}

在方法的內(nèi)部實(shí)現(xiàn)中,直接可以通過(guò) session.get() 獲取到當(dāng)前線程的 session,省掉了參數(shù)在方法間傳遞的環(huán)節(jié)。

ThreadLocal 的實(shí)現(xiàn)原理

一般,類屬性中的數(shù)據(jù)是多個(gè)線程共享的,但 ThreadLocal 類型的數(shù)據(jù) 聲明為類屬性,卻可以為每一個(gè)使用它(通過(guò) set(T value)方法)的線程存儲(chǔ)線程私有的數(shù)據(jù),通過(guò)其源碼我們可以發(fā)現(xiàn)其中的原理。

public class ThreadLocal<T> {    /**     * 下面的 getMap()方法 傳入當(dāng)前線程,獲得一個(gè)ThreadLocalMap對(duì)象,說(shuō)明每一個(gè)線程維護(hù)了     * 自己的一個(gè) map,保證讀取出來(lái)的value是自己線程的。     *     * ThreadLocalMap 是ThreadLocal靜態(tài)內(nèi)部類,存儲(chǔ)value的鍵值就是ThreadLocal本身。     *     * 因此可以斷定,每個(gè)線程維護(hù)一個(gè)ThreadLocalMap的鍵值對(duì)映射Map。不同線程的Map的 key值 是一樣的,     * 都是ThreadLocal,但 value 是不同的。     */    public T get() {        Thread t = Thread.currentThread();        ThreadLocalMap map = getMap(t);        if (map != null) {            ThreadLocalMap.Entry e = map.getEntry(this);            if (e != null) {                @SuppressWarnings("unchecked")                T result = (T)e.value;                return result;            }        }        return setInitialValue();    }    public void set(T value) {        Thread t = Thread.currentThread();        ThreadLocalMap map = getMap(t);        if (map != null)            map.set(this, value);        else            createMap(t, value);    }}

ThreadLocal 在 Spring 中的使用

Spring 事務(wù)處理的設(shè)計(jì)與實(shí)現(xiàn)中大量使用了 ThreadLocal 類,比如,TransactionSynchronizationManager 維護(hù)了一系列的 ThreadLocal 變量,用于存儲(chǔ)線程私有的 事務(wù)屬性及資源。源碼如下。

/** * 管理每個(gè)線程的資源和事務(wù)同步的中心幫助程序。供資源管理代碼使用,但不供典型應(yīng)用程序代碼使用。 * * 資源管理代碼應(yīng)該檢查線程綁定的資源,如,JDBC連接 或 Hibernate Sessions。 * 此類代碼通常不應(yīng)該將資源綁定到線程,因?yàn)檫@是事務(wù)管理器的職責(zé)。另一個(gè)選項(xiàng)是, * 如果事務(wù)同步處于活動(dòng)狀態(tài),則在首次使用時(shí)延遲綁定,以執(zhí)行跨任意數(shù)量資源的事務(wù)。 */public abstract class TransactionSynchronizationManager {    /**     *  一般是一個(gè)線程持有一個(gè) 獨(dú)立的事務(wù),以相互隔離地處理各自的事務(wù)。     *  所以這里使用了很多 ThreadLocal對(duì)象,為每個(gè)線程綁定 對(duì)應(yīng)的事務(wù)屬性及資源,     *  以便后續(xù)使用時(shí)能直接獲取。     */    private static final ThreadLocal<Map<Object, Object>> resources =            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =            new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");    private static final ThreadLocal<String> currentTransactionName =            new NamedThreadLocal<String>("Current transaction name");    private static final ThreadLocal<Boolean> currentTransactionReadOnly =            new NamedThreadLocal<Boolean>("Current transaction read-only status");    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =            new NamedThreadLocal<Integer>("Current transaction isolation level");    private static final ThreadLocal<Boolean> actualTransactionActive =            new NamedThreadLocal<Boolean>("Actual transaction active");    /**     * 為當(dāng)前線程 綁定 對(duì)應(yīng)的resource資源     */    public static void bindResource(Object key, Object value) throws IllegalStateException {        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);        Assert.notNull(value, "Value must not be null");        Map<Object, Object> map = resources.get();        // 如果當(dāng)前線程的 resources中,綁定的數(shù)據(jù)map為空,則為 resources 綁定 map        if (map == null) {            map = new HashMap<Object, Object>();            resources.set(map);        }        Object oldValue = map.put(actualKey, value);        if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {            oldValue = null;        }        if (oldValue != null) {            throw new IllegalStateException("Already value [" + oldValue + "] for key [" +                    actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");        }        if (logger.isTraceEnabled()) {            logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +                    Thread.currentThread().getName() + "]");        }    }    /**     * 返回當(dāng)前線程綁定的所有資源     */    public static Map<Object, Object> getResourceMap() {        Map<Object, Object> map = resources.get();        return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap());    }}

ThreadLocal 在 Mybatis 中的使用

Mybatis 的 SqlSession 對(duì)象 也是各線程私有的資源,所以對(duì)其的管理也使用到了 ThreadLocal 類。源碼如下。

public class SqlSessionManager implements SqlSessionFactory, SqlSession {  private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<>();  public void startManagedSession() {    this.localSqlSession.set(openSession());  }  public void startManagedSession(boolean autoCommit) {    this.localSqlSession.set(openSession(autoCommit));  }  public void startManagedSession(Connection connection) {    this.localSqlSession.set(openSession(connection));  }  public void startManagedSession(TransactionIsolationLevel level) {    this.localSqlSession.set(openSession(level));  }  public void startManagedSession(ExecutorType execType) {    this.localSqlSession.set(openSession(execType));  }  public void startManagedSession(ExecutorType execType, boolean autoCommit) {    this.localSqlSession.set(openSession(execType, autoCommit));  }  public void startManagedSession(ExecutorType execType, TransactionIsolationLevel level) {    this.localSqlSession.set(openSession(execType, level));  }  public void startManagedSession(ExecutorType execType, Connection connection) {    this.localSqlSession.set(openSession(execType, connection));  }  public boolean isManagedSessionStarted() {    return this.localSqlSession.get() != null;  }  @Override  public Connection getConnection() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot get connection.  No managed session is started.");    }    return sqlSession.getConnection();  }  @Override  public void clearCache() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot clear the cache.  No managed session is started.");    }    sqlSession.clearCache();  }  @Override  public void commit() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");    }    sqlSession.commit();  }  @Override  public void commit(boolean force) {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot commit.  No managed session is started.");    }    sqlSession.commit(force);  }  @Override  public void rollback() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");    }    sqlSession.rollback();  }  @Override  public void rollback(boolean force) {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");    }    sqlSession.rollback(force);  }  @Override  public List<BatchResult> flushStatements() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot rollback.  No managed session is started.");    }    return sqlSession.flushStatements();  }  @Override  public void close() {    final SqlSession sqlSession = localSqlSession.get();    if (sqlSession == null) {      throw new SqlSessionException("Error:  Cannot close.  No managed session is started.");    }    try {      sqlSession.close();    } finally {      localSqlSession.set(null);    }  }}

J.U.C 包的實(shí)際應(yīng)用

線程池 ThreadPoolExecutor

首先通過(guò) ThreadPoolExecutor 的源碼 看一下線程池的主要參數(shù)及方法。

public class ThreadPoolExecutor extends AbstractExecutorService {    /**     * 核心線程數(shù)     * 當(dāng)向線程池提交一個(gè)任務(wù)時(shí),若線程池已創(chuàng)建的線程數(shù)小于corePoolSize,即便此時(shí)存在空閑線程,     * 也會(huì)通過(guò)創(chuàng)建一個(gè)新線程來(lái)執(zhí)行該任務(wù),直到已創(chuàng)建的線程數(shù)大于或等于corePoolSize     */    private volatile int corePoolSize;    /**     * 最大線程數(shù)     * 當(dāng)隊(duì)列滿了,且已創(chuàng)建的線程數(shù)小于maximumPoolSize,則線程池會(huì)創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。     * 另外,對(duì)于無(wú)界隊(duì)列,可忽略該參數(shù)     */    private volatile int maximumPoolSize;    /**     * 線程存活保持時(shí)間     * 當(dāng)線程池中線程數(shù) 超出核心線程數(shù),且線程的空閑時(shí)間也超過(guò) keepAliveTime時(shí),     * 那么這個(gè)線程就會(huì)被銷毀,直到線程池中的線程數(shù)小于等于核心線程數(shù)     */    private volatile long keepAliveTime;    /**     * 任務(wù)隊(duì)列     * 用于傳輸和保存等待執(zhí)行任務(wù)的阻塞隊(duì)列     */    private final BlockingQueue<Runnable> workQueue;    /**     * 線程工廠     * 用于創(chuàng)建新線程。threadFactory 創(chuàng)建的線程也是采用 new Thread() 方式,threadFactory     * 創(chuàng)建的線程名都具有統(tǒng)一的風(fēng)格:pool-m-thread-n(m為線程池的編號(hào),n為線程池中線程的編號(hào)     */    private volatile ThreadFactory threadFactory;    /**     * 線程飽和策略     * 當(dāng)線程池和隊(duì)列都滿了,再加入的線程會(huì)執(zhí)行此策略     */    private volatile RejectedExecutionHandler handler;    /**     * 構(gòu)造方法提供了多種重載,但實(shí)際上都使用了最后一個(gè)重載 完成了實(shí)例化     */    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             threadFactory, defaultHandler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              RejectedExecutionHandler handler) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), handler);    }    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }    /**     * 執(zhí)行一個(gè)任務(wù),但沒(méi)有返回值     */    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }    /**     * 提交一個(gè)線程任務(wù),有返回值。該方法繼承自其父類 AbstractExecutorService,有多種重載,這是最常用的一個(gè)。     * 通過(guò)future.get()獲取返回值(阻塞直到任務(wù)執(zhí)行完)     */    public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }    /**     * 關(guān)閉線程池,不再接收新的任務(wù),但會(huì)把已有的任務(wù)執(zhí)行完     */    public void shutdown() {        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(SHUTDOWN);            interruptIdleWorkers();            onShutdown(); // hook for ScheduledThreadPoolExecutor        } finally {            mainLock.unlock();        }        tryTerminate();    }    /**     * 立即關(guān)閉線程池,已有的任務(wù)也會(huì)被拋棄     */    public List<Runnable> shutdownNow() {        List<Runnable> tasks;        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            checkShutdownAccess();            advanceRunState(STOP);            interruptWorkers();            tasks = drainQueue();        } finally {            mainLock.unlock();        }        tryTerminate();        return tasks;    }    public boolean isShutdown() {        return ! isRunning(ctl.get());    }}

線程池執(zhí)行流程,如下圖所示。

Java并發(fā)編程在各主流框架中怎么應(yīng)用
Executors 提供的 4 種線程池

Executors 類 通過(guò) ThreadPoolExecutor 封裝了 4 種常用的線程池:CachedThreadPool,F(xiàn)ixedThreadPool,ScheduledThreadPool 和 SingleThreadExecutor。其功能如下。

1.CachedThreadPool:用來(lái)創(chuàng)建一個(gè)幾乎可以無(wú)限擴(kuò)大的線程池(最大線程數(shù)為 Integer.MAX_VALUE),適用于執(zhí)行大量短生命周期的異步任務(wù)。2.FixedThreadPool:創(chuàng)建一個(gè)固定大小的線程池,保證線程數(shù)可控,不會(huì)造成線程過(guò)多,導(dǎo)致系統(tǒng)負(fù)載更為嚴(yán)重。3.SingleThreadExecutor:創(chuàng)建一個(gè)單線程的線程池,可以保證任務(wù)按調(diào)用順序執(zhí)行。4.ScheduledThreadPool:適用于執(zhí)行 延時(shí) 或者 周期性 任務(wù)。

如何配置線程池

?CPU 密集型任務(wù)
盡量使用較小的線程池,一般為 CPU 核心數(shù)+1。因?yàn)?CPU 密集型任務(wù) 使得 CPU 使用率 很高,若開(kāi)過(guò)多的線程數(shù),會(huì)造成 CPU 過(guò)度切換。?IO 密集型任務(wù)
可以使用稍大的線程池,一般為 2*CPU 核心數(shù)。IO 密集型任務(wù) CPU 使用率 并不高,因此可以讓 CPU 在等待 IO 的時(shí)候有其他線程去處理別的任務(wù),充分利用 CPU 時(shí)間。

線程池的實(shí)際應(yīng)用

Tomcat 在分發(fā) web 請(qǐng)求時(shí)使用了線程池來(lái)處理。

BlockingQueue

核心方法
public interface BlockingQueue<E> extends Queue<E> {    // 將給定元素設(shè)置到隊(duì)列中,如果設(shè)置成功返回true, 否則返回false。如果是往限定了長(zhǎng)度的隊(duì)列中設(shè)置值,推薦使用offer()方法。    boolean add(E e);    // 將給定的元素設(shè)置到隊(duì)列中,如果設(shè)置成功返回true, 否則返回false. e的值不能為空,否則拋出空指針異常。    boolean offer(E e);    // 將元素設(shè)置到隊(duì)列中,如果隊(duì)列中沒(méi)有多余的空間,該方法會(huì)一直阻塞,直到隊(duì)列中有多余的空間。    void put(E e) throws InterruptedException;    // 將給定元素在給定的時(shí)間內(nèi)設(shè)置到隊(duì)列中,如果設(shè)置成功返回true, 否則返回false.    boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException;    // 從隊(duì)列中獲取值,如果隊(duì)列中沒(méi)有值,線程會(huì)一直阻塞,直到隊(duì)列中有值,并且該方法取得了該值。    E take() throws InterruptedException;    // 在給定的時(shí)間里,從隊(duì)列中獲取值,時(shí)間到了直接調(diào)用普通的 poll()方法,為null則直接返回null。    E poll(long timeout, TimeUnit unit)        throws InterruptedException;    // 獲取隊(duì)列中剩余的空間。    int remainingCapacity();    // 從隊(duì)列中移除指定的值。    boolean remove(Object o);    // 判斷隊(duì)列中是否擁有該值。    public boolean contains(Object o);    // 將隊(duì)列中值,全部移除,并發(fā)設(shè)置到給定的集合中。    int drainTo(Collection<? super E> c);    // 指定最多數(shù)量限制將隊(duì)列中值,全部移除,并發(fā)設(shè)置到給定的集合中。    int drainTo(Collection<? super E> c, int maxElements);}
主要實(shí)現(xiàn)類

?ArrayBlockingQueue

基于數(shù)組的阻塞隊(duì)列實(shí)現(xiàn),在 ArrayBlockingQueue 內(nèi)部,維護(hù)了一個(gè)定長(zhǎng)數(shù)組,以便緩存隊(duì)列中的數(shù)據(jù)對(duì)象,這是一個(gè)常用的阻塞隊(duì)列,除了一個(gè)定長(zhǎng)數(shù)組外,ArrayBlockingQueue 內(nèi)部還保存著兩個(gè)整形變量,分別標(biāo)識(shí)著隊(duì)列的頭部和尾部在數(shù)組中的位置。
ArrayBlockingQueue 在生產(chǎn)者放入數(shù)據(jù) 和 消費(fèi)者獲取數(shù)據(jù)時(shí),都是共用同一個(gè)鎖對(duì)象,由此也意味著兩者無(wú)法真正并行運(yùn)行,這點(diǎn)尤其不同于 LinkedBlockingQueue。ArrayBlockingQueue 和 LinkedBlockingQueue 間還有一個(gè)明顯的不同之處在于,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成一個(gè)額外的 Node 對(duì)象。這在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對(duì)于 GC 的影響還是存在一定的區(qū)別。而在創(chuàng)建 ArrayBlockingQueue 時(shí),我們還可以控制對(duì)象的內(nèi)部鎖是否采用公平鎖,默認(rèn)采用非公平鎖。

?LinkedBlockingQueue

基于鏈表的阻塞隊(duì)列,同 ArrayListBlockingQueue 類似,其內(nèi)部也維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由一個(gè)鏈表構(gòu)成),當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊(duì)列內(nèi)部,而生產(chǎn)者立即返回;只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到最大值緩存容量時(shí)(LinkedBlockingQueue 可以通過(guò)構(gòu)造函數(shù)指定該值),才會(huì)阻塞生產(chǎn)者隊(duì)列,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理。而 LinkedBlockingQueue 之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者端和消費(fèi)者端分別采用了獨(dú)立的鎖來(lái)控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。
需要注意的是,如果構(gòu)造一個(gè) LinkedBlockingQueue 對(duì)象,而沒(méi)有指定其容量大小,LinkedBlockingQueue 會(huì)默認(rèn)一個(gè)類似無(wú)限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費(fèi)者的速度,也許還沒(méi)有等到隊(duì)列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。

?PriorityBlockingQueue

基于優(yōu)先級(jí)的阻塞隊(duì)列(優(yōu)先級(jí)的判斷通過(guò)構(gòu)造函數(shù)傳入的 Compator 對(duì)象來(lái)決定),但需要注意的是 PriorityBlockingQueue 并不會(huì)阻塞數(shù)據(jù)生產(chǎn)者,而只會(huì)在沒(méi)有可消費(fèi)的數(shù)據(jù)時(shí),阻塞數(shù)據(jù)的消費(fèi)者。因此使用的時(shí)候要特別注意,生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對(duì)不能快于消費(fèi)者消費(fèi)數(shù)據(jù)的速度,否則時(shí)間一長(zhǎng),會(huì)最終耗盡所有的可用堆內(nèi)存空間。在實(shí)現(xiàn) PriorityBlockingQueue 時(shí),內(nèi)部控制線程同步的鎖采用的是公平鎖。

CAS 指令和原子類(應(yīng)用比較多的就是計(jì)數(shù)器)

互斥同步最主要的問(wèn)題就是進(jìn)行線程阻塞和喚醒所帶來(lái)的性能的額外損耗,因此這種同步被稱為阻塞同步,它屬于一種悲觀的并發(fā)策略,我們稱之為悲觀鎖。隨著硬件和操作系統(tǒng)指令集的發(fā)展和優(yōu)化,產(chǎn)生了非阻塞同步,被稱為樂(lè)觀鎖。簡(jiǎn)單地說(shuō),就是先進(jìn)行操作,操作完成之后再判斷操作是否成功,是否有并發(fā)問(wèn)題,如果有則進(jìn)行失敗補(bǔ)償,如果沒(méi)有就算操作成功,這樣就從根本上避免了同步鎖的弊端。

目前,在 Java 中應(yīng)用最廣泛的非阻塞同步就是 CAS。從 JDK1.5 以后,可以使用 CAS 操作,該操作由 sun.misc.Unsafe 類里的 compareAndSwapInt() 和 compareAndSwapLong() 等方法實(shí)現(xiàn)。通常情況下 sun.misc.Unsafe 類 對(duì)于開(kāi)發(fā)者是不可見(jiàn)的,因此,JDK 提供了很多 CAS 包裝類 簡(jiǎn)化開(kāi)發(fā)者的使用,如 AtomicInteger。使用 Java 自帶的 Atomic 原子類,可以避免同步鎖帶來(lái)的并發(fā)訪問(wèn)性能降低的問(wèn)題,減少犯錯(cuò)的機(jī)會(huì)。

到此,相信大家對(duì)“Java并發(fā)編程在各主流框架中怎么應(yīng)用”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI