溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

BlockKeeper的邏輯是什么

發(fā)布時(shí)間:2021-12-20 16:37:00 來源:億速云 閱讀:156 作者:iii 欄目:互聯(lián)網(wǎng)科技

這篇文章主要講解了“BlockKeeper的邏輯是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“BlockKeeper的邏輯是什么”吧!

我們要發(fā)送什么樣的數(shù)據(jù)請(qǐng)求,才能讓比原節(jié)點(diǎn)把它持有的區(qū)塊數(shù)據(jù)發(fā)給我?

找到發(fā)送請(qǐng)求的代碼

首先我們先要在代碼中定位到,比原到底是在什么時(shí)候來向?qū)Ψ焦?jié)點(diǎn)發(fā)送請(qǐng)求的。

在前一篇講的是如何建立連接并驗(yàn)證身份,那么發(fā)出數(shù)據(jù)請(qǐng)求的操作,一定在上次的代碼之后。按照這個(gè)思路,我們?cè)?code>SyncManager類中Switch啟動(dòng)之后,找到了一個(gè)叫BlockKeeper的類,相關(guān)的操作是在它里面完成的。

下面是老規(guī)矩,還是從啟動(dòng)開始,但是會(huì)更簡(jiǎn)化一些:

cmd/bytomd/main.go#L54

func main() {
    cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
    cmd.Execute()
}

cmd/bytomd/commands/run_node.go#L41

func runNode(cmd *cobra.Command, args []string) error {
    n := node.NewNode(config)
    if _, err := n.Start(); err != nil {
        // ...
}

node/node.go#L169

func (n *Node) OnStart() error {
    // ...
    n.syncManager.Start()
    // ...
}

netsync/handle.go#L141

func (sm *SyncManager) Start() {
    go sm.netStart()
    // ...
    go sm.syncer()
}

注意sm.netStart(),我們?cè)谝黄薪⑦B接并驗(yàn)證身份的操作,就是在它里面完成的。而這次的這個(gè)問題,是在下面的sm.syncer()中完成的。

另外注意,由于這兩個(gè)函數(shù)調(diào)用都使用了goroutine,所以它們是同時(shí)進(jìn)行的。

sm.syncer()的代碼如下:

netsync/sync.go#L46

func (sm *SyncManager) syncer() {
    sm.fetcher.Start()
    defer sm.fetcher.Stop()

    // ...
    for {
        select {
        case <-sm.newPeerCh:
            log.Info("New peer connected.")
            // Make sure we have peers to select from, then sync
            if sm.sw.Peers().Size() < minDesiredPeerCount {
                break
            }
            go sm.synchronise()
            // ..
    }
}

這里混入了一個(gè)叫fetcher的奇怪的東西,名字看起來好像是專門去抓取數(shù)據(jù)的,我們要找的是它嗎?

可惜不是,fetcher的作用是從多個(gè)peer那里拿到了區(qū)塊數(shù)據(jù)之后,對(duì)數(shù)據(jù)進(jìn)行整理,把有用的放到本地鏈上。我們?cè)谝院髸?huì)研究它,所以這里不展開討論。

接著是一個(gè)for循環(huán),當(dāng)發(fā)現(xiàn)通道newPeerCh有了新數(shù)據(jù)(也就是有了新的節(jié)點(diǎn)連接上了),會(huì)判斷一下當(dāng)前自己連著的節(jié)點(diǎn)是否夠多(大于等于minDesiredPeerCount,值為5),夠多的話,就會(huì)進(jìn)入sm.synchronise(),進(jìn)行數(shù)據(jù)同步。

這里為什么要多等幾個(gè)節(jié)點(diǎn),而不是一連上就馬上同步呢?我想這是希望有更多選擇的機(jī)會(huì),找到一個(gè)數(shù)據(jù)夠多的節(jié)點(diǎn)。

sm.synchronise()還是屬于SyncManager的方法。在真正調(diào)用到BlockKeeper的方法之前,它還做了一些比如清理已經(jīng)斷開的peer,找到最適合同步數(shù)據(jù)的peer等。其中“清理peer”的工作涉及到不同的對(duì)象持有的peer集合間的同步,略有些麻煩,但對(duì)當(dāng)前問題幫助不大,所以我打算把它們放在以后的某個(gè)問題中回答(比如“當(dāng)一個(gè)節(jié)點(diǎn)斷開了,比原會(huì)有什么樣的處理”),這里就先省略。

sm.synchronise()代碼如下:

netsync/sync.go#L77

func (sm *SyncManager) synchronise() {
    log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
    // ...
    peer, bestHeight := sm.peers.BestPeer()
    // ...
    if bestHeight > sm.chain.BestBlockHeight() {
        // ...
        sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
    }
}

可以看到,首先是從眾多的peers中,找到最合適的那個(gè)。什么叫Best呢?看一下BestPeer()的定義:

netsync/peer.go#L266

func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
    // ...
    for _, p := range ps.peers {
        if bestPeer == nil || p.height > bestHeight {
            bestPeer, bestHeight = p.swPeer, p.height
        }
    }
    return bestPeer, bestHeight
}

其實(shí)就是持有區(qū)塊鏈數(shù)據(jù)最長(zhǎng)的那個(gè)。

找到了BestPeer之后,就調(diào)用sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)方法,從這里,正式進(jìn)入BlockKeeper -- 也就是本文的主角 -- 的世界。

BlockKeeper

blockKeeper.BlockRequestWorker的邏輯比較復(fù)雜,它包含了:

  1. 根據(jù)自己持有的區(qū)塊數(shù)據(jù)來計(jì)算需要同步的數(shù)據(jù)

  2. 向前面找到的最佳節(jié)點(diǎn)發(fā)送數(shù)據(jù)請(qǐng)求

  3. 拿到對(duì)方發(fā)過來的區(qū)塊數(shù)據(jù)

  4. 對(duì)數(shù)據(jù)進(jìn)行處理

  5. 廣播新狀態(tài)

  6. 處理各種出錯(cuò)情況,等等

由于本文中只關(guān)注“發(fā)送請(qǐng)求”,所以一些與之關(guān)系不大的邏輯我會(huì)忽略掉,留待以后再講。

在“發(fā)送請(qǐng)求”這里,實(shí)際也包含了兩種情形,一種簡(jiǎn)單的,一種復(fù)雜的:

  1. 簡(jiǎn)單的:假設(shè)不存在分叉,則直接檢查本地高度最高的區(qū)塊,然后請(qǐng)求下一個(gè)區(qū)塊

  2. 復(fù)雜的:考慮分叉的情況,則當(dāng)前本地的區(qū)塊可能就存在分叉,那么到底應(yīng)該請(qǐng)求哪個(gè)區(qū)塊,就需要慎重考慮

由于第2種情況對(duì)于本文來說過于復(fù)雜(因?yàn)樾枰羁汤斫獗仍溨蟹植娴奶幚磉壿嫞栽诒疚闹袑褑栴}簡(jiǎn)化,只考慮第1種。而分叉的處理,將放在以后講解。

下面是把blockKeeper.BlockRequestWorker中的代碼簡(jiǎn)化成了只包含第1種情況:

netsync/block_keeper.go#L72

func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
    num := bk.chain.BestBlockHeight() + 1
    reqNum := uint64(0)
    reqNum = num
    // ...
    bkPeer, ok := bk.peers.Peer(peerID)
    swPeer := bkPeer.getPeer()
    // ...
    block, err := bk.BlockRequest(peerID, reqNum)
    // ...
}

在這種情況下,我們可以認(rèn)為bk.chain.BestBlockHeight()中的Best,指的是本地持有的不帶分叉的區(qū)塊鏈高度最高的那個(gè)。(需要提醒的是,如果存在分叉情況,則Best不一定是高度最高的那個(gè))

那么我們就可以直接向最佳peer請(qǐng)求下一個(gè)高度的區(qū)塊,它是通過bk.BlockRequest(peerID, reqNum)實(shí)現(xiàn)的:

netsync/block_keeper.go#L152

func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
    var block *types.Block

    if err := bk.blockRequest(peerID, height); err != nil {
        return nil, errReqBlock
    }

    // ...

    for {
        select {
        case pendingResponse := <-bk.pendingProcessCh:
            block = pendingResponse.block
            // ...
            return block, nil
        // ...
        }
    }
}

在上面簡(jiǎn)化后的代碼中,主要分成了兩個(gè)部分。一個(gè)是發(fā)送請(qǐng)求bk.blockRequest(peerID, height),這是本文的重點(diǎn);它下面的for-select部分,已經(jīng)是在等待并處理對(duì)方節(jié)點(diǎn)的返回?cái)?shù)據(jù)了,這部分我們今天先略過不講。

bk.blockRequest(peerID, height)這個(gè)方法,從邏輯上又可以分成兩部分:

  1. 構(gòu)造出請(qǐng)求的信息

  2. 把信息發(fā)送給對(duì)方節(jié)點(diǎn)

構(gòu)造出請(qǐng)求的信息

bk.blockRequest(peerID, height)經(jīng)過一連串的方法調(diào)用之后,使用height構(gòu)造出了一個(gè)BlockRequestMessage對(duì)象,代碼如下:

netsync/block_keeper.go#L148

func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
    return bk.peers.requestBlockByHeight(peerID, height)
}

netsync/peer.go#L332

func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
    peer, ok := ps.Peer(peerID)
    // ...
    return peer.requestBlockByHeight(height)
}

netsync/peer.go#L73

func (p *peer) requestBlockByHeight(height uint64) error {
    msg := &BlockRequestMessage{Height: height}
    p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
    return nil
}

到這里,終于構(gòu)造出了所需要的BlockRequestMessage,其實(shí)主要就是把height告訴peer。

然后,通過PeerTrySend()把該信息發(fā)出去。

發(fā)送請(qǐng)求

TrySend中,主要是通過github.com/tendermint/go-wire庫將其序列化,再發(fā)送給對(duì)方??雌饋響?yīng)該是很簡(jiǎn)單的操作吧,先預(yù)個(gè)警,還是挺繞的。

當(dāng)我們進(jìn)入TrySend()后:

p2p/peer.go#L242

func (p *Peer) TrySend(chID byte, msg interface{}) bool {
    if !p.IsRunning() {
        return false
    }
    return p.mconn.TrySend(chID, msg)
}

發(fā)現(xiàn)它把鍋丟給了p.mconn.TrySend方法,那么mconn是什么?chID又是什么?

mconnMConnection的實(shí)例,它是從哪兒來的?它應(yīng)該在之前的某個(gè)地方初始化了,否則我們沒法直接調(diào)用它。所以我們先來找到它初始化的地方。

經(jīng)過一番尋找,發(fā)現(xiàn)原來是在前一篇之后,即比原節(jié)點(diǎn)與另一個(gè)節(jié)點(diǎn)完成了身份驗(yàn)證之后,具體的位置在Switch類啟動(dòng)的地方。

我們這次直接從SwtichOnStart作為起點(diǎn):

p2p/switch.go#L186

func (sw *Switch) OnStart() error {
    //...
    // Start listeners
    for _, listener := range sw.listeners {
        go sw.listenerRoutine(listener)
    }
    return nil
}

p2p/switch.go#L498

func (sw *Switch) listenerRoutine(l Listener) {
    for {
        inConn, ok := <-l.Connections()
        // ...
        err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
        // ...
    }
}

p2p/switch.go#L645

func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
    // ...
    peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
    // ...
}

p2p/peer.go#L87

func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
    return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
}

p2p/peer.go#L91

func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
    conn := rawConn
    // ...
    if config.AuthEnc {
        // ...
        conn, err = MakeSecretConnection(conn, ourNodePrivKey)
        // ...
    }

    // Key and NodeInfo are set after Handshake
    p := &Peer{
        outbound: outbound,
        conn:     conn,
        config:   config,
        Data:     cmn.NewCMap(),
    }

    p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)

    p.BaseService = *cmn.NewBaseService(nil, "Peer", p)

    return p, nil
}

終于找到了。上面方法中的MakeSecretConnection就是與對(duì)方節(jié)點(diǎn)交換公鑰并進(jìn)行身份驗(yàn)證的地方,下面的p.mconn = createMConnection(...)就是創(chuàng)建mconn的地方。

繼續(xù)進(jìn)去:

p2p/peer.go#L292

func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
    onReceive := func(chID byte, msgBytes []byte) {
        reactor := reactorsByCh[chID]
        if reactor == nil {
            if chID == PexChannel {
                return
            } else {
                cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
            }
        }
        reactor.Receive(chID, p, msgBytes)
    }

    onError := func(r interface{}) {
        onPeerError(p, r)
    }

    return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}

原來mconnMConnection的實(shí)例,它是通過NewMConnectionWithConfig創(chuàng)建的。

看了上面的代碼,發(fā)現(xiàn)這個(gè)MConnectionWithConfig與普通的net.Conn并沒有太大的區(qū)別,只不過是當(dāng)收到了對(duì)方發(fā)來的數(shù)據(jù)后,會(huì)根據(jù)指定的chID調(diào)用相應(yīng)的ReactorReceive方法來處理。所以它起到了將數(shù)據(jù)分發(fā)給Reactor的作用。

為什么需要這樣的分發(fā)操作呢?這是因?yàn)?,在比原中,?jié)點(diǎn)之間交換數(shù)據(jù),有多種不同的方式:

  1. 一種是規(guī)定了詳細(xì)的數(shù)據(jù)交互協(xié)議(比如有哪些信息類型,分別代表什么意思,什么情況下發(fā)哪個(gè),如何應(yīng)答等),在ProtocolReactor中實(shí)現(xiàn),它對(duì)應(yīng)的chIDBlockchainChannel,值為byte(0x40)

  2. 另一種使用了與BitTorrent類似的文件共享協(xié)議,叫PEX,在PEXReactor中實(shí)現(xiàn),它對(duì)應(yīng)的chIDPexChannel,值為byte(0x00)

所以節(jié)點(diǎn)之間發(fā)送信息的時(shí)候,需要知道對(duì)方發(fā)過來的數(shù)據(jù)對(duì)應(yīng)的是哪一種方式,然后轉(zhuǎn)交給相應(yīng)的Reactor去處理。

在比原中,前者是主要的方式,后者起到輔助作用。我們目前的文章中涉及到的都是前者,后者將在以后專門研究。

p.mconn.TrySend

當(dāng)我們知道了p.mconn.TrySend中的mconn是什么,并且在什么時(shí)候初始化以后,下面就可以進(jìn)入它的TrySend方法了。

p2p/connection.go#L243

func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
    // ...
    channel, ok := c.channelsIdx[chID]
    // ...
    ok = channel.trySendBytes(wire.BinaryBytes(msg))
    if ok {
        // Wake up sendRoutine if necessary
        select {
        case c.send <- struct{}{}:
        default:
        }
    }

    return ok
}

可以看到,它找到相應(yīng)的channel后(在這里應(yīng)該是ProtocolReactor對(duì)應(yīng)的channel),調(diào)用channel的trySendBytes方法。在發(fā)送數(shù)據(jù)的時(shí)候,使用了github.com/tendermint/go-wire庫,將msg序列化為二進(jìn)制數(shù)組。

p2p/connection.go#L602

func (ch *Channel) trySendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    default:
        return false
    }
}

原來它是把要發(fā)送的數(shù)據(jù),放到了該channel對(duì)應(yīng)的sendQueue中,交由別人來發(fā)送。具體是由誰來發(fā)送,我們馬上要就找到它。

細(xì)心的同學(xué)會(huì)發(fā)現(xiàn),Channel除了trySendBytes方法外,還有一個(gè)sendBytes(在本文中沒有用上):

p2p/connection.go#L589

func (ch *Channel) sendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    case <-time.After(defaultSendTimeout):
        return false
    }
}

它們兩個(gè)的區(qū)別是,前者嘗試把待發(fā)送數(shù)據(jù)bytes放入ch.sendQueue時(shí),如果能放進(jìn)去,則返回true,否則馬上失敗,返回false,所以它是非阻塞的。而后者,如果放不進(jìn)去(sendQueue已滿,那邊還沒處理完),則等待defaultSendTimeout(值為10秒),然后才會(huì)失敗。另外,sendQueue的容量默認(rèn)為1

感謝各位的閱讀,以上就是“BlockKeeper的邏輯是什么”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)BlockKeeper的邏輯是什么這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI