溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算

發(fā)布時間:2021-12-30 11:03:34 來源:億速云 閱讀:235 作者:小新 欄目:開發(fā)技術(shù)

小編給大家分享一下Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

Prometheus是什么

Prometheus是一套開源監(jiān)控系統(tǒng)和告警為一體,由go語言(golang)開發(fā),是監(jiān)控+報警+時間序列數(shù)

據(jù)庫的組合。適合監(jiān)控docker容器。因為kubernetes(k8s)的流行帶動其發(fā)展。

Prometheus的主要特點

  • 多維度數(shù)據(jù)模型,由指標(biāo)名稱和鍵/值對標(biāo)識的時間序列數(shù)據(jù)。

  • 作為一個時間序列數(shù)據(jù)庫,其采集的數(shù)據(jù)會以文件的形式存儲在本地中。

  • 靈活的查詢語言,PromQL(Prometheus Query Language)函數(shù)式查詢語言。

  • 不依賴分布式存儲,單個服務(wù)器節(jié)點是自治的。

  • 以HTTP方式,通過pull模型拉取時間序列數(shù)據(jù)。

  • 也可以通過中間網(wǎng)關(guān)支持push模型。

  • 通過服務(wù)發(fā)現(xiàn)或者靜態(tài)配置,來發(fā)現(xiàn)目標(biāo)服務(wù)對象。

  • 支持多種多樣的圖表和界面展示。

Prometheus原理架構(gòu)圖

Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算

Prometheus基礎(chǔ)概念

什么是時間序列數(shù)據(jù)

時間序列數(shù)據(jù)(TimeSeries Data) : 按照時間順序記錄系統(tǒng)、設(shè)備狀態(tài)變化的數(shù)據(jù)被稱為時序數(shù)據(jù)。

應(yīng)用的場景很多,如:

  • 無人駕駛運行中記錄的經(jīng)度,緯度,速度,方向,旁邊物體距離等。

  • 某一個地區(qū)的各車輛的行駛軌跡數(shù)據(jù)。

  • 傳統(tǒng)證券行業(yè)實時交易數(shù)據(jù)。

  • 實時運維監(jiān)控數(shù)據(jù)等。

時間序列數(shù)據(jù)特點:

  • 性能好、存儲成本低

什么是targets(目標(biāo))

Prometheus 是一個監(jiān)控平臺,它通過抓取監(jiān)控目標(biāo)(targets)上的指標(biāo) HTTP 端點來從這些目標(biāo)收集指標(biāo)。

安裝完P(guān)rometheus Server端之后,第一個targets就是它本身。

具體可以參考官方文檔

什么是metrics(指標(biāo))

Prometheus存在多種不同的監(jiān)控指標(biāo)(Metrics),在不同的場景下應(yīng)該要選擇不同的Metrics。

Prometheus的merics類型有四種,分別為Counter、Gauge、Summary、Histogram。

  • Counter:只增不減的計數(shù)器

  • Gauge:可增可減的儀表盤

  • Histogram:分析數(shù)據(jù)分布情況

  • Summary:使用較少

簡單了解即可,暫不需要深入理解。

通過瀏覽器訪問http://被監(jiān)控端IP:9100(被監(jiān)控端口)/metrics

就可以查到node_exporter在被監(jiān)控端收集的監(jiān)控信息

什么是PromQL(函數(shù)式查詢語言)

Prometheus內(nèi)置了一個強大的數(shù)據(jù)查詢語言PromQL。 通過PromQL可以實現(xiàn)對監(jiān)控數(shù)據(jù)的查詢、聚合。

同時PromQL也被應(yīng)用于數(shù)據(jù)可視化(如Grafana)以及告警當(dāng)中。

通過PromQL可以輕松回答以下問題:

  • 在過去一段時間中95%應(yīng)用延遲時間的分布范圍?

  • 預(yù)測在4小時后,磁盤空間占用大致會是什么情況?

  • CPU占用率前5位的服務(wù)有哪些?(過濾)

具體查詢細節(jié)可以參考官方。

如何監(jiān)控遠程Linux主機

安裝Prometheus組件其實很簡單,下載包--解壓--后臺啟動運行即可,不做具體演示。

在遠程linux主機(被監(jiān)控端)上安裝node_exporter組件,可看下載地址

Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算

下載解壓后,里面就一個啟動命令node_exporter,直接啟動即可。

nohup /usr/local/node_exporter/node_exporter >/dev/null 2>&1 &
lsof -i:9100

nohup:如果直接啟動node_exporter的話,終端關(guān)閉進程也會隨之關(guān)閉,這個命令幫你解決問題。

Prometheus HTTP API

Prometheus 所有穩(wěn)定的 HTTP API 都在 /api/v1 路徑下。當(dāng)我們有數(shù)據(jù)查詢需求時,可以通過查詢 API 請求監(jiān)控數(shù)據(jù),提交數(shù)據(jù)可以使用 remote write 協(xié)議或者 Pushgateway 的方式。

支持的 API

Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算

認證方法

默認開啟認證,因此所有的接口都需要認證,且所有的認證方式都支持 Bearer Token和 Basic Auth。

調(diào)用接口的時候,我們需要攜帶Basic Auth請求頭的認證,否則會出現(xiàn)401。

Bearer Token

Bearer Token 隨著實例產(chǎn)生而生成,可以通過控制臺進行查詢。了解 Bearer Token 更多信息,請參見 Bearer Authentication。

Basic Auth

Basic Auth 兼容原生 Prometheus Query 的認證方式,用戶名為用戶的 APPID,密碼為 bearer token(實例產(chǎn)生時生成),可以通過控制臺進行查詢。了解 Basic Auth 更多信息,請參見 Basic Authentication。

數(shù)據(jù)返回格式

所有 API 的響應(yīng)數(shù)據(jù)格式都為 JSON。每一次成功的請求會返回 2xx 狀態(tài)碼。

無效的請求會返回一個包含錯誤對象的 JSON 格式數(shù)據(jù),同時也將包含一個如下表格的狀態(tài)碼:

Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算

無效請求響應(yīng)返回模板如下:

{
"status": "success" | "error",
"data": <data>,
 // 當(dāng) status 狀態(tài)為 error 時,下面的數(shù)據(jù)將被返回
"errorType": "<string>",
"error": "<string>",
 // 當(dāng)執(zhí)行請求時有警告信息時,該字段將被填充返回
"warnings": ["<string>"]
}

數(shù)據(jù)寫入

運維過程不需要對數(shù)據(jù)進行寫入,所以暫時不深入理解。

有興趣的同學(xué)可以看看官方文檔

監(jiān)控數(shù)據(jù)查詢

當(dāng)我們有數(shù)據(jù)查詢需求時,可以通過查詢 API 請求監(jiān)控數(shù)據(jù)。

查詢 API 接口

GET /api/v1/query
POST /api/v1/query

查詢參數(shù):

query= : Prometheus:查詢表達式。

time= <rfc3339 | unix_timestamp>: 時間戳, 可選。

timeout= :檢測超時時間, 可選。 默認由 -query.timeout 參數(shù)指定。

簡單的查詢

查詢當(dāng)前狀態(tài)為up的監(jiān)控主機:

curl -u "appid:token" 'http://IP:PORT/api/v1/query?query=up'

范圍查詢

GET /api/v1/query_range
POST /api/v1/query_range

根據(jù)時間范圍查詢需要的數(shù)據(jù),這也是我們用得最多的場景,

這時我們需要用到 /api/v1/query_range 接口,示例如下:

$ curl 'http://localhost:9090/api/v1/query_range?query=up&start=2015-07-01T20:10:30.781Z&end=2015-07-01T20:11:00.781Z&step=15s'
{
   "status" : "success",
   "data" : {
      "resultType" : "matrix",
      "result" : [
         {
            "metric" : {
               "__name__" : "up",
               "job" : "prometheus",
               "instance" : "localhost:9090"
            },
            "values" : [
               [ 1435781430.781, "1" ],
               [ 1435781445.781, "1" ],
               [ 1435781460.781, "1" ]
            ]
         },
         {
            "metric" : {
               "__name__" : "up",
               "job" : "node",
               "instance" : "localhost:9091"
            },
            "values" : [
               [ 1435781430.781, "0" ],
               [ 1435781445.781, "0" ],
               [ 1435781460.781, "1" ]
            ]
         }
      ]
   }
}

什么是Grafana

Grafana是一個開源的度量分析和可視化工具,可以通過將采集的數(shù)據(jù)分析、查詢,

然后進行可視化的展示,并能實現(xiàn)報警。

網(wǎng)址: https://grafana.com/

使用Grafana連接Prometheus

連接不再做具體演示,操作思路如下:

1.在Grafana服務(wù)器上安裝,下載地址:https://grafana.com/grafana/download

2.瀏覽器http://grafana服務(wù)器IP:3000登錄,默認賬號密碼都是admin,就可以登陸了。

3.把Prometheus服務(wù)器收集的數(shù)據(jù)做為一個數(shù)據(jù)源添加到Grafana,得到Prometheus數(shù)據(jù)。

4.然后為添加好的數(shù)據(jù)源做圖形顯示,最后在dashboard就可以查看到。

操作流程不難,就不講解重點,后面正式開始上查詢腳本。

工作使用場景

工作中需要通過CPU、內(nèi)存生成資源利用率報表,可以通過Prometheus的API寫一個Python腳本。

Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算

可通過API獲取數(shù)據(jù),然后再進行數(shù)據(jù)排序、過濾、運算、聚合,最后寫入Mysql數(shù)據(jù)庫。

CPU峰值計算

取最近一周CPU數(shù)值,再排序取最高的值。

def get_cpu_peak(self):
    """
        CPU取最近一周所有數(shù)值,再排序取最高的值,TOP1
        :return: {'IP' : value}
        """
    # 拼接URL
    pre_url = self.server_ip + '/api/v1/query_range?query='
    expr = '100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) ' \
    '&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    url = pre_url + expr
    # print(url)

    result = {}
    # 請求URL后將Json數(shù)據(jù)轉(zhuǎn)為字典對象
    res = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
    # print(data)

    # 循環(huán)取出字典里每個IP的values,排序取最高值,最后存入result字典
    for da in res.get('data').get('result'):
        values = da.get('values')
        cpu_values = [float(v[1]) for v in values]  # 取出數(shù)值并存入列表
        # 取出IP并消除端口號
        ip = da.get('metric').get('instance')
        ip = ip[:ip.index(':')] if ':' in ip else ip
        # if ip == '10.124.58.181':
        #     print (ip)
        # cpu_peak = round(sorted(cpu_values, reverse=True)[0], 2)
        cpu_peak = sorted(cpu_values, reverse=True)[0]
        # 取出IP和最高值之后,寫入字典
        result[ip] = cpu_peak

        # print(result)
        return result

CPU均值計算

取最近一周CPU每一天的TOP20除以20得到當(dāng)時忙時平均值,

再將7天平均值的和除以n,得到時間范圍內(nèi)忙時平均值。

def get_cpu_average(self):
    """
        CPU忙時平均值:取最近一周CPU數(shù)據(jù),每一天的TOP20除以20得到忙時平均值;
        再將一周得到的忙時平均值相加,再除以7,得到時間范圍內(nèi)一周的忙時平均值。
        :return:
        """
    cpu_average = {}
    for t in range(len(self.time_list)):
        if t + 1 < len(self.time_list):
            start_time = self.time_list[t]
            end_time = self.time_list[t + 1]
            # print(start_time, end_time)
            # 拼接URL
            pre_url = server_ip + '/api/v1/query_range?query='
            expr = '100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) ' \
            '&start=%s&end=%s&step=300' % (start_time, end_time - 1)
            url = pre_url + expr
            # print(url)
            # 請求接口數(shù)據(jù)
            data = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))

            for da in data.get('data').get('result'):   # 循環(huán)拿到result數(shù)據(jù)
                values = da.get('values')
                cpu_load = [float(v[1]) for v in values]    # 循環(huán)拿到values里面的所有值
                ip = da.get('metric').get('instance')       # 拿到instance里面的ip
                ip = ip[:ip.index(':')] if ':' in ip else ip    # 去除個別后面帶的端口號
                # avg_cup_load = sum(sorted(cpu_load, reverse=True)[:20]) / 20
                # 取top20% 再除以20%,得出top20%的平均值
                # avg_cup_load = round(sum(sorted(cpu_load, reverse=True)[:round(len(cpu_load) * 0.2)]) / round(len(cpu_load) * 0.2), 2)
                # 倒序后取前面20%除以個數(shù),得到前20%的平均值
                avg_cup_load = sum(sorted(cpu_load, reverse=True)[:round(len(cpu_load) * 0.2)]) / round(len(cpu_load) * 0.2)
                # print(avg_cup_load)
                # 將計算后的數(shù)據(jù)以ip為key寫入字典
                if cpu_average.get(ip):
                    cpu_average[ip].append(avg_cup_load)
                    else:
                        cpu_average[ip] = [avg_cup_load]

                        # 每日top20的平均值累加,共7天的再除以7
                        for k, v in cpu_average.items():
                            # cpu_average[k] = round(sum(v) / 7, 2)
                            cpu_average[k] = sum(v)

                            # print(cpu_average)
                            return cpu_average

內(nèi)存峰值計算

取7天內(nèi)存數(shù)值,排序后取最高峰值TOP1

def get_mem_peak(self):
    """
        內(nèi)存單臺峰值:取7天內(nèi)存最高峰值TOP1
        :return: 7天內(nèi)存使用率最高峰值
        """
    pre_url = self.server_ip + '/api/v1/query_range?query='
    # expr = '(node_memory_MemTotal_bytes - (node_memory_MemFree_bytes+node_memory_Buffers_bytes+node_memory_Cached_bytes )) / node_memory_MemTotal_bytes * 100&start=%s&end=%s&step=300' % (start_time, end_time)
    # 字符太長會導(dǎo)致報錯,所以這里進行拆分字段計算
    expr_MenTotal = 'node_memory_MemTotal_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    expr_MemFree = 'node_memory_MemFree_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    expr_Buffers = 'node_memory_Buffers_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
    expr_Cached = 'node_memory_Cached_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)

    result = {}
    # 循環(huán)分別取出總內(nèi)存、可用內(nèi)存、Buffer塊、緩存塊四個字段
    for ur in expr_MenTotal, expr_MemFree, expr_Buffers, expr_Cached:
        url = pre_url + ur
        data = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
        ip_dict = {}
        # 循環(huán)單個字段所有值
        for da in data.get('data').get('result'):
            ip = da.get('metric').get('instance')
            ip = ip[:ip.index(':')] if ':' in ip else ip
            # if ip != '10.124.53.12':
            #     continue
            if ip_dict.get(ip):     # 過濾重復(fù)的ip,重復(fù)ip會導(dǎo)致計算多次
                # print("重復(fù)ip:%s" % (ip))
                continue
                values = da.get('values')
                # 將列表里的值轉(zhuǎn)為字典方便計算
                values_dict = {}
                for v in values:
                    values_dict[str(v[0])] = v[1]
                    # 標(biāo)記ip存在
                    ip_dict[ip] = True
                    # 建立列表追加字典
                    if result.get(ip):
                        result[ip].append(values_dict)
                        else:
                            result[ip] = [values_dict]

                            # print(result)
                            # 對取出的四個值進行計算,得出峰值
                            for ip, values in result.items():
                                values_list = []
                                for k, v in values[0].items():
                                    try:
                                        values_MenTotal = float(v)
                                        values_MemFree = float(values[1].get(k, 0))
                                        values_Buffers = float(values[2].get(k, 0)) if values[2] else 0
                                        values_Cached = float(values[3].get(k, 0)) if values[3] else 0
                                        # 如果是0,不參與計算
                                        if values_MemFree==0.0 or values_Buffers==0.0 or values_Cached==0.0:
                                            continue
                                            # values_list.append(round((values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100, 2))
                                            # 合并后計算,得出列表
                                            values_list.append((values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100)
                                            # 對得出結(jié)果進行排序
                                            result[ip] = sorted(values_list, reverse=True)[0]
                                            except Exception as e:
                                                # print(values[0])
                                                logging.exception(e)

                                                # print(result)
                                                return result

內(nèi)存均值計算

先取出7天的日期,根據(jù)多條鏈接循環(huán)取出每天數(shù)據(jù),排序value取top20除以20,最終7天數(shù)據(jù)再除以7

def get_mem_average(self):
    """
        內(nèi)存忙時平均值:先取出7天的日期,根據(jù)多條鏈接循環(huán)取出每天數(shù)據(jù),排序value取top20除以20,最終7天數(shù)據(jù)再除以7
        :return:
        """
    avg_mem_util = {}
    for t in range(len(self.time_list)):
        if t + 1 < len(self.time_list):
            start_time = self.time_list[t]
            end_time = self.time_list[t + 1]
            # 根據(jù)多條鏈接循環(huán)取出每天數(shù)據(jù)
            pre_url = self.server_ip + '/api/v1/query_range?query='
            # expr = '(node_memory_MemTotal_bytes - (node_memory_MemFree_bytes+node_memory_Buffers_bytes+node_memory_Cached_bytes )) / node_memory_MemTotal_bytes * 100&start=%s&end=%s&step=300' % (start_time, end_time)
            expr_MenTotal = 'node_memory_MemTotal_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
            expr_MemFree = 'node_memory_MemFree_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
            expr_Buffers = 'node_memory_Buffers_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
            expr_Cached = 'node_memory_Cached_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)

            result = {}
            # 循環(huán)取出四個字段
            for ur in expr_MenTotal, expr_MemFree, expr_Buffers, expr_Cached:
                url = pre_url + ur
                data = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
                ip_dict = {}
                # 循環(huán)單個字段所有值
                for da in data.get('data').get('result'):
                    ip = da.get('metric').get('instance')
                    ip = ip[:ip.index(':')] if ':' in ip else ip
                    if ip_dict.get(ip):
                        # print("重復(fù)ip:%s" % (ip))
                        continue
                        values = da.get('values')
                        # 將列表里的值轉(zhuǎn)為字典方便計算
                        values_dict = {}
                        for v in values:
                            values_dict[str(v[0])] = v[1]
                            # 標(biāo)記ip存在
                            ip_dict[ip] = True
                            # 建立列表追加字典
                            if result.get(ip):
                                result[ip].append(values_dict)
                                else:
                                    result[ip] = [values_dict]

                                    # print(result)
                                    for ip, values in result.items():
                                        values_list = []

                                        for k, v in values[0].items():
                                            try:
                                                values_MenTotal = float(v)
                                                values_MemFree = float(values[1].get(k, 0)) if values[1] else 0
                                                values_Buffers = float(values[2].get(k, 0)) if values[2] else 0
                                                values_Cached = float(values[3].get(k, 0)) if values[3] else 0
                                                if values_MemFree == 0.0 or values_Buffers == 0.0 or values_Cached == 0.0:
                                                    continue
                                                    value_calc = (values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100
                                                    if value_calc != float(0):
                                                        values_list.append(value_calc)
                                                        except Exception as e:
                                                            print(values[0])
                                                            # logging.exception(e)
                                                            continue
                                                            # 排序value取top20除以20
                                                            # avg_mem = round(sum(sorted(values_list, reverse=True)[:round(len(values_list) * 0.2)]) / round(len(values_list) * 0.2), 2)
                                                            try:
                                                                avg_mem = sum(sorted(values_list, reverse=True)[:round(len(values_list) * 0.2)]) / round(len(values_list) * 0.2)
                                                                except Exception as e:
                                                                    avg_mem = 0
                                                                    logging.exception(e)

                                                                    if avg_mem_util.get(ip):
                                                                        avg_mem_util[ip].append(avg_mem)
                                                                        else:
                                                                            avg_mem_util[ip] = [avg_mem]

                                                                            # 最終7天數(shù)據(jù)再除以7
                                                                            for k, v in avg_mem_util.items():
                                                                                # avg_mem_util[k] = round(sum(v) / 7, 2)
                                                                                avg_mem_util[k] = sum(v)

                                                                                return avg_mem_util

導(dǎo)出excel

將采集到的數(shù)據(jù)導(dǎo)出excel

def export_excel(self, export):
    """
        將采集到的數(shù)據(jù)導(dǎo)出excel
        :param export: 數(shù)據(jù)集合
        :return:
        """
    try:
        # 將字典列表轉(zhuǎn)換為DataFrame
        pf = pd.DataFrame(list(export))
        # 指定字段順序
        order = ['ip', 'cpu_peak', 'cpu_average', 'mem_peak', 'mem_average', 'collector']
        pf = pf[order]
        # 將列名替換為中文
        columns_map = {
            'ip': 'ip',
            'cpu_peak': 'CPU峰值利用率',
            'cpu_average': 'CPU忙時平均峰值利用率',
            'mem_peak': '內(nèi)存峰值利用率',
            'mem_average': '內(nèi)存忙時平均峰值利用率',
            'collector': '來源地址'
        }
        pf.rename(columns=columns_map, inplace=True)
        # 指定生成的Excel表格名稱
        writer_name = self.Host + '.xlsx'
        writer_name.replace(':18600', '')
        # print(writer_name)
        file_path = pd.ExcelWriter(writer_name.replace(':18600', ''))
        # 替換空單元格
        pf.fillna(' ', inplace=True)
        # 輸出
        pf.to_excel(file_path, encoding='utf-8', index=False)
        # 保存表格
        file_path.save()
        except Exception as e:
            print(e)
            logging.exception(e)

因為機房需要保留數(shù)據(jù)方便展示,后面改造成采集直接入庫mysql。

以上是“Python如何調(diào)用Prometheus監(jiān)控數(shù)據(jù)并計算”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI