溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang中tinyrpc框架怎么使用

發(fā)布時間:2023-01-16 09:42:45 來源:億速云 閱讀:103 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Golang中tinyrpc框架怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

    tinyrpc功能

    tinyrpc基于TCP協(xié)議,支持各種壓縮格式,基于protocol buffer的序列化協(xié)議。其rpc是基于golang原生的net/rpc開發(fā)而成。

    tinyrpc項目結(jié)構(gòu)

    tinyrpc基于net/rpc開發(fā)而成,在此基礎上集成了額外的能力。項目結(jié)構(gòu)如圖:

    Golang中tinyrpc框架怎么使用

    功能目錄如下:

    • codec 編碼模塊

    • compressor 壓縮模塊

    • header 請求/響應頭模塊

    • protoc-gen-tinyrpc 代碼生成插件

    • serializer 序列化模塊

    tinyrpc源碼解讀

    客戶端和服務端構(gòu)建

    客戶端是以net/rpcrpc.Client為基礎構(gòu)建,在此基礎上定義了Option以配置壓縮方式和序列化方式:

    type Option func(o *options)
    
    type options struct {
    	compressType compressor.CompressType
    	serializer   serializer.Serializer
    }

    在創(chuàng)建客戶端的時候?qū)⑴渲煤玫膲嚎s算法和序列化方式作為創(chuàng)建客戶端的參數(shù):

    func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
    	options := options{
    		compressType: compressor.Raw,
    		serializer:   serializer.Proto,
    	}
    	for _, option := range opts {
    		option(&options)
    	}
    	return &Client{rpc.NewClientWithCodec(
    		codec.NewClientCodec(conn, options.compressType, options.serializer))}
    }

    服務端是以net/rpcrpc.Server為基礎構(gòu)建,在此基礎上擴展了Server的定義:

    type Server struct {
    	*rpc.Server
    	serializer.Serializer
    }

    在創(chuàng)建客戶端和開啟服務時傳入序列化方式:

    func NewServer(opts ...Option) *Server {
    	options := options{
    		serializer: serializer.Proto,
    	}
    	for _, option := range opts {
    		option(&options)
    	}
    
    	return &Server{&rpc.Server{}, options.serializer}
    }
    
    func (s *Server) Serve(lis net.Listener) {
    	log.Printf("tinyrpc started on: %s", lis.Addr().String())
    	for {
    		conn, err := lis.Accept()
    		if err != nil {
    			continue
    		}
    		go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
    	}
    }

    壓縮算法compressor

    壓縮算法的實現(xiàn)中首先是定義了壓縮的接口:

    type Compressor interface {
    	Zip([]byte) ([]byte, error)
    	Unzip([]byte) ([]byte, error)
    }

    壓縮的接口包含壓縮和解壓方法。

    壓縮算法使用的是uint類型,使用iota來初始化,并且使用map來進行所有壓縮算法實現(xiàn)的管理:

    type CompressType uint16
    
    const (
    	Raw CompressType = iota
    	Gzip
    	Snappy
    	Zlib
    )
    
    // Compressors which supported by rpc
    var Compressors = map[CompressType]Compressor{
    	Raw:    RawCompressor{},
    	Gzip:   GzipCompressor{},
    	Snappy: SnappyCompressor{},
    	Zlib:   ZlibCompressor{},
    }

    序列化 serializer

    序列化部分代碼非常簡單,提供了一個接口:

    type Serializer interface {
    	Marshal(message interface{}) ([]byte, error)
    	Unmarshal(data []byte, message interface{}) error
    }

    目前只有ProtoSerializer一個實現(xiàn),ProtoSerializer內(nèi)部的實現(xiàn)是基于"google.golang.org/protobuf/proto"來實現(xiàn)的,并沒有什么特殊的處理,因此就不花費筆墨詳述了。

    請求/響應頭 header

    tinyrpc定義了自己的請求頭和響應頭:

    // RequestHeader request header structure looks like:
    // +--------------+----------------+----------+------------+----------+
    // | CompressType |      Method    |    ID    | RequestLen | Checksum |
    // +--------------+----------------+----------+------------+----------+
    // |    uint16    | uvarint+string |  uvarint |   uvarint  |  uint32  |
    // +--------------+----------------+----------+------------+----------+
    type RequestHeader struct {
    	sync.RWMutex
    	CompressType compressor.CompressType
    	Method       string
    	ID           uint64
    	RequestLen   uint32
    	Checksum     uint32
    }

    請求頭由壓縮類型,方法,id,請求長度和校驗碼組成。

    // ResponseHeader request header structure looks like:
    // +--------------+---------+----------------+-------------+----------+
    // | CompressType |    ID   |      Error     | ResponseLen | Checksum |
    // +--------------+---------+----------------+-------------+----------+
    // |    uint16    | uvarint | uvarint+string |    uvarint  |  uint32  |
    // +--------------+---------+----------------+-------------+----------+
    type ResponseHeader struct {
    	sync.RWMutex
    	CompressType compressor.CompressType
    	ID           uint64
    	Error        string
    	ResponseLen  uint32
    	Checksum     uint32
    }

    響應頭由壓縮類型,id,錯誤信息,返回長度和校驗碼組成。

    為了實現(xiàn)頭的重用,tinyrpc為頭構(gòu)建了緩存池:

    var (
    	RequestPool  sync.Pool
    	ResponsePool sync.Pool
    )
    
    func init() {
    	RequestPool = sync.Pool{New: func() interface{} {
    		return &RequestHeader{}
    	}}
    	ResponsePool = sync.Pool{New: func() interface{} {
    		return &ResponseHeader{}
    	}}
    }

    在使用時get出來,生命周期結(jié)束后放回池子,并且在put之前需要進行重置:

        h := header.RequestPool.Get().(*header.RequestHeader)
    	defer func() {
    		h.ResetHeader()
    		header.RequestPool.Put(h)
    	}()
    // ResetHeader reset request header
    func (r *RequestHeader) ResetHeader() {
    	r.Lock()
    	defer r.Unlock()
    	r.ID = 0
    	r.Checksum = 0
    	r.Method = ""
    	r.CompressType = 0
    	r.RequestLen = 0
    }
    
    // ResetHeader reset response header
    func (r *ResponseHeader) ResetHeader() {
    	r.Lock()
    	defer r.Unlock()
    	r.Error = ""
    	r.ID = 0
    	r.CompressType = 0
    	r.Checksum = 0
    	r.ResponseLen = 0
    }

    搞清楚了頭的結(jié)構(gòu)以及對象池的復用邏輯,那么具體的頭的編碼與解碼就是很簡單的拆裝工作,就不在此一行一行解析了,大家有興趣可以自行去閱讀。

    編碼 codec

    由于tinyrpc是基于net/rpc開發(fā),那么其codec模塊自然也是依賴于net/rpcClientCodecServerCodec接口來實現(xiàn)的。

    客戶端實現(xiàn)

    客戶端是基于ClientCodec實現(xiàn)的能力:

    type ClientCodec interface {
    	WriteRequest(*Request, any) error
    	ReadResponseHeader(*Response) error
    	ReadResponseBody(any) error
    
    	Close() error
    }

    client定義了一個clientCodec類型,并且實現(xiàn)了ClientCodec的接口方法:

    type clientCodec struct {
    	r io.Reader
    	w io.Writer
    	c io.Closer
    
    	compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
    	serializer serializer.Serializer
    	response   header.ResponseHeader // rpc response header
    	mutex      sync.Mutex            // protect pending map
    	pending    map[uint64]string
    }

    WriteRequest實現(xiàn):

    // WriteRequest Write the rpc request header and body to the io stream
    func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
    	c.mutex.Lock()
    	c.pending[r.Seq] = r.ServiceMethod
    	c.mutex.Unlock()
    
    	if _, ok := compressor.Compressors[c.compressor]; !ok {
    		return NotFoundCompressorError
    	}
    	reqBody, err := c.serializer.Marshal(param)
    	if err != nil {
    		return err
    	}
    	compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
    	if err != nil {
    		return err
    	}
    	h := header.RequestPool.Get().(*header.RequestHeader)
    	defer func() {
    		h.ResetHeader()
    		header.RequestPool.Put(h)
    	}()
    	h.ID = r.Seq
    	h.Method = r.ServiceMethod
    	h.RequestLen = uint32(len(compressedReqBody))
    	h.CompressType = compressor.CompressType(c.compressor)
    	h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
    
    	if err := sendFrame(c.w, h.Marshal()); err != nil {
    		return err
    	}
    	if err := write(c.w, compressedReqBody); err != nil {
    		return err
    	}
    
    	c.w.(*bufio.Writer).Flush()
    	return nil
    }

    可以看到代碼的實現(xiàn)還是比較清晰的,主要分為幾個步驟:

    • 將數(shù)據(jù)進行序列化構(gòu)成請求體

    • 選擇相應的壓縮算法進行壓縮

    • 從Pool中獲取請求頭實例將數(shù)據(jù)全部填入其中構(gòu)成最后的請求頭

    • 分別通過io操作發(fā)送處理過的請求頭和請求體

    ReadResponseHeader實現(xiàn):

    // ReadResponseHeader read the rpc response header from the io stream
    func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
    	c.response.ResetHeader()
    	data, err := recvFrame(c.r)
    	if err != nil {
    		return err
    	}
    	err = c.response.Unmarshal(data)
    	if err != nil {
    		return err
    	}
    	c.mutex.Lock()
    	r.Seq = c.response.ID
    	r.Error = c.response.Error
    	r.ServiceMethod = c.pending[r.Seq]
    	delete(c.pending, r.Seq)
    	c.mutex.Unlock()
    	return nil
    }

    此方法作用是讀取返回的響應頭,并解析成具體的結(jié)構(gòu)體

    ReadResponseBody實現(xiàn):

    func (c *clientCodec) ReadResponseBody(param interface{}) error {
    	if param == nil {
    		if c.response.ResponseLen != 0 {
    			if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
    				return err
    			}
    		}
    		return nil
    	}
    
    	respBody := make([]byte, c.response.ResponseLen)
    	err := read(c.r, respBody)
    	if err != nil {
    		return err
    	}
    
    	if c.response.Checksum != 0 {
    		if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
    			return UnexpectedChecksumError
    		}
    	}
    
    	if c.response.GetCompressType() != c.compressor {
    		return CompressorTypeMismatchError
    	}
    
    	resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
    	if err != nil {
    		return err
    	}
    
    	return c.serializer.Unmarshal(resp, param)
    }

    此方法是用于讀取返回的響應結(jié)構(gòu)體,流程如下:

    • 讀取流獲取響應體

    • 根據(jù)響應頭中的校驗碼來比對響應體是否完整

    • 根據(jù)壓縮算法來解壓具體的結(jié)構(gòu)體

    • 進行反序列化

    服務端實現(xiàn)

    服務端是基于ServerCodec實現(xiàn)的能力:

    type ServerCodec interface {
    	ReadRequestHeader(*Request) error
    	ReadRequestBody(any) error
    	WriteResponse(*Response, any) error
    
    	// Close can be called multiple times and must be idempotent.
    	Close() error
    }

    和客戶端類似,server定義了一個serverCodec類型,并且實現(xiàn)了ServerCodec的接口方法:

    type serverCodec struct {
    	r io.Reader
    	w io.Writer
    	c io.Closer
    
    	request    header.RequestHeader
    	serializer serializer.Serializer
    	mutex      sync.Mutex // protects seq, pending
    	seq        uint64
    	pending    map[uint64]*reqCtx
    }

    ReadRequestHeader實現(xiàn):

    // ReadRequestHeader read the rpc request header from the io stream
    func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
    	s.request.ResetHeader()
    	data, err := recvFrame(s.r)
    	if err != nil {
    		return err
    	}
    	err = s.request.Unmarshal(data)
    	if err != nil {
    		return err
    	}
    	s.mutex.Lock()
    	s.seq++
    	s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
    	r.ServiceMethod = s.request.Method
    	r.Seq = s.seq
    	s.mutex.Unlock()
    	return nil
    }

    此方法用于讀取請求頭并解析成結(jié)構(gòu)體

    ReadRequestBody實現(xiàn):

    // ReadRequestBody read the rpc request body from the io stream
    func (s *serverCodec) ReadRequestBody(param interface{}) error {
    	if param == nil {
    		if s.request.RequestLen != 0 {
    			if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
    				return err
    			}
    		}
    		return nil
    	}
    
    	reqBody := make([]byte, s.request.RequestLen)
    
    	err := read(s.r, reqBody)
    	if err != nil {
    		return err
    	}
    
    	if s.request.Checksum != 0 {
    		if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
    			return UnexpectedChecksumError
    		}
    	}
    
    	if _, ok := compressor.
    		Compressors[s.request.GetCompressType()]; !ok {
    		return NotFoundCompressorError
    	}
    
    	req, err := compressor.
    		Compressors[s.request.GetCompressType()].Unzip(reqBody)
    	if err != nil {
    		return err
    	}
    
    	return s.serializer.Unmarshal(req, param)
    }

    此方法用于讀取請求體,流程和讀取響應體差不多,大致如下:

    • 讀取流并解析成請求體

    • 根據(jù)請求頭中的校驗碼進行校驗

    • 根據(jù)壓縮算法進行解壓

    • 反序列化

    WriteResponse實現(xiàn):

    // WriteResponse Write the rpc response header and body to the io stream
    func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {
    	s.mutex.Lock()
    	reqCtx, ok := s.pending[r.Seq]
    	if !ok {
    		s.mutex.Unlock()
    		return InvalidSequenceError
    	}
    	delete(s.pending, r.Seq)
    	s.mutex.Unlock()
    
    	if r.Error != "" {
    		param = nil
    	}
    	if _, ok := compressor.
    		Compressors[reqCtx.compareType]; !ok {
    		return NotFoundCompressorError
    	}
    
    	var respBody []byte
    	var err error
    	if param != nil {
    		respBody, err = s.serializer.Marshal(param)
    		if err != nil {
    			return err
    		}
    	}
    
    	compressedRespBody, err := compressor.
    		Compressors[reqCtx.compareType].Zip(respBody)
    	if err != nil {
    		return err
    	}
    	h := header.ResponsePool.Get().(*header.ResponseHeader)
    	defer func() {
    		h.ResetHeader()
    		header.ResponsePool.Put(h)
    	}()
    	h.ID = reqCtx.requestID
    	h.Error = r.Error
    	h.ResponseLen = uint32(len(compressedRespBody))
    	h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
    	h.CompressType = reqCtx.compareType
    
    	if err = sendFrame(s.w, h.Marshal()); err != nil {
    		return err
    	}
    
    	if err = write(s.w, compressedRespBody); err != nil {
    		return err
    	}
    	s.w.(*bufio.Writer).Flush()
    	return nil
    }

    此方法用于寫入響應體,大致與寫入請求體差不多,流程如下:

    • 將響應體序列化

    • 使用壓縮算法將響應體進行壓縮

    • 使用Pool管理響應頭

    • 分別發(fā)送返回頭和返回體

    “Golang中tinyrpc框架怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI