您好,登錄后才能下訂單哦!
項目情況介紹:
基于Python 3.6.6 ,實現(xiàn)對nginx訪問的日志分析代碼,實現(xiàn)了對日志中code的占比統(tǒng)計和瀏覽器類型和訪問情況統(tǒng)計
實現(xiàn)的代碼段有:
1.編寫窗戶函數(shù),實現(xiàn)在一定的時間內(nèi)對數(shù)據(jù)進(jìn)行分析
2.通過正則表達(dá)式對日志進(jìn)行匹配,加載日志文件,提取出文本里每行的日志信息
3.編寫消費端代碼,即使得提取到的數(shù)據(jù)能夠按照消費端的代碼進(jìn)行處理
4.消息分發(fā)代碼實現(xiàn),通過queue,將提取的的文本放到隊列里,供消費端代碼處理
項目代碼如下
import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path
from user_agents import parse
"""
這段代碼,實現(xiàn)了再一段時間內(nèi)獲得數(shù)據(jù),通過不同的handler(即消費端函數(shù))
對獲取到的同一份數(shù)據(jù)進(jìn)行處理,主要是兩段消費函數(shù),網(wǎng)頁返回的code的統(tǒng)計和瀏覽器的分析
這段代碼,窗口函數(shù)中,data = src.get(),使得沒有新的數(shù)據(jù)產(chǎn)生時,該代碼會阻塞,直到有新的數(shù)據(jù)生成,再次進(jìn)行處理
"""
pattern = '''(?P<remote>[\d.]{7,}\s-\s-\s\[(?P<datetime>[^\[\]]+)\])\s\
"(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''
#編譯
regex = re.compile(pattern)
#構(gòu)造字典
ops = {
'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'size': int,
'useragent': lambda ua: parse(ua)
}
#提取信息
def extract(line: str) -> dict:
matcher = regex.match(line)
if matcher:
return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()}
# 打開文件
def openfile(path: str):
"""裝載日志文件"""
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
#裝載文件,判斷文件類型已經(jīng)是否存在
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
yield from openfile(str(file))
elif p.is_file():
yield from openfile(str(p))
# 隨機生成100以內(nèi)的數(shù)字
def source(second=1):
"""生成數(shù)據(jù)"""
while True:
yield {
'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
'value': random.randint(1, 100)
}
time.sleep(second)
# 滑動窗口函數(shù)
def window(src: Queue, handler, width: int, interval: int):
'''
窗口函數(shù),表示間隔一段時間取出一定的數(shù)據(jù)進(jìn)行處理
:param src:數(shù)據(jù)源,這里是緩存隊列,用于獲取數(shù)據(jù)
:param handler:數(shù)據(jù)處理的函數(shù)
:param width:時間窗口函數(shù),秒
:param interval:處理時間間隔,秒
'''
start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z')
current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z')
buffer = []
delta = datetime.timedelta(seconds=width - interval)
while True:
# 從數(shù)據(jù)源獲取數(shù)據(jù)
data = src.get() # 這個代碼會阻塞,等待數(shù)據(jù)輸入,沒有數(shù)據(jù)輸入就阻塞
if data:
buffer.append(data)
current = data['datetime'] # 存入臨時緩沖等待計算
# 每隔interval重新計算buffer中的一次數(shù)據(jù)
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
start = current
# 清除超出width的數(shù)據(jù)
buffer = [x for x in buffer if x['datetime'] > current - delta]
# 隨機數(shù)平均的測算函數(shù)
source()
def handler(iterable):
#return sum(map(lambda x: x['value'], iterable)) / len(iterable)
print(sum(map(lambda x:x['value'],iterable))/len(iterable))
# 測試函數(shù)
def donothing_handler(iterable):
#return iterable
print(iterable)
# 狀態(tài)碼占比
def status_handler(iterable):
# 時間窗口內(nèi)的一批數(shù)據(jù)
status = {}
for item in iterable:
key = item['status']
status[key] = status.get(key, 0) + 1
total = len(iterable)
print({k:float( "{:.2f}".format(status[k] / total)) for k, v in status.items()})
return {k: status[k] / total for k, v in status.items()}
# 瀏覽器分析
allbrowsers = {}
def browser_handler(iterable):
browsers = {}
for item in iterable:
ua = item['useragent']
key = (ua.browser.family, ua.browser.version_string)
browsers[key] = browsers.get(key, 0) + 1
allbrowsers[key] = allbrowsers.get(key, 0) + 1
print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10])
return browsers
# 分發(fā)器
def dispatcher(src):
# 分發(fā)器中記錄handler,同時保存各自的隊列
handlers = []
queues = []
def reg(handler, width: int, interval: int):
"""
注冊窗口處理函數(shù)
:param handler:注冊數(shù)據(jù)處理函數(shù)
:param width:時間窗口寬度
:param interval:時間間隔
"""
q = Queue()
queues.append(q)
# 多線程,數(shù)據(jù)并行
h = threading.Thread(target=window, args=(q, handler, width, interval))
handlers.append(h)
def run():
# 啟動線程處理數(shù)據(jù)
for t in handlers:
t.start()
# 將獲取到的數(shù)據(jù)分發(fā)到所有的隊列中
for item in src:
for q in queues:
q.put(item)
# print(q.get())
return reg, run
if __name__ == "__main__":
import sys
path = '/tmp/test.log'
"""
以下的代碼為測試用的,用于統(tǒng)計每隔5s統(tǒng)計10s內(nèi)的隨機數(shù)字的平均值
reg, run = dispatcher(source())
reg(handler, 10, 5)
"""
reg, run = dispatcher(load(path))
#每隔5s返回過去10s的數(shù)據(jù),但是不做處理
reg(donothing_handler, 10, 5)
#每隔5s統(tǒng)計10s內(nèi)的返回狀態(tài)碼的占比情況
reg(status_handler, 10, 5)
# 每隔5s統(tǒng)計10s內(nèi)的瀏覽器類型占比情況,展示排行10s內(nèi)訪問量前十的瀏覽器
reg(browser_handler,10,5)
run()
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。