溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

GO怎么實(shí)現(xiàn)Redis的AOF持久化

發(fā)布時(shí)間:2023-03-27 10:58:06 來(lái)源:億速云 閱讀:95 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要介紹“GO怎么實(shí)現(xiàn)Redis的AOF持久化”的相關(guān)知識(shí),小編通過(guò)實(shí)際案例向大家展示操作過(guò)程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“GO怎么實(shí)現(xiàn)Redis的AOF持久化”文章能幫助大家解決問(wèn)題。

GO實(shí)現(xiàn)Redis的AOF持久化

將用戶發(fā)來(lái)的指令以RESP協(xié)議的形式存儲(chǔ)在本地的AOF文件,重啟Redis后執(zhí)行此文件恢復(fù)數(shù)據(jù)

本文涉及以下文件:redis.conf:配置文件

aof:實(shí)現(xiàn)aof

redis.conf

appendonly yes
appendfilename appendonly.aof

aof/aof.go

type CmdLine = [][]byte

const (
   aofQueueSize = 1 << 16
)

type payload struct {
   cmdLine CmdLine
   dbIndex int
}

type AofHandler struct {
   db          databaseface.Database
   aofChan     chan *payload
   aofFile     *os.File
   aofFilename string
   currentDB   int
}

func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {
   handler := &AofHandler{}
   handler.aofFilename = config.Properties.AppendFilename
   handler.db = db
   handler.LoadAof()
   aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
   if err != nil {
      return nil, err
   }
   handler.aofFile = aofFile
   handler.aofChan = make(chan *payload, aofQueueSize)
   go func() {
      handler.handleAof()
   }()
   return handler, nil
}

func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
   if config.Properties.AppendOnly && handler.aofChan != nil {
      handler.aofChan <- &payload{
         cmdLine: cmdLine,
         dbIndex: dbIndex,
      }
   }
}

func (handler *AofHandler) handleAof() {
   handler.currentDB = 0
   for p := range handler.aofChan {
      if p.dbIndex != handler.currentDB {
         // select db
         data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
         _, err := handler.aofFile.Write(data)
         if err != nil {
            logger.Warn(err)
            continue
         }
         handler.currentDB = p.dbIndex
      }
      data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
      _, err := handler.aofFile.Write(data)
      if err != nil {
         logger.Warn(err)
      }
   }
}

func (handler *AofHandler) LoadAof() {
   file, err := os.Open(handler.aofFilename)
   if err != nil {
      logger.Warn(err)
      return
   }
   defer file.Close()
   ch := parser.ParseStream(file)
   fakeConn := &connection.Connection{}
   for p := range ch {
      if p.Err != nil {
         if p.Err == io.EOF {
            break
         }
         logger.Error("parse error: " + p.Err.Error())
         continue
      }
      if p.Data == nil {
         logger.Error("empty payload")
         continue
      }
      r, ok := p.Data.(*reply.MultiBulkReply)
      if !ok {
         logger.Error("require multi bulk reply")
         continue
      }
      ret := handler.db.Exec(fakeConn, r.Args)
      if reply.IsErrorReply(ret) {
         logger.Error("exec err", err)
      }
   }
}
  • AofHandler:1.從管道中接收數(shù)據(jù) 2.寫(xiě)入AOF文件

  • AddAof:用戶的指令包裝成payload放入管道

  • handleAof:將管道中的payload寫(xiě)入磁盤(pán)

  • LoadAof:重啟Redis后加載aof文件

database/database.go

type Database struct {
   dbSet []*DB
   aofHandler *aof.AofHandler
}

func NewDatabase() *Database {
   mdb := &Database{}
   if config.Properties.Databases == 0 {
      config.Properties.Databases = 16
   }
   mdb.dbSet = make([]*DB, config.Properties.Databases)
   for i := range mdb.dbSet {
      singleDB := makeDB()
      singleDB.index = i
      mdb.dbSet[i] = singleDB
   }
   if config.Properties.AppendOnly {
      aofHandler, err := aof.NewAOFHandler(mdb)
      if err != nil {
         panic(err)
      }
      mdb.aofHandler = aofHandler
      for _, db := range mdb.dbSet {
         singleDB := db
         singleDB.addAof = func(line CmdLine) {
            mdb.aofHandler.AddAof(singleDB.index, line)
         }
      }
   }
   return mdb
}

將AOF加入到database里

使用singleDB的原因:因?yàn)樵谘h(huán)中獲取返回變量的地址都完全相同,因此當(dāng)我們想要訪問(wèn)數(shù)組中元素所在的地址時(shí),不應(yīng)該直接獲取 range 返回的變量地址 db,而應(yīng)該使用 singleDB := db

database/db.go

type DB struct {
   index int
   data   dict.Dict
   addAof func(CmdLine)
}

func makeDB() *DB {
	db := &DB{
		data:   dict.MakeSyncDict(),
		addAof: func(line CmdLine) {},
	}
	return db
}

由于分?jǐn)?shù)據(jù)庫(kù)db引用不到aof,所以添加一個(gè)addAof匿名函數(shù),在NewDatabase中用這個(gè)匿名函數(shù)調(diào)用AddAof

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {
   ......
   if deleted > 0 {
      db.addAof(utils.ToCmdLine2("del", args...))
   }
   return reply.MakeIntReply(int64(deleted))
}

func execFlushDB(db *DB, args [][]byte) resp.Reply {
	db.Flush()
	db.addAof(utils.ToCmdLine2("flushdb", args...))
	return &reply.OkReply{}
}

func execRename(db *DB, args [][]byte) resp.Reply {
	......
	db.addAof(utils.ToCmdLine2("rename", args...))
	return &reply.OkReply{}
}

func execRenameNx(db *DB, args [][]byte) resp.Reply {
	......
	db.addAof(utils.ToCmdLine2("renamenx", args...))
	return reply.MakeIntReply(1)
}

database/string.go

func execSet(db *DB, args [][]byte) resp.Reply {
   ......
   db.addAof(utils.ToCmdLine2("set", args...))
   return &reply.OkReply{}
}

func execSetNX(db *DB, args [][]byte) resp.Reply {
   ......
   db.addAof(utils.ToCmdLine2("setnx", args...))
   return reply.MakeIntReply(int64(result))
}

func execGetSet(db *DB, args [][]byte) resp.Reply {
   key := string(args[0])
   value := args[1]

   entity, exists := db.GetEntity(key)
   db.PutEntity(key, &database.DataEntity{Data: value})
   db.addAof(utils.ToCmdLine2("getset", args...))
   ......
}

添加addAof方法

測(cè)試命令

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n

關(guān)于“GO怎么實(shí)現(xiàn)Redis的AOF持久化”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI