溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

25日志分析項(xiàng)目

發(fā)布時(shí)間:2020-07-26 12:33:09 來源:網(wǎng)絡(luò) 閱讀:357 作者:chaijowin 欄目:編程語言

?

?

?

生產(chǎn)中會生成大量的系統(tǒng)日志、應(yīng)用程序日志、安全日志等等,通過對日志的分析,可了解服務(wù)器的負(fù)載、健康狀態(tài),可分析客戶的分布情況、客戶的行為,甚至基于這些分析可做出預(yù)測;

?

一般采集流程:

日志產(chǎn)出-->采集-->存儲-->分析-->存儲-->可視化;

采集(logstashflumeapache)、scribefacebook));

?

開源實(shí)時(shí)日志分析,ELK平臺:

logstash收集日志,存放到ES集群中,kibanaES中查詢數(shù)據(jù)生成圖表,返回browser;

?

離線分析;

在線分析,一份生成日志,一份傳給大數(shù)據(jù)實(shí)時(shí)處理服務(wù);

實(shí)時(shí)處理技術(shù):storm、spark;

?

?

分析的前提:

半結(jié)構(gòu)化數(shù)據(jù):日志是半結(jié)構(gòu)化數(shù)據(jù),是有組織的,有格式的數(shù)據(jù),可分割成行和列,可當(dāng)作表來處理,也可分析里面的數(shù)據(jù);

?

文本分析:日志是文本文件,需要依賴文件io、字符串操作、正則等技術(shù),通過這些技術(shù)能把日志中需要的數(shù)據(jù)提取出來;

?

例:

123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"

?

提取數(shù)據(jù):

1、用空格分割;

1

25日志分析項(xiàng)目

2:先空格分割,遇""[]特殊處理;

?

2、用正則提取;

?

1、

import datetime

?

logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800]

"GET / HTTP/1.1" 200 8642 "-"

"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

?

names = ('remote','','','datetime','request','status','length','','useragent')

?

ops = (None,None,None,lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

?????? lambda request: dict(zip(['method','url','protocol'],request.split())),int,int,None,None)

?

def extract(line):

??? fields = []

??? flag = False

??? tmp = ''

?

??? for field in line.split():

??? #???? print(field)

??????? if not flag and (field.startswith('[') or field.startswith('"')):

??????????? if field.endswith(']') or field.endswith('"'):

??????????????? fields.append(field.strip())

??????????? else:

??????????????? tmp += field[1:]

??? #???????????? print(tmp)

??????????????? flag = True

??????????? continue

?

??????? if flag:

??????????? if field.endswith(']') or field.endswith('"'):

??????????????? tmp += ' ' + field[:-1]

??????????????? fields.append(tmp)

??????????????? flag = False

??????????????? tmp = ''

??????????? else:

?????????? ?????tmp += ' ' + field

??????????? continue

?

??????? fields.append(field)

??? print(fields)

???

??? info = {}

??? for i,field in enumerate(fields):

#???????? print(i,field)

??????? name = names[i]

??????? op = ops[i]

??????? if op:

??????????? info[name] = (op(field),op)

??? return info

?

print(extract(logs))

輸出:

['123.125.71.36', '-', '-', '06/Apr/2017:18:09:25 +0800', 'GET / HTTP/1.1', '200', '8642', '"-"', 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)']

Out[16]:

{'datetime': (datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))),

? <function __main__.<lambda>>),

?'length': (8642, int),

?'request': ({'method': 'GET', 'protocol': 'HTTP/1.1', 'url': '/'},

? <function __main__.<lambda>>),

?'status': (200, int)}

?

2、

25日志分析項(xiàng)目

((?:\d{1,3}\.){3}\d{1,3}) - - \[([/:+ \w]+)\] "(\w+) (\S+) ([/\.\w\d]+)" (\d+) (\d+) .+ "(.+)"

?

import datetime

import re

?

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

?

ops = {

??? 'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

??? 'status': int,

??? 'length': int

}

?

pattern = '''(?P<remote>(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P<datetime>[/:+ \w]+)\] "(?P<method>\w+) (?P<request>\S+) (?P<protocol>[/\.\w\d]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

?

regex = re.compile(pattern)

?

def extract(line)->dict:

??? matcher = regex.match(line)

??? info = None

??? if matcher:

??????? info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

??? return info

?

# print(extract(logs))

?

def load(path:str):?? #裝載日志文件

??? with open(path) as f:

??????? for line in f:

??????????? d = extract(line)

??????????? if d:

??????????????? yield d?? #生成器函數(shù)

??????????? else:

??????????????? continue?? #不合格數(shù)據(jù),pycharm中左下角TODO(view-->Status Bar)

?

g = load('access.log')

print(next(g))

print(next(g))

print(next(g))

?

# for i in g:

#???? print(i)

輸出:

{'remote': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}

{'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

{'remote': '119.123.183.219', 'datetime': datetime.datetime(2017, 4, 6, 20, 59, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}

注:

代碼若在jupyter下,注意logs中內(nèi)容不能換行;

?

?

?

滑動(dòng)窗口:

或叫時(shí)間窗口,時(shí)間窗口函數(shù),在數(shù)據(jù)分析領(lǐng)域極其重要;

很多數(shù)據(jù),如日志,都是和時(shí)間相關(guān)的,都是按時(shí)間順序產(chǎn)生的,在數(shù)據(jù)分析時(shí),要按照時(shí)間來求值;

interval,表示每一次求值的時(shí)間間隔;

width,時(shí)間窗口寬度,指一次求值的時(shí)間窗口寬度,每個(gè)時(shí)間窗口的數(shù)據(jù)不均勻;

?

當(dāng)width > interval

25日志分析項(xiàng)目


有重疊;

?

當(dāng)width = interval

25日志分析項(xiàng)目25日志分析項(xiàng)目

數(shù)據(jù)求值沒有重疊;

?

當(dāng)width < interval

一般不采納這種方案,會有數(shù)據(jù)缺失;

如業(yè)務(wù)數(shù)據(jù)有1000萬條,要求每次漏幾個(gè),這不影響統(tǒng)計(jì)趨勢;

25日志分析項(xiàng)目25日志分析項(xiàng)目

c2 = c1 - delta

delta = width - interval

delta = 0時(shí),width = interval

?

時(shí)序數(shù)據(jù),運(yùn)維環(huán)境中,日志、監(jiān)控等產(chǎn)生的數(shù)據(jù)是按時(shí)間先后產(chǎn)生并記錄下來的,與時(shí)間相關(guān)的數(shù)據(jù),一般按時(shí)間對數(shù)據(jù)進(jìn)行分析;

數(shù)據(jù)分析基本程序結(jié)構(gòu):

?

例:

一函數(shù),無限的生成隨機(jī)數(shù)函數(shù),產(chǎn)生時(shí)間相關(guān)的數(shù)據(jù),返回->時(shí)間+隨機(jī)數(shù);

每次取3個(gè)數(shù)據(jù),求平均值;

import random

import datetime

?

# def source():

#???? while True:

#???????? yield datetime.datetime.now(),random.randint(1,100)

?

# i = 0

# for x in source():

#???? print(x)

#???? i += 1

#???? if i > 100:

# ????????break

?

# for _ in range(100):

#???? print(next(source()))

?

def source():

??? while True:

??????? yield {'value': random.randint(1,100),'datetime':datetime.datetime.now()}

?

src = source()

# lst = []

# lst.append(next(src))

# lst.append(next(src))

# lst.append(next(src))

lst = [next(src) for _ in range(3)]

?

def handler(iterable):

??? values = [x['value'] for x in iterable]

??? return sum(values) // len(values)

?

print(lst)

print(handler(lst))

?

?

?

窗口函數(shù):

import datetime

import re

?

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

?

ops = {

??? 'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

??? 'status': int,

??? 'length': int

}

?

pattern = '''(?P<remote>(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P<datetime>[/:+ \w]+)\] "(?P<method>\w+) (?P<request>\S+) (?P<protocol>[/\.\w\d]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

?

regex = re.compile(pattern)

?

def extract(line)->dict:

??? matcher = regex.match(line)

??? info = None

??? if matcher:

??????? info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

??? return info

?

# print(extract(logs))

?

def load(path:str):

??? with open(path) as f:

??????? for line in f:

??????????? d = extract(line)

??????????? if d:

??????????????? yield d

??????????? else:

??????????????? continue

?

# g = load('access.log')

# print(next(g))

# print(next(g))

# print(next(g))

?

# for i in g:

#???? print(i)

?

def window(src,handler,width:int,interval:int):

??? # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

??? start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

??? current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

??? seconds = width - interval

??? delta = datetime.timedelta(seconds)

??? buffer = []

?

??? for x in src:

??????? if x:

??????????? buffer.append(x)

??????????? current = x['datetime']

??????? if (current-start).total_seconds() >= interval:

??????????? ret = handler(buffer)

??????????? # print(ret)

??????????? start = current

??????????? # tmp = []

??????????? # for i in buffer:

??????????? #???? if i['datetime'] > current - delta:

??????????? #???? ????tmp.append(i)

??????????? buffer = [i for i in buffer if i['datetime'] > current - delta]

?

def donothing_handler(iterable:list):

??? print(iterable)

??? return iterable

?

def handler(iterable:list):

??? pass?? #TODO

?

def size_handler(iterable:list):

??? pass?? #TODO

?

# window(load('access.log'),donothing_handler,8,5)

# window(load('access.log'),donothing_handler,10,5)

window(load('access.log'),donothing_handler,5,5)

輸出:

[{'remote': '123.125.71.36', 'datetime': datetime.datetime(2017, 4, 6, 18, 9, 25, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 8642, 'useragent': 'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'}]

[{'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}]

[{'remote': '119.123.183.219', 'datetime': datetime.datetime(2017, 4, 6, 20, 59, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}]

?

?

?

分發(fā):

生產(chǎn)者消費(fèi)者模型:

對于一個(gè)監(jiān)控系統(tǒng),需要處理很多數(shù)據(jù),包括日志;

要有數(shù)據(jù)的采集、分析;

被監(jiān)控對象,即數(shù)據(jù)的producer生產(chǎn)者,數(shù)據(jù)的處理程序,即數(shù)據(jù)的consumer消費(fèi)者;

傳統(tǒng)的生產(chǎn)者消費(fèi)者模型,生產(chǎn)者生產(chǎn),消費(fèi)者消費(fèi),這種模型有些問題,開發(fā)的代碼耦合太高,如果生產(chǎn)規(guī)模擴(kuò)大,不易擴(kuò)展,生產(chǎn)和消費(fèi)的速度難匹配;

?

queue隊(duì)列,食堂打飯;

producer-consumer,賣包子;消費(fèi)速度 >= 生產(chǎn)速度;解決辦法:queue,作用:解耦(在程序間實(shí)現(xiàn)解耦(服務(wù)間解耦))、緩沖;

?

注:

zeromq,底層通信協(xié)議用;

大多數(shù)*mq,都是消費(fèi)隊(duì)列;

kafka,性能極高;

FIFO,先進(jìn)先出;

LIFO,后進(jìn)先出;

?

數(shù)據(jù)的生產(chǎn)是不穩(wěn)定的,會造成短時(shí)間數(shù)據(jù)的潮涌,需要緩沖;

消費(fèi)者消費(fèi)能力不一樣,有快有慢,消費(fèi)者可以自己決定消費(fèi)緩沖區(qū)中的數(shù)據(jù);

單機(jī)可用queue(內(nèi)建模塊)構(gòu)建進(jìn)程內(nèi)的隊(duì)列,滿足多個(gè)線程間的生產(chǎn)消費(fèi)需要;

大型系統(tǒng)可使用第三方消息中間件,rabbitmqrocketmq、kafka

?

queue模塊:

queue.Queue(maxsize=0),queue提供了一個(gè)FIFO先進(jìn)先出的隊(duì)列Queue,創(chuàng)建FIFO隊(duì)列,返回Queue對象;maxsize <= 0,隊(duì)列長度沒有限制;

?

q = queue.Queue()

?

q.get(block=True,timeout=None),從隊(duì)列中移除元素并返回這個(gè)元素,只要get過即拿走就沒了;

block阻塞,timeout超時(shí);

block=True,是阻塞,timeout=None,就是一直阻塞,timeout有值,即阻塞到一定秒數(shù)拋Empty異常;

blcok=False,是非阻塞,timeout將被忽略,要么成功返回一個(gè)元素,要么拋Empty異常;

?

q.get_nowait(),等價(jià)于q.get(block=False)q.get(False),即要么成功返回一個(gè)元素,要么拋Empty異常;這種阻塞效果,要多線程中舉例;

?

q.put(item,block=True,timeout=None),把一個(gè)元素加入到隊(duì)列中去,

block=True,timeout=None,一直阻塞直至有空位放元素;

block=Truetimeout=5,阻塞5秒拋Full異常;

block=False,timeout失效,立即返回,能塞進(jìn)去就塞,不能則拋Full異常;

?

q.put_nowait(item),等價(jià)于q.put(item,False);

?

注:

Queue的長度是個(gè)近似值,不準(zhǔn)確,因?yàn)樯a(chǎn)消費(fèi)一直在進(jìn)行;

q.get(),只要get過,即拿走,數(shù)據(jù)就沒了;而kafka中,拿走數(shù)據(jù)后,kafka中仍保留有,由consumer來清理;

?

例:

from queue import Queue

import random

?

q = Queue()

?

q.put(random.randint(1,100))

q.put(random.randint(1,100))

?

print(q.get())

print(q.get())

# print(q.get())?? #block

print(q.get(timeout=3))

輸出:

2

35

Traceback (most recent call last):

? File "/home/python/magedu/projects/cmdb/queue_Queue.py", line 12, in <module>

??? print(q.get(timeout=3))

? File "/ane/python3.6/lib/python3.6/queue.py", line 172, in get

? ??raise Empty

queue.Empty

?

分發(fā)器的實(shí)現(xiàn):

生產(chǎn)者(數(shù)據(jù)源)生產(chǎn)數(shù)據(jù),緩沖到消息隊(duì)列中;

數(shù)據(jù)處理流程:數(shù)據(jù)加載-->提取-->分析(滑動(dòng)窗口函數(shù));

?

處理大量數(shù)據(jù)時(shí),對于一個(gè)數(shù)據(jù)源來說,需要多個(gè)消費(fèi)者處理,但如何分配數(shù)據(jù)?

需要一個(gè)分發(fā)器(調(diào)度器),把數(shù)據(jù)分發(fā)給不同的消費(fèi)者處理;

每一個(gè)消費(fèi)者拿到數(shù)據(jù)后,有自己的處理函數(shù),所以要有一種注冊機(jī)制;

數(shù)據(jù)加載-->提取-->分發(fā)-->分析函數(shù)1|分析函數(shù)2,一個(gè)數(shù)據(jù)通過分發(fā)器,發(fā)送給n個(gè)消費(fèi)者,分析函數(shù)1|分析函數(shù)2為不同的handler,不同的窗口寬度,間隔時(shí)間;

?

如何分發(fā)?

一對多,副本發(fā)送(一個(gè)數(shù)據(jù)通過分發(fā)器,發(fā)送到n個(gè)消費(fèi)者),用輪詢;

?

MQ

在生產(chǎn)者和消費(fèi)者之間用消息隊(duì)列,那么所有的消費(fèi)者共用一個(gè)消息隊(duì)列?(這需要解決爭搶的問題);還是各自擁有一個(gè)消息隊(duì)列?(較容易);

?

注冊?

在調(diào)度器內(nèi)部記錄有哪些消費(fèi)者,記錄消費(fèi)者自己的隊(duì)列;

?

線程?

由于一條數(shù)據(jù)會被多個(gè)不同的注冊過的handler處理,所以最好的方式是多線程;

注:

import threading

t = threading.Thread(target=window,args=(src,handler,width,interval))?? #target,線程中運(yùn)行的函數(shù),args,這個(gè)函數(shù)運(yùn)行時(shí)需要的實(shí)參用tuple

t.start()

?

分析功能:

分析日志很重要,通過海量數(shù)據(jù)的分析就能知道是否遭受了***,是否是爬取的高峰期,是否有盜鏈;

分析的邏輯放到handler中;

window僅通過時(shí)間窗口挪動(dòng)取數(shù)據(jù),不要將其的功能做的豐富全面,若需統(tǒng)一處理,獨(dú)立出單獨(dú)的函數(shù);

?

注:

爬蟲:baiduspidergooglebot,SEOhttp,requestresponse;

?

狀態(tài)碼分析:

狀態(tài)碼中包含了很多信息;

304,服務(wù)器收到客戶端提交的請求數(shù),發(fā)現(xiàn)資源未變化,要求browser使用靜態(tài)資源的緩存;

404,server找不到請求的資源;

304占比大,說明靜態(tài)緩存效果明顯;

404占比大,說明出現(xiàn)了錯(cuò)誤鏈接,或深度嗅探網(wǎng)站資源;

400,500占比突然開始增大,網(wǎng)站一定出問題了;

?

import datetime

import re

from queue import Queue

import threading

?

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

?

ops = {

??? 'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

??? 'status': int,

??? 'length': int

}

?

pattern = '''(?P<remote>(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P<datetime>[/:+ \w]+)\] "(?P<method>\w+) (?P<request>\S+) (?P<protocol>[/\.\w\d]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

?

regex = re.compile(pattern)

?

def extract(line)->dict:

??? matcher = regex.match(line)

??? info = None

??? if matcher:

??????? info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

??? return info

?

# print(extract(logs))

?

def load(path:str):

??? with open(path) as f:

??????? for line in f:

??????????? d = extract(line)

??????????? if d:

??????????????? yield d

??????????? else:

??????????????? continue

?

# g = load('access.log')

# print(next(g))

# print(next(g))

# print(next(g))

?

# for i in g:

#???? print(i)

?

# def window(src,handler,width:int,interval:int):

#???? # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

#???? start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

#???? current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

#???? seconds = width - interval

#???? delta = datetime.timedelta(seconds)

#???? buffer = []

#

#???? for x in src:

#???????? if x:

#???????????? buffer.append(x)

#??????????? ?current = x['datetime']

#???????? if (current-start).total_seconds() >= interval:

#???????????? ret = handler(buffer)

#???????????? # print(ret)

#???????????? start = current

#???????????? # tmp = []

#???????????? # for i in buffer:

#???????????? #???? if i['datetime'] > current - delta:

#???????????? #???????? tmp.append(i)

#???????????? buffer = [i for i in buffer if i['datetime'] > current - delta]

?

# window(load('access.log'),donothing_handler,8,5)

# window(load('access.log'),donothing_handler,10,5)

# window(load('access.log'),donothing_handler,5,5)

?

def window(src:Queue,handler,width:int,interval:int):

??? # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

??? start = datetime.datetime.strptime('1970/01/01 00:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

??? current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

??? delta = datetime.timedelta(width-interval)

??? buffer = []

?

??? while True:

??????? data = src.get()

??????? if data:

??????????? buffer.append(data)

??????????? current = data['datetime']

??????? if (current-start).total_seconds() >= interval:

??????????? ret = handler(buffer)

??????????? # print(ret)

??????????? start = current

??????????? buffer = [i for i in buffer if i['datetime'] > current - delta]

?

def donothing_handler(iterable:list):

??? print(iterable)

??? return iterable

?

def handler(iterable:list):

??? pass?? #TODO

?

def size_handler(iterable:list):

??? pass?? #TODO

?

def status_handler(iterable:list):

??? d = {}

??? for item in iterable:

??????? key = item['status']

??????? if key not in d.keys():

??????????? d[key] = 0

??????? d[key] += 1

??? total = sum(d.values())

??? print({k:v/total*100 for k,v in d.items()})?? #return

?

def dispatcher(src):

??? queues = []

??? threads = []

??? def reg(handler,width,interval):

??????? q = Queue()

??????? queues.append(q)

??????? t = threading.Thread(target=window,args=(q,handler,width,interval))

??????? threads.append(t)

??? def run():

??????? for t in threads:

??????????? t.start()

??????? for x in src:

??????????? for q in queues:

??????????????? q.put(x)

??? return reg,run

?

reg,run = dispatcher(load('access.log'))

reg(status_handler,8,5)

run()

?

?

?

日志文件加載:

改為接受一批;

如果一批路徑,迭代每一個(gè)路徑;

如果路徑是一個(gè)普通文件,按行讀取內(nèi)容(假設(shè)是日志文件);

如果路徑是一個(gè)目錄,就遍歷路徑下的所有普通文件,每一個(gè)文件按行處理,不遞歸處理子目錄;

?

def openfile(path:str):

??? with open(path) as f:

??????? for line in f:

??????????? d = extract(line)

??????????? if d:

??????????????? yield d

??????????? else:

??????????????? continue

?

def load(*paths):

??? for file in paths:

??????? p = Path(file)

??????? if not p.exists():

??????????? continue

??????? if p.is_dir():

??????????? for x in p.iterdir():

??????????????? if x.is_file():

??????????????????? # for y in openfile(str(x)):

??????????????????? #?? ??yield y

??????????????????? yield from openfile(str(x))

??????? elif p.is_file():

??????????? # for y in openfile(str(p)):

??????????? #???? yield y

??????????? yield from openfile(str(p))

?

離線日志分析項(xiàng)目:

可指定文件或目錄,對日志進(jìn)行數(shù)據(jù)分析;

分析函數(shù)可動(dòng)態(tài)注冊;

數(shù)據(jù)可分發(fā)給不同的分析處理程序處理;

?

關(guān)鍵步驟:

數(shù)據(jù)源處理(處理一行行數(shù)據(jù));

拿到數(shù)據(jù)后的處理(作為分析,一小批一小批處理,窗口函數(shù));

分發(fā)器(生產(chǎn)者和消費(fèi)者間作為橋梁作用);

?

?

?

瀏覽器分析:

useragent,指軟件按一定的格式向遠(yuǎn)端服務(wù)器提供一個(gè)標(biāo)記自己的字符串;

http協(xié)議中,使用user-agent字段傳送一這個(gè)字符串,這個(gè)值可被修改(想偽裝誰都可以);

格式:([platform details]) [extensions]

例如:"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36"

?

注:

chrome-->console,navigator.userAgent,將內(nèi)容復(fù)制粘貼到傲游的自定義UserAgent中;

?

信息提取模塊:

user-agents、pyyaml、ua-parser;

]$ pip install user-agents pyyaml ua-parser

?

例:

from user_agents import parse

?

u = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.72 Safari/537.36'

ua = parse(u)

?

print(ua.browser)

print(ua.browser.family)

print(ua.browser.version_string)

輸出:

Browser(family='Chrome', version=(28, 0, 1500), version_string='28.0.1500')

Chrome

28.0.1500

?

整合,完整代碼:

25日志分析項(xiàng)目

import datetime

import re

from queue import Queue

import threading

from pathlib import Path

from user_agents import parse

from collections import defaultdict

?

# logs = '''123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"'''

?

ops = {

??? 'datetime': lambda timestr: datetime.datetime.strptime(timestr,'%d/%b/%Y:%H:%M:%S %z'),

??? 'status': int,

??? 'length': int,

??? 'request': lambda request: dict(zip(('method','url','protocol'),request.split())),

??? 'useragent': lambda useragent: parse(useragent)

}

?

# pattern = '''(?P<remote>(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P<datetime>[/:+ \w]+)\] "(?P<method>\w+) (?P<request>\S+) (?P<protocol>[/\.\w\d]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

pattern = '''(?P<remote>(?:\d{1,3}\.){3}\d{1,3}) - - \[(?P<datetime>[/:+ \w]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[/\.\w\d]+)" (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

?

regex = re.compile(pattern)

?

def extract(line)->dict:

??? matcher = regex.match(line)

??? info = None

??? if matcher:

??????? info = {k:ops.get(k,lambda x:x)(v) for k,v in matcher.groupdict().items()}

??? # print(info)

??? return info

?

# print(extract(logs))

?

# def load(path:str):

#???? with open(path) as f:

#???????? for line in f:

#???????????? d = extract(line)

#???????????? if d:

#???????????????? yield d

#???????????? else:

#???????????????? continue

?

def openfile(path:str):

??? with open(path) as f:

??????? for line in f:

??????????? d = extract(line)

??????????? if d:

??????????????? yield d

??????????? else:

??????????????? continue

?

def load(*paths):

??? for file in paths:

??????? p = Path(file)

??????? if not p.exists():

??????????? continue

??????? if p.is_dir():

??????????? for x in p.iterdir():

??????????????? if x.is_file():

??????????????????? # for y in openfile(str(x)):

??????????????????? #?? ??yield y

??????????????????? yield from openfile(str(x))

??????? elif p.is_file():

??????????? # for y in openfile(str(p)):

??????????? #???? yield y

??????????? yield from openfile(str(p))

?

# g = load('access.log')

# print(next(g))

# print(next(g))

# print(next(g))

?

# for i in g:

#???? print(i)

?

# def window(src,handler,width:int,interval:int):

#???? # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

#???? start = datetime.datetime.strptime('1970/01/01 01:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

#???? current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

#???? seconds = width - interval

#???? delta = datetime.timedelta(seconds)

#???? buffer = []

#

#???? for x in src:

#???????? if x:

#???????????? buffer.append(x)

#???????????? current = x['datetime']

#???????? if (current-start).total_seconds() >= interval:

#???????????? ret = handler(buffer)

#???????????? # print(ret)

#???????????? start = current

#???????????? # tmp = []

#???????????? # for i in buffer:

#?? ??????????#???? if i['datetime'] > current - delta:

#???????????? #???????? tmp.append(i)

#???????????? buffer = [i for i in buffer if i['datetime'] > current - delta]

?

# window(load('access.log'),donothing_handler,8,5)

# window(load('access.log'),donothing_handler,10,5)

# window(load('access.log'),donothing_handler,5,5)

?

def window(src:Queue,handler,width:int,interval:int):

??? # src = {'remote': '112.64.118.97', 'datetime': datetime.datetime(2017, 4, 6, 19, 13, 59, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'request': '/favicon.ico', 'protocol': 'HTTP/1.1', 'status': 200, 'length': 4101, 'useragent': 'Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)'}

??? start = datetime.datetime.strptime('1970/01/01 00:01:01 +0800','%Y/%m/%d %H:%M:%S %z')

??? current = datetime.datetime.strptime('1970/01/01 01:01:02 +0800','%Y/%m/%d %H:%M:%S %z')

??? delta = datetime.timedelta(width-interval)

??? buffer = []

?

??? while True:

??????? data = src.get()

??????? if data:

??????????? buffer.append(data)

??????????? current = data['datetime']

??????? if (current-start).total_seconds() >= interval:

?????? ?????ret = handler(buffer)

??????????? # print(ret)

??????????? start = current

??????????? buffer = [i for i in buffer if i['datetime'] > current - delta]

?

def donothing_handler(iterable:list):

??? print(iterable)

??? return iterable

?

def handler(iterable:list):

??? pass?? #TODO

?

def size_handler(iterable:list):

??? pass?? #TODO

?

def status_handler(iterable:list):

??? d = {}

??? for item in iterable:

??????? key = item['status']

??????? if key not in d.keys():

??????????? d[key] = 0

??????? d[key] += 1

??? total = sum(d.values())

??? print({k:v/total*100 for k,v in d.items()})?? #return

?

browsers = defaultdict(lambda :0)

def browser_handler(iterable:list):

??? # browsers = {}

??? for item in iterable:

??????? ua = item['useragent']

??????? key = (ua.browser.family,ua.browser.version_string)

??????? # browsers[key] = browsers.get(key,0) + 1

??????? browsers[key] += 1

??? return browsers

?

def dispatcher(src):

??? queues = []

??? threads = []

??? def reg(handler,width,interval):

??????? q = Queue()

??????? queues.append(q)

??????? t = threading.Thread(target=window,args=(q,handler,width,interval))

??????? threads.append(t)

??? def run():

??????? for t in threads:

??????????? t.start()

??????? for x in src:

??????????? for q in queues:

??????????????? q.put(x)

??? return reg,run

?

reg,run = dispatcher(load('access.log'))

reg(status_handler,8,5)

reg(browser_handler,5,5)

run()

print(browsers)

?

?

?

?

?

?

?

?


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI