溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

TransmittableThreadLocal怎么解決池化復用線程的傳值問題

發(fā)布時間:2021-10-23 17:07:31 來源:億速云 閱讀:194 作者:iii 欄目:編程語言

這篇文章主要介紹“TransmittableThreadLocal怎么解決池化復用線程的傳值問題”,在日常操作中,相信很多人在TransmittableThreadLocal怎么解決池化復用線程的傳值問題問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”TransmittableThreadLocal怎么解決池化復用線程的傳值問題”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

TransmittableThreadLocal

一般情況下,ThreadLocal都可以滿足我們的需求,當我們出現需要 在使用線程池等會池化復用線程的執(zhí)行組件情況下傳遞ThreadLocal ,

這個場景就是TransmittableThreadLocal解決的問題。

Github地址:https://github.com/alibaba/transmittable-thread-local

感興趣的可以去下載的玩一玩,接下來我們來介紹一下這個組件的神奇之處。

首先看個demo, 通過demo,我們先了解了解怎么用

demo
/**
 * ttl測試
 *
 * @author zhangyunhe
 * @date 2020-04-23 12:47
 */
public class Test {

    // 1. 初始化一個TransmittableThreadLocal,這個是繼承了InheritableThreadLocal的
    static TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>();

    // 初始化一個長度為1的線程池
    static ExecutorService poolExecutor = Executors.newFixedThreadPool(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Test test = new Test();
        test.test();
    }
    private void test() throws ExecutionException, InterruptedException {

        // 設置初始值
        local.set("天王老子");
        //!?。?! 注意:這個地方的Task是使用了TtlRunnable包裝的
        Future future = poolExecutor.submit(TtlRunnable.get(new Task("任務1")));
        future.get();

        Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任務2")));
        future2.get();
      
        System.out.println("父線程的值:"+local.get());
        poolExecutor.shutdown();
    }

    class Task implements Runnable{

        String str;
        Task(String str){
            this.str = str;
        }
        @Override
        public void run() {
            // 獲取值
            System.out.println(Thread.currentThread().getName()+":"+local.get());
            // 重新設置一波
            local.set(str);
        }
    }
}

輸出結果:

pool-1-thread-1:天王老子
pool-1-thread-1:天王老子
父線程的值:天王老子

原理分析

我們首先看一下TransmittableThreadLocal的源碼,

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
  
  // 1. 此處的holder是他的主要設計點,后續(xù)在構建TtlRunnable
  private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
            new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
                }

                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
                }
            };

    @SuppressWarnings("unchecked")
    private void addThisToHolder() {
        if (!holder.get().containsKey(this)) {
            holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
        }
    }
  
  @Override
    public final T get() {
        T value = super.get();
        if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
        return value;
    }
    /**
     * see {@link InheritableThreadLocal#set}
     */
    @Override
    public final void set(T value) {
        if (!disableIgnoreNullValueSemantics && null == value) {
            // may set null to remove value
            remove();
        } else {
            super.set(value);
            addThisToHolder();
        }
    }
    /**
     * see {@link InheritableThreadLocal#remove()}
     */
    @Override
    public final void remove() {
        removeThisFromHolder();
        super.remove();
    }

    private void superRemove() {
        super.remove();
    }
}

步驟說明:

  1. 在代碼中,作者構建了一個holder對象,這個對象是一個InheritableThreadLocal , 里面的類型是一個弱引用的WeakHashMap , 這個map的va lu就是TransmittableThreadLocal , 至于value永遠都是空的

holder里面存儲的是這個應用里面,所有關于TransmittableThreadLocal的引用。

  1. 從上面可以看到,每次get, set ,remove都會操作holder對象,這樣做的目的是為了保持TransmittableThreadLocal所有的這個引用都在holder里面存一份。

TtlRunnable

回到我們上面的代碼

Future future = poolExecutor.submit(TtlRunnable.get(new Task("任務1")));

細心的朋友可能已經發(fā)現了,我們調用了TtlRunnable 對象的get方法,下面看一下這個方法有啥作用吧

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        if (null == runnable) return null;

        if (runnable instanceof TtlEnhanced) {
            // avoid redundant decoration, and ensure idempotency
            if (idempotent) return (TtlRunnable) runnable;
            else throw new IllegalStateException("Already TtlRunnable!");
        }
  			// 重點在這里
        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
    }

看上面的代碼,細節(jié)上我們不看,我們看大致的思路, 這個地方主要就是根據傳入的runnable構建了一個TtlRunnable對象。

private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
  			//重點在這里
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}

下面這行代碼,運行到這里的時候,還是在主線程里面,調用了capture方法

this.capturedRef = new AtomicReference<Object>(capture());
capture
public static Object capture() {
    // 構建一個臨時對象,主要看captureTtlValues方法
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
  // 構建一個WeakHashMap方法,
  WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  // 在主線程里面,調用holder變量,循環(huán)獲取里面所有的key和value
  for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
    ttl2Value.put(threadLocal, threadLocal.copyValue());
  }
  // 返回出去
  return ttl2Value;
}

步驟說明:

1.調用靜態(tài)變量holder, 循環(huán)獲取里面所有的key和value, value的獲取就比較巧妙一點。

private T copyValue() {
      // 這里的get方法,調用的是父類的方法,可以在父類里面最終獲取到當前TransmittableThreadLocal所對應的value
      return copy(get());
}

2.組裝好一個WeakHashMap出去,最終就會到了我們上面的構造方法里面,針對capturedRef 的賦值操作。

run
@Override
public void run() {
  //1. 獲取到剛剛構造TtlRunnable對象的時候初始化的capturedRef對象。包含了從submit丟任務進來的時候父線程的數據
  Object captured = capturedRef.get();
  if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
    throw new IllegalStateException("TTL value reference is released after run!");
  }
	// 清除不在captured里面的key,同時在這個子線程中,對所有的ThreadLocal進行重新設置值
  Object backup = replay(captured);
  try {
    // 執(zhí)行實際的線程方法
    runnable.run();
  } finally {
    // 做好還原工作,根據backup
    restore(backup);
  }
}

private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
            WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();

      // 做好當前線程的local備份
      backup.put(threadLocal, threadLocal.get());

      // 清除數據,不在captured里面的。
      if (!captured.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }

    // 這里就是把值設置到當前線程的TransmittableThreadLocal里面。
    setTtlValuesTo(captured);

    // 一個鉤子
    doExecuteCallback(true);

    return backup;
}

到此,關于“TransmittableThreadLocal怎么解決池化復用線程的傳值問題”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI