溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Golang中的緩存庫freecache怎么用

發(fā)布時間:2022-02-21 13:47:31 來源:億速云 閱讀:192 作者:iii 欄目:編程語言

這篇文章主要講解了“Golang中的緩存庫freecache怎么用”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“Golang中的緩存庫freecache怎么用”吧!

Golang中的緩存庫freecache怎么用

go開發(fā)緩存場景一般使用map或者緩存框架,為了線程安全會使用sync.Map或線程安全的緩存框架。

緩存場景中如果數(shù)據(jù)量大于百萬級別,需要特別考慮數(shù)據(jù)類型對于gc的影響(注意string類型底層是指針+Len+Cap,因此也算是指針類型),如果緩存key和value都是非指針類型的話就無需多慮了。

但實際應(yīng)用場景中,key和value是(包含)指針類型數(shù)據(jù)是很常見的,因此使用緩存框架需要特別注意其對gc影響,從是否對GC影響角度來看緩存框架大致分為2類:

  • 零GC開銷:比如freecache或bigcache這種,底層基于ringbuf,減小指針個數(shù);

  • 有GC開銷:直接基于Map來實現(xiàn)的緩存框架。

對于map而言,gc時會掃描所有key/value鍵值對,如果其都是基本類型,那么gc便不會再掃描。

下面以freecache為例分析下其實現(xiàn)原理,代碼示例如下:

func main() {
   cacheSize := 100 * 1024 * 1024
   cache := freecache.NewCache(cacheSize)

   for i := 0; i < N; i++ {
      str := strconv.Itoa(i)
      _ = cache.Set([]byte(str), []byte(str), 1)
   }

   now := time.Now()
   runtime.GC()
   fmt.Printf("freecache, GC took: %s\n", time.Since(now))
   _, _ = cache.Get([]byte("aa"))

   now = time.Now()
   for i := 0; i < N; i++ {
      str := strconv.Itoa(i)
      _, _ = cache.Get([]byte(str))
   }
   fmt.Printf("freecache, Get took: %s\n\n", time.Since(now))
}

1 初始化

freecache.NewCache會初始化本地緩存,size表示存儲空間大小,freecache會初始化256個segment,每個segment是獨立的存儲單元,freecache加鎖維度也是基于segment的,每個segment有一個ringbuf,初始大小為size/256。freecache號稱零GC的來源就是其指針是固定的,只有512個,每個segment有2個,分別是rb和slotData(注意切片為指針類型)。

type segment struct {
   rb            RingBuf // ring buffer that stores data
   segId         int
   _             uint32  // 占位
   missCount     int64
   hitCount      int64
   entryCount    int64
   totalCount    int64      // number of entries in ring buffer, including deleted entries.
   totalTime     int64      // used to calculate least recent used entry.
   timer         Timer      // Timer giving current time
   totalEvacuate int64      // used for debug
   totalExpired  int64      // used for debug
   overwrites    int64      // used for debug
   touched       int64      // used for debug
   vacuumLen     int64      // up to vacuumLen, new data can be written without overwriting old data.
   slotLens      [256]int32 // The actual length for every slot.
   slotCap       int32      // max number of entry pointers a slot can hold.
   slotsData     []entryPtr // 索引指針
}

func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) {
    cache = new(Cache)
    for i := 0; i < segmentCount; i++ {
        cache.segments[i] = newSegment(size/segmentCount, i, timer)
    }
}
func newSegment(bufSize int, segId int, timer Timer) (seg segment) {
    seg.rb = NewRingBuf(bufSize, 0)
    seg.segId = segId
    seg.timer = timer
    seg.vacuumLen = int64(bufSize)
    seg.slotCap = 1
    seg.slotsData = make([]entryPtr, 256*seg.slotCap) // 每個slotData初始化256個單位大小
}

2 讀寫流程

freecache的key和value都是[]byte數(shù)組,使用時需要自行序列化和反序列化,如果緩存復(fù)雜對象不可忽略其序列化和反序列化帶來的影響,首先看下Set流程:

_ = cache.Set([]byte(str), []byte(str), 1)

Set流程首先對key進(jìn)行hash,hashVal類型uint64,其低8位segID對應(yīng)segment數(shù)組,低8-15位表示slotId對應(yīng)slotsData下標(biāo),高16位表示slotsData下標(biāo)對應(yīng)的[]entryPtr某個數(shù)據(jù),這里需要查找操作。注意[]entryPtr數(shù)組大小為slotCap(初始為1),當(dāng)擴容時會slotCap倍增。

每個segment對應(yīng)一個lock(sync.Mutex),因此其能夠支持較大并發(fā)量,而不像sync.Map只有一個鎖。

func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
   hashVal := hashFunc(key)
   segID := hashVal & segmentAndOpVal // 低8位
   cache.locks[segID].Lock() // 加鎖
   err = cache.segments[segID].set(key, value, hashVal, expireSeconds)
   cache.locks[segID].Unlock()
}

func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
   slotId := uint8(hashVal >> 8)
   hash26 := uint16(hashVal >> 16)
   slot := seg.getSlot(slotId)
   idx, match := seg.lookup(slot, hash26, key)

   var hdrBuf [ENTRY_HDR_SIZE]byte
   hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0]))
   if match { // 有數(shù)據(jù)更新操作
      matchedPtr := &slot[idx]
      seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset)
      hdr.slotId = slotId
      hdr.hash26 = hash26
      hdr.keyLen = uint16(len(key))
      originAccessTime := hdr.accessTime
      hdr.accessTime = now
      hdr.expireAt = expireAt
      hdr.valLen = uint32(len(value))
      if hdr.valCap >= hdr.valLen {
         // 已存在數(shù)據(jù)value空間能存下此次value大小
         atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime))
         seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset)
         seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
         atomic.AddInt64(&seg.overwrites, 1)
         return
      }
      // 刪除對應(yīng)entryPtr,涉及到slotsData內(nèi)存copy,ringbug中只是標(biāo)記刪除
      seg.delEntryPtr(slotId, slot, idx)
      match = false
      // increase capacity and limit entry len.
      for hdr.valCap < hdr.valLen {
         hdr.valCap *= 2
      }
      if hdr.valCap > uint32(maxKeyValLen-len(key)) {
         hdr.valCap = uint32(maxKeyValLen - len(key))
      }
   } else { // 無數(shù)據(jù)
      hdr.slotId = slotId
      hdr.hash26 = hash26
      hdr.keyLen = uint16(len(key))
      hdr.accessTime = now
      hdr.expireAt = expireAt
      hdr.valLen = uint32(len(value))
      hdr.valCap = uint32(len(value))
      if hdr.valCap == 0 { // avoid infinite loop when increasing capacity.
         hdr.valCap = 1
      }
   }
   
   // 數(shù)據(jù)實際長度為 ENTRY_HDR_SIZE=24 + key和value的長度    
   entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap)
   slotModified := seg.evacuate(entryLen, slotId, now)
   if slotModified {
      // the slot has been modified during evacuation, we need to looked up for the 'idx' again.
      // otherwise there would be index out of bound error.
      slot = seg.getSlot(slotId)
      idx, match = seg.lookup(slot, hash26, key)
      // assert(match == false)
   }
   newOff := seg.rb.End()
   seg.insertEntryPtr(slotId, hash26, newOff, idx, hdr.keyLen)
   seg.rb.Write(hdrBuf[:])
   seg.rb.Write(key)
   seg.rb.Write(value)
   seg.rb.Skip(int64(hdr.valCap - hdr.valLen))
   atomic.AddInt64(&seg.totalTime, int64(now))
   atomic.AddInt64(&seg.totalCount, 1)
   seg.vacuumLen -= entryLen
   return
}

seg.evacuate會評估ringbuf是否有足夠空間存儲key/value,如果空間不夠,其會從空閑空間尾部后一位(也就是待淘汰數(shù)據(jù)的開始位置)開始掃描(oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()),如果對應(yīng)數(shù)據(jù)已被邏輯deleted或者已過期,那么該塊內(nèi)存可以直接回收,如果不滿足回收條件,則將entry從環(huán)頭調(diào)換到環(huán)尾,再更新entry的索引,如果這樣循環(huán)5次還是不行,那么需要將當(dāng)前oldHdrBuf回收以滿足內(nèi)存需要。

執(zhí)行完seg.evacuate所需空間肯定是能滿足的,然后就是寫入索引和數(shù)據(jù)了,insertEntryPtr就是寫入索引操作,當(dāng)[]entryPtr中元素個數(shù)大于seg.slotCap(初始1)時,需要擴容操作,對應(yīng)方法見seg.expand,這里不再贅述。

寫入ringbuf就是執(zhí)行rb.Write即可。

func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) {
   var oldHdrBuf [ENTRY_HDR_SIZE]byte
   consecutiveEvacuate := 0
   for seg.vacuumLen < entryLen {
      oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()
      seg.rb.ReadAt(oldHdrBuf[:], oldOff)
      oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0]))
      oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap)
      if oldHdr.deleted { // 已刪除
         consecutiveEvacuate = 0
         atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
         atomic.AddInt64(&seg.totalCount, -1)
         seg.vacuumLen += oldEntryLen
         continue
      }
      expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now
      leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime)
      if expired || leastRecentUsed || consecutiveEvacuate > 5 {
      // 可以回收
         seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash26, oldOff)
         if oldHdr.slotId == slotId {
            slotModified = true
         }
         consecutiveEvacuate = 0
         atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime))
         atomic.AddInt64(&seg.totalCount, -1)
         seg.vacuumLen += oldEntryLen
         if expired {
            atomic.AddInt64(&seg.totalExpired, 1)
         } else {
            atomic.AddInt64(&seg.totalEvacuate, 1)
         }
      } else {
         // evacuate an old entry that has been accessed recently for better cache hit rate.
         newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen))
         seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash26, oldOff, newOff)
         consecutiveEvacuate++
         atomic.AddInt64(&seg.totalEvacuate, 1)
      }
   }
}

freecache的Get流程相對來說簡單點,通過hash找到對應(yīng)segment,通過slotId找到對應(yīng)索引slot,然后通過二分+遍歷尋找數(shù)據(jù),如果找不到直接返回ErrNotFound,否則更新一些time指標(biāo)。Get流程還會更新緩存命中率相關(guān)指標(biāo)。

func (cache *Cache) Get(key []byte) (value []byte, err error) {
   hashVal := hashFunc(key)
   segID := hashVal & segmentAndOpVal
   cache.locks[segID].Lock()
   value, _, err = cache.segments[segID].get(key, nil, hashVal, false)
   cache.locks[segID].Unlock()
   return
}
func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) {
   hdr, ptr, err := seg.locate(key, hashVal, peek) // hash+定位查找
   if err != nil {
      return
   }
   expireAt = hdr.expireAt
   if cap(buf) >= int(hdr.valLen) {
      value = buf[:hdr.valLen]
   } else {
      value = make([]byte, hdr.valLen)
   }

   seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen))
}

定位到數(shù)據(jù)之后,讀取ringbuf即可,注意一般來說讀取到的value是新創(chuàng)建的內(nèi)存空間,因此涉及到[]byte數(shù)據(jù)的復(fù)制操作。

感謝各位的閱讀,以上就是“Golang中的緩存庫freecache怎么用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對Golang中的緩存庫freecache怎么用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI