溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

使用spark分析mysql慢日志

發(fā)布時間:2020-07-09 04:04:15 來源:網(wǎng)絡 閱讀:12362 作者:橡皮高 欄目:MySQL數(shù)據(jù)庫

熟悉oracle的童鞋都知道,在oracle中,有很多視圖記錄著sql執(zhí)行的各項指標,我們可以根據(jù)自己的需求編寫相應腳本,從oracle中獲取sql的性能開銷。作為開源數(shù)據(jù)庫,mysql不比oracle,分析慢sql只能通過slow.log。slow.log看起來不夠直觀,而且同一條慢sql執(zhí)行多次的話就會在slow.log中被記錄多次,可閱讀性較差。
最近,部門開發(fā)的數(shù)據(jù)庫審計平臺上線mysql審計模塊,需要為客戶提供一鍵化提取slow.log中慢sql的功能。由于本人之前研究過spark,在分析慢日志的文本結構后,使用scala語言,利用spark core相關技術,編寫了能夠去重slow.log中重復sql,并將按執(zhí)行時間排序的top sql輸入到hive表中的小程序。
話不多說,上菜!

開發(fā)環(huán)境:
1、CentOS 6.5
2、JDK 1.7
3、Hadoop 2.4.1
4、Hive 0.13
5、Spark 1.5.1
6、scala 2.11.4
hadoop及spark集群環(huán)境的搭建方法就不多說了哈,網(wǎng)上資料很多,對大數(shù)據(jù)感興趣的童鞋可以嘗試搭建。

step 1 使用scala ide for eclipse編寫應用程序
analyzeSlowLog.scala:

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.util.matching.Regex
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext

object SlowLogAnalyze {
  def main(args: Array[String]): Unit = {
    //創(chuàng)建SparkConf,SparkContext和HiveContext
    val conf=new SparkConf()
      .setAppName("SlowLogAnalyze");
    val sc=new SparkContext(conf)
    val hiveContext=new HiveContext(sc)

    //讀取hdfs文件,獲取logRDD
    val logRDD=sc.textFile("hdfs://spark1:9000/files/slow.log", 5)

    //創(chuàng)建正則表達式,用來過濾slow.log中的無效信息
    val pattern1="# Time:".r
    val pattern2="# User@Host:".r
    val pattern3="SET timestamp=".r 

    //對logRDD進行filter,過濾無效信息
    val filteredLogRDD=logRDD.filter { str => 
          //正則返回的是option類型,只有Some和None兩種類型
          if(pattern1.findFirstIn(str)!=None){
            false
          }else if(pattern2.findFirstIn(str)!=None){
            false
          }else if(pattern3.findFirstIn(str)!=None){
            false
          }else{
            true
          }
         }
    /**
     * 將filteredLogRDD轉(zhuǎn)換為格式為(execute_time,sql_text)的tuple類RDD KV_RDD
     */

    //將filteredLogRDD轉(zhuǎn)換為數(shù)組
    val logArray=filteredLogRDD.toArray()

    //定義正則表達式pattern,用于識別Query_time
    val pattern="# Query_time:".r 

    //定義數(shù)組KV_Array,用于存放循環(huán)映射后的tuple,tuple為(query_time所在行,sql_text)
    val KV_Array=ArrayBuffer[(String,String)]()
          for (i<-0 until logArray.length){
             if(pattern.findFirstIn(logArray(i))!=None){
               val key=logArray(i)
               var flag=true 
               var value=""
               if(i<logArray.length-1){
                 for(k<-i+1 until logArray.length if flag ){
                   if(pattern.findFirstIn(logArray(k))!=None){
                     flag=false
                   }else{
                     value=value+logArray(k)
                   }
                 } 
               }
               KV_Array+=((key,value))
             }
           }

     //并行化集合獲取KV_RDD
     val KV_RDD=sc.parallelize(KV_Array, 1)

     //執(zhí)行map,將KV_RDD映射為(execute_time,sql_text)的tuple類RDD time_sql_RDD
     val sql_time_RDD=KV_RDD
         .map{tuple=>
             val timeSplit=tuple._1.split(" ")
             //注意這里是toDouble,不是toInt?。。。∫驗槿罩局械臅r間是Double類型?。。。?             (tuple._2,timeSplit(2).toDouble)
         }

     /**
      * 由于慢日志中保存了較多相同sql,需進行去重處理
      * 對相同的sql的execute_time取均值,最后輸出unique的(sql_text,execute_time)
      */

     val groupBySqlRDD=sql_time_RDD.groupByKey()
         .map{tuple=>
             val timeArray=tuple._2.toArray
             var totalTime=0.0
             for(i<-0 until timeArray.length){
               totalTime=totalTime + timeArray(i)
             }
             val avgTime=totalTime/timeArray.length
             (tuple._1,avgTime)
         }

     val sortedRowRDD=groupBySqlRDD
         .map{tuple=>(tuple._2,tuple._1)}
         .sortByKey(false, 1)
         .map{tuple=>Row(tuple._2,tuple._1)}
     val top10Array=sortedRowRDD.take(10)
     val top10RDD=sc.parallelize(top10Array, 1)
     //將sortedRDD轉(zhuǎn)換為dataframe 
     val structType=new StructType(Array(
           StructField("sql_text",StringType,true),
           StructField("executed_time",DoubleType,true)
           )
         )
     val top10DF=hiveContext.createDataFrame(top10RDD, structType) 
     hiveContext.sql("drop table if exists sql_top10")
     top10DF.saveAsTable("sql_top10")
  }
}

將代碼打成jar包并上傳至linux。
step 2 編寫執(zhí)行腳本
analyzeSlowLog.sh:

/var/software/spark-1.5.1-bin-hadoop2.4/bin/spark-submit \
--class cn.spark.study.sql.SlowLogAnalyze \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
--files /var/software/hive/conf/hive-site.xml \
--driver-class-path /var/software/hive/lib/mysql-connector-java-5.1.17.jar \
/var/software/spark_study/scala/SlowLogAnalyze.jar

step 3 執(zhí)行analyzeSlowLog.sh,并進入hive查看分析結果:
hive> show tables;
OK
daily_top3_keywords_uvs
good_students
sql_top10 -- 這張表就是scala程序中定義的表名,程序運行時會在hive中創(chuàng)建
student_infos
student_scores
Time taken: 0.042 seconds, Fetched: 5 row(s)

查看sql_top10中的內(nèi)容:
這里由于長度限制,截斷了sql文本,所以看起來部分sql是一樣的,實際是兩條不同的sql(where 條件不同)。
hive> select substr(sql_text,1,50),executed_time from sql_top10;
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
...
Execution completed successfully
MapredLocal task succeeded
OK
select 'true' as QUERYID, ID_GARAG 0.0252804
select count() from pms_garage_vitri_info 0.0048902
select count(
) from information_schema.PROCESSLIS 3.626E-4
select 'true' as QUERYID, e_survey 2.39E-4
select 'true' as QUERYID, e_survey 2.34E-4
SELECT account_code AS um 2.2360000000000001E-4
select 'true' as QUERYID, e_survey 2.19E-4
select 'true' as QUERYID, e_survey 2.18E-4
select 'true' as QUERYID, e_survey 2.15E-4
SELECT account_code AS um 2.1419999999999998E-4
Time taken: 8.501 seconds, Fetched: 10 row(s)

至此,對mysql slow.log的提取完畢!

關于在mysql中創(chuàng)建相關視圖的思考:
hadoop和spark一般用于處理大數(shù)據(jù),這里用來處理mysql的慢日志實在是大材小用。不過,要想在mysql中提供查看數(shù)據(jù)庫top sql的v$Topsql視圖,對slow.log的實時分析是必須的,此時,spark streaming便可派上用場。
思路如下:
1.編寫crontab定時任務以定時拷貝slow.log至hdfs
2.編寫crontab定時任務以調(diào)用spark streaming程序分析hdfs上的最新slow.log ->通過jdbc將將top sql輸出到對應mysql數(shù)據(jù)庫中的v$Topsql視圖中,并覆蓋之前的數(shù)據(jù)。
ps:在分析slow.log時,可在程序中executor,timestamp等字段(本文中并未提取這兩個字段),以提供更詳細的信息。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI