您好,登錄后才能下訂單哦!
如何進(jìn)行ThreadLocal源碼分析,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。
ThreadLocal是線程本地變量,ThreadLocal為每一個(gè)線程創(chuàng)建一個(gè)單獨(dú)的變量副本ThreadLocalMap,所以每個(gè)線程修改自己變量副本不會(huì)影響其它的線程。區(qū)別于線程同步,我們知道線程同步是為了解決多線程下共享變量的安全問題,而ThreadLocal是為了解決線程內(nèi)部數(shù)據(jù)傳遞問題。一個(gè)線程內(nèi)部可以有多個(gè)ThreadLocal,但是它門維護(hù)線程的同一個(gè)ThreadLocalMap變量,共用同一個(gè)Entry數(shù)組。
ThreadLocal數(shù)據(jù)結(jié)構(gòu):
每個(gè)線程內(nèi)部有一個(gè)ThreadLocalMap屬性,ThreadLocal通過維護(hù)該屬性來保證單個(gè)線程內(nèi)部數(shù)據(jù)共享。ThreadLocalMap內(nèi)部有一個(gè)entry數(shù)組,該數(shù)組是key,value型結(jié)構(gòu),key為當(dāng)前ThreadLocal的弱引用,value用于存放具體的值,類型為一個(gè)泛型結(jié)構(gòu),支持各種數(shù)據(jù)變量。ThredLocalMap內(nèi)Entry數(shù)組的下標(biāo)值也是通過 key.threadLocalHashCode & (數(shù)組長(zhǎng)度 - 1)來確定的,只不過這個(gè)threadLocalHashCode 是通過AutomicLong每次遞增0x61c88647來確定的,這可以盡量減少hash碰撞。不同于HashMap,ThreadLocalMap內(nèi)部只維護(hù)了一個(gè)Entry數(shù)組,所以當(dāng)發(fā)生hash沖突的時(shí)候,ThreadLocalMap會(huì)將發(fā)生hash沖突的Entry放在當(dāng)前key對(duì)應(yīng)數(shù)組下標(biāo)后面第一個(gè)為空的數(shù)組槽位內(nèi)。ThreadLocal的擴(kuò)容閾值默認(rèn)為數(shù)組大小的 2/3。因?yàn)镋ntry的key為當(dāng)前threadlcoal的弱引用,所以在發(fā)生gc的時(shí)候容易導(dǎo)致key被回收,但是此時(shí)value為強(qiáng)引用,所以這種情況會(huì)導(dǎo)致內(nèi)存溢出。但是,當(dāng)我們調(diào)用threadlocal的set,get,remove方法的時(shí)候,ThreadLocalMap內(nèi)都會(huì)發(fā)生回收過期key的操作,不過這種回收是一種抽樣回收,可能并不能回收所有的過期key。而且在執(zhí)行set方法回收的時(shí)候,可能發(fā)生擴(kuò)容,這時(shí)候的擴(kuò)容判斷是當(dāng)前數(shù)組的長(zhǎng)度的1/2。Entry數(shù)組默認(rèn)初始化長(zhǎng)度為16。
public class ThreadLocalTest { private static final ThreadLocal<String> threadLocal = new ThreadLocal(); private static String str = null; public static void print1() { System.out.println("打印方法1輸出:" + threadLocal.get()); } public static void print2() { System.out.println("打印方法2輸出:" + str); } public static void main(String[] args) { //線程1 new Thread(() -> { threadLocal.set("線程1設(shè)置的str1"); str = "線程1設(shè)置的str2"; //睡5秒鐘 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //睡5秒鐘后打印,此時(shí)第2個(gè)線程早已執(zhí)行完 print1(); print2(); }).start(); //線程2 new Thread(() -> { threadLocal.set("線程2設(shè)置的str1"); str = "線程2設(shè)置的str2"; //直接打印 print1(); print2(); }).start(); }}
運(yùn)行結(jié)果:
打印方法1輸出:線程2設(shè)置的str1打印方法2輸出:線程2設(shè)置的str2打印方法1輸出:線程1設(shè)置的str1打印方法2輸出:線程2設(shè)置的str2
根據(jù)運(yùn)行結(jié)果分析出,使用ThreadLocal的存儲(chǔ)的變量在多線程不存在線程安全問題,常規(guī)創(chuàng)建的屬性在多線程下存在線程安全問題。
ThreadLocal中使用了斐波那契散列法,來保證哈希表的離散度。可以保證 nextHashCode 生成的哈希值,均勻的分布在 2 的冪次方上。具體的數(shù)學(xué)問題不在這里深究。
private final int threadLocalHashCode = nextHashCode();private static AtomicInteger nextHashCode = new AtomicInteger();//十進(jìn)制1640531527=0.618*2^32,這個(gè)值是黃金分割率*2^32private static final int HASH_INCREMENT = 0x61c88647;//每次調(diào)用該方法,hashcode值就會(huì)遞增HASH_INCREMENTprivate static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT);}
//用于計(jì)算數(shù)組下標(biāo)的值,table.length - 1轉(zhuǎn)二進(jìn)制有N個(gè)1,那么//key.threadLocalHashCode & (table.length - 1)的值就是threadLocalHashCode的低N位int i = key.threadLocalHashCode & (table.length - 1);
public void set(T value) { //獲取當(dāng)前線程 Thread t = Thread.currentThread(); //根據(jù)當(dāng)前線程獲取ThreadLocalMap ThreadLocalMap map = getMap(t); //如果map為空則創(chuàng)建一個(gè),否則設(shè)置屬性值 if (map != null) //key為當(dāng)前thread的引用則設(shè)置該值 map.set(this, value); else //map為空則創(chuàng)建當(dāng)前線程的ThreadMap并和當(dāng)前線程綁定 createMap(t, value);}
private void set(ThreadLocal<?> key, Object value) { //將初始化后的當(dāng)前數(shù)組賦值給臨時(shí)數(shù)組tab Entry[] tab = table; //獲取當(dāng)前臨時(shí)tab數(shù)組長(zhǎng)度 int len = tab.length; //計(jì)算當(dāng)前key對(duì)應(yīng)的數(shù)組下標(biāo) int i = key.threadLocalHashCode & (len-1); //從當(dāng)前下標(biāo)開始循環(huán)往后遍歷,如果當(dāng)前數(shù)組槽為空,則直接跳出循環(huán),如果不為空,則進(jìn)行key的判斷 //因?yàn)門hreadLocalMap的結(jié)構(gòu)只是數(shù)組,沒有鏈表,當(dāng)key發(fā)生沖突, //不同的key定位到相同的數(shù)組下標(biāo)的時(shí)候,會(huì)往后尋找第一個(gè)下標(biāo)為null //的槽或者第一個(gè)key位過期key的槽,并將entry放入并賦值 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { //對(duì)應(yīng)下標(biāo)為i的槽位為空的時(shí)候才會(huì)走到循環(huán)里面的邏輯 //獲取key ThreadLocal<?> k = e.get(); //CASE1:如果key相同,替換value并跳出循環(huán) if (k == key) { e.value = value; return; } //CASE2:如果key為空,說明key已經(jīng)過期了,當(dāng)前下標(biāo)對(duì)應(yīng)的槽可以被使用 if (k == null) { //替換過期key的邏輯 replaceStaleEntry(key, value, i); return; } } //如果當(dāng)前下標(biāo)下的數(shù)組槽為空,占用該槽位并賦值 tab[i] = new Entry(key, value); //遞增數(shù)組大小 int sz = ++size; //沒有清理到數(shù)據(jù),且size大小達(dá)到了擴(kuò)容閾值 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash();}
給當(dāng)前key找數(shù)組槽位的時(shí)候,找到的下標(biāo)對(duì)應(yīng)的key為過期的key的時(shí)候,執(zhí)行替換操作
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { //數(shù)組列表 Entry[] tab = table; //數(shù)組長(zhǎng)度 int len = tab.length; //臨時(shí)變量 Entry e; //需要清理的數(shù)據(jù)的開始下標(biāo),默認(rèn)為當(dāng)前staleSlot int slotToExpunge = staleSlot; //從當(dāng)前staleSlot向前查找,找對(duì)應(yīng)數(shù)組槽下的entry,直到碰到空的槽則退出循環(huán) for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) //如果在查找過程中,碰到key為過期key的情況,更新需要清理的數(shù)據(jù)的開始下標(biāo) if (e.get() == null) slotToExpunge = i; //從當(dāng)前staleSlot向后查找,找對(duì)應(yīng)數(shù)組槽下的entry,直到碰到空的槽則退出循環(huán) for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { //獲取當(dāng)前元素的key ThreadLocal<?> k = e.get(); //如果key相同,則替換value,遷移數(shù)據(jù)位置 if (k == key { e.value = value; //將過期的tab[staleSlot]放到找到的i下標(biāo)下 tab[i] = tab[staleSlot]; //當(dāng)前staleSlot下標(biāo)下的槽替換為當(dāng)前的entry,數(shù)據(jù)的位置被優(yōu)化了 tab[staleSlot] = e; //條件成立說明向前過程中并沒有找到過期的key if (slotToExpunge == staleSlot) //修改需要清理數(shù)據(jù)的開始下標(biāo)為替換數(shù)據(jù)后的下標(biāo) slotToExpunge = i; //清理數(shù)據(jù) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } //k==null說明循環(huán)過程中未找到匹配的key //slotToExpunge == staleSlot說明向前遍歷過程中未找到過期的key if (k == null && slotToExpunge == staleSlot) //可以將循環(huán)向后查找的i指向slotToExpunge,因?yàn)樵谙蚝蟛檎业倪^程中沒有找到相同的key //該段期間沒必要處理了 slotToExpunge = i; } //走到這里說明循環(huán)向后查找的過程中,沒有找到相同的key //直接使用當(dāng)前下標(biāo)并賦值 tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); //條件成立,說明在向前向后遍歷中,slotToExpunge被改變了 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);}
為什么有while ( (n >>>= 1) != 0),這樣不是可能清理不了所有數(shù)據(jù)嗎?是的,ThreadLocal的設(shè)計(jì)行就是部分清除,類似于抽樣,避免清理所有影響性能。
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; //執(zhí)行清理,可能會(huì)遷移數(shù)據(jù) i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed;}
擴(kuò)容之前,進(jìn)行一次全面的清理操作
private void rehash() { expungeStaleEntries(); if (size >= threshold - threshold / 4) resize();}
擴(kuò)容邏輯,比較簡(jiǎn)單,數(shù)組變大兩倍,舊數(shù)據(jù)遷移到新數(shù)組,如果key已經(jīng)過期的,則直接將value也設(shè)置為空。這里需要注意的時(shí)候,清理過程中擴(kuò)容的閾值是原數(shù)組容量的 1/2, size >= threshold - threshold / 4,我們直到threashold = 2 / 3 * length, 所以轉(zhuǎn)化后size >= 3 / 4 * (2 / 3) * length。
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); //如果對(duì)應(yīng)的key已經(jīng)回收 if (k == null) { //value設(shè)置為空 e.value = null; // Help the GC } else { //進(jìn)行數(shù)據(jù)遷移,如果存在沖突,則放到計(jì)算出來的下標(biāo)的后方第一個(gè)不為null的槽 int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } //重新設(shè)置擴(kuò)容閾值 setThreshold(newLen); size = count; table = newTab;}
當(dāng)我們調(diào)用threadLocal的get方法的時(shí)候,首先會(huì)調(diào)用getMap方法,該方法根據(jù)當(dāng)前線程獲取當(dāng)前線程的ThreadLocal.ThreadLocalMap threadLocals屬性,如果非空,再獲取對(duì)應(yīng)的ThreadLocal的ThreadLocalMap 里面的entry,根據(jù)entry獲取對(duì)應(yīng)的value,這個(gè)過程會(huì)調(diào)用expungestaleEntry方法,清空key為空的hash槽的值,并將key不為空的且通過key的hash值計(jì)算出來的下標(biāo)發(fā)生過向后偏移的entry移動(dòng)到更靠近計(jì)算出來的下標(biāo)值的后面的某個(gè)空的槽內(nèi)。如果getMap返回空,說明我們可能沒用調(diào)用ThreadLocal的set方法的情況下調(diào)用了get方法,那么創(chuàng)建一個(gè)ThreadLocalMap,初始化entry數(shù)組,設(shè)置擴(kuò)容閾值,并設(shè)置對(duì)應(yīng)的ThreadLocal的hash槽的值為空。
public T get() { //獲取當(dāng)前線程 Thread t = Thread.currentThread(); //取出當(dāng)前線程的ThreadLocalMap屬性 ThreadLocalMap map = getMap(t); //如果當(dāng)前線程的ThreadLocalMap不為空 if (map != null) { //獲取ThreadLocalMap的Entry數(shù)組 ThreadLocalMap.Entry e = map.getEntry(this); //如果數(shù)組不為空,取出value值返回 if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue();}
//獲取thread的threadLocals屬性ThreadLocalMap getMap(Thread t) { return t.threadLocals;}
//獲取ThreadLocalMap的entry數(shù)組對(duì)應(yīng)下標(biāo)的數(shù)據(jù)private Entry getEntry(ThreadLocal<?> key) { //計(jì)算下標(biāo) int i = key.threadLocalHashCode & (table.length - 1); //獲取對(duì)應(yīng)下標(biāo)數(shù)據(jù) Entry e = table[i]; if (e != null && e.get() == key) return e; //如果取不到,為什么有這種情況? //從put方法中我們知道,threadlocalMap不同于hashMap //內(nèi)部只有數(shù)組,數(shù)組的每個(gè)hash槽下只有一個(gè)entry值 //如果在put的時(shí)候發(fā)現(xiàn)對(duì)應(yīng)hash槽的值不為空,且key不相同 //則往后找第一個(gè)為空的hash槽,講entry放入該hash槽 else return getEntryAfterMiss(key, i, e);}
//從對(duì)應(yīng)下標(biāo)往后循環(huán)查找,這里有個(gè)特殊的地方nextIndex//該方法:從對(duì)應(yīng)下標(biāo)往后循環(huán)返回下標(biāo),如果超出數(shù)組長(zhǎng)度,//則從0下標(biāo)開始繼續(xù)往后循環(huán)返回下標(biāo)private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; //循環(huán)遍歷 while (e != null) { ThreadLocal<?> k = e.get(); //case1:key值相同,返回對(duì)應(yīng)的entry if (k == key) return e; //case2:發(fā)現(xiàn)對(duì)應(yīng)entry數(shù)組下標(biāo)下的key為空,清理 if (k == null) expungeStaleEntry(i); //case3:key不為空但key不相同,數(shù)組下標(biāo)往后推進(jìn) else i = nextIndex(i, len); //返回下一個(gè)下標(biāo)值對(duì)應(yīng)的entry e = tab[i]; } return null;}
//從對(duì)應(yīng)下標(biāo)往后循環(huán),如果超出數(shù)組長(zhǎng)度,則從0下標(biāo)開始繼續(xù)往后循環(huán)//返回具體下標(biāo)值private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0);}
從當(dāng)前staleSlot開始循環(huán)清理過期key對(duì)應(yīng)的entry數(shù)組內(nèi)的值;如果key不為空且當(dāng)前線程對(duì)應(yīng)的threadlocal的hash值計(jì)算出來的下標(biāo)發(fā)生過遷移,說明之前在put的時(shí)候,在對(duì)應(yīng)下標(biāo)下發(fā)生過hash沖突,將當(dāng)前下標(biāo)下的entry數(shù)組對(duì)應(yīng)的值置為null,并將當(dāng)前下標(biāo)下的entry值移動(dòng)到更接近通過hash值計(jì)算出來的下標(biāo)之后的某個(gè)空的槽中。循環(huán)在進(jìn)行下標(biāo)右移的過程中,如果碰到對(duì)應(yīng)下標(biāo)下的槽數(shù)據(jù)為空,則退出循環(huán)。該方法在執(zhí)行的時(shí)候會(huì)將本該在staleSlot位置的key對(duì)應(yīng)的變量移動(dòng)到該位置或更靠近該位置的后方。避免remove方法遍歷的時(shí)候出現(xiàn)null導(dǎo)致清理不到的情況。
private int expungeStaleEntry(int staleSlot) { //將全局entry數(shù)組賦值給臨時(shí)tab Entry[] tab = table; //臨時(shí)entry數(shù)組當(dāng)前長(zhǎng)度 int len = tab.length; //設(shè)置對(duì)應(yīng)數(shù)組下標(biāo)下的entry的value為空 tab[staleSlot].value = null; //設(shè)置對(duì)應(yīng)entry為空 tab[staleSlot] = null; //entry數(shù)組全局長(zhǎng)度-1 size--; Entry e; int i; //從當(dāng)前下標(biāo)往后循環(huán)遍歷,直到對(duì)應(yīng)的下標(biāo)下槽內(nèi)數(shù)據(jù)為空跳出循環(huán) for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { //獲取對(duì)應(yīng)下標(biāo)下當(dāng)前entry對(duì)應(yīng)的key ThreadLocal<?> k = e.get(); //如果key為空則清理entry的value和設(shè)置當(dāng)前數(shù)組對(duì)應(yīng)entry為空 if (k == null) { e.value = null; tab[i] = null; size--; //如果key不為空 } else { //計(jì)算獲取對(duì)應(yīng)的下標(biāo),這個(gè)本該是存放entry的位置,但是可能由于hash沖突,put的時(shí)候向后偏移了 int h = k.threadLocalHashCode & (len - 1); //條件成立說明在put的時(shí)候計(jì)算出來的下標(biāo)發(fā)生過hash沖突 //數(shù)據(jù)向后偏移過,而且 h < i if (h != i) { //將當(dāng)前下標(biāo)下entry設(shè)置為空 tab[i] = null; //從計(jì)算出來的下標(biāo)h循環(huán)向后獲取一個(gè)對(duì)應(yīng)entry為空的下標(biāo)值 //該下標(biāo)下存放當(dāng)前entry while (tab[h] != null) //這個(gè)新計(jì)算出來的h的值更靠近計(jì)算獲取的下標(biāo) h = nextIndex(h, len); //將entry放在對(duì)應(yīng)下標(biāo) tab[h] = e; } } //返回進(jìn)行處理過后的起點(diǎn)下標(biāo)i return i;}
private T setInitialValue() { //獲取一個(gè)空值 T value = initialValue(); Thread t = Thread.currentThread(); //獲取當(dāng)前線程的ThreadMap ThreadLocalMap map = getMap(t); //如果不為空,則將當(dāng)前空值注入 if (map != null) map.set(this, value); else //否則創(chuàng)建這個(gè)ThreadMap并和當(dāng)前Thread綁定 createMap(t, value); return value;}
remove方法也很簡(jiǎn)單,就是將key的引用設(shè)置為null,然后找到key所對(duì)應(yīng)的數(shù)組槽位,執(zhí)行清理操作。
在ThreadLocal使用完畢后,執(zhí)行remove方法防止內(nèi)存溢出。
public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this);}
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } }}
public void clear() { this.referent = null;}
上面說完了ThreadLocal的問題,可以看出,ThreadLocal只能在單個(gè)線程內(nèi)部傳遞參數(shù),無法在子父線程間傳遞參數(shù)。
但是InheritableThreadLocal的出現(xiàn)解決了這個(gè)問題。
public class InheriTableThreadLocalTest { private static final InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>(); public static void main(String[] args) { threadLocal.set("主線程設(shè)置值"); new Thread(() -> { System.out.println(threadLocal.get()); }).start(); }}
分析InheritableThreadLocal類,發(fā)現(xiàn)繼承于ThreadLocal,但是在createMap,getMap的時(shí)候維護(hù)的是inheritableThreadLocals
public class InheritableThreadLocal<T> extends ThreadLocal<T> { protected T childValue(T parentValue) { return parentValue; } ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); }}
在線程初始化的代碼init方法中,有這么一段邏輯:
如果父線程的inheritThreadLocals不為空,則調(diào)用ThreadLocal.createInheritedMap方法,該方法傳遞了父線程的inheritableThreadLocals
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
再看看ThreadLocal.createInheritedMap方法,子線程在創(chuàng)建的時(shí)候,將父線程的inheritableThreadLocals復(fù)制了過來保存在了自己的inheritableThreadLocals中。
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { return new ThreadLocalMap(parentMap);}
private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } }}
看完上述內(nèi)容,你們掌握如何進(jìn)行ThreadLocal源碼分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。