溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java并發(fā)編程之CAS的案例分析

發(fā)布時(shí)間:2020-08-25 09:33:53 來源:億速云 閱讀:196 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹Java并發(fā)編程之CAS的案例分析,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

在Java并發(fā)編程的世界里,synchronized 和 Lock 是控制多線程并發(fā)環(huán)境下對(duì)共享資源同步訪問的兩大手段。其中 Lock 是 JDK 層面的鎖機(jī)制,是輕量級(jí)鎖,底層使用大量的自旋+CAS操作實(shí)現(xiàn)的。

那什么是CAS呢?CAS,compare and swap,即比較并交換,什么是比較并交換呢?在Lock鎖的理念中,采用的是一種樂觀鎖的形式,即多線程去修改共享資源時(shí),不是在修改之前就加鎖,而是樂觀的認(rèn)為沒有別的線程和自己爭(zhēng)鎖,就是通過CAS的理念去保障共享資源的安全性的。CAS的基本思想是,拿變量的原值和內(nèi)存中的值進(jìn)行比較,如果相同,則原值沒有被修改過,那么就將原值修改為新值,這兩步是原子的,能夠保證同一時(shí)間只有一個(gè)線程修改成功。這就是CAS的理念。

Java中要想使用CAS原子的修改某值,怎么做呢?幸運(yùn)的是Java提供了這樣的API,就是在sun.misc.Unsafe.java類中。Unsafe,中文名不安全的,也被稱為魔術(shù)類,魔法類。

Unsafe類介紹

Unsafe類使Java擁有了像C語言的指針一樣操作內(nèi)存空間的能力,一旦能夠直接操作內(nèi)存,這也就意味著

(1)不受JVM管理,意思就是使用Unsafe操作內(nèi)存無法被JVM GC,需要我們手動(dòng)GC,稍有不慎就會(huì)出現(xiàn)內(nèi)存泄漏。
(2)Unsafe的不少方法中必須提供原始地址(內(nèi)存地址)和被替換對(duì)象的地址,并且偏移量要自己計(jì)算(其提供的有計(jì)算偏移量的方法),所以一旦出現(xiàn)問題就是JVM崩潰級(jí)別的異常,會(huì)導(dǎo)致整個(gè)JVM實(shí)例崩潰,表現(xiàn)為應(yīng)用程序直接crash掉。
(3)直接操作內(nèi)存,所以速度更快,在高并發(fā)的條件之下能夠很好地提高效率。

因此,從上面三個(gè)角度來看,雖然在一定程度上提升了效率但是也帶來了指針的不安全性。這也是它被取名為Unsafe的原因吧。

下面我們深入到源碼中看看,提供了什么方法直接操作內(nèi)存。

打開Unsafe這個(gè)類,我們會(huì)發(fā)現(xiàn)里面有大量的被native關(guān)鍵字修飾的方法,這意味著這些方法是C語言提供的實(shí)現(xiàn),底層調(diào)的是C語言的庫函數(shù),我們無法直接看到他的源碼實(shí)現(xiàn),需要去從OpenJDK去看了。另外還有一些基于native方法封裝的其他方法,整個(gè)Unsafe中的方法大致可以歸結(jié)為以下幾類:

(1)初始化操作
(2)操作對(duì)象屬性
(3)操作數(shù)組元素
(4)線程掛起和恢復(fù)
(5)CAS機(jī)制

CAS的使用

如果你學(xué)過java并發(fā)編程的話,稍微閱讀過JUC并發(fā)包里面的源碼的話,對(duì)這個(gè)Unsafe類一定不陌生,因?yàn)檎麄€(gè)java并發(fā)包底層實(shí)現(xiàn)的核心就是靠它。JUC并發(fā)包中主要使用它提供的CAS(compare and swap,比較并交換)操作,原子的修改鎖的狀態(tài)和一些隊(duì)列元素。

沒看過JUC源碼的讀者也不用擔(dān)心,今天我們就是簡(jiǎn)單介紹Unsafe類中的CAS操作,那么我們接下來就會(huì)通過一個(gè)簡(jiǎn)單的例子來看看Unsafe的CAS是怎么使用的。

首先,使用這個(gè)類我們第一個(gè)要做的事情就是拿到這個(gè)類的實(shí)例,下面我們自定義了一個(gè)Util類用來獲取Unsafe的實(shí)例

import sun.misc.Unsafe;
import java.lang.reflect.Field;

public class UnsafeUtil {
 public static Unsafe reflectGetUnsafe() {
  try {
   Field field = Unsafe.class.getDeclaredField("theUnsafe");
   field.setAccessible(true);
   return (Unsafe) field.get(null);
  } catch (Exception e) {
   e.printStackTrace();
  }
  return null;
 }
}

這個(gè)工具類通過反射的方式拿到Unsafe類中的一個(gè)名為theUnsafe字段,該字段是Unsafe類型,并在static塊中new一個(gè)Unsafe對(duì)象初始化這個(gè)字段(單例模式)。

然后我們定義了一個(gè)AtomicState類,這個(gè)類很簡(jiǎn)單,有一個(gè)int型的state字段,還有一個(gè)Unsafe的常量,以及int型的offsetState,用來記錄state字段在AtomicState對(duì)象中的偏移量。具體代碼如下:

import com.walking.juc.util.UnsafeUtil;
import sun.misc.Unsafe;
public class AtomicState {
 private volatile int state = 0;
 public int getState() {
  return state;
 }

 private static final Unsafe UNSAFE = UnsafeUtil.reflectGetUnsafe();
 private static final long offsetState;
 static {
  try {
   offsetState = UNSAFE.objectFieldOffset(AtomicState.class.getDeclaredField("state"));
  } catch (NoSuchFieldException e) {
   throw new Error(e);
  }
 }
 public final boolean compareAndSetState(int oldVal, int newVal) {
  return UNSAFE.compareAndSwapInt(this, offsetState, oldVal, newVal);
 }
}

我們定義了一個(gè)compareAndSetState方法,需要傳兩個(gè)參數(shù),分別是state的舊值和新值,也就是讀到的state的之前的值,以及想要把它修改成什么值,該方法內(nèi)部調(diào)用的是Unsafe類的compareAndSwapInt方法,它有四個(gè)參數(shù),分別是要修改的類實(shí)例對(duì)象、要修改的值的偏移量、舊值、新值。解釋一下偏移量,剛才我們提到Unsafe提供給我們直接訪問內(nèi)存的能力,那么訪問內(nèi)存肯定是要知道內(nèi)存的地址在哪才能去修改其相應(yīng)的值吧,我們看,第一個(gè)參數(shù)是對(duì)象實(shí)例引用,也就是說,已經(jīng)知道這個(gè)對(duì)象的地址了,那么我們想修改這個(gè)對(duì)象里的state的值,就只需要計(jì)算出state在這個(gè)對(duì)象的偏移量就能找到state所在的內(nèi)存地址,那就可以修改它了。

然后,我們通過一個(gè)測(cè)試類來驗(yàn)證Unsafe的CAS操作。這個(gè)測(cè)試類我來解釋下大致的思想,我們弄5個(gè)線程,讓這個(gè)5個(gè)線程一個(gè)個(gè)啟動(dòng),我們無法保證線程同時(shí)開始啟動(dòng),那么我們有辦法保證這個(gè)5個(gè)線程同時(shí)執(zhí)行我們的代碼,就是使用JUC包里的CyclicBarrier工具來實(shí)現(xiàn)的,這個(gè)工具初始化時(shí)需要傳入一個(gè)int值n,我們?cè)诰€程的run方法內(nèi)部在業(yè)務(wù)代碼執(zhí)行之前調(diào)用CyclicBarrier的await方法,當(dāng)指定數(shù)量n的線程都調(diào)用了這個(gè)方法那么這n個(gè)線程將同時(shí)往下執(zhí)行,就像設(shè)置了一個(gè)屏障,所有人都達(dá)到這個(gè)屏障后,一起通過屏障,依次來模擬多線程并發(fā)

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@Slf4j
public class TestAtomicState {

 static int tNum = 5;//線程數(shù) 我們開10個(gè)線程模擬多線程并發(fā)
 static CyclicBarrier cyclicBarrier = new CyclicBarrier(tNum);//柵欄
 static CountDownLatch countDownLatch = new CountDownLatch(tNum);//計(jì)數(shù)器
 static AtomicState atomicState = new AtomicState();

 public static void main(String[] args) throws InterruptedException {
  for (int i = 1; i <= tNum; i++) {
   new Thread(new MyTask(),"t-"+i).start();
  }
  countDownLatch.await();//為的是讓主線程在這句阻塞住,等待所有線程執(zhí)行完畢(計(jì)數(shù)器減到0)再往下走
  log.info("state最后的值:" + atomicState.getState());
 }

 static class MyTask implements Runnable{
  @Override
  public void run() {
   try {
    log.info(Thread.currentThread().getName() + "到達(dá)起跑線");
    String name = Thread.currentThread().getName();
    String substring = name.substring(name.indexOf("-") + 1);
    int i1 = Integer.parseInt(substring);

    cyclicBarrier.await();//設(shè)置一個(gè)屏障,所有線程達(dá)到這后開始一起往下執(zhí)行 模擬并發(fā)
    boolean b = atomicState.compareAndSetState(0, i1);
    if (b) {
     log.info("修改成功,tName:{}" ,Thread.currentThread().getName());
    } else {
     log.info("修改失敗,tName:{}" ,Thread.currentThread().getName());
    }
   } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
   } finally {
    countDownLatch.countDown();//線程執(zhí)行完畢計(jì)數(shù)器減一
   }
  }
 }
}

在cyclicBarrier.await();之后我們調(diào)用AtomicState的compareAndSetState方法傳入舊值0和新值,新值就是線程名t-n中的n,哪個(gè)線程修改成功,最后state值就是線程名中的數(shù)字。
至于CountDownLatch使用它的目的是讓mian線程等到t-1到t-5的線程全部執(zhí)行完后打印state的值。我們的重點(diǎn)不是CyclicBarrier和CountDownLatch,知道它們是干什么的就行。

然后我們運(yùn)行這個(gè)測(cè)試程序:

13:57:46.619 [t-2] INFO com.walking.castest.TestAtomicState - t-2到達(dá)起跑線
13:57:46.619 [t-3] INFO com.walking.castest.TestAtomicState - t-3到達(dá)起跑線
13:57:46.619 [t-5] INFO com.walking.castest.TestAtomicState - t-5到達(dá)起跑線
13:57:46.619 [t-1] INFO com.walking.castest.TestAtomicState - t-1到達(dá)起跑線
13:57:46.619 [t-4] INFO com.walking.castest.TestAtomicState - t-4到達(dá)起跑線
13:57:46.628 [t-1] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-1
13:57:46.628 [t-4] INFO com.walking.castest.TestAtomicState - 修改成功,tName:t-4
13:57:46.628 [t-2] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-2
13:57:46.628 [t-5] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-5
13:57:46.628 [t-3] INFO com.walking.castest.TestAtomicState - 修改失敗,tName:t-3
13:57:46.636 [main] INFO com.walking.castest.TestAtomicState - state最后的值:4

可以看到只有一個(gè)線程執(zhí)行成功,這就是CAS的基本使用。

CAS的ABA問題

何為ABA問題呢?舉個(gè)例子,小明和小花合伙賣煎餅,不就后攢了10萬元,他們一起去銀行把錢存在他們公共的賬戶里,但是小明聽說最近牛市來了,就偷偷的把錢轉(zhuǎn)移到了股票市場(chǎng),公共賬戶余額是0。1個(gè)月后股票賺了一筆錢,然后小明把之前轉(zhuǎn)移的10萬元又存到他們的公共賬戶。小明和小花一個(gè)月后又去存錢,去查賬戶余額是10萬。這就是ABA問題,簡(jiǎn)單來說就是一個(gè)值本來是A,兩個(gè)線程同時(shí)都看到是A,然后線程1把A改成B后又改成A,線程1結(jié)束了。然后線程2去修改時(shí),看到的是A,無法感知到這個(gè)過程中值發(fā)生過變化,對(duì)于線程2來說就發(fā)生了ABA的問題。

模擬ABA問題:

import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class CAS_ABA_Stampe {
 static AtomicInteger atomicInteger = new AtomicInteger(10);
 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   try {
    log.info("{}拿到state的值為:{}", Thread.currentThread().getName(), atomicInteger.get());
    log.info("{}第一次修改", Thread.currentThread().getName());
    atomicInteger.getAndSet(0);
    Thread.sleep(2000);
    log.info("{}第二次修改", Thread.currentThread().getName());
    atomicInteger.getAndSet(10);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }, "t1");
  t1.start();

  Thread t2 = new Thread(() -> {
   try {
    log.info("{}第一次拿到state的值為:{}", Thread.currentThread().getName(), atomicInteger.get());
    Thread.sleep(2500);
    log.info("{}第二次拿到state的值為:{}", Thread.currentThread().getName(), atomicInteger.get());
    log.info("{}開始修改state的值為2", Thread.currentThread().getName());
    atomicInteger.getAndSet(20);
    log.info("{}修改成功", Thread.currentThread().getName());
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }, "t2");
  t2.start();
  t1.join();
  t2.join();
  log.info("最終state的值:{}", atomicInteger.get());
 }
}

//結(jié)果t2也能修改成功,并沒有發(fā)現(xiàn)這種變化
15:12:35.999 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1拿到state的值為:10
15:12:35.999 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2第一次拿到state的值為:10
15:12:36.014 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1第一次修改
15:12:38.015 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1第二次修改
15:12:38.515 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2第二次拿到state的值為:10
15:12:38.515 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2開始修改state的值為2
15:12:38.516 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2修改成功
15:12:38.516 [main] INFO com.walking.castest.CAS_ABA_Stampe - 最終state的值:20

怎么解決CAS的ABA問題呢?

那就是基于版本號(hào)去解決,增加一個(gè)版本號(hào)的概念,每次被修改這個(gè)版本號(hào)就加1,版本號(hào)是一直向前的,版本號(hào)變了,就說明被修改過。

JUC包中提供了解決ABA問題的工具:

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicStampedReference;

@Slf4j
public class CAS_ABA_Stampe {
 static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(10, 1);

 public static void main(String[] args) throws InterruptedException {
  Thread t1 = new Thread(() -> {
   try {
    int stamp = stampedReference.getStamp();
    int intValue = stampedReference.getReference().intValue();
    log.info("{}私挪公款拿到stamp的值為:{},余額:{}", Thread.currentThread().getName(), stamp,intValue);
    stampedReference.compareAndSet(10, 0, stamp, stamp + 1);
    Thread.sleep(2000);
    stamp = stampedReference.getStamp();
    intValue = stampedReference.getReference().intValue();
    log.info("{}還回公款拿到stamp的值為:{},余額:{}", Thread.currentThread().getName(), stamp,intValue);
    stampedReference.compareAndSet(0, 10, stamp, stamp + 1);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }, "t1");
  t1.start();

  Thread t2 = new Thread(() -> {
   try {
    int stamp = stampedReference.getStamp();
    int intValue = stampedReference.getReference().intValue();
    log.info("{}拿到stamp的值為:{},余額:{}", Thread.currentThread().getName(), stamp, intValue);
    Thread.sleep(3000);

    log.info("{}開始存款", Thread.currentThread().getName());
    if (stampedReference.compareAndSet(10, 20, stamp, stamp + 1)) {
     log.info("{}款款成功", Thread.currentThread().getName());
    }else {
     log.info("{}存款失敗,發(fā)現(xiàn)賬戶異常!!oldStamp:{},currentStamp:{}", Thread.currentThread().getName(),stamp,stampedReference.getStamp());
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }, "t2");
  t2.start();
  t1.join();
  t2.join();
  log.info("最終賬戶余額:{}W", stampedReference.getReference().intValue());
 }
}

運(yùn)行結(jié)果:

15:32:37.488 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1私挪公款拿到stamp的值為:1,余額:10
15:32:37.476 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2拿到stamp的值為:1,余額:10
15:32:39.500 [t1] INFO com.walking.castest.CAS_ABA_Stampe - t1還回公款拿到stamp的值為:2,余額:0
15:32:40.498 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2開始存款
15:32:40.498 [t2] INFO com.walking.castest.CAS_ABA_Stampe - t2存款失敗,發(fā)現(xiàn)賬戶異常!!oldStamp:1,currentStamp:3
15:32:40.498 [main] INFO com.walking.castest.CAS_ABA_Stampe - 最終賬戶余額:10W

t2存款時(shí)就發(fā)現(xiàn)賬戶異常,因?yàn)榘姹咎?hào)已經(jīng)變成了3,和t2剛開始拿到的不一樣,說明已經(jīng)被別人修改過,從而解決ABA問題。

以上是Java并發(fā)編程之CAS的案例分析的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI