溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Transformation和Action怎么使用

發(fā)布時(shí)間:2021-12-22 15:37:15 來源:億速云 閱讀:153 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Transformation和Action怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

一、Transformation算子演示

val conf = new SparkConf().setAppName("Test").setMaster("local")
      val sc = new SparkContext(conf)

    //通過并行化生成rdd
    val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10))

    //map:對(duì)rdd里面每一個(gè)元乘以2然后排序
    val rdd2: RDD[Int] = rdd.map(_ * 2)
    //collect以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素(是Action算子)
    println(rdd2.collect().toBuffer)

    //filter:該RDD由經(jīng)過func函數(shù)計(jì)算后返回值為true的輸入元素組成
    val rdd3: RDD[Int] = rdd2.filter(_ > 10)
    println(rdd3.collect().toBuffer)

    val rdd4 = sc.parallelize(Array("a b c","b c d"))
    //flatMap:將rdd4中的元素進(jìn)行切分后壓平
    val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))
    println(rdd5.collect().toBuffer)
    //假如: List(List(" a,b" ,"b c"),List("e c"," i o"))
    //壓平 flatMap(_.flatMap(_.split(" ")))
    
    //sample隨機(jī)抽樣
    //withReplacement表示是抽出的數(shù)據(jù)是否放回,true為有放回的抽樣,false為無放回的抽樣
    //fraction抽樣比例例如30% 即0.3 但是這個(gè)值是一個(gè)浮動(dòng)的值不準(zhǔn)確
    //seed用于指定隨機(jī)數(shù)生成器種子 默認(rèn)參數(shù)不傳
    val rdd5_1 = sc.parallelize(1 to 10)
    val sample = rdd.sample(false,0.5)
    println(sample.collect().toBuffer)

    //union:求并集
    val rdd6 = sc.parallelize(List(5,6,7,8))
    val rdd7 = sc.parallelize(List(1,2,5,6))
    val rdd8 = rdd6 union rdd7
    println(rdd8.collect.toBuffer)

    //intersection:求交集
    val rdd9 = rdd6 intersection rdd7
    println(rdd9.collect.toBuffer)

    //distinct:去重出重復(fù)
    println(rdd8.distinct.collect.toBuffer)

    //join相同的key會(huì)被合并
    val rdd10_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))
    val rdd10_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))
    val rdd10_3 = rdd10_1 join rdd10_2
    println(rdd10_3.collect().toBuffer)
    
    //左連接和右連接
    //除基準(zhǔn)值外是Option類型,因?yàn)榭赡艽嬖诳罩邓允褂肙ption
    val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 //以左邊為基準(zhǔn)沒有是null
    val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 //以右邊為基準(zhǔn)沒有是null
    println(rdd10_4.collect().toList)
    println(rdd10_5.collect().toBuffer)

    val rdd11_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2)))
    val rdd11_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10)))
    //笛卡爾積
    val rdd11_3 = rdd11_1 cartesian rdd11_2
    println(rdd11_3.collect.toBuffer)
  
   //根據(jù)傳入的參數(shù)進(jìn)行分組
    val rdd11_5_1 = rdd11_4.groupBy(_._1)
    println(rdd11_5_1.collect().toList)

    //按照相同key進(jìn)行分組,并且可以制定分區(qū)
    val rdd11_5_2 = rdd11_4.groupByKey
    println(rdd11_5_2.collect().toList)

    //根據(jù)相同key進(jìn)行分組[分組的話需要二元組]
    //cogroup 和 groupBykey的區(qū)別
    //cogroup不需要對(duì)數(shù)據(jù)先進(jìn)行合并就以進(jìn)行分組 得到的結(jié)果是 同一個(gè)key 和不同數(shù)據(jù)集中的數(shù)據(jù)集合
    //groupByKey是需要先進(jìn)行合并然后在根據(jù)相同key進(jìn)行分組
    val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2
    println(rdd11_6)

二、Action算子演示

val conf = new SparkConf().setAppName("Test").setMaster("local[*]")
    val sc = new SparkContext(conf)
    /* Action 算子*/
    //集合函數(shù)
    val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
    val rdd1_1 = rdd1.reduce(_+_)
    println(rdd1_1)
    //以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
    println(rdd1.collect().toBuffer)
    //返回RDD的元素個(gè)數(shù)
    println(rdd1.count())
    //取出對(duì)應(yīng)數(shù)量的值 默認(rèn)降序, 若輸入0 會(huì)返回一個(gè)空數(shù)組
    println(rdd1.top(3).toBuffer)
    //順序取出對(duì)應(yīng)數(shù)量的值
    println(rdd1.take(3).toBuffer)
    //順序取出對(duì)應(yīng)數(shù)量的值 默認(rèn)生序
    println(rdd1.takeOrdered(3).toBuffer)
    //獲取第一個(gè)值 等價(jià)于 take(1)
    println(rdd1.first())
    //將處理過后的數(shù)據(jù)寫成文件(存儲(chǔ)在HDFS或本地文件系統(tǒng))
    //rdd1.saveAsTextFile("dir/file1")
    //統(tǒng)計(jì)key的個(gè)數(shù)并生成map k是key名 v是key的個(gè)數(shù)
    val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2)
    val rdd2_1: collection.Map[String, Long] = rdd2.countByKey()
    println(rdd2_1)
    //遍歷數(shù)據(jù)
    rdd1.foreach(x => println(x))

    /*其他算子*/
    //統(tǒng)計(jì)value的個(gè)數(shù) 但是會(huì)將集合中的一個(gè)元素看做是一個(gè)vluae
    val value: collection.Map[(String, Int), Long] = rdd2.countByValue
    println(value)
    //filterByRange:對(duì)RDD中的元素進(jìn)行過濾,返回指定范圍內(nèi)的數(shù)據(jù)
    val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1)))
    val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//包括開始和結(jié)束的
    println(rdd3_1.collect.toList)
    //flatMapValues對(duì)參數(shù)進(jìn)行扁平化操作,是value的值
    val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4")))
    println( rdd3_2.flatMapValues(_.split(" ")).collect.toList)
    //foreachPartition 循環(huán)的是分區(qū)數(shù)據(jù)
    // foreachPartiton一般應(yīng)用于數(shù)據(jù)的持久化,存入數(shù)據(jù)庫(kù),可以進(jìn)行分區(qū)的數(shù)據(jù)存儲(chǔ)
    val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
    rdd4.foreachPartition(x => println(x.reduce(_+_)))
    //keyBy 以傳入的函數(shù)返回值作為key ,RDD中的元素為value 新的元組
    val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3)
    val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length)
    println(rdd5_1.collect.toList)
    //keys獲取所有的key  values 獲取所有的values
    println(rdd5_1.keys.collect.toList)
    println(rdd5_1.values.collect.toList)
    //collectAsMap  將需要的二元組轉(zhuǎn)換成Map
    val map: collection.Map[String, Int] = rdd2.collectAsMap()
    println(map)

“Transformation和Action怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI