溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

發(fā)布時(shí)間:2021-12-16 18:15:36 來源:億速云 閱讀:128 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn),小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

一、前言

共享內(nèi)存廣泛用于Redis,Kafka,RabbitMQ 等高性能組件中,本文主要提供一個(gè)共享內(nèi)存在廣告埋點(diǎn)數(shù)據(jù)采集的實(shí)戰(zhàn)場景。

二、共享內(nèi)存原理

1、原理

在Linux中,每個(gè)進(jìn)程都有屬于自己的進(jìn)程控制塊(PCB)和地址空間(Addr Space),并且都有一個(gè)與之對應(yīng)的頁表,負(fù)責(zé)將進(jìn)程的虛擬地址與物理地址進(jìn)行映射,通過內(nèi)存管理單元(MMU)進(jìn)行管理。兩個(gè)不同的虛擬地址通過頁表映射到物理空間的同一區(qū)域,它們所指向的這塊區(qū)域即共享內(nèi)存。

當(dāng)兩個(gè)進(jìn)程通過頁表將虛擬地址映射到物理地址時(shí),在物理地址中有一塊共同的內(nèi)存區(qū),即共享內(nèi)存,這塊內(nèi)存可以被兩個(gè)進(jìn)程同時(shí)看到。這樣當(dāng)一個(gè)進(jìn)程進(jìn)行寫操作,另一個(gè)進(jìn)程讀操作就可以實(shí)現(xiàn)進(jìn)程間通信。但是,我們要確保一個(gè)進(jìn)程在寫的時(shí)候不能被讀,因此我們使用信號(hào)量來實(shí)現(xiàn)同步與互斥。

對于一個(gè)共享內(nèi)存,實(shí)現(xiàn)采用的是引用計(jì)數(shù)的原理,當(dāng)進(jìn)程脫離共享存儲(chǔ)區(qū)后,計(jì)數(shù)器減一,掛架成功時(shí),計(jì)數(shù)器加一,只有當(dāng)計(jì)數(shù)器變?yōu)榱銜r(shí),才能被刪除。當(dāng)進(jìn)程終止時(shí),它所附加的共享存儲(chǔ)區(qū)都會(huì)自動(dòng)脫離。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

2、與傳統(tǒng)文件對比

共享內(nèi)存可以說是最有用的進(jìn)程間通信方式,也是最快的IPC形式, 因?yàn)檫M(jìn)程可以直接讀寫內(nèi)存,而不需要任何 數(shù)據(jù)的拷貝。對于像管道和消息隊(duì)列等通信方式,則需要在內(nèi)核和用戶空間進(jìn)行四次的數(shù)據(jù)拷貝 共享內(nèi)存則只拷貝兩次數(shù)據(jù): 一次從輸入文件到共享內(nèi)存區(qū),另一次從共享內(nèi)存區(qū)到輸出文件。

實(shí)際上,進(jìn)程之間在共享內(nèi) 存時(shí),并不總是讀寫少量數(shù)據(jù)后就解除映射,有新的通信時(shí),再重新建立共享內(nèi)存區(qū)域。而是保持共享區(qū)域,直 到通信完畢為止,這樣,數(shù)據(jù)內(nèi)容一直保存在共享內(nèi)存中,并沒有寫回文件。共享內(nèi)存中的內(nèi)容往往是在解除映 射時(shí)才寫回文件的。因此,采用共享內(nèi)存的通信方式效率是非常高的。

傳統(tǒng)文件

UNIX 訪問文件的傳統(tǒng)方法是用 open 打開它們,如果有多個(gè)進(jìn)程訪問同一個(gè)文件,則每一個(gè)進(jìn)程在自己的地址空間都包含有該文件的副本,這不必要地浪費(fèi)了存儲(chǔ)空間。

下圖說明了兩個(gè)進(jìn)程同時(shí)讀一個(gè)文件的同一頁的情形。系統(tǒng)要將該頁從磁盤讀到高速緩沖區(qū)中,每個(gè)進(jìn)程再執(zhí)行一個(gè)存儲(chǔ)器內(nèi)的復(fù)制操作將數(shù)據(jù)從高速緩沖區(qū)讀到自己的地址空間。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

共享存儲(chǔ)映射

現(xiàn)在考慮另一種處理方法:進(jìn)程 A 和進(jìn)程 B 都將該頁映射到自己的地址空間,當(dāng)進(jìn)程 A 第一次訪問該頁中的數(shù)據(jù)時(shí), 它生成一個(gè)缺頁中斷。內(nèi)核此時(shí)讀入這一頁到內(nèi)存并更新頁表使之指向它。以后,當(dāng)進(jìn)程B訪問同一頁面而出現(xiàn)缺頁中斷時(shí),該頁已經(jīng)在內(nèi)存,內(nèi)核只需要將進(jìn)程 B 的頁表登記項(xiàng)指向次頁即可。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

3、mmap()

(1)mmap()系統(tǒng)調(diào)用

mmap()系統(tǒng)調(diào)用使得進(jìn)程之間通過映射同一個(gè)普通文件實(shí)現(xiàn)共享內(nèi)存。普通文件被映射到進(jìn)程地址空間后,進(jìn)程可以向訪問普通內(nèi)存一樣對文件進(jìn)行訪問,不必再調(diào)用read(),write()等操作。

mmap()系統(tǒng)調(diào)用形式如下:

void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )

mmap的作用是映射文件描述符fd指定文件的 [off,off + len]區(qū)域至調(diào)用進(jìn)程的[addr, addr + len]的內(nèi)存區(qū)域:

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

  • 數(shù)fd為即將映射到進(jìn)程空間的文件描述字,一般由open()返回,同時(shí),fd可以指定為-1,此時(shí)須指定flags參數(shù)中的,MAP_ANON,表明進(jìn)行的是匿名映射(不涉及具體的文件名,避免了文件的創(chuàng)建及打開,很顯然只能用于具有親緣關(guān)系的進(jìn)程間通信)。

  • len是映射到調(diào)用進(jìn)程地址空間的字節(jié)數(shù),它從被映射文件開頭offset個(gè)字節(jié)開始算起。

  • prot 參數(shù)指定共享內(nèi)存的訪問權(quán)限??扇∪缦聨讉€(gè)值的或:PROT_READ(可讀) , PROT_WRITE (可寫), PROT_EXEC (可執(zhí)行), PROT_NONE(不可訪問)。

  • flags由以下幾個(gè)常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必選其一,而MAP_FIXED則不推薦使用。

  • offset參數(shù)一般設(shè)為0,表示從文件頭開始映射。

  • 參數(shù)addr指定文件應(yīng)被映射到進(jìn)程空間的起始地址,一般被指定一個(gè)空指針,此時(shí)選擇起始地址的任務(wù)留給內(nèi)核來完成。函數(shù)的返回值為最后文件映射到進(jìn)程空間的地址,進(jìn)程可直接操作起始地址為該值的有效地址。

(2)mmap()返回地址的訪問

對mmap()返回地址的訪問,linux采用的是頁式管理機(jī)制。

對于用mmap()映射普通文件來說,進(jìn)程會(huì)在自己的地址空間新增一塊空間,空間大小由mmap()的len參數(shù)指定,注意,進(jìn)程并不一定能夠?qū)θ啃略隹臻g都能進(jìn)行有效訪問。

進(jìn)程能夠訪問的有效地址大小取決于文件被映射部分的大小。

簡單的說,能夠容納文件被映射部分大小的最少頁面?zhèn)€數(shù)決定了進(jìn)程從mmap()返回的地址開始,能夠有效訪問的地址空間大小。

超過這個(gè)空間大小,內(nèi)核會(huì)根據(jù)超過的嚴(yán)重程度返回發(fā)送不同的信號(hào)給進(jìn)程??捎萌缦聢D示說明:

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

2、分區(qū)讀寫

 為了要確保一個(gè)進(jìn)程在寫的時(shí)候不能被讀,我們使用idx來標(biāo)記可讀塊。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

3、規(guī)則,指標(biāo)和值

下圖描述的是從連續(xù)內(nèi)存空間轉(zhuǎn)化成【規(guī)則,維度,值】語義的過程:

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

4、源碼分析

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn)

5、general.proto 

通用監(jiān)控上報(bào)協(xié)議:

general.proto
 
syntax = "proto2";
package general;
message Data {
    map<string, string> kv = 1;
}
message GeneralData {
    optional string rule_id = 1;
    repeated Data data = 2;
    optional int64 count = 3;
    optional int64 left_size = 4;
    optional int32 version = 5;
}

6、constant.go 配置參數(shù)

| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預(yù)留長度 | magincNum2(4byte) | 4k protect |

 

package moni_shm
 
const (
   OssShmId           uint32 = 0x3eeff00
   MagicNum1          uint32 = 0x650a218
   MagicNum2          uint32 = 0x138a4f2
   CreateShmLock             = "/var/run/.oss_shm_lock"
   OssMapOneAttrCnt          = 1024 * 128      //1024 個(gè)規(guī)則
   OssOneAttrEntryCnt        = 128             //每個(gè)規(guī)則有128個(gè)指標(biāo)
   EntrySz                   = 4
   OssMapCnt                 = 2
 
   OneAttrSz = OssOneAttrEntryCnt * EntrySz
   OssMapSz  = OssMapOneAttrCnt * OneAttrSz
   OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4
 
   defaultIntervalSec = 60
   defaultTopic       = "moni_general_shared_memory"
)

7、util.go 工具類

內(nèi)存清零工具和"整頁"分配:

cd package moni_shm
import (
    "unsafe"
)
//取整分配
func align(actual, to uint64) uint64 {
    return (actual + to - 1) / to * to
}
//連續(xù)空間清0
func zero(ptr uintptr, bts uint64) {
    if 0 == bts {
        return
    }
    const sz = 4096
    var next uint64
    cnt := 0
    for ; next+sz <= bts; {  //按頁清零
        arr := (*[sz]byte)(unsafe.Pointer(ptr))
        for i := range *arr {
            (*arr)[i] = 0
        }
        next += sz
        ptr += uintptr(sz)
        cnt++
    }
    if next == bts {
        return
    }
    var i uintptr
    for i = 0; i < uintptr(bts-next); i++ { //剩余空間清零
        *(*byte)(unsafe.Pointer(ptr + i)) = 0
    }
}

8、mgr.go 采集邏輯

共享內(nèi)存采集邏輯對應(yīng) “規(guī)則指標(biāo)和值”:

var (
    _basePtr     uintptr = 0
    _shmUtil             = NewShmUtil(OssShmId, OssAttrSz)
    _intervalSec         = defaultIntervalSec
    _topic               = defaultTopic
    _on          bool    = false
)
func Stat(on bool) {
    _on = on
}
func Start() {
    go collect() //開始采集
}
func tryInitBaseptr() error {
    var err error
    if _basePtr == 0 {
        _basePtr, err = _shmUtil.GetData() //獲取當(dāng)前共享內(nèi)存數(shù)據(jù)塊首地址
        if nil != err {
            logrus.Warnf("init base ptr failed, retrying: %v", err)
        }
    }
    return err
}
func collect() {
    var (
        cost  time.Duration
        start time.Time
        first = true
    )
    for {
        if !first {
            time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期對齊
        }
        first = false
        start = time.Now()
        if !_on {
            cost = time.Since(start)
            continue
        }
        if _basePtr == 0 {
            if err := tryInitBaseptr(); nil != err {
                cost = time.Since(start)
                continue
            }
        }
        d := collectOnce()
        for _, v := range d {
            moni_report.ProductReportData(*v)
        }
        cost = time.Since(start)
    }
}
 
func collectOnce() []*moni_report.ReportData {
   now := time.Now()
   var ret []*moni_report.ReportData
   data := make(map[uint32]*general.GeneralData)
 
   d := SwitchAndFetch(_basePtr)
   logrus.Infof("sending %d data from shm", len(d))
 
   for _, v := range d {
      ruleId := strconv.FormatUint(uint64(v[0]), 10)
      dim := strconv.FormatUint(uint64(v[1]), 10)
      value := strconv.FormatUint(uint64(v[2]), 10)
 
      if _, ok := data[v[0]]; !ok {
         data[v[0]] = &general.GeneralData{
            RuleId: proto.String(ruleId),
            Data:   []*general.Data{},
         }
      }
 
      data[v[0]].Data = append(data[v[0]].Data, &general.Data{
         Kv: map[string]string{
            dim:         value,
            "timestamp": strconv.FormatInt(now.Unix()*1000, 10),
            "ip":        viper.GetString("host.inner_ip"),
         },
      })
   }
   logrus.Infof("collect format shm data:%v", data)
   for _, v := range data {
      bts, err := proto.Marshal(v)
      if nil != err {
         logrus.Errorf("marshal shm data failed: %v", err)
         continue
      }
      ret = append(ret, &moni_report.ReportData{
         DataBytes: bts,
         Topic:     _topic,
      })
   }
 
   return ret
}

9、shmutil.go 共享內(nèi)存操作

每60秒根據(jù)idx值切換可讀區(qū),采集后上報(bào)后,清零,切換到下一區(qū)。

package moni_shm
import (
    "fmt"
    "log"
    "os"
    "syscall"
    "unsafe"
    "github.com/sirupsen/logrus"
)
const (
    IpcCreate = 00001000
)
var (
    ErrNotCreated   = fmt.Errorf("shm not created")
    ErrCreateFailed = fmt.Errorf("shm create failed")
)
type shmOpt func(*ShmUtil)
func WithCreate(b bool) shmOpt {
    return func(u *ShmUtil) {
        u.create = b
    }
}
/*共享內(nèi)存數(shù)據(jù)結(jié)構(gòu)
     |1page mprotect|page align data|1page mprotect|
     | 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預(yù)留長度 | magincNum2(4byte) | 4k protect |
*/
type ShmUtil struct {
    pageSz int
    dataSz uint64
    total  uint64
    shmKey uint32
    create bool
    base uintptr
    data uintptr
}
func NewShmUtil(key uint32, sz uint64, cfgs ...shmOpt) *ShmUtil {
    if key == 0 {
        panic("invalid shm key: 0")
    }
    ret := &ShmUtil{
        dataSz: sz,
        shmKey: key,
    }
    ret.pageSz = os.Getpagesize() //獲取頁大小
    ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按頁分配“包體”大小
    ret.total = ret.dataSz + uint64(ret.pageSz)*2     // 總空間大小=包體大小 + 頭尾各2頁保護(hù)地址
    for _, c := range cfgs {
        c(ret)
    }
    return ret
}
func (s *ShmUtil) attachShm(flag int) error {
    created := false
    shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag))     //使用已存在的共享內(nèi)存,返回共享內(nèi)存標(biāo)識(shí)符
    if 0 != errno {
        return errno
    }
    if shmid < 0 {
        if !s.create {  //不允創(chuàng)建,直接返回
            return ErrNotCreated
        }
        shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate))  //新創(chuàng)建共享內(nèi)存
        if 0 != errno {
            return fmt.Errorf("shm create: %v", errno)
        }
        if shmid < 0 {
            return ErrCreateFailed
        }
        created = true
    }
    addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0)  //掛接共享內(nèi)存到當(dāng)前進(jìn)程
    if 0 != errno {
        return fmt.Errorf("shmat: %v", errno)
    }
    if created {
        zero(addr, s.total)//新創(chuàng)建的共享內(nèi)存,初始化共享內(nèi)存數(shù)據(jù)
    }
    s.base = addr //記錄共享內(nèi)存首地址 用于之后的釋放
    s.data = s.base + uintptr(s.pageSz) //寫數(shù)據(jù)的起始地址
     
    _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)
    if 0 != errno { //鎖定共享內(nèi)存頭,鎖指定的內(nèi)存區(qū)間必須包含整個(gè)內(nèi)存頁(4K)
        s.detach()
        return fmt.Errorf("mprotect head: %v", errno)
    }
    _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //鎖指定共享內(nèi)存尾,區(qū)間開始的地址start必須是一個(gè)內(nèi)存頁的起始地址,并且區(qū)間長度len必須是頁大小的整數(shù)倍。
    if 0 != errno {
        s.detach()
        return fmt.Errorf("mprotect tail: %v", errno)
    }
    return nil
}
func (s *ShmUtil) detach() { //進(jìn)程去關(guān)聯(lián)共享內(nèi)存
    if 0 != s.base {
        syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0)
        s.base = 0
        s.data = 0
    }
}
/*
  獲取內(nèi)存并且返回?cái)?shù)據(jù)段起始位置
  s.create 決定是否新申請共享內(nèi)存
*/
func (s *ShmUtil) GetData() (uintptr, error) {
    if s.data != 0 {
        return s.data, nil
    }
    if err := s.attachShm(0666); nil != err { //初始化共享內(nèi)存,并關(guān)聯(lián)到進(jìn)程
        return 0, err
    }
    return s.data, nil
}
func SwitchAndFetch(ptr uintptr) [][3]uint32 { //從共享內(nèi)存讀取 [][3]uint32{ossid,key,value}
    if ptr == 0 {
        return nil
    }
    m1 := (*uint32)(unsafe.Pointer(ptr))
    m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))   
    if MagicNum1 != *m1 || MagicNum2 != *m2 {
        logrus.Errorf("magic 1 in header: wrote:%v\tread:%v\n", MagicNum1, *m1)
        logrus.Errorf("magic 2 in tail:   wrote:%v\tread:%v\n", MagicNum2, *m2)
        return nil
    }
    idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切換塊標(biāo)志
    old := *idx
    *idx = 1 - *idx
    ret := PartialRead(ptr, old)  //讀取當(dāng)前idx塊數(shù)據(jù)
    zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //讀完清0
    return ret
}
//根據(jù)idx輪流讀數(shù)據(jù)區(qū)域
func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根據(jù)idx獲取塊起始地址
    startPtr := ptr + 8 + uintptr(idx)*OssMapSz
    ret := ReadOssMap(startPtr)
    log.Printf("result: %v\n", ret)
    return ret
}
func ReadOssMap(ptr uintptr) [][3]uint32 { //1個(gè)周期內(nèi)的指標(biāo)總?cè)萘繛?nbsp;128*1024 = 128k  = 13W
    var ret [][3]uint32
    var i uint32 = 0
    for i = 0; i < OssMapOneAttrCnt; i++ {  //1個(gè)周期最多支持1024個(gè)業(yè)務(wù)
        for _, v := range ReadOneAttr(ptr) {
            ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value]
        }
        ptr += OneAttrSz  // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4
    }
    return ret
}
func ReadOneAttr(ptr uintptr) [][2]uint32 {
    var ret [][2]uint32
    var i uint32 = 0
    for i = 0; i < OssOneAttrEntryCnt; i++ { //目前默認(rèn)一個(gè)業(yè)務(wù)下最多有128單維度指標(biāo), OssOneAttrEntryCnt = 128
        v := *(*uint32)(unsafe.Pointer(ptr))
        if v != 0 {
            ret = append(ret, [2]uint32{i, v}) // [keyID, value]
        }
        ptr += EntrySz  // 4yte 讀取一個(gè)指標(biāo)
    }
    return ret
}

以上就是怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實(shí)戰(zhàn),小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

vcs
AI