val conf =
new SparkConf().setAppName("Test").setMaster("local[*]") val sc =
new SparkContext(conf) /* Action
算子*/ //集合函數(shù) val rdd1 = sc.parallelize(List(2,1,3,6,5),2) val rdd1_1 = rdd1.reduce(_+_) println(rdd1_1) //以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素 println(rdd1.collect().toBuffer) //返回RDD的元素個(gè)數(shù) println(rdd1.count()) //取出對(duì)應(yīng)數(shù)量的值 默認(rèn)降序,
若輸入0
會(huì)返回一個(gè)空數(shù)組 println(rdd1.top(3).toBuffer) //順序取出對(duì)應(yīng)數(shù)量的值 println(rdd1.take(3).toBuffer) //順序取出對(duì)應(yīng)數(shù)量的值 默認(rèn)生序 println(rdd1.takeOrdered(3).toBuffer) //獲取第一個(gè)值 等價(jià)于
take(1) println(rdd1.first()) //將處理過后的數(shù)據(jù)寫成文件(存儲(chǔ)在HDFS或本地文件系統(tǒng)) //rdd1.saveAsTextFile("dir/file1") //統(tǒng)計(jì)key的個(gè)數(shù)并生成map k是key名
v是key的個(gè)數(shù) val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2) val rdd2_1: collection.Map[String, Long] = rdd2.countByKey() println(rdd2_1) //遍歷數(shù)據(jù) rdd1.foreach(x =>
println(x))
/*其他算子*/ //統(tǒng)計(jì)value的個(gè)數(shù) 但是會(huì)將集合中的一個(gè)元素看做是一個(gè)vluae val value: collection.Map[(String, Int), Long] = rdd2.countByValue println(value) //filterByRange:對(duì)RDD中的元素進(jìn)行過濾,返回指定范圍內(nèi)的數(shù)據(jù) val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1))) val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//包括開始和結(jié)束的 println(rdd3_1.collect.toList) //flatMapValues對(duì)參數(shù)進(jìn)行扁平化操作,是value的值 val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4"))) println( rdd3_2.flatMapValues(_.split(" ")).collect.toList) //foreachPartition
循環(huán)的是分區(qū)數(shù)據(jù) // foreachPartiton一般應(yīng)用于數(shù)據(jù)的持久化,存入數(shù)據(jù)庫(kù),可以進(jìn)行分區(qū)的數(shù)據(jù)存儲(chǔ) val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) rdd4.foreachPartition(x =>
println(x.reduce(_+_))) //keyBy
以傳入的函數(shù)返回值作為key ,RDD中的元素為value
新的元組 val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3) val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length) println(rdd5_1.collect.toList) //keys獲取所有的key values
獲取所有的values println(rdd5_1.keys.collect.toList) println(rdd5_1.values.collect.toList) //collectAsMap 將需要的二元組轉(zhuǎn)換成Map val map: collection.Map[String, Int] = rdd2.collectAsMap() println(map) |