溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

基于Golang如何實(shí)現(xiàn)Redis協(xié)議解析器

發(fā)布時(shí)間:2023-03-24 14:31:44 來源:億速云 閱讀:118 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了基于Golang如何實(shí)現(xiàn)Redis協(xié)議解析器的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇基于Golang如何實(shí)現(xiàn)Redis協(xié)議解析器文章都會(huì)有所收獲,下面我們一起來看看吧。

RESP協(xié)議

RESP是客戶端與服務(wù)端通信的協(xié)議,格式有五種:

正?;貜?fù):以“+”開頭,以“\r\n”結(jié)尾的字符串形式

錯(cuò)誤回復(fù):以“-”開頭,以“\r\n”結(jié)尾的字符串形式

整數(shù):以“:”開頭,以“\r\n”結(jié)尾的字符串形式

多行字符串:以“$”開頭,后跟實(shí)際發(fā)送字節(jié)數(shù),再以“\r\n”開頭和結(jié)尾

$3\r\nabc\r\n

數(shù)組:以“*”開頭,后跟成員個(gè)數(shù)

SET key value
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

客戶端和服務(wù)器發(fā)送的命令或數(shù)據(jù)一律以 \r\n (CRLF)作為換行符。

當(dāng)我們輸入*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n這樣一串命令,服務(wù)端接收到的是如下的命令:
*3\r\n
$3\r\n
SET\r\n
$3\r\n
key\r\n
$5\r\n
value\r\n

interface/resp/conn.go

type Connection interface {
   Write([]byte) error
   GetDBIndex() int
   SelectDB(int)
}

interface/resp/reply.go
type Reply interface {
	ToBytes() []byte
}
  • Connection接口:Redis客戶端的一個(gè)連接

  • Write:給客戶端回復(fù)消息

  • GetDBIndex:Redis有16個(gè)DB

  • Reply接口:響應(yīng)接口

resp/reply/consts.go

type PongReply struct{}

var pongBytes = []byte("+PONG\r\n")

func (r *PongReply) ToBytes() []byte {
    return pongBytes
}

var thePongReply = new(PongReply)

func MakePongReply() *PongReply {
    return thePongReply
}

type OkReply struct{}

var okBytes = []byte("+OK\r\n")

func (r *OkReply) ToBytes() []byte {
    return okBytes
}

var theOkReply = new(OkReply)

func MakeOkReply() *OkReply {
    return theOkReply
}

var nullBulkBytes = []byte("$-1\r\n")

type NullBulkReply struct{}

func (r *NullBulkReply) ToBytes() []byte {
    return nullBulkBytes
}

func MakeNullBulkReply() *NullBulkReply {
    return &NullBulkReply{}
}

var emptyMultiBulkBytes = []byte("*0\r\n")

type EmptyMultiBulkReply struct{}

func (r *EmptyMultiBulkReply) ToBytes() []byte {
    return emptyMultiBulkBytes
}

type NoReply struct{}

var noBytes = []byte("")

func (r *NoReply) ToBytes() []byte {
    return noBytes
}

定義五種回復(fù):回復(fù)pong,ok,null,空數(shù)組,空

resp/reply/reply.go

type ErrorReply interface {
   Error() string
   ToBytes() []byte
}

ErrorReply:定義錯(cuò)誤接口

resp/reply/errors.go

type UnknownErrReply struct{}

var unknownErrBytes = []byte("-Err unknown\r\n")

func (r *UnknownErrReply) ToBytes() []byte {
   return unknownErrBytes
}

func (r *UnknownErrReply) Error() string {
   return "Err unknown"
}

type ArgNumErrReply struct {
   Cmd string
}

func (r *ArgNumErrReply) ToBytes() []byte {
   return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}

func (r *ArgNumErrReply) Error() string {
   return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}

func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
   return &ArgNumErrReply{
      Cmd: cmd,
   }
}

type SyntaxErrReply struct{}

var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}

func MakeSyntaxErrReply() *SyntaxErrReply {
   return theSyntaxErrReply
}

func (r *SyntaxErrReply) ToBytes() []byte {
   return syntaxErrBytes
}

func (r *SyntaxErrReply) Error() string {
   return "Err syntax error"
}

type WrongTypeErrReply struct{}

var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")

func (r *WrongTypeErrReply) ToBytes() []byte {
   return wrongTypeErrBytes
}

func (r *WrongTypeErrReply) Error() string {
   return "WRONGTYPE Operation against a key holding the wrong kind of value"
}

type ProtocolErrReply struct {
   Msg string
}

func (r *ProtocolErrReply) ToBytes() []byte {
   return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}

func (r *ProtocolErrReply) Error() string {
   return "ERR Protocol error: '" + r.Msg
}

errors定義5種錯(cuò)誤:UnknownErrReply 未知錯(cuò)誤,ArgNumErrReply 參數(shù)個(gè)數(shù)錯(cuò)誤,SyntaxErrReply 語法錯(cuò)誤,WrongTypeErrReply 數(shù)據(jù)類型錯(cuò)誤,ProtocolErrReply 協(xié)議錯(cuò)誤

resp/reply/reply.go

var (
   nullBulkReplyBytes = []byte("$-1")
   // 協(xié)議的結(jié)尾
   CRLF = "\r\n"
)

type BulkReply struct {
   Arg []byte
}

func MakeBulkReply(arg []byte) *BulkReply {
   return &BulkReply{
      Arg: arg,
   }
}

func (r *BulkReply) ToBytes() []byte {
   if len(r.Arg) == 0 {
      return nullBulkReplyBytes
   }
   return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}

type MultiBulkReply struct {
   Args [][]byte
}

func (r *MultiBulkReply) ToBytes() []byte {
   argLen := len(r.Args)
   var buf bytes.Buffer
   buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
   for _, arg := range r.Args {
      if arg == nil {
         buf.WriteString("$-1" + CRLF)
      } else {
         buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
      }
   }
   return buf.Bytes()
}

func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
   return &MultiBulkReply{
      Args: args,
   }
}

type StatusReply struct {
   Status string
}

func MakeStatusReply(status string) *StatusReply {
   return &StatusReply{
      Status: status,
   }
}

func (r *StatusReply) ToBytes() []byte {
   return []byte("+" + r.Status + CRLF)
}

type IntReply struct {
   Code int64
}

func MakeIntReply(code int64) *IntReply {
   return &IntReply{
      Code: code,
   }
}

func (r *IntReply) ToBytes() []byte {
   return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}

type StandardErrReply struct {
   Status string
}

func (r *StandardErrReply) ToBytes() []byte {
   return []byte("-" + r.Status + CRLF)
}

func (r *StandardErrReply) Error() string {
   return r.Status
}

func MakeErrReply(status string) *StandardErrReply {
   return &StandardErrReply{
      Status: status,
   }
}

func IsErrorReply(reply resp.Reply) bool {
   return reply.ToBytes()[0] == '-'
}
  • BulkReply:回復(fù)一個(gè)字符串

  • MultiBulkReply:回復(fù)字符串?dāng)?shù)組

  • StatusReply:狀態(tài)回復(fù)

  • IntReply:數(shù)字回復(fù)

  • StandardErrReply:標(biāo)準(zhǔn)錯(cuò)誤回復(fù)

  • IsErrorReply:判斷是否為錯(cuò)誤回復(fù)

  • ToBytes:將字符串轉(zhuǎn)成RESP協(xié)議規(guī)定的格式

resp/parser/parser.go

type Payload struct {
   Data resp.Reply
   Err  error
}

type readState struct {
   readingMultiLine  bool     
   expectedArgsCount int     
   msgType           byte    
   args              [][]byte 
   bulkLen           int64    
}

func (s *readState) finished() bool {
   return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}

func ParseStream(reader io.Reader) <-chan *Payload {
   ch := make(chan *Payload)
   go parse0(reader, ch)
   return ch
}

func parse0(reader io.Reader, ch chan<- *Payload) {
	 ......
}

Payload結(jié)構(gòu)體:客服端給我們發(fā)的數(shù)據(jù)

Reply:客戶端與服務(wù)端互相發(fā)的數(shù)據(jù)都稱為Reply

readState結(jié)構(gòu)體:

  • readingMultiLine:解析單行還是多行數(shù)據(jù)

  • expectedArgsCount:應(yīng)該讀取的參數(shù)個(gè)數(shù)

  • msgType:消息類型

  • args:消息內(nèi)容

  • bulkLen:數(shù)據(jù)長(zhǎng)度

finished方法:判斷解析是否完成

ParseStream方法:異步解析數(shù)據(jù)后放入管道,返回管道數(shù)據(jù)

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
   var msg []byte
   var err error
   if state.bulkLen == 0 {
      msg, err = bufReader.ReadBytes('\n')
      if err != nil {
         return nil, true, err
      }
      if len(msg) == 0 || msg[len(msg)-2] != '\r' {
         return nil, false, errors.New("protocol error: " + string(msg))
      }
   } else {
      msg = make([]byte, state.bulkLen+2)
      _, err = io.ReadFull(bufReader, msg)
      if err != nil {
         return nil, true, err
      }
      if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
         return nil, false, errors.New("protocol error: " + string(msg))
      }
      state.bulkLen = 0
   }
   return msg, false, nil
}

readLine:一行一行的讀取。讀正常的行,以\n分隔。讀正文中包含\r\n字符的行時(shí),state.bulkLen加上換行符\r\n(state.bulkLen+2)

func parseMultiBulkHeader(msg []byte, state *readState) error {
   var err error
   var expectedLine uint64
   expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
   if err != nil {
      return errors.New("protocol error: " + string(msg))
   }
   if expectedLine == 0 {
      state.expectedArgsCount = 0
      return nil
   } else if expectedLine > 0 {
      state.msgType = msg[0]
      state.readingMultiLine = true
      state.expectedArgsCount = int(expectedLine)
      state.args = make([][]byte, 0, expectedLine)
      return nil
   } else {
      return errors.New("protocol error: " + string(msg))
   }
}

func parseBulkHeader(msg []byte, state *readState) error {
   var err error
   state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
   if err != nil {
      return errors.New("protocol error: " + string(msg))
   }
   if state.bulkLen == -1 { // null bulk
      return nil
   } else if state.bulkLen > 0 {
      state.msgType = msg[0]
      state.readingMultiLine = true
      state.expectedArgsCount = 1
      state.args = make([][]byte, 0, 1)
      return nil
   } else {
      return errors.New("protocol error: " + string(msg))
   }
}

parseMultiBulkHeader:解析數(shù)組的頭部,設(shè)置期望的行數(shù)和相關(guān)參數(shù)。

parseBulkHeader:解析多行字符串的頭部。

func parseSingleLineReply(msg []byte) (resp.Reply, error) {
   str := strings.TrimSuffix(string(msg), "\r\n")
   var result resp.Reply
   switch msg[0] {
   case '+': // status reply
      result = reply.MakeStatusReply(str[1:])
   case '-': // err reply
      result = reply.MakeErrReply(str[1:])
   case ':': // int reply
      val, err := strconv.ParseInt(str[1:], 10, 64)
      if err != nil {
         return nil, errors.New("protocol error: " + string(msg))
      }
      result = reply.MakeIntReply(val)
   }
   return result, nil
}

func readBody(msg []byte, state *readState) error {
   line := msg[0 : len(msg)-2]
   var err error
   if line[0] == '$' {
      // bulk reply
      state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
      if err != nil {
         return errors.New("protocol error: " + string(msg))
      }
      if state.bulkLen <= 0 { // null bulk in multi bulks
         state.args = append(state.args, []byte{})
         state.bulkLen = 0
      }
   } else {
      state.args = append(state.args, line)
   }
   return nil
}

parseSingleLineReply:解析單行命令

readBody:讀取多行的命令,如果是$開頭,設(shè)置bulkLen,讀取下一行時(shí)根據(jù)這個(gè)+2,不是$開頭則直接添加到args

func parse0(reader io.Reader, ch chan<- *Payload) {
    defer func() {
       if err := recover(); err != nil {
          logger.Error(string(debug.Stack()))
      }
   }()
    bufReader := bufio.NewReader(reader)
    var state readState
    var err error
    var msg []byte
    for {
       var ioErr bool
       msg, ioErr, err = readLine(bufReader, &state)
       if err != nil {
          if ioErr {
             ch <- &Payload{
                Err: err,
            }
             close(ch)
             return
         }
          ch <- &Payload{
             Err: err,
         }
          state = readState{}
          continue
      }

       if !state.readingMultiLine {
          if msg[0] == '*' {
             // multi bulk reply
             err = parseMultiBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
               }
                state = readState{}
                continue
            }
             if state.expectedArgsCount == 0 {
                ch <- &Payload{
                   Data: &reply.EmptyMultiBulkReply{},
               }
                state = readState{}
                continue
            }
         } else if msg[0] == '$' { // bulk reply
             err = parseBulkHeader(msg, &state)
             if err != nil {
                ch <- &Payload{
                   Err: errors.New("protocol error: " + string(msg)),
               }
                state = readState{} // reset state
                continue
            }
             if state.bulkLen == -1 { // null bulk reply
                ch <- &Payload{
                   Data: &reply.NullBulkReply{},
               }
                state = readState{} // reset state
                continue
            }
         } else {
             // single line reply
             result, err := parseSingleLineReply(msg)
             ch <- &Payload{
                Data: result,
                Err:  err,
            }
             state = readState{} // reset state
             continue
         }
      } else {
          // read bulk reply
          err = readBody(msg, &state)
          if err != nil {
             ch <- &Payload{
                Err: errors.New("protocol error: " + string(msg)),
            }
             state = readState{} // reset state
             continue
         }
          // if sending finished
          if state.finished() {
             var result resp.Reply
             if state.msgType == '*' {
                result = reply.MakeMultiBulkReply(state.args)
            } else if state.msgType == '$' {
                result = reply.MakeBulkReply(state.args[0])
            }
             ch <- &Payload{
                Data: result,
                Err:  err,
            }
             state = readState{}
         }
      }
   }
}

parse0:解析指令,解析完成后通過channel發(fā)出去

resp/connection/conn.go

type Connection struct {
    conn net.Conn
    waitingReply wait.Wait
    mu sync.Mutex // 避免多個(gè)協(xié)程往客戶端中寫
    selectedDB int
}

func NewConn(conn net.Conn) *Connection {
    return &Connection{
        conn: conn,
    }
}

func (c *Connection) RemoteAddr() net.Addr {
    return c.conn.RemoteAddr()
}

func (c *Connection) Close() error {
    c.waitingReply.WaitWithTimeout(10 * time.Second)
    _ = c.conn.Close()
    return nil
}

func (c *Connection) Write(b []byte) error {
    if len(b) == 0 {
        return nil
    }
    c.mu.Lock()
    c.waitingReply.Add(1)
    defer func() {
        c.waitingReply.Done()
        c.mu.Unlock()
    }()

    _, err := c.conn.Write(b)
    return err
}

func (c *Connection) GetDBIndex() int {
    return c.selectedDB
}

func (c *Connection) SelectDB(dbNum int) {
    c.selectedDB = dbNum
}

之前寫的EchoHandler是用戶傳過來什么,我們傳回去什么?,F(xiàn)在要寫一個(gè)RespHandler來代替EchoHandler,讓解析器來解析。RespHandler中要有一個(gè)管理客戶端連接的結(jié)構(gòu)體Connection。

Connection:客戶端連接,在協(xié)議層的handler中會(huì)用到

resp/handler/handler.go

var (
   unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)

type RespHandler struct {
   activeConn sync.Map
   db         databaseface.Database
   closing    atomic.Boolean
}

func MakeHandler() *RespHandler {
   var db databaseface.Database
   db = database.NewEchoDatabase()
   return &RespHandler{
      db: db,
   }
}

func (h *RespHandler) closeClient(client *connection.Connection) {
   _ = client.Close()
   h.db.AfterClientClose(client)
   h.activeConn.Delete(client)
}

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
   if h.closing.Get() {
      // closing handler refuse new connection
      _ = conn.Close()
   }

   client := connection.NewConn(conn)
   h.activeConn.Store(client, 1)

   ch := parser.ParseStream(conn)
   for payload := range ch {
      if payload.Err != nil {
         if payload.Err == io.EOF ||
            payload.Err == io.ErrUnexpectedEOF ||
            strings.Contains(payload.Err.Error(), "use of closed network connection") {
            // connection closed
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         // protocol err
         errReply := reply.MakeErrReply(payload.Err.Error())
         err := client.Write(errReply.ToBytes())
         if err != nil {
            h.closeClient(client)
            logger.Info("connection closed: " + client.RemoteAddr().String())
            return
         }
         continue
      }
      if payload.Data == nil {
         logger.Error("empty payload")
         continue
      }
      r, ok := payload.Data.(*reply.MultiBulkReply)
      if !ok {
         logger.Error("require multi bulk reply")
         continue
      }
      result := h.db.Exec(client, r.Args)
      if result != nil {
         _ = client.Write(result.ToBytes())
      } else {
         _ = client.Write(unknownErrReplyBytes)
      }
   }
}

func (h *RespHandler) Close() error {
   logger.Info("handler shutting down...")
   h.closing.Set(true)
   // TODO: concurrent wait
   h.activeConn.Range(func(key interface{}, val interface{}) bool {
      client := key.(*connection.Connection)
      _ = client.Close()
      return true
   })
   h.db.Close()
   return nil
}

RespHandler:和之前的echo類似,加了核心層的db.exec執(zhí)行解析的指令

interface/database/database.go

type CmdLine = [][]byte

type Database interface {
	Exec(client resp.Connection, args [][]byte) resp.Reply
	AfterClientClose(c resp.Connection)
	Close()
}

type DataEntity struct {
	Data interface{}
}

Exec:核心層的執(zhí)行

AfterClientClose:關(guān)閉之后的善后方法

CmdLine:二維字節(jié)數(shù)組的指令別名

DataEntity:表示Redis的數(shù)據(jù),包括string, list, set等等

database/echo_database.go

type EchoDatabase struct {
}

func NewEchoDatabase() *EchoDatabase {
   return &EchoDatabase{}
}

func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
   return reply.MakeMultiBulkReply(args)
}

func (e EchoDatabase) AfterClientClose(c resp.Connection) {
   logger.Info("EchoDatabase AfterClientClose")
}

func (e EchoDatabase) Close() {
   logger.Info("EchoDatabase Close")
}

echo_database:測(cè)試協(xié)議層

Exec:指令解析后,再使用MakeMultiBulkReply包裝一下返回去

main.go

err := tcp.ListenAndServeWithSignal(
   &tcp.Config{
      Address: fmt.Sprintf("%s:%d",
         config.Properties.Bind,
         config.Properties.Port),
   },
   handler.MakeHandler())
if err != nil {
   logger.Error(err)
}

main改成剛才寫的:handler.MakeHandler()

關(guān)于“基于Golang如何實(shí)現(xiàn)Redis協(xié)議解析器”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“基于Golang如何實(shí)現(xiàn)Redis協(xié)議解析器”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI