溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

以太坊 p2p Server 原理及實(shí)現(xiàn)是怎樣的

發(fā)布時(shí)間:2021-12-03 19:08:49 來源:億速云 閱讀:219 作者:柒染 欄目:互聯(lián)網(wǎng)科技

今天就跟大家聊聊有關(guān)以太坊 p2p Server 原理及實(shí)現(xiàn)是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

以太坊p2p原理與實(shí)現(xiàn)

區(qū)塊鏈技術(shù)的去中心依賴于底層組網(wǎng)技術(shù),以太坊的底層實(shí)現(xiàn)了p2pServer,大約可以分為這樣三層。

  • 底層路由表。封裝了kad路由,節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)以及計(jì)算記錄,節(jié)點(diǎn)搜索,驗(yàn)證等功能。

  • 中層peer抽象,message開放發(fā)送接口,server對(duì)外提供peer檢測(cè),初始化,事件訂閱,peer狀態(tài)查詢,啟動(dòng),停止等功能

  • 以太坊最上層peer,peerset再封裝,通過協(xié)議的Run函數(shù),在中層啟動(dòng)peer時(shí),獲取peer,最終通過一個(gè)循環(huán)截取穩(wěn)定peer,包裝在peerset中使用。

底層路由表

這里簡化問題僅討論Node Discovery Protocol。 這一層維護(hù)了一個(gè)buckets桶,總共有17個(gè)桶,每個(gè)桶有16個(gè)節(jié)點(diǎn)和10個(gè)替換節(jié)點(diǎn)。 Node放入時(shí)先要計(jì)算hash和localNode的距離。再按距離選擇一個(gè)桶放進(jìn)去,取的時(shí)候逐個(gè)計(jì)算target和每個(gè)桶中對(duì)象的舉例,詳細(xì)參考closest函數(shù),后面會(huì)貼出來。

距離公式滿足:f(x,y)=256-8*n-map(x[n+1]^y[n+1]) 注:n為相同節(jié)點(diǎn)數(shù)量 map為一個(gè)負(fù)相關(guān)的映射關(guān)系。

簡單來說就是相似越多,值越小。細(xì)節(jié)參考Node.go的logdist函數(shù)。 這里需要了解算法Kademlia,

.
├── database.go         //封裝node數(shù)據(jù)庫相關(guān)操作
├── node.go             //節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu)
├── ntp.go              //同步時(shí)間  
├── table.go            //路由表
├── udp.go              //網(wǎng)絡(luò)相關(guān)操作

其中最重要的就是table對(duì)象,table公共方法有:

  • newTable 實(shí)例創(chuàng)建

  • Self local節(jié)點(diǎn)獲取

  • ReadRandomNodes 隨機(jī)讀取幾個(gè)節(jié)點(diǎn)

  • Close 關(guān)閉

  • Resolve 在周邊查找某個(gè)節(jié)點(diǎn)

  • Lookup 查找某個(gè)節(jié)點(diǎn)的鄰近節(jié)點(diǎn)

逐個(gè)來分析這些方法:

newTable

  • 1:生成對(duì)象實(shí)例(獲取數(shù)據(jù)庫客戶端,LocalNode etc)

    // If no node database was given, use an in-memory one
    db, err := newNodeDB(nodeDBPath, Version, ourID)
    if err != nil {
        return nil, err
    }
    tab := &Table{
        net:        t,
        db:         db,
        self:       NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
        bonding:    make(map[NodeID]*bondproc),
        bondslots:  make(chan struct{}, maxBondingPingPongs),
        refreshReq: make(chan chan struct{}),
        initDone:   make(chan struct{}),
        closeReq:   make(chan struct{}),
        closed:     make(chan struct{}),
        rand:       mrand.New(mrand.NewSource(0)),
        ips:        netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
    }
  • 2:載入引導(dǎo)節(jié)點(diǎn),初始化k桶。

    if err := tab.setFallbackNodes(bootnodes); err != nil {
        return nil, err
    }
    for i := 0; i < cap(tab.bondslots); i++ {
        tab.bondslots <- struct{}{}
    }
    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }
  • 3:將節(jié)點(diǎn)放入到桶里,生成一條協(xié)程用于刷新,驗(yàn)證節(jié)點(diǎn)。

    tab.seedRand()
    tab.loadSeedNodes(false)  //載入種子節(jié)點(diǎn)
    // Start the background expiration goroutine after loading seeds so that the search for
    // seed nodes also considers older nodes that would otherwise be removed by the
    // expiration.
    tab.db.ensureExpirer()
    go tab.loop()

載入種子節(jié)點(diǎn)

    func (tab *Table) loadSeedNodes(bond bool) {
        seeds := tab.db.querySeeds(seedCount, seedMaxAge)
        //數(shù)據(jù)庫中的種子節(jié)點(diǎn)和引導(dǎo)節(jié)點(diǎn)合并
        seeds = append(seeds, tab.nursery...) 
        if bond {
            seeds = tab.bondall(seeds)   //節(jié)點(diǎn)驗(yàn)證
        }
        for i := range seeds {
            seed := seeds[i]
            age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
            log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
            tab.add(seed)               //節(jié)點(diǎn)入桶
        }
    }

節(jié)點(diǎn)入桶,同時(shí)也要檢查ip等限制。

    func (tab *Table) add(new *Node) {
        tab.mutex.Lock()
        defer tab.mutex.Unlock()

        b := tab.bucket(new.sha)   //獲取當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的桶
        if !tab.bumpOrAdd(b, new) {
            // Node is not in table. Add it to the replacement list.
            tab.addReplacement(b, new)
        }
    }

桶的選擇

    func (tab *Table) bucket(sha common.Hash) *bucket {
        d := logdist(tab.self.sha, sha)  //計(jì)算hash舉例
        if d <= bucketMinDistance {
            //這里按算法來看,只要hash前三位相等就會(huì)到第一個(gè)buckets
            return tab.buckets[0]
        }
        return tab.buckets[d-bucketMinDistance-1]
    }

Resolve

根據(jù)Node的Id查找Node,先在當(dāng)前的桶里面查找,查找一遍之后沒找到就在周邊的節(jié)點(diǎn)里面搜索一遍再找。

    // Resolve searches for a specific node with the given ID.
    // It returns nil if the node could not be found.
    func (tab *Table) Resolve(targetID NodeID) *Node {
        // If the node is present in the local table, no
        // network interaction is required.
        hash := crypto.Keccak256Hash(targetID[:])
        tab.mutex.Lock()
        //查找最近節(jié)點(diǎn)
        cl := tab.closest(hash, 1)
        tab.mutex.Unlock()
        if len(cl.entries) > 0 && cl.entries[0].ID == targetID {
            return cl.entries[0]
        }
        // Otherwise, do a network lookup.
        //不存在 搜索鄰居節(jié)點(diǎn)
        result := tab.Lookup(targetID)
        for _, n := range result {
            if n.ID == targetID {
                return n
            }
        }
        return nil
    }

這里需要理解的函數(shù)是 closest,遍歷所有桶的所有節(jié)點(diǎn),查找最近的一個(gè)

    // closest returns the n nodes in the table that are closest to the
    // given id. The caller must hold tab.mutex.
    func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
        // This is a very wasteful way to find the closest nodes but
        // obviously correct. I believe that tree-based buckets would make
        // this easier to implement efficiently.
        close := &nodesByDistance{target: target}
        for _, b := range tab.buckets {
            for _, n := range b.entries {
                close.push(n, nresults)
            }
        }
        return close
    }

    func (h *nodesByDistance) push(n *Node, maxElems int) {
        ix := sort.Search(len(h.entries), func(i int) bool {
            return distcmp(h.target, h.entries[i].sha, n.sha) > 0
        })
        if len(h.entries) < maxElems {
            h.entries = append(h.entries, n)
        }
        if ix == len(h.entries) {
            // farther away than all nodes we already have.
            // if there was room for it, the node is now the last element.
        } else {
            // slide existing entries down to make room
            // this will overwrite the entry we just appended.
            //近的靠前邊
            copy(h.entries[ix+1:], h.entries[ix:])
            h.entries[ix] = n
        }
    }

ReadRandomNodes

整體思路是先拷貝出來,再逐個(gè)桶的抽最上面的一個(gè),剩下空桶移除,剩下的桶合并后,下一輪再抽桶的第一個(gè)節(jié)點(diǎn),直到填滿給定數(shù)據(jù)或者桶全部空掉。最后返回填到數(shù)組里面的數(shù)量。

    // ReadRandomNodes fills the given slice with random nodes from the
    // table. It will not write the same node more than once. The nodes in
    // the slice are copies and can be modified by the caller.
    func (tab *Table) ReadRandomNodes(buf []*Node) (n int) {
        if !tab.isInitDone() {
            return 0
        }
        tab.mutex.Lock()
        defer tab.mutex.Unlock()

        // Find all non-empty buckets and get a fresh slice of their entries.
        var buckets [][]*Node
        //拷貝節(jié)點(diǎn)
        for _, b := range tab.buckets {
            if len(b.entries) > 0 {
                buckets = append(buckets, b.entries[:])
            }
        }
        if len(buckets) == 0 {
            return 0
        }
        // Shuffle the buckets.
        for i := len(buckets) - 1; i > 0; i-- {
            j := tab.rand.Intn(len(buckets))
            buckets[i], buckets[j] = buckets[j], buckets[i]
        }
        // Move head of each bucket into buf, removing buckets that become empty.
        var i, j int
        for ; i < len(buf); i, j = i+1, (j+1)%len(buckets) {
            b := buckets[j]
            buf[i] = &(*b[0])  //取第一個(gè)節(jié)點(diǎn)
            buckets[j] = b[1:] //移除第一個(gè)
            if len(b) == 1 {
                //空桶移除
                buckets = append(buckets[:j], buckets[j+1:]...)  
            }
            if len(buckets) == 0 {
                break          
            }
        }
        return i + 1
    }

Lookup

lookup會(huì)要求已知節(jié)點(diǎn)查找鄰居節(jié)點(diǎn),查找的鄰居節(jié)點(diǎn)又遞歸的找它周邊的節(jié)點(diǎn)

    for {
        // ask the alpha closest nodes that we haven't asked yet
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID] {
                asked[n.ID] = true
                pendingQueries++   
                go func() {
                    // Find potential neighbors to bond with
                    r, err := tab.net.findnode(n.ID, n.addr(), targetID)
                    if err != nil {
                        // Bump the failure counter to detect and evacuate non-bonded entries
                        fails := tab.db.findFails(n.ID) + 1
                        tab.db.updateFindFails(n.ID, fails)
                        log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

                        if fails >= maxFindnodeFailures {
                            log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
                            tab.delete(n)
                        }
                    }
                    reply <- tab.bondall(r)
                }()
            }
        }
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        // wait for the next reply
        for _, n := range <-reply {    //此處會(huì)阻塞請(qǐng)求
            if n != nil && !seen[n.ID] {
                seen[n.ID] = true
                result.push(n, bucketSize)
            }
        }
        pendingQueries--
    }

桶的維護(hù)

桶初始化完成后會(huì)進(jìn)入一個(gè)循環(huán)邏輯,其中通過三個(gè)timer控制調(diào)整周期。

  • 驗(yàn)證timer 間隔 10s左右

  • 刷新timer 間隔 30 min

  • 持久化timer 間隔 30s

    revalidate     = time.NewTimer(tab.nextRevalidateTime())
    refresh        = time.NewTicker(refreshInterval)
    copyNodes      = time.NewTicker(copyNodesInterval)

刷新邏輯:重新加載種子節(jié)點(diǎn),查找周邊節(jié)點(diǎn),隨機(jī)三個(gè)節(jié)點(diǎn),并查找這三個(gè)節(jié)點(diǎn)的周圍節(jié)點(diǎn)。

    func (tab *Table) doRefresh(done chan struct{}) {
        defer close(done)

        tab.loadSeedNodes(true)

        tab.lookup(tab.self.ID, false)

        for i := 0; i < 3; i++ {
            var target NodeID
            crand.Read(target[:])
            tab.lookup(target, false)
        }
    }

驗(yàn)證邏輯:驗(yàn)證每個(gè)桶的最末尾節(jié)點(diǎn),如果該節(jié)點(diǎn)通過驗(yàn)證則放到隊(duì)首(驗(yàn)證過程是本地節(jié)點(diǎn)向它發(fā)送ping請(qǐng)求,如果回應(yīng)pong則通過)

    last, bi := tab.nodeToRevalidate()  //取最后一個(gè)節(jié)點(diǎn)
    if last == nil {
        // No non-empty bucket found.
        return
    }

    // Ping the selected node and wait for a pong.
    err := tab.ping(last.ID, last.addr())   //通信驗(yàn)證

    tab.mutex.Lock()
    defer tab.mutex.Unlock()
    b := tab.buckets[bi]
    if err == nil {
        // The node responded, move it to the front.
        log.Debug("Revalidated node", "b", bi, "id", last.ID)
        b.bump(last)    //提到隊(duì)首
        return
    }

Peer/Server

相關(guān)文件

.
├── dial.go          //封裝一個(gè)任務(wù)生成處理結(jié)構(gòu)以及三種任務(wù)結(jié)構(gòu)中(此處命名不太精確)
├── message.go       //定義一些數(shù)據(jù)的讀寫接口,以及對(duì)外的Send/SendItem函數(shù)
├── peer.go          //封裝了Peer 包括消息讀取  
├── rlpx.go          //內(nèi)部的握手協(xié)議
├── server.go        //初始化,維護(hù)Peer網(wǎng)絡(luò),還有一些對(duì)外的接口

這一層會(huì)不斷的從路由中提取節(jié)點(diǎn),提取出來的節(jié)點(diǎn)要經(jīng)過身份驗(yàn)證,協(xié)議檢查之后加入到peer里面,緊接著如果沒有人使用這個(gè)peer,這個(gè)peer就會(huì)被刪除,再重新選擇一些節(jié)點(diǎn)出來繼續(xù)這個(gè)流程,peer再其中是隨生隨銷,這樣做是為了平均的使用所有的節(jié)點(diǎn),而不是僅僅依賴于特定的幾個(gè)節(jié)點(diǎn)。因而這里從Server開始入手分析整個(gè)流程

    Peers()                             //peer對(duì)象
    PeerCount()                         //peer數(shù)量
    AddPeer(node *discover.Node)        //添加節(jié)點(diǎn)
    RemovePeer(node *discover.Node)     //刪除節(jié)點(diǎn)
    SubscribeEvents(ch chan *PeerEvent) //訂閱內(nèi)部的事件(節(jié)點(diǎn)的增加,刪除)
    //以上四個(gè)屬于對(duì)外的接口,不影響內(nèi)部邏輯
    Start()                             //server開始工作
    SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node)  //啟動(dòng)一個(gè)連接,經(jīng)過兩次驗(yàn)證之后,如果通過則加入到peer之中。

Start初始化

Start做了三件事,生成路由表于建立底層網(wǎng)絡(luò)。生成DialState用于驅(qū)動(dòng)維護(hù)本地peer的更新與死亡,監(jiān)聽本地接口用于信息應(yīng)答。這里主要分析peer的維護(hù)過程。函數(shù)是run函數(shù)。

    func (srv *Server) Start() (err error) {
        
        //**************初始化代碼省略
        if !srv.NoDiscovery && srv.DiscoveryV5 {
            unhandled = make(chan discover.ReadPacket, 100)
            sconn = &sharedUDPConn{conn, unhandled}
        }

        // node table
        if !srv.NoDiscovery {
            //路由表生成
            cfg := discover.Config{
                PrivateKey:   srv.PrivateKey,
                AnnounceAddr: realaddr,
                NodeDBPath:   srv.NodeDatabase,
                NetRestrict:  srv.NetRestrict,
                Bootnodes:    srv.BootstrapNodes,
                Unhandled:    unhandled,
            }
            ntab, err := discover.ListenUDP(conn, cfg)
            if err != nil {
                return err
            }
            srv.ntab = ntab
        }

        if srv.DiscoveryV5 {
            //路由表生成
            var (
                ntab *discv5.Network
                err  error
            )
            if sconn != nil {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            } else {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            }
            if err != nil {
                return err
            }
            if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
                return err
            }
            srv.DiscV5 = ntab
        }

        dynPeers := srv.maxDialedConns()
        //newDialState 對(duì)象生成,這個(gè)對(duì)象包含Peer的實(shí)際維護(hù)代碼
        dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

        // handshake  協(xié)議加載
        srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
        for _, p := range srv.Protocols {
            srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
        }
        // listen/dial
        //監(jiān)聽本地端口
        if srv.ListenAddr != "" {
            if err := srv.startListening(); err != nil {
                return err
            }
        }
        if srv.NoDial && srv.ListenAddr == "" {
            srv.log.Warn("P2P server will be useless, neither dialing nor listening")
        }

        srv.loopWG.Add(1)
        //重要的一句,開個(gè)協(xié)程,在其中做peer的維護(hù)
        go srv.run(dialer)
        srv.running = true
        return nil
    }

run 開始peer的生成

該函數(shù)中定義了兩個(gè)隊(duì)列

    runningTasks []task //正在執(zhí)行的任務(wù)
    queuedTasks  []task //尚未執(zhí)行的任務(wù)

定義了三個(gè)匿名函數(shù)

    //從正在執(zhí)行任務(wù)中刪除任務(wù)
	delTask := func(t task) {
		for i := range runningTasks {
			if runningTasks[i] == t {
				runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
				break
			}
		}
	}
	//開始一批任務(wù)
	startTasks := func(ts []task) (rest []task) {
		i := 0
		for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
			t := ts[i]
			srv.log.Trace("New dial task", "task", t)
			go func() {
				 t.Do(srv); taskdone <- t  
			}()
			runningTasks = append(runningTasks, t)
		}
		return ts[i:]
	}
    //啟動(dòng)開始一批任務(wù)再調(diào)用dialstate的newTasks函數(shù)生成一批任務(wù),加載到任務(wù)隊(duì)列里面
	scheduleTasks := func() {
		// Start from queue first.
		queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
		// Query dialer for new tasks and start as many as possible now.
		if len(runningTasks) < maxActiveDialTasks {
			nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
			queuedTasks = append(queuedTasks, startTasks(nt)...)
		}
	}

定義了一個(gè)循環(huán),分不同的chanel執(zhí)行對(duì)應(yīng)的邏輯

	for {
        //調(diào)度開始找生成任務(wù)
		scheduleTasks()

		select {
		case <-srv.quit://退出
			break running
		case n := <-srv.addstatic: 
            //增加一個(gè)節(jié)點(diǎn)  該節(jié)點(diǎn)最終會(huì)生成一個(gè)dialTask 
            //并在newTasks的時(shí)候加入到讀列
			srv.log.Debug("Adding static node", "node", n)
			dialstate.addStatic(n)
		case n := <-srv.removestatic:
            //直接刪除該節(jié)點(diǎn) 節(jié)點(diǎn)不再參與維護(hù),很快就會(huì)死掉了
			dialstate.removeStatic(n)
			if p, ok := peers[n.ID]; ok {
				p.Disconnect(DiscRequested)
			}
		case op := <-srv.peerOp:
			//  Peers 和 PeerCount 兩個(gè)外部接口,只是讀取peer信息
			op(peers)
			srv.peerOpDone <- struct{}{}
		case t := <-taskdone:
		    //task完成后會(huì)根據(jù)不同的任務(wù)類型進(jìn)行相應(yīng)的處理
			srv.log.Trace("Dial task done", "task", t)
			dialstate.taskDone(t, time.Now())
			delTask(t)
		case c := <-srv.posthandshake:
			//身份驗(yàn)證通過 
			if trusted[c.id] {
				// Ensure that the trusted flag is set before checking against MaxPeers.
				c.flags |= trustedConn
			}
			select {
			case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
			case <-srv.quit:
				break running
			}
		case c := <-srv.addpeer:
			//身份協(xié)議驗(yàn)證通過 加入隊(duì)列
			err := srv.protoHandshakeChecks(peers, inboundCount, c)
			if err == nil {
				// The handshakes are done and it passed all checks.
				p := newPeer(c, srv.Protocols)
				// If message events are enabled, pass the peerFeed
				// to the peer
				if srv.EnableMsgEvents {
					p.events = &srv.peerFeed
				}
				name := truncateName(c.name)
				srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
				go srv.runPeer(p)   //觸發(fā)事件 此處是最上層截取peer的位置,如果此物沒有外部影響,那么這個(gè)peer很快就被銷毀了
				peerAdd++
				fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)
				
				peers[c.id] = p
				if p.Inbound() {
					inboundCount++
				}
			}
			// The dialer logic relies on the assumption that
			// dial tasks complete after the peer has been added or
			// discarded. Unblock the task last.
			select {
			case c.cont <- err:
			case <-srv.quit:
				break running
			}
		case pd := <-srv.delpeer:
			//移除peer
			d := common.PrettyDuration(mclock.Now() - pd.created)
			pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
			delete(peers, pd.ID())
			peerDel++
			fmt.Printf("--count %d--- add %d-- del %d--\n",len(peers),peerAdd,peerDel)
			if pd.Inbound() {
				inboundCount--
			}
		}
	}

記住上面的代碼,再來逐個(gè)的看:

scheduleTasks

scheduleTasks調(diào)度生成任務(wù),生成的任務(wù)中有一種dialTask的任務(wù),該任務(wù)結(jié)構(gòu)如下

    type dialTask struct {
        flags        connFlag
        dest         *discover.Node
        lastResolved time.Time
        resolveDelay time.Duration
    }

    func (t *dialTask) Do(srv *Server) {
        if t.dest.Incomplete() {
            if !t.resolve(srv) {
                return
            }
        }
        err := t.dial(srv, t.dest)  //此處會(huì)調(diào)用到setupConn函數(shù)
        if err != nil {
            log.Trace("Dial error", "task", t, "err", err)
            // Try resolving the ID of static nodes if dialing failed.
            if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
                if t.resolve(srv) {
                    t.dial(srv, t.dest)
                }
            }
        }
    }

dial最終回調(diào)用到setupConn函數(shù),函數(shù)只保留重點(diǎn)的幾句,篇幅有點(diǎn)長了

    func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {

        //身份驗(yàn)證碼 獲取設(shè)備,標(biāo)識(shí)等信息
        if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != 
        //此處會(huì)往chanel中添加連接對(duì)象,最終觸發(fā)循環(huán)中的posthandshake分支
        err = srv.checkpoint(c, srv.posthandshake)  
        //協(xié)議驗(yàn)證
        phs, err := c.doProtoHandshake(srv.ourHandshake)
        c.caps, c.name = phs.Caps, phs.Name
        //此處會(huì)往chanel中添加連接對(duì)象 最終觸發(fā)循環(huán)中的addpeer分支
        err = srv.checkpoint(c, srv.addpeer)
    }

posthandshake 分支僅僅做了驗(yàn)證,addpeer做的事情就比較多了,重要的就是執(zhí)行runPeer函數(shù)

    func (srv *Server) runPeer(p *Peer) {
        // 廣播 peer add
        srv.peerFeed.Send(&PeerEvent{
            Type: PeerEventTypeAdd,
            Peer: p.ID(),
        })

        // run the protocol
        remoteRequested, err := p.run() //

        // 廣播 peer drop
        srv.peerFeed.Send(&PeerEvent{
            Type:  PeerEventTypeDrop,
            Peer:  p.ID(),
            Error: err.Error(),
        })
        //移除peer
        srv.delpeer <- peerDrop{p, err, remoteRequested}
    }

    func (p *Peer) run() (remoteRequested bool, err error) {
        //*************
        writeStart <- struct{}{}
        p.startProtocols(writeStart, writeErr)
        //*************
        //這一句阻塞性確保了peer的存活
        p.wg.Wait()  
    }

    func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
        p.wg.Add(len(p.running))
        for _, proto := range p.running {
            proto := proto
            proto.closed = p.closed
            proto.wstart = writeStart
            proto.werr = writeErr
            var rw MsgReadWriter = proto
            if p.events != nil {
                rw = newMsgEventer(rw, p.events, p.ID(), proto.Name)
            }
            p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
            go func() {
                //其他的都是為這一句做準(zhǔn)備的,在以太坊中p2p就是靠這一句對(duì)上層暴露peer對(duì)象
                err := proto.Run(p, rw)
                if err == nil {
                    p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
                    err = errProtocolReturned
                } else if err != io.EOF {
                    p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
                }
                p.protoErr <- err
                p.wg.Done()  
            }()
        }
    }

這樣就可以可理出一條思路 scheduleTasks執(zhí)行生成dialTask任務(wù) dialTask任務(wù)執(zhí)行過程中逐個(gè)填充posthandshake,addPeer這兩個(gè)chanel。 addPeer執(zhí)行時(shí)對(duì)上層暴露了Peer對(duì)象,完成后填充了delpeer,最后刪除了Peer。

任務(wù)的生成

具體看代碼中的注釋

    func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
        if s.start.IsZero() {
            s.start = now
        }

        var newtasks []task
        //這里聲明了一個(gè)添加任務(wù)的函數(shù)  
        addDial := func(flag connFlag, n *discover.Node) bool {
            if err := s.checkDial(n, peers); err != nil {
                log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
                return false
            }
            s.dialing[n.ID] = flag  //排除掉已經(jīng)再測(cè)試的
            newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
            return true
        }

        // Compute number of dynamic dials necessary at this point.
        needDynDials := s.maxDynDials     //當(dāng)前系統(tǒng)中最大連接數(shù)目
        for _, p := range peers {        //扣除已建立鏈接的peer
            if p.rw.is(dynDialedConn) {
                needDynDials--
            }
        }
        for _, flag := range s.dialing {  //扣除已建立鏈接的peer
            if flag&dynDialedConn != 0 {
                needDynDials--
            }
        }

        //外部命令添加的節(jié)點(diǎn) 這種節(jié)點(diǎn)不占用needDynDials數(shù)目,
        //是為了保證手動(dòng)加的節(jié)點(diǎn)能夠起效
        for id, t := range s.static {
            err := s.checkDial(t.dest, peers)
            switch err {
            case errNotWhitelisted, errSelf:
                log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
                delete(s.static, t.dest.ID)
            case nil:
                s.dialing[id] = t.flags
                newtasks = append(newtasks, t)
            }
        }
        // If we don't have any peers whatsoever, try to dial a random bootnode. This
        // scenario is useful for the testnet (and private networks) where the discovery
        // table might be full of mostly bad peers, making it hard to find good ones.
        if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && 
        //檢查引導(dǎo)節(jié)點(diǎn)  因?yàn)橐龑?dǎo)節(jié)點(diǎn)比搜索到的節(jié)點(diǎn)更大概率靠譜 因而比較靠前
        now.Sub(s.start) > fallbackInterval {
            bootnode := s.bootnodes[0]
            s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
            s.bootnodes = append(s.bootnodes, bootnode)

            if addDial(dynDialedConn, bootnode) {
                needDynDials--
            }
        }
        //隨機(jī)的從路由中抽取最大節(jié)點(diǎn)的二分之一
        randomCandidates := needDynDials / 2
        if randomCandidates > 0 {
            n := s.ntab.ReadRandomNodes(s.randomNodes)
            for i := 0; i < randomCandidates && i < n; i++ {
                if addDial(dynDialedConn, s.randomNodes[i]) {
                    needDynDials--
                }
            }
        }
        // 從lookupbuf中抽取
        i := 0
        for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
            if addDial(dynDialedConn, s.lookupBuf[i]) {
                needDynDials--
            }
        }
        s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
        // 如果還是不夠,路由再去搜索節(jié)點(diǎn)
        if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
            s.lookupRunning = true
            newtasks = append(newtasks, &discoverTask{})
        }

        // wait
        if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
            t := &waitExpireTask{s.hist.min().exp.Sub(now)}
            newtasks = append(newtasks, t)
        }
        return newtasks
    }

消息發(fā)送

另一個(gè)是message中的Send,SendItem函數(shù) 實(shí)現(xiàn)了MsgWriter的對(duì)象都可以調(diào)用這個(gè)函數(shù)寫入,覺得這里沒什么必要,完全可以封裝到peer里面去,不過它上層做廣播的時(shí)候確實(shí)是調(diào)用的這兩個(gè)函數(shù)。

    func Send(w MsgWriter, msgcode uint64, data interface{}) error {
        size, r, err := rlp.EncodeToReader(data)
        if err != nil {
            return err
        }
        return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
    }

    func SendItems(w MsgWriter, msgcode uint64, elems ...interface{}) error {
        return Send(w, msgcode, elems)
    }

以太坊上層調(diào)用

Peer/PeerSet

文件:go-ethereum/eth/peer.go

定義了兩個(gè)struct,Peer和PeerSet。Peer封裝了底層的p2p.Peer,集成了一些和業(yè)務(wù)相關(guān)的方法,比如SendTransactions,SendNewBlock等。PeerSet是Peer的集合

    type peer struct {
        id string

        *p2p.Peer
        rw p2p.MsgReadWriter

        version  int         // Protocol version negotiated
        forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time

        head common.Hash
        td   *big.Int
        lock sync.RWMutex

        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
    }

    type peerSet struct {
        peers  map[string]*peer
        lock   sync.RWMutex
        closed bool
    }

Peer注冊(cè)/注銷

文件:go-ethereum/eth/handler.go manager.handle在檢查了peer后會(huì)把這個(gè)peer注冊(cè)到peerset中,表示此peer可用,發(fā)生錯(cuò)誤后peerset注銷該peer,返回錯(cuò)誤,最后再Server中銷毀。

	manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
	for i, version := range ProtocolVersions {
		// Skip protocol version if incompatible with the mode of operation
		if mode == downloader.FastSync && version < eth73 {
			continue
		}
		// Compatible; initialise the sub-protocol
		version := version // Closure for the run
		manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
			Name:    ProtocolName,
			Version: version,
			Length:  ProtocolLengths[i],
			Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
				peer := manager.newPeer(int(version), p, rw)
				select {
				case manager.newPeerCh <- peer:
					manager.wg.Add(1)
					defer manager.wg.Done()
                    //此處如果順利會(huì)進(jìn)入for循環(huán) 如果失敗返回錯(cuò)誤我會(huì)銷毀掉這個(gè)peer
					return manager.handle(peer)  
				case <-manager.quitSync:
					return p2p.DiscQuitting
				}
			},
			NodeInfo: func() interface{} {
				return manager.NodeInfo()
			},
			PeerInfo: func(id discover.NodeID) interface{} {
				if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
					return p.Info()
				}
				return nil
			},
		})
	}

看完上述內(nèi)容,你們對(duì)以太坊 p2p Server 原理及實(shí)現(xiàn)是怎樣的有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI