您好,登錄后才能下訂單哦!
最近項(xiàng)目中使用SparkSQL來(lái)做數(shù)據(jù)的統(tǒng)計(jì)分析,閑來(lái)就記錄下來(lái)。 直接上代碼: import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object SparkSQL { //定義兩個(gè)case class A和B: // A是用戶的基本信息:包括客戶號(hào)、***號(hào)和性別 // B是用戶的交易信息:包括客戶號(hào)、消費(fèi)金額和消費(fèi)狀態(tài) case class A(custom_id:String,id_code:String,sex:String) case class B(custom_id:String,money:String,status:Int) def main(args:Array[String]): Unit = { //數(shù)據(jù)量不大時(shí),測(cè)試發(fā)現(xiàn)使用local[*]的效率要比local和基于YARN的效率都高。 //這里使用local[*]模式,設(shè)置AppName為"SparkSQL" val sc = new SparkContext("local[*]", "SparkSQL") val sqlContext = new SQLContext(sc) import sqlContext.createSchemaRDD //定義兩個(gè)RDD:A_RDD和B_RDD。數(shù)據(jù)之間以char(1)char(1)分隔,取出對(duì)應(yīng)的客戶信息。 val A_RDD = sc.textFile("hdfs://172.16.30.2:25000/usr/tmpdata/A.dat").map(_.split("\u0001\u0001")).map(t => tbclient(t(0), t(4), t(13))) val B_RDD = sc.textFile("hdfs://172.16.30.3:25000/usr/tmpdata/B.dat").map(_.split("\u0001\u0001")).map(t=>tbtrans(t(16),t(33),t(71).toInt)) //將普通RDD轉(zhuǎn)為SchemaRDD A_RDD.registerTempTable("A_RDD") B_RDD.registerTempTable("B_RDD") def toInt(s: String): Int = { try { s.toInt } catch { case e: Exception => 9999 } } def myfun2(id_code:String):Int = { val i = id_code.length i } //定義函數(shù):根據(jù)***號(hào)判斷屬相 //這里注意Scala的substring方法的使用,和Java、Oracle等都不同 def myfun5(id_code:String):String = { var year = "" if(id_code.length == 18){ val md = toInt(id_code.substring(6,10)) val i = 1900 val years=new Array[String](12) years(0) = "鼠" years(1) = "牛" years(2) = "虎" years(3) = "兔" years(4) = "龍" years(5) = "蛇" years(6) = "馬" years(7) = "羊" years(8) = "猴" years(9) = "雞" years(10) = "狗" years(11) = "豬" year = years((md-i)%years.length) } year } //設(shè)置年齡段 def myfun3(id_code:String):String = { var rt = "" if(id_code.length == 18){ val age = toInt(id_code.substring(6,10)) if(age >= 1910 && age < 1920){ rt = "1910 ~ 1920" } else if(age >= 1920 && age < 1930){ rt = "1920 ~ 1930" } else if(age >= 1930 && age < 1940){ rt = "1930 ~ 1940" } else if(age >= 1940 && age < 1950){ rt = "1940 ~ 1950" } else if(age >= 1950 && age < 1960){ rt = "1950 ~ 1960" } else if(age >= 1960 && age <1970){ rt = "1960 ~ 1970" } else if(age >= 1970 && age <1980){ rt = "1970 ~ 1980" } else if(age >= 1980 && age <1990){ rt = "1980 ~ 1990" } else if(age >= 1990 && age <2000){ rt = "1990 ~ 2000" } else if(age >= 2000 && age <2010){ rt = "2000 ~ 2010" } else if(age >= 2010 && age<2014){ rt = "2010以后" } } rt } //劃分消費(fèi)金額區(qū)間 def myfun4(money:String):String = { var rt = "" if(money>="10000" && money<"50000"){ rt = "10000 ~ 50000" } else if(money>="50000" && money<"60000"){ rt = "50000 ~ 60000" } else if(money>="60000" && money<"70000"){ rt = "60000 ~ 70000" } else if(money>="70000" && money<"80000"){ rt = "70000 ~ 80000" } else if(money>="80000" && money<"100000"){ rt = "80000 ~ 100000" } else if(money>="100000" && money<"150000"){ rt = "100000 ~ 150000" } else if(money>="150000" && money<"200000"){ rt = "150000 ~ 200000" } else if(money>="200000" && money<"1000000"){ rt = "200000 ~ 1000000" } else if(money>="1000000" && money<"10000000"){ rt = "1000000 ~ 10000000" } else if(money>="10000000" && money<"50000000"){ rt = "10000000 ~ 50000000" } else if(money>="5000000" && money<"100000000"){ rt = "5000000 ~ 100000000" } rt } //根據(jù)生日判斷星座 def myfun1(id_code:String):String = { var rt = "" if(id_code.length == 18){ val md = toInt(id_code.substring(10,14)) if (md >= 120 && md <= 219){ rt = "水瓶座" } else if (md >= 220 && md <= 320){ rt = "雙魚(yú)座" } else if (md >= 321 && md <= 420){ rt = "白羊座" } else if (md >= 421 && md <= 521){ rt = "金牛座" } else if (md >= 522 && md <= 621){ rt = "雙子座" } else if (md >= 622 && md <= 722){ rt = "巨蟹座" } else if (md >= 723 && md <= 823){ rt = "獅子座" } else if (md >= 824 && md <= 923){ rt = "***座" } else if (md >= 924 && md <= 1023){ rt = "天秤座" } else if (md >= 1024 && md <= 1122){ rt = "天蝎座" } else if (md >= 1123 && md <= 1222){ rt = "射手座" } else if ((md >= 1223 && md <= 1231) | (md >= 101 && md <= 119)){ rt = "摩蝎座" } else rt = "無(wú)效" } rt } //注冊(cè)函數(shù) sqlContext.registerFunction("fun1",(x:String)=>myfun1(x)) sqlContext.registerFunction("fun3",(z:String)=>myfun3(z)) sqlContext.registerFunction("fun4",(m:String)=>myfun4(m)) sqlContext.registerFunction("fun5",(n:String)=>myfun5(n)) //星座統(tǒng)計(jì),注意,這里必須要有fun2(id_code)=18這個(gè)限制,否則,第一個(gè)字段有這個(gè)限制,而第二個(gè)統(tǒng)計(jì)字段值卻沒(méi)有這個(gè)限制 val result1 = sqlContext.sql("select fun1(id_code),count(*) from A_RDD t where fun2(id_code)=18 group by fun1(id_code)") //屬相統(tǒng)計(jì) val result2 = sqlContext.sql("select fun5(a.id_code),count(*) from A_RDD a where fun2(id_code)=18 group by fun5(a.id_code)") //根據(jù)消費(fèi)區(qū)間統(tǒng)計(jì)消費(fèi)人數(shù)和總金額 val result3 = sqlContext.sql("select fun4(a.money),count(distinct a.custom_id),SUM(a.money) from B_RDD a where a.status=8 and a.custom_id in (select b.custom_id from A_RDD b where fun2(b.id_code)=18) group by fun4(a.money)") //打印結(jié)果 result3.collect().foreach(println) //也可以將結(jié)果保存到OS/HDFS上 result2.saveAsTextFile("file:///tmp/age") } }
在測(cè)試result3的時(shí)候,發(fā)現(xiàn)報(bào)錯(cuò):
Exception in thread "main" java.lang.RuntimeException: [1.101] failure: ``NOT'' expected but `select' found
select fun5(a.id_code),count(*) from A_RDD a where fun2(a.id_code)=18 and a.custom_id IN (select distinct b.custom_id from B_RDD b where b.status=8) group by fun5
(a.id_code)
^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:74)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:267)
at SparkSQL$.main(SparkSQL.scala:198)
at SparkSQL.main(SparkSQL.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
目前還在調(diào)試階段,目測(cè)可能SparkSQL對(duì)條件中子查詢的支持做的不是很好(只是猜測(cè))。
如有問(wèn)題,還望路過(guò)的高手不吝賜教。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。