溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

急中生智~利用Spark core完成"ETL"!

發(fā)布時(shí)間:2020-07-27 17:54:28 來源:網(wǎng)絡(luò) 閱讀:12772 作者:橡皮高 欄目:MySQL數(shù)據(jù)庫

背景介紹:
今天接到老板分配的一個(gè)小任務(wù):開發(fā)一個(gè)程序,實(shí)現(xiàn)從數(shù)據(jù)庫中抽取數(shù)據(jù)并生成報(bào)表(這是我們數(shù)據(jù)庫審計(jì)平臺準(zhǔn)備上線的一個(gè)功能)。既然是要生成報(bào)表,那么首先得有數(shù)據(jù),于是便想到從該業(yè)務(wù)系統(tǒng)的測試環(huán)境抽取業(yè)務(wù)表的數(shù)據(jù),然后裝載至自己云主機(jī)上的Mysql中。
本來以為只要"select ...into outfile"和"load data infile..."兩個(gè)命令就可以搞定的,可是還是出了意外。測試環(huán)境導(dǎo)出的
txt文件在云主機(jī)load時(shí),報(bào)了"Row 1 doesn't contain data for all columns"這樣的warning,表中的數(shù)據(jù)自然也是凌亂且不完整的。
仔細(xì)分析,感覺可能是兩個(gè)方面出了問題:
1.由于測試環(huán)境的網(wǎng)段是隔離的,所以為了拿到"select ...into outfile"時(shí)生成的數(shù)據(jù),我是打開CRT的日志,然后執(zhí)行
"cat xxx.txt",變相地將數(shù)據(jù)獲取到了本地,然后上傳至云主機(jī)的;
2.測試環(huán)境的Mysql和云主機(jī)上Mysql的小版本不一致。
這兩個(gè)問題看似都沒法解決,現(xiàn)在只有文本文件,怎么辦?使用Spark不就得了!
之前也寫過一篇使用Spark分析Mysql慢日志的博文,自己對Spark core的各種算子比較熟悉,所以決定試一試。

實(shí)戰(zhàn)演練:
表結(jié)構(gòu)如下:

mysql> desc claims_case_loss_document;
+---------------+-------------+------+-----+---------+----------------+
| Field         | Type        | Null | Key | Default | Extra          |
+---------------+-------------+------+-----+---------+----------------+
| id            | int(11)     | NO   | PRI | NULL    | auto_increment |
| case_id       | varchar(22) | NO   |     | NULL    |                |
| case_times    | varchar(2)  | NO   |     | NULL    |                |
| document_list | text        | NO   |     | NULL    |                |
| create_time   | timestamp   | YES  |     | NULL    |                |
| update_time   | timestamp   | YES  |     | NULL    |                |
+---------------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)

文本結(jié)構(gòu)如下:

1147    90100002700021437455    1       100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書     2017-11-16 12:08:08     2017-11-16 12:08:08

觀察文本結(jié)構(gòu)可知,每個(gè)字段間都有數(shù)個(gè)空格,而且兩兩字段間的空格數(shù)并不一致,所以得先使用Spark core將文本中字段提取出來,以便后續(xù)插入。
閑話少說,直接上程序?。ㄒ韵鲁绦蚓褂胹cala在eclipse ide for scala中編寫和執(zhí)行)

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import java.sql.DriverManager

object insert2Mysql {
  def main(args: Array[String]): Unit = {
    val t1=System.nanoTime()
    val conf = new SparkConf()
        .setAppName("insert2Mysql")
        .setMaster("local")
    val sc = new SparkContext(conf)
    //textFile方法只能讀取字符集為utf-8的文件,否則中文會(huì)亂碼。windows下,將文件另存為時(shí),可以選擇utf-8字符集
    //也可在代碼中實(shí)施轉(zhuǎn)換,但比較繁瑣
    val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text001.txt", 1);
    val words = lines.map { line => line.split(" ") }
    val wordsNotNull = words.map{ word =>
                       val wordArray_raw = new ArrayBuffer[String]()
                       val wordArray = new ArrayBuffer[String]()
                       for(i<-0 until word.length){
                         if (word(i)!=""){
                           wordArray_raw+=word(i)
                         }
                       }
                       for(i<-0 until wordArray_raw.length-4){
                         wordArray+=wordArray_raw(i)
                       }
                       wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
                       wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
                       wordArray
                      }

    wordsNotNull.foreach { word =>
                           Class.forName("com.mysql.cj.jdbc.Driver")
                           val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
                           try {
                                val statement = conn.createStatement()
                                val sql="insert into claims_case_loss_document values ("+
                                        word(0)+","+
                                        "'"+word(1)+"'"+","+
                                        "'"+word(2)+"'"+","+
                                        "'"+word(3)+"'"+","+
                                        "'"+word(4)+"'"+","+
                                        "'"+word(5)+"'"+")"
                                //執(zhí)行插入
                                //println(sql)
                                statement.executeUpdate(sql)
                                } catch{
                                        case e:Exception =>e.printStackTrace
                                       }
                                  finally {
                                          conn.close
                                          }
                         }
    val t2=System.nanoTime()
        //打印程序運(yùn)行時(shí)間
    println((t2-t1)/1000000000 +"s")
  }
}

在插入的過程中,第一條記錄總是會(huì)報(bào)錯(cuò)(后續(xù)語句插入正常),將eclipse中打印出的報(bào)錯(cuò)的insert語句手工粘貼至mysql執(zhí)行時(shí),仍報(bào)相同錯(cuò)誤:
急中生智~利用Spark core完成"ETL"!
從報(bào)錯(cuò)看是遇到了bug,并且1147這個(gè)值有問題,將相鄰語句放入Notepad對比:
急中生智~利用Spark core完成"ETL"!
從圖中可看出,1147的千位上的1確實(shí)發(fā)生了異常改變,而第二條語句中的1148是正常的,猜測可能是某個(gè)未知bug導(dǎo)致了第一條記錄發(fā)生了異常改變。這個(gè)猜測在后續(xù)得到了證實(shí):當(dāng)把1147所在行從文本中刪除后(此時(shí)1148所在行為第一條記錄),1148所在行也報(bào)出同樣的錯(cuò)誤,而后續(xù)語句均可正常插入。
由于數(shù)據(jù)是作分析用的,所以丟失一條無傷大雅,而且這個(gè)bug實(shí)在詭異,這里就不再深究了。

細(xì)心的童鞋在看了代碼后應(yīng)該會(huì)問:數(shù)據(jù)插入的效率如何?實(shí)不相瞞,效率很差!5000條的數(shù)據(jù)足足用了近半個(gè)小時(shí),即使是在這樣的OLAP場景下,這樣的效率也是不可容忍的!
仔細(xì)研究代碼可發(fā)現(xiàn),在對RDD調(diào)用foreach方法進(jìn)行插入的時(shí)候,每一條記錄都要?jiǎng)?chuàng)建一個(gè)連接,并且每一次insert都會(huì)在Mysql中觸發(fā)一次commit操作(autocommit參數(shù)默認(rèn)是打開的),這些都是很消耗資源的操作,插入效率自然很差。
發(fā)現(xiàn)這些問題后,針對代碼進(jìn)行了修改:

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.DriverManager
import scala.collection.mutable.ArrayBuffer

object insert2Mysql {
    def main(args: Array[String]): Unit = {
    val t1=System.nanoTime()
    val conf = new SparkConf()
        .setAppName("insert2Mysql")
        .setMaster("local")
    val sc = new SparkContext(conf)
    //textFile方法只能讀取字符集為utf-8的文件,否則中文會(huì)亂碼。windows下,將文件另存為時(shí),可以選擇utf-8字符集
    //也可在代碼中實(shí)施轉(zhuǎn)換,但比較繁瑣
    val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text01.txt", 1);
    val words = lines.map { line => line.split(" ") }
    val wordsNotNull = words.map{ word =>
                       val wordArray_raw = new ArrayBuffer[String]()
                       val wordArray = new ArrayBuffer[String]()
                       for(i<-0 until word.length){
                         if (word(i)!=""){
                           wordArray_raw+=word(i)
                         }
                       }
                       for(i<-0 until wordArray_raw.length-4){
                         wordArray+=wordArray_raw(i)
                       }
                       wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
                       wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
                       wordArray
                      }

    val sqlRDD=wordsNotNull.map{ word =>  
                                    val sql="insert into claims_case_loss_document values ("+
                                             word(0)+","+
                                             "'"+word(1)+"'"+","+
                                             "'"+word(2)+"'"+","+
                                             "'"+word(3)+"'"+","+
                                             "'"+word(4)+"'"+","+
                                             "'"+word(5)+"'"+")"
                                    sql
                                  }

    val sqlArray=sqlRDD.toArray()

    //加載驅(qū)動(dòng)
    Class.forName("com.mysql.cj.jdbc.Driver") 
    val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")                               
    try {
        conn.setAutoCommit(false)
        val statement = conn.createStatement() 
        //這里有bug,處理出來的第一行格式都會(huì)報(bào)ERROR 1054 (42S22): Unknown column '?1147' in 'field list'
        //為了避免程序跳出循環(huán),所以循環(huán)從1開始,即從第2條開始插入
        for(i<-1 until sqlArray.length){
           //執(zhí)行插入
          println(sqlArray(i))
          statement.executeUpdate(sqlArray(i))
          }
        conn.commit()
        } 
    catch{
          case e:Exception =>e.printStackTrace
          }   
    finally{
            conn.close
            }

    val t2=System.nanoTime()
    println((t2-t1)/1000000000 +"s")
  }
}

修改后的代碼規(guī)避了上述缺陷,在同樣插入5000條數(shù)據(jù)的情況下,只用了221s!效率大大提升!
到Mysql驗(yàn)證數(shù)據(jù):

mysql> select count(*) from claims_case_loss_document;
+----------+
| count(*) |
+----------+
|     4999 |  --插入時(shí)跳過了第一條,所以為4999條
+----------+
1 row in set (0.00 sec)

mysql> select * from claims_case_loss_document limit 1\G
*************************** 1. row ***************************
           id: 1148
      case_id: 90100002700021437450
   case_times: 1
document_list: 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書
  create_time: 2017-11-16 12:08:08
  update_time: 2017-11-16 12:08:08
1 row in set (0.00 sec)

至此,問題圓滿解決!整個(gè)過程和數(shù)據(jù)倉庫領(lǐng)域的ETL很接近,抽取-轉(zhuǎn)換-裝載,三個(gè)環(huán)節(jié)都有涉及,只是沒有使用
kettle之類的工具罷了。

總結(jié):
在大數(shù)據(jù)時(shí)代,DBA應(yīng)該積極做出改變,掌握一定開發(fā)技能,以便更好地適應(yīng)時(shí)代變化,切不可固守自己的一畝三分地!

最后,給我們上海分組自研的數(shù)據(jù)庫審計(jì)平臺打個(gè)廣告 ^.^
數(shù)據(jù)庫審計(jì)平臺是我們分組歷時(shí)兩年打造的產(chǎn)品,可用于Mysql、Oracle、Postgres等多種數(shù)據(jù)庫,具備以下核心工能:
1.審計(jì)違規(guī)sql,前端一鍵生成報(bào)告
2.對相同功能點(diǎn)的sql可實(shí)現(xiàn)自動(dòng)歸類,方便后續(xù)統(tǒng)一整改
3.內(nèi)嵌Percona toolkit,前端一鍵調(diào)用
4.一鍵抓取低效sql,并自動(dòng)給出優(yōu)化建議
還有很多很酷的功能就不一一介紹了,總之,誰用誰說好!感興趣的DBA童鞋可以留言,可免費(fèi)試用哦!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI