溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

打點(diǎn)統(tǒng)計(jì)——3(go日志讀取分析寫入)

發(fā)布時(shí)間:2020-07-26 03:18:47 來(lái)源:網(wǎng)絡(luò) 閱讀:1120 作者:梁十八 欄目:編程語(yǔ)言

uid是服務(wù)端給客戶端種下的cookie。比如訪問(wèn)百度,同一臺(tái)電腦同一個(gè)瀏覽器,不管是百度哪個(gè)頁(yè)面,都是這個(gè)uid:
打點(diǎn)統(tǒng)計(jì)——3(go日志讀取分析寫入)


區(qū)分PV、IV、UV如下:
1、pv訪問(wèn)量(Page View),即頁(yè)面訪問(wèn)量,每打開一次頁(yè)面PV計(jì)數(shù)+1,刷新頁(yè)面也是。

2、UV訪問(wèn)數(shù)(Unique Visitor)指獨(dú)立訪客訪問(wèn)數(shù),一臺(tái)電腦終端為一個(gè)訪客。

3、IV是初始向量(IV,Initialization Vector)。


redis數(shù)據(jù)結(jié)構(gòu)HyperLogLog
如果我們要實(shí)現(xiàn)記錄網(wǎng)站每天訪問(wèn)的獨(dú)立IP數(shù)量這樣的一個(gè)功能

集合實(shí)現(xiàn):

使用集合來(lái)儲(chǔ)存每個(gè)訪客的 IP ,通過(guò)集合性質(zhì)(集合中的每個(gè)元素都各不相同)來(lái)得到多個(gè)獨(dú)立 IP ,
然后通過(guò)調(diào)用 SCARD 命令來(lái)得出獨(dú)立 IP 的數(shù)量。
舉個(gè)例子,程序可以使用以下代碼來(lái)記錄 2014 年 8 月 15 日,每個(gè)網(wǎng)站訪客的 IP :
ip = get_vistor_ip()
SADD '2014.8.15::unique::ip' ip
然后使用以下代碼來(lái)獲得當(dāng)天的唯一 IP 數(shù)量:
SCARD '2014.8.15::unique::ip'

集合實(shí)現(xiàn)的問(wèn)題

使用字符串來(lái)儲(chǔ)存每個(gè) IPv4 地址最多需要耗費(fèi) 15 字節(jié)(格式為 'XXX.XXX.XXX.XXX' ,比如
'202.189.128.186')。
下表給出了使用集合記錄不同數(shù)量的獨(dú)立 IP 時(shí),需要耗費(fèi)的內(nèi)存數(shù)量:
獨(dú)立 IP 數(shù)量一天一個(gè)月一年
一百萬(wàn)15 MB 450 MB 5.4 GB
一千萬(wàn)150 MB 4.5 GB 54 GB
一億1.5 GB 45 GB 540 GB
隨著集合記錄的 IP 越來(lái)越多,消耗的內(nèi)存也會(huì)越來(lái)越多。
另外如果要儲(chǔ)存 IPv6 地址的話,需要的內(nèi)存還會(huì)更多一些

為了更好地解決像獨(dú)立 IP 地址計(jì)算這種問(wèn)題,
Redis 在 2.8.9 版本添加了 HyperLogLog 結(jié)構(gòu)。

HyperLogLog介紹

HyperLogLog 可以接受多個(gè)元素作為輸入,并給出輸入元素的基數(shù)估算值:
? 基數(shù):集合中不同元素的數(shù)量。比如 {'apple', 'banana', 'cherry', 'banana', 'apple'} 的基數(shù)就是 3 。
? 估算值:算法給出的基數(shù)并不是精確的,可能會(huì)比實(shí)際稍微多一些或者稍微少一些,但會(huì)控制在合
理的范圍之內(nèi)。
HyperLogLog 的優(yōu)點(diǎn)是,即使輸入元素的數(shù)量或者體積非常非常大,計(jì)算基數(shù)所需的空間總是固定
的、并且是很小的。
在 Redis 里面,每個(gè) HyperLogLog 鍵只需要花費(fèi) 12 KB 內(nèi)存,就可以計(jì)算接近 2^64 個(gè)不同元素的基
數(shù)。這和計(jì)算基數(shù)時(shí),元素越多耗費(fèi)內(nèi)存就越多的集合形成鮮明對(duì)比。
但是,因?yàn)?HyperLogLog 只會(huì)根據(jù)輸入元素來(lái)計(jì)算基數(shù),而不會(huì)儲(chǔ)存輸入元素本身,所以
HyperLogLog 不能像集合那樣,返回輸入的各個(gè)元素。

將元素添加至 HyperLogLog
PFADD key element [element ...]
將任意數(shù)量的元素添加到指定的 HyperLogLog 里面。
這個(gè)命令可能會(huì)對(duì) HyperLogLog 進(jìn)行修改,以便反映新的基數(shù)估算值,如果 HyperLogLog 的基數(shù)估算
值在命令執(zhí)行之后出現(xiàn)了變化, 那么命令返回 1 , 否則返回 0 。
命令的復(fù)雜度為 O(N) ,N 為被添加元素的數(shù)量。

返回給定 HyperLogLog 的基數(shù)估算值
PFCOUNT key [key ...]
當(dāng)只給定一個(gè) HyperLogLog 時(shí),命令返回給定 HyperLogLog 的基數(shù)估算值。
當(dāng)給定多個(gè) HyperLogLog 時(shí),命令會(huì)先對(duì)給定的 HyperLogLog 進(jìn)行并集計(jì)算,得出一個(gè)合并后的
HyperLogLog ,然后返回這個(gè)合并 HyperLogLog 的基數(shù)估算值作為命令的結(jié)果(合并得出的
HyperLogLog 不會(huì)被儲(chǔ)存,使用之后就會(huì)被刪掉)。
當(dāng)命令作用于單個(gè) HyperLogLog 時(shí), 復(fù)雜度為 O(1) , 并且具有非常低的平均常數(shù)時(shí)間。
當(dāng)命令作用于多個(gè) HyperLogLog 時(shí), 復(fù)雜度為 O(N) ,并且常數(shù)時(shí)間也比處理單個(gè) HyperLogLog 時(shí)要
大得多。

PFADD 和 PFCOUNT 的使用示例
redis> PFADD unique::ip::counter '192.168.0.1'
(integer) 1
redis> PFADD unique::ip::counter '127.0.0.1'
(integer) 1
redis> PFADD unique::ip::counter '255.255.255.255'
(integer) 1
redis> PFCOUNT unique::ip::counter
(integer) 3

合并多個(gè) HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]
將多個(gè) HyperLogLog 合并為一個(gè) HyperLogLog ,合并后的 HyperLogLog 的基數(shù)估算值是通過(guò)對(duì)所有
給定 HyperLogLog 進(jìn)行并集計(jì)算得出的。
命令的復(fù)雜度為 O(N) , 其中 N 為被合并的 HyperLogLog 數(shù)量, 不過(guò)這個(gè)命令的常數(shù)復(fù)雜度比較高。

PFMERGE 的使用示例
redis> PFADD str1 "apple" "banana" "cherry"
(integer) 1
redis> PFCOUNT str1
(integer) 3
redis> PFADD str2 "apple" "cherry" "durian" "mongo"
(integer) 1
redis> PFCOUNT str2
(integer) 4
redis> PFMERGE str1&2 str1 str2
OK
redis> PFCOUNT str1&2
(integer) 5

HyperLogLog 實(shí)現(xiàn)獨(dú)立 IP 計(jì)算功能

獨(dú)立 IP 數(shù)量一天一個(gè)月一年一年(使用集合)
一百萬(wàn)12 KB 360 KB 4.32 MB 5.4 GB
一千萬(wàn)12 KB 360 KB 4.32 MB 54 GB
一億12 KB 360 KB 4.32 MB 540 GB
下表列出了使用 HyperLogLog 記錄不同數(shù)量的獨(dú)立 IP 時(shí),需要耗費(fèi)的內(nèi)存數(shù)量:
可以看到,要統(tǒng)計(jì)相同數(shù)量的獨(dú)立 IP ,HyperLogLog 所需的內(nèi)存要比集合少得多。


打點(diǎn)統(tǒng)計(jì)——3(go日志讀取分析寫入)


package main

import (
    "flag"
    "github.com/sirupsen/logrus"
    "time"
    "os"
    "bufio"
    "io"
    "strings"
    "github.com/mgutz/str"
    "net/url"
    "crypto/md5"
    "encoding/hex"
    "github.com/mediocregopher/radix.v2/pool"
    "strconv"
)

const HANDLE_DIG = " /dig?"
const HANDLE_MOVIE = "/movie/"
const HANDLE_LIST = "/list/"
const HANDLE_HTML = ".html"

type cmdParams struct {
    logFilePath string
    routineNum int
}
type digData struct{
    time   string
    url    string
    refer  string
    ua        string
}
type urlData struct {
    data   digData
    uid    string
    unode  urlNode
}
type urlNode struct {
    unType     string // 詳情頁(yè) 或者 列表頁(yè) 或者 首頁(yè)
    unRid  int       // Resource ID 資源ID
    unUrl  string // 當(dāng)前這個(gè)頁(yè)面的url
    unTime  string // 當(dāng)前訪問(wèn)這個(gè)頁(yè)面的時(shí)間
}
type storageBlock struct {
    counterType       string
    storageModel   string
    unode        urlNode
}

var log = logrus.New()

func init() {
    log.Out = os.Stdout //聲明用什么輸出日志
    log.SetLevel( logrus.DebugLevel ) //設(shè)置日志的等級(jí)
}

func main() {
    // 獲取參數(shù)
    logFilePath := flag.String( "logFilePath", "F:/phpStudy/PHPTutorial/nginx/logs/access.log", "log file path" ) //日志文件路徑
    routineNum := flag.Int( "routineNum", 5, "consumer numble by goroutine" ) //routine數(shù)量,默認(rèn)為5
    l := flag.String( "l", "./log.log", "this programe runtime log target file path" ) //go生成的日志存放路徑
    flag.Parse()

    params := cmdParams{ *logFilePath, *routineNum }

    // 打日志
    logFd, err := os.OpenFile( *l, os.O_CREATE|os.O_WRONLY, 0644 ) //打開go生成的日志
    if err == nil {
        log.Out = logFd //打開出錯(cuò),則用日志文件存錯(cuò)誤信息
        defer logFd.Close() //關(guān)閉文件
    }
    log.Infof( "Exec start." ) //提示日志文件啟動(dòng)
    log.Infof( "Params: logFilePath=%s, routineNum=%d", params.logFilePath, params.routineNum ) //提示輸入的/默認(rèn)參數(shù)

    // 初始化一些channel,用于數(shù)據(jù)傳遞
    var logChannel = make(chan string, 3*params.routineNum) //讀取日志文件量更大,設(shè)置為3倍
    var pvChannel = make(chan urlData, params.routineNum)
    var uvChannel = make(chan urlData, params.routineNum)
    var storageChannel = make(chan storageBlock, params.routineNum)

    // Redis Pool
    redisPool, err := pool.New( "tcp", "localhost:6379", 2*params.routineNum ); //連接池,2*params.routineNum是連接池?cái)?shù)
    if err != nil{
        log.Fatalln( "Redis pool created failed." )
        panic(err)
    } else {
        //空閑時(shí)間過(guò)了后,客戶端(也就是連接池和遠(yuǎn)端服務(wù)器會(huì)斷開)。所以以一定的間隔去ping
        go func(){
            for{
                redisPool.Cmd( "PING" )
                time.Sleep( 3*time.Second )
            }
        }()
    }

    // 日志消費(fèi)者
    go readFileLinebyLine( params, logChannel )

    // 創(chuàng)建一組日志處理
    for i:=0; i<params.routineNum; i++ {
        go logConsumer( logChannel, pvChannel, uvChannel )
    }

    // 創(chuàng)建PV UV 統(tǒng)計(jì)器
    go pvCounter( pvChannel, storageChannel )
    go uvCounter( uvChannel, storageChannel, redisPool )
    // 可擴(kuò)展的 xxxCounter(如果還有別的要統(tǒng)計(jì)的,則:go xxCounter(...))

    // 創(chuàng)建 存儲(chǔ)器
    go dataStorage( storageChannel, redisPool )

    time.Sleep( 1000*time.Second )
}

// HBase 劣勢(shì):列簇需要聲明清楚。所以這里用redis來(lái)存儲(chǔ)
func dataStorage( storageChannel chan storageBlock, redisPool *pool.Pool) {
    for block := range storageChannel {
        prefix := block.counterType + "_"

        // 逐層添加,加洋蔥皮的過(guò)程
        // 維度: 天-小時(shí)-分鐘
        // 層級(jí): 定級(jí)-大分類-小分類-終極頁(yè)面
        // 存儲(chǔ)模型: Redis  SortedSet
        setKeys := []string{
            prefix+"day_"+getTime(block.unode.unTime, "day"),
            prefix+"hour_"+getTime(block.unode.unTime, "hour"),
            prefix+"min_"+getTime(block.unode.unTime, "min"),
            prefix+block.unode.unType+"_day_"+getTime(block.unode.unTime, "day"),
            prefix+block.unode.unType+"_hour_"+getTime(block.unode.unTime, "hour"),
            prefix+block.unode.unType+"_min_"+getTime(block.unode.unTime, "min"),
        }

        rowId := block.unode.unRid

        for _,key := range setKeys {
            ret, err := redisPool.Cmd( block.storageModel, key, 1, rowId ).Int()
            if ret<=0 || err!=nil {
                log.Errorln( "DataStorage redis storage error.", block.storageModel, key, rowId )
            }
        }
    }
}

func pvCounter( pvChannel chan urlData, storageChannel chan storageBlock ) {
    for data := range pvChannel {
        sItem := storageBlock{ "pv", "ZINCRBY", data.unode }
        storageChannel <- sItem
    }
}

func uvCounter( uvChannel chan urlData, storageChannel chan storageBlock, redisPool *pool.Pool ) {
    for data := range uvChannel {
        //HyperLoglog redis
        hyperLogLogKey := "uv_hpll_" + getTime(data.data.time, "day") //uv_hpll_ + 天級(jí)別的時(shí)間 組成集合中的鍵
        ret, err := redisPool.Cmd( "PFADD", hyperLogLogKey, data.uid, "EX", 86400 ).Int()
        if err!=nil {
            log.Warningln( "UvCounter check redis hyperloglog failed, ", err )
        }
        if ret!=1 {
            continue
        }

        sItem := storageBlock{ "uv", "ZINCRBY", data.unode }
        storageChannel <- sItem
    }
}

//消費(fèi)一行行讀取到的日志
func logConsumer( logChannel chan string, pvChannel, uvChannel chan urlData ) error {
    for logStr := range logChannel {
        // 切割日志字符串,扣出打點(diǎn)上報(bào)的數(shù)據(jù)
        data := cutLogFetchData( logStr )

        // uid
        // 說(shuō)明: 課程中模擬生成uid(不是現(xiàn)實(shí)環(huán)境中服務(wù)器給瀏覽器種下的cookie中的uid), md5(refer+ua)
        hasher := md5.New()
        hasher.Write( []byte( data.refer+data.ua ) )
        uid := hex.EncodeToString( hasher.Sum(nil) )

        // 很多解析的工作都可以放到這里完成
        // ...
        // ...

        uData := urlData{ data, uid, formatUrl( data.url, data.time ) }

        pvChannel <- uData
        uvChannel <- uData
        /* 如果有其他要塞入的:xxChannel <- uData */

    }
    return nil
}

func cutLogFetchData( logStr string ) digData {
    logStr = strings.TrimSpace( logStr )
    pos1 := str.IndexOf( logStr,  HANDLE_DIG, 0)
    if pos1==-1 {
        return digData{}
    }
    pos1 += len( HANDLE_DIG )
    pos2 := str.IndexOf( logStr, " HTTP/", pos1 )
    d := str.Substr( logStr, pos1, pos2-pos1 )

    urlInfo, err := url.Parse( "http://localhost/?"+d ) //url.Parse只認(rèn)完整的網(wǎng)址,所以 加上:http://localhost/?
    if err != nil {
        return digData{}
    }
    data := urlInfo.Query()
    return digData{
        data.Get("time"),
        data.Get("refer"),
        data.Get("url"),
        data.Get("ua"),
    }
}

func readFileLinebyLine( params cmdParams, logChannel chan string ) error {
    fd, err := os.Open( params.logFilePath ) //打開nginx日志文件
    if err != nil {
        log.Warningf( "ReadFileLinebyLine can't open file:%s", params.logFilePath )
        return err
    }

    defer fd.Close() //關(guān)閉是好習(xí)慣

    count := 0
    bufferRead := bufio.NewReader( fd )
    for {
        line, err := bufferRead.ReadString( '\n' ) //一行行讀
        logChannel <- line //讀出一行寫入一次logChannel
        count++

        if count%(1000*params.routineNum) == 0 { //每1000*params.routineNum行日志輸出一次信息到控制臺(tái)
            log.Infof( "ReadFileLinebyLine line: %d", count )
        }
        if err != nil { //error部位空有兩種情況,一種是錯(cuò)誤,一種是讀到尾部了
            if err == io.EOF { //讀到尾部了(讀完了),休息3秒鐘
                time.Sleep( 3*time.Second )
                log.Infof( "ReadFileLinebyLine wait, raedline:%d", count ) //提醒在等待,已經(jīng)讀到了第n行
            } else {
                log.Warningf( "ReadFileLinebyLine read log error" ) //錯(cuò)誤則打出錯(cuò)誤
            }
        }
    }
    return nil
}

func formatUrl( url, t string ) urlNode{
    // 一定從量大的著手,  詳情頁(yè)>列表頁(yè)≥首頁(yè)
    pos1 := str.IndexOf( url, HANDLE_MOVIE, 0)
    if pos1!=-1 {
        pos1 += len( HANDLE_MOVIE )
        pos2 := str.IndexOf( url, HANDLE_HTML, 0 )
        idStr := str.Substr( url , pos1, pos2-pos1 )
        id, _ := strconv.Atoi( idStr )
        return urlNode{ "movie", id, url, t }
    } else {
        pos1 = str.IndexOf( url, HANDLE_LIST, 0 )
        if pos1!=-1 {
            pos1 += len( HANDLE_LIST )
            pos2 := str.IndexOf( url, HANDLE_HTML, 0 )
            idStr := str.Substr( url , pos1, pos2-pos1 )
            id, _ := strconv.Atoi( idStr )
            return urlNode{ "list", id, url, t }
        } else {
            return urlNode{ "home", 1, url, t}
        } // 如果頁(yè)面url有很多種,就不斷在這里擴(kuò)展
    }
}

//去重需要在一定的時(shí)間內(nèi)
func getTime( logTime, timeType string ) string {
    var item string
    switch timeType {
    case "day":
        item = "2006-01-02"
        break
    case "hour":
        item = "2006-01-02 15"
        break
    case "min":
        item = "2006-01-02 15:04"
        break
    }
    t, _ := time.Parse( item, time.Now().Format(item) )
    return strconv.FormatInt( t.Unix(), 10 ) //將unix時(shí)間戳轉(zhuǎn)換為10位字符串
}
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI