溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么使用StampLock

發(fā)布時間:2021-12-24 10:32:44 來源:億速云 閱讀:170 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容介紹了“怎么使用StampLock”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

主要成員變量

public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage {

    // 實際存儲數(shù)據(jù)的位置
    private final EntryLogger entryLogger;

    // -----------------
    // index 相關(guān)
    // -----------------
 
    // 記錄fence,exist,masterKey等信息
    private final LedgerMetadataIndex ledgerIndex;
    // 關(guān)于位置的index
    private final EntryLocationIndex entryLocationIndex;
  
    // 臨時的ledgerCache
    private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
  
    // -----------------
    // 寫入相關(guān)
    // -----------------
  
  
    // 用來寫入的memtable,2個互相swap
    private final StampedLock writeCacheRotationLock = new StampedLock();
    // Write cache where all new entries are inserted into
    protected volatile WriteCache writeCache;
    // Write cache that is used to swap with writeCache during flushes
    protected volatile WriteCache writeCacheBeingFlushed;
  

    // Cache where we insert entries for speculative reading
    private final ReadCache readCache;

  
    // checkpoint 相關(guān)
    private final CheckpointSource checkpointSource;
    private Checkpoint lastCheckpoint = Checkpoint.MIN;
}
主要作用
  1. 可以讀寫ledger,維護ledger的位置(index)

  2. 保存ledger相關(guān)的metadata

  3. 支持checkpoint

寫入Entry

寫入會直接寫入到WriteCache里面,這里面使用了StampLock,將swap cache的操作進行了保護,StampLock是一個樂觀讀的讀寫鎖,并發(fā)更高。

public long addEntry(ByteBuf entry) throws IOException, BookieException {
        long startTime = MathUtils.nowInNano();

        long ledgerId = entry.getLong(entry.readerIndex());
        long entryId = entry.getLong(entry.readerIndex() + 8);
        long lac = entry.getLong(entry.readerIndex() + 16);

        // 這里的模板是StampLock樂觀讀取的通用模板
        // 相對的互斥操作實際上是swap cache的操作
  
        // First we try to do an optimistic locking to get access to the current write cache.
        // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the
        // rest of the time, we can have multiple thread using the optimistic lock here without interfering.
       
        // 樂觀讀鎖
        long stamp = writeCacheRotationLock.tryOptimisticRead();
        boolean inserted = false;

        inserted = writeCache.put(ledgerId, entryId, entry);
        // 如果插入過程中發(fā)生了cache swap 則再次插入
        if (!writeCacheRotationLock.validate(stamp)) {
            // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat
            // the operation because we might have inserted in a write cache that was already being flushed and cleared,
            // without being sure about this last entry being flushed or not.
          
            // 說明插入到被swap的那個cache里面了
            // 如果insert是true TODO
            // 如果是false的話沒啥影響
            stamp = writeCacheRotationLock.readLock();
            try {
                inserted = writeCache.put(ledgerId, entryId, entry);
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }
        }

        // 如果這里寫入到writeCache失敗的話,觸發(fā)Flush WriteCache
        // 走到這里說明可能2個buffer都滿了?
        if (!inserted) {
            triggerFlushAndAddEntry(ledgerId, entryId, entry);
        }

        // 更新LAC的緩存
        // after successfully insert the entry, update LAC and notify the watchers
        updateCachedLacIfNeeded(ledgerId, lac);
  
        return entryId;
}
writeCache滿了,觸發(fā)flush的流程

這里的邏輯比較容易,一直不斷循環(huán)插入到writeCache 里面,如果超時的話就跳出循環(huán)標記,這個寫入失敗。

如果沒有觸發(fā)flush動作的話,會提交一個flush task。

private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
            throws IOException, BookieException {
        // metric 打點
        dbLedgerStorageStats.getThrottledWriteRequests().inc();
        ...
        // 最大等待寫入時間,超時之前不斷重試
        while (System.nanoTime() < absoluteTimeoutNanos) {
            // Write cache is full, we need to trigger a flush so that it gets rotated
            // If the flush has already been triggered or flush has already switched the
            // cache, we don't need to trigger another flush
          
            // 提交一個flush任務(wù),如果之前有了就不提交了
            if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
                // Trigger an early flush in background
                log.info("Write cache is full, triggering flush");
                executor.execute(() -> {
                        try {
                            flush();
                        } catch (IOException e) {
                            log.error("Error during flush", e);
                        }
                    });
            }

            long stamp = writeCacheRotationLock.readLock();
            try {
                if (writeCache.put(ledgerId, entryId, entry)) {
                    // We succeeded in putting the entry in write cache in the
                    return;
                }
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }

            // Wait some time and try again
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
            }
        }

        // Timeout expired and we weren't able to insert in write cache
        dbLedgerStorageStats.getRejectedWriteRequests().inc();
        throw new OperationRejectedException();
}
flush 流程

實際上flush流程是觸發(fā)checkpoint的邏輯,

主要動作

  • 交換2個writeCache,正在寫入的cache會被交換成flush的batch

  • 遍歷writeCache,將內(nèi)容寫到EntryLogger里面

  • sync EntryLogger將上一步寫入的內(nèi)容落盤

  • 更新ledgerLocationIndex,同時flush這個index到rocksDb里面

public void flush() throws IOException {
    // journal
    Checkpoint cp = checkpointSource.newCheckpoint();
    checkpoint(cp);
    checkpointSource.checkpointComplete(cp, true);
}

public void checkpoint(Checkpoint checkpoint) throws IOException {
        // journal
        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
  
        // 這里檢查是否在這個點之前做過checkpoint了
        if (lastCheckpoint.compareTo(checkpoint) > 0) {
            return;
        }

        long startTime = MathUtils.nowInNano();

        // Only a single flush operation can happen at a time
        flushMutex.lock();

        try {
            // Swap the write cache so that writes can continue to happen while the flush is
            // ongoing
            // 這里邏輯比較容易,交換當前的writeCache和后備的writeCache
            // 獲取的是StampLock的writeLock
            swapWriteCache();

            long sizeToFlush = writeCacheBeingFlushed.size();
            

            // Write all the pending entries into the entry logger and collect the offset
            // position for each entry
            
            // 刷cache到實際的保存位置上、
          
            // 構(gòu)建一個rocksDb的batch
            Batch batch = entryLocationIndex.newBatch();
            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
                try {
                    // 把寫入的entry刷到entryLogger里面
                    // 這里返回的這個entry的offset
                    long location = entryLogger.addEntry(ledgerId, entry, true);
                  
                    // 這里的邏輯實際上就是把3個long 拆分成k/v 寫入到RocksDb的batch 里面
                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });

            // 這里不展開說了,實際上會把剛才寫入的entryLogger進行flush && fsync 到磁盤上。
            entryLogger.flush();

            // 這里觸發(fā)RocksDb的batch flush
            // 這個寫入是sync的
            long batchFlushStarTime = System.nanoTime();
            batch.flush();
            batch.close();
            
          
            // flush ledgerIndex
            // 這里的內(nèi)容變化比較少,因為記錄的是metadata
            ledgerIndex.flush();

            // 調(diào)度一個cleanUp的邏輯
            cleanupExecutor.execute(() -> {
                // There can only be one single cleanup task running because the cleanupExecutor
                // is single-threaded
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Removing deleted ledgers from db indexes");
                    }

                    entryLocationIndex.removeOffsetFromDeletedLedgers();
                    ledgerIndex.removeDeletedLedgers();
                } catch (Throwable t) {
                    log.warn("Failed to cleanup db indexes", t);
                }
            });
            
            // 保存checkpoint 
            lastCheckpoint = thisCheckpoint;

            // 清空這個cache
          
            // Discard all the entry from the write cache, since they're now persisted
            writeCacheBeingFlushed.clear();

            
        } catch (IOException e) {
            // Leave IOExecption as it is
            throw e;
        } catch (RuntimeException e) {
            // Wrap unchecked exceptions
            throw new IOException(e);
        } finally {
            try {
                isFlushOngoing.set(false);
            } finally {
                flushMutex.unlock();
            }
        }
}

這樣寫入就完成了

讀取Entry

這里會從3個位置開始讀取

  1. writeCache,包括正在刷新的和正在寫入的

  2. readCache,預(yù)讀的緩存

  3. entryLogger,讀文件,這部分已經(jīng)落盤了

讀取成功之后會嘗試增加預(yù)讀的buffer


如果正在flush這個時候有觸發(fā)讀取會怎么樣?

上面的flush流程是在所有內(nèi)容已經(jīng)落盤之后才把刷新的writeCache 清空的

即使有并發(fā)讀,如果最后還是落到了讀文件這一步,那怎么都能讀到


還有個問題就是這個先后順序,不確定是否有相同ledgerId,entry,但是內(nèi)容不同的請求出現(xiàn)。

這樣的話感覺可能有問題

public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
        long startTime = MathUtils.nowInNano();
        
        // 讀LAC的情況
        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
            return getLastEntry(ledgerId);
        }

        // We need to try to read from both write caches, since recent entries could be found in either of the two. The
        // write caches are already thread safe on their own, here we just need to make sure we get references to both
        // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches.
        long stamp = writeCacheRotationLock.tryOptimisticRead();
        WriteCache localWriteCache = writeCache;
        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
        if (!writeCacheRotationLock.validate(stamp)) {
            // Fallback to regular read lock approach
            stamp = writeCacheRotationLock.readLock();
            try {
                localWriteCache = writeCache;
                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }
        }

        // First try to read from the write cache of recent entries
        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // If there's a flush going on, the entry might be in the flush buffer
        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // Try reading from read-ahead cache
        entry = readCache.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // Read from main storage
        long entryLocation;
        try {
            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
            if (entryLocation == 0) {
                throw new NoEntryException(ledgerId, entryId);
            }
            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
        } catch (NoEntryException e) {
            recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            throw e;
        }

        readCache.put(ledgerId, entryId, entry);

        // Try to read more entries
        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);

        recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime);
        recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
        return entry;
}

“怎么使用StampLock”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI