溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

第17課:RDD案例(join、cogroup等實(shí)戰(zhàn))

發(fā)布時(shí)間:2020-06-07 04:57:58 來(lái)源:網(wǎng)絡(luò) 閱讀:1205 作者:Spark_2016 欄目:大數(shù)據(jù)

本節(jié)課通過(guò)代碼實(shí)戰(zhàn)演示RDD中最重要的兩個(gè)算子,join和cogroup


join算子代碼實(shí)戰(zhàn):

//通過(guò)代碼演示join算子
val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)


val rdd3 = rdd1.join(rdd2)
rdd3.collect().foreach(println)


運(yùn)行結(jié)果:

(1,(Spark,100))

(3,(Tachyon,90))

(2,(Hadoop,70))


cogroup算子代碼實(shí)戰(zhàn):

首先通過(guò)java的方式編寫(xiě):

        SparkConf conf = new SparkConf().setMaster("local").setAppName("Cogroup");

        JavaSparkContext sc = new JavaSparkContext(conf);


        List<Tuple2<Integer, String>> nameList = Arrays.asList(new Tuple2<Integer, String>(1, "Spark"),

                new Tuple2<Integer, String>(2, "Tachyon"), new Tuple2<Integer, String>(3, "Hadoop"));

        List<Tuple2<Integer, Integer>> ScoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100),

                new Tuple2<Integer, Integer>(2, 95), new Tuple2<Integer, Integer>(3, 80),

                new Tuple2<Integer, Integer>(1, 80), new Tuple2<Integer, Integer>(2, 110),

                new Tuple2<Integer, Integer>(2, 90));


        JavaPairRDD<Integer, String> names = sc.parallelizePairs(nameList);

        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(ScoreList);


        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> nameAndScores = names.cogroup(scores);

        nameAndScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {

            public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {

                System.out.println("ID:" + t._1);

                System.out.println("Name:" + t._2._1);

                System.out.println("Score:" + t._2._2);

            }

        });

       sc.close();


運(yùn)行結(jié)果:

ID:1

Name:[Spark]

Score:[100, 80]

ID:3

Name:[Hadoop]

Score:[80]

ID:2

Name:[Tachyon]

Score:[95, 110, 90]


通過(guò)Scala的方式:

val conf = new SparkConf().setAppName("RDDDemo").setMaster("local")
val sc = new SparkContext(conf)
val arr1 = Array(Tuple2(1, "Spark"), Tuple2(2, "Hadoop"), Tuple2(3, "Tachyon"))
val arr2 = Array(Tuple2(1, 100), Tuple2(2, 70), Tuple2(3, 90), Tuple2(1, 95), Tuple2(2, 65), Tuple2(1, 110))
val rdd1 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr2)

val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect().foreach(println)
sc.stop()


運(yùn)行結(jié)果:

(1,(CompactBuffer(Spark),CompactBuffer(100, 95, 110)))

(3,(CompactBuffer(Tachyon),CompactBuffer(90)))

(2,(CompactBuffer(Hadoop),CompactBuffer(70, 65)))


備注:

資料來(lái)源于:DT_大數(shù)據(jù)夢(mèng)工廠(chǎng)(Spark發(fā)行版本定制)

更多私密內(nèi)容,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark

如果您對(duì)大數(shù)據(jù)Spark感興趣,可以免費(fèi)聽(tīng)由王家林老師每天晚上20:00開(kāi)設(shè)的Spark永久免費(fèi)公開(kāi)課,地址YY房間號(hào):68917580

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI