您好,登錄后才能下訂單哦!
背景介紹:
今天接到老板分配的一個(gè)小任務(wù):開發(fā)一個(gè)程序,實(shí)現(xiàn)從數(shù)據(jù)庫中抽取數(shù)據(jù)并生成報(bào)表(這是我們數(shù)據(jù)庫審計(jì)平臺準(zhǔn)備上線的一個(gè)功能)。既然是要生成報(bào)表,那么首先得有數(shù)據(jù),于是便想到從該業(yè)務(wù)系統(tǒng)的測試環(huán)境抽取業(yè)務(wù)表的數(shù)據(jù),然后裝載至自己云主機(jī)上的Mysql中。
本來以為只要"select ...into outfile"和"load data infile..."兩個(gè)命令就可以搞定的,可是還是出了意外。測試環(huán)境導(dǎo)出的
txt文件在云主機(jī)load時(shí),報(bào)了"Row 1 doesn't contain data for all columns"這樣的warning,表中的數(shù)據(jù)自然也是凌亂且不完整的。
仔細(xì)分析,感覺可能是兩個(gè)方面出了問題:
1.由于測試環(huán)境的網(wǎng)段是隔離的,所以為了拿到"select ...into outfile"時(shí)生成的數(shù)據(jù),我是打開CRT的日志,然后執(zhí)行
"cat xxx.txt",變相地將數(shù)據(jù)獲取到了本地,然后上傳至云主機(jī)的;
2.測試環(huán)境的Mysql和云主機(jī)上Mysql的小版本不一致。
這兩個(gè)問題看似都沒法解決,現(xiàn)在只有文本文件,怎么辦?使用Spark不就得了!
之前也寫過一篇使用Spark分析Mysql慢日志的博文,自己對Spark core的各種算子比較熟悉,所以決定試一試。
實(shí)戰(zhàn)演練:
表結(jié)構(gòu)如下:
mysql> desc claims_case_loss_document;
+---------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| case_id | varchar(22) | NO | | NULL | |
| case_times | varchar(2) | NO | | NULL | |
| document_list | text | NO | | NULL | |
| create_time | timestamp | YES | | NULL | |
| update_time | timestamp | YES | | NULL | |
+---------------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)
文本結(jié)構(gòu)如下:
1147 90100002700021437455 1 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書 2017-11-16 12:08:08 2017-11-16 12:08:08
觀察文本結(jié)構(gòu)可知,每個(gè)字段間都有數(shù)個(gè)空格,而且兩兩字段間的空格數(shù)并不一致,所以得先使用Spark core將文本中字段提取出來,以便后續(xù)插入。
閑話少說,直接上程序?。ㄒ韵鲁绦蚓褂胹cala在eclipse ide for scala中編寫和執(zhí)行)
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import java.sql.DriverManager
object insert2Mysql {
def main(args: Array[String]): Unit = {
val t1=System.nanoTime()
val conf = new SparkConf()
.setAppName("insert2Mysql")
.setMaster("local")
val sc = new SparkContext(conf)
//textFile方法只能讀取字符集為utf-8的文件,否則中文會(huì)亂碼。windows下,將文件另存為時(shí),可以選擇utf-8字符集
//也可在代碼中實(shí)施轉(zhuǎn)換,但比較繁瑣
val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text001.txt", 1);
val words = lines.map { line => line.split(" ") }
val wordsNotNull = words.map{ word =>
val wordArray_raw = new ArrayBuffer[String]()
val wordArray = new ArrayBuffer[String]()
for(i<-0 until word.length){
if (word(i)!=""){
wordArray_raw+=word(i)
}
}
for(i<-0 until wordArray_raw.length-4){
wordArray+=wordArray_raw(i)
}
wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
wordArray
}
wordsNotNull.foreach { word =>
Class.forName("com.mysql.cj.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
try {
val statement = conn.createStatement()
val sql="insert into claims_case_loss_document values ("+
word(0)+","+
"'"+word(1)+"'"+","+
"'"+word(2)+"'"+","+
"'"+word(3)+"'"+","+
"'"+word(4)+"'"+","+
"'"+word(5)+"'"+")"
//執(zhí)行插入
//println(sql)
statement.executeUpdate(sql)
} catch{
case e:Exception =>e.printStackTrace
}
finally {
conn.close
}
}
val t2=System.nanoTime()
//打印程序運(yùn)行時(shí)間
println((t2-t1)/1000000000 +"s")
}
}
在插入的過程中,第一條記錄總是會(huì)報(bào)錯(cuò)(后續(xù)語句插入正常),將eclipse中打印出的報(bào)錯(cuò)的insert語句手工粘貼至mysql執(zhí)行時(shí),仍報(bào)相同錯(cuò)誤:
從報(bào)錯(cuò)看是遇到了bug,并且1147這個(gè)值有問題,將相鄰語句放入Notepad對比:
從圖中可看出,1147的千位上的1確實(shí)發(fā)生了異常改變,而第二條語句中的1148是正常的,猜測可能是某個(gè)未知bug導(dǎo)致了第一條記錄發(fā)生了異常改變。這個(gè)猜測在后續(xù)得到了證實(shí):當(dāng)把1147所在行從文本中刪除后(此時(shí)1148所在行為第一條記錄),1148所在行也報(bào)出同樣的錯(cuò)誤,而后續(xù)語句均可正常插入。
由于數(shù)據(jù)是作分析用的,所以丟失一條無傷大雅,而且這個(gè)bug實(shí)在詭異,這里就不再深究了。
細(xì)心的童鞋在看了代碼后應(yīng)該會(huì)問:數(shù)據(jù)插入的效率如何?實(shí)不相瞞,效率很差!5000條的數(shù)據(jù)足足用了近半個(gè)小時(shí),即使是在這樣的OLAP場景下,這樣的效率也是不可容忍的!
仔細(xì)研究代碼可發(fā)現(xiàn),在對RDD調(diào)用foreach方法進(jìn)行插入的時(shí)候,每一條記錄都要?jiǎng)?chuàng)建一個(gè)連接,并且每一次insert都會(huì)在Mysql中觸發(fā)一次commit操作(autocommit參數(shù)默認(rèn)是打開的),這些都是很消耗資源的操作,插入效率自然很差。
發(fā)現(xiàn)這些問題后,針對代碼進(jìn)行了修改:
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.DriverManager
import scala.collection.mutable.ArrayBuffer
object insert2Mysql {
def main(args: Array[String]): Unit = {
val t1=System.nanoTime()
val conf = new SparkConf()
.setAppName("insert2Mysql")
.setMaster("local")
val sc = new SparkContext(conf)
//textFile方法只能讀取字符集為utf-8的文件,否則中文會(huì)亂碼。windows下,將文件另存為時(shí),可以選擇utf-8字符集
//也可在代碼中實(shí)施轉(zhuǎn)換,但比較繁瑣
val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text01.txt", 1);
val words = lines.map { line => line.split(" ") }
val wordsNotNull = words.map{ word =>
val wordArray_raw = new ArrayBuffer[String]()
val wordArray = new ArrayBuffer[String]()
for(i<-0 until word.length){
if (word(i)!=""){
wordArray_raw+=word(i)
}
}
for(i<-0 until wordArray_raw.length-4){
wordArray+=wordArray_raw(i)
}
wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
wordArray
}
val sqlRDD=wordsNotNull.map{ word =>
val sql="insert into claims_case_loss_document values ("+
word(0)+","+
"'"+word(1)+"'"+","+
"'"+word(2)+"'"+","+
"'"+word(3)+"'"+","+
"'"+word(4)+"'"+","+
"'"+word(5)+"'"+")"
sql
}
val sqlArray=sqlRDD.toArray()
//加載驅(qū)動(dòng)
Class.forName("com.mysql.cj.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
try {
conn.setAutoCommit(false)
val statement = conn.createStatement()
//這里有bug,處理出來的第一行格式都會(huì)報(bào)ERROR 1054 (42S22): Unknown column '?1147' in 'field list'
//為了避免程序跳出循環(huán),所以循環(huán)從1開始,即從第2條開始插入
for(i<-1 until sqlArray.length){
//執(zhí)行插入
println(sqlArray(i))
statement.executeUpdate(sqlArray(i))
}
conn.commit()
}
catch{
case e:Exception =>e.printStackTrace
}
finally{
conn.close
}
val t2=System.nanoTime()
println((t2-t1)/1000000000 +"s")
}
}
修改后的代碼規(guī)避了上述缺陷,在同樣插入5000條數(shù)據(jù)的情況下,只用了221s!效率大大提升!
到Mysql驗(yàn)證數(shù)據(jù):
mysql> select count(*) from claims_case_loss_document;
+----------+
| count(*) |
+----------+
| 4999 | --插入時(shí)跳過了第一條,所以為4999條
+----------+
1 row in set (0.00 sec)
mysql> select * from claims_case_loss_document limit 1\G
*************************** 1. row ***************************
id: 1148
case_id: 90100002700021437450
case_times: 1
document_list: 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書
create_time: 2017-11-16 12:08:08
update_time: 2017-11-16 12:08:08
1 row in set (0.00 sec)
至此,問題圓滿解決!整個(gè)過程和數(shù)據(jù)倉庫領(lǐng)域的ETL很接近,抽取-轉(zhuǎn)換-裝載,三個(gè)環(huán)節(jié)都有涉及,只是沒有使用
kettle之類的工具罷了。
總結(jié):
在大數(shù)據(jù)時(shí)代,DBA應(yīng)該積極做出改變,掌握一定開發(fā)技能,以便更好地適應(yīng)時(shí)代變化,切不可固守自己的一畝三分地!
最后,給我們上海分組自研的數(shù)據(jù)庫審計(jì)平臺打個(gè)廣告 ^.^
數(shù)據(jù)庫審計(jì)平臺是我們分組歷時(shí)兩年打造的產(chǎn)品,可用于Mysql、Oracle、Postgres等多種數(shù)據(jù)庫,具備以下核心工能:
1.審計(jì)違規(guī)sql,前端一鍵生成報(bào)告
2.對相同功能點(diǎn)的sql可實(shí)現(xiàn)自動(dòng)歸類,方便后續(xù)統(tǒng)一整改
3.內(nèi)嵌Percona toolkit,前端一鍵調(diào)用
4.一鍵抓取低效sql,并自動(dòng)給出優(yōu)化建議
還有很多很酷的功能就不一一介紹了,總之,誰用誰說好!感興趣的DBA童鞋可以留言,可免費(fèi)試用哦!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。