溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么進(jìn)行Spark WC開發(fā)與應(yīng)用部署的分析

發(fā)布時(shí)間:2021-12-17 14:13:49 來源:億速云 閱讀:144 作者:柒染 欄目:大數(shù)據(jù)

本篇文章給大家分享的是有關(guān)怎么進(jìn)行Spark WC開發(fā)與應(yīng)用部署的分析,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

Spark WordCount開發(fā)

創(chuàng)建的是maven工程,使用的依賴如下:

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.5</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

spark wc之Java版本

package cn.xpleaf.bigdata.spark.java.core.p1;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * spark Core 開發(fā)
 *
 * 基于Java
 * 計(jì)算國際慣例
 *
 * Spark程序的入口:
 *      SparkContext
 *          Java:JavaSparkContext
 *          scala:SparkContext
 *
 * D:/data\spark\hello.txt
 *
 * spark RDD的操作分為兩種,第一為Transformation,第二為Action
 * 我們將Transformation稱作轉(zhuǎn)換算子,Action稱作Action算子
 * Transformation算子常見的有:map flatMap reduceByKey groupByKey filter...
 * Action常見的有:foreach collect count save等等
 *
 * Transformation算子是懶加載的,其執(zhí)行需要Action算子的觸發(fā)
 * (可以參考下面的代碼,只要foreach不執(zhí)行,即使中間RDD的操作函數(shù)有異常也不會(huì)報(bào)錯(cuò),因?yàn)槠渲皇羌虞d到內(nèi)存中,并沒有真正執(zhí)行)
 */
public class _01SparkWordCountOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName(_01SparkWordCountOps.class.getSimpleName());
        /**
         * sparkConf中設(shè)置的master選擇,
         * local
         *      local
         *          spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配一個(gè)工作線程
         *      local[N]
         *          spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配N個(gè)工作線程
         *      local[*]
         *          spark作業(yè)在本地執(zhí)行,根據(jù)機(jī)器的硬件資源,為spark分配適合的工作線程,一般也就2個(gè)
         *      local[N, M]
         *          local[N, M]和上面最大的區(qū)別就是,當(dāng)spark作業(yè)啟動(dòng)或者提交失敗之后,可以有M次重試的機(jī)會(huì),上面幾種沒有
         * standalone模式:
         *      就是spark集群中master的地址,spark://uplooking01:7077
         * yarn
         *      yarn-cluster
         *          基于yarn的集群模式,sparkContext的構(gòu)建和作業(yè)的運(yùn)行都在yarn集群中執(zhí)行
         *      yarn-client
         *          基于yarn的client模式,sparkContext的構(gòu)建在本地,作業(yè)的運(yùn)行在集群
         *
         * mesos
         *      mesos-cluster
         *      mesos-client
         */
        String master = "local[*]";
        conf.setMaster(master);
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Integer defaultParallelism = jsc.defaultParallelism();
        System.out.println("defaultParallelism=" + defaultParallelism);
        /**
         * 下面的操作代碼,其實(shí)就是spark中RDD的DAG圖
         */
        JavaRDD<String> linesRDD = jsc.textFile("D:/data/spark/hello.txt");
        System.out.println("linesRDD's partition size is: " + linesRDD.partitions().size());
        JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {
                // int i = 1 / 0;  // 用以驗(yàn)證Transformation算子的懶加載
                return Arrays.asList(line.split(" "));
            }
        });
        JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        JavaPairRDD<String, Integer> retRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println("retRDD's partition size is: " + retRDD.partitions().size());
        retRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                System.out.println(tuple._1 + "---" + tuple._2);
            }
        });
        jsc.close();
    }
}

本地執(zhí)行,輸出結(jié)果如下:

defaultParallelism=20
......
linesRDD's partition size is: 2
retRDD's partition size is: 2
......
hello---3
you---1
me---1
he---1

spark wc之Java lambda版本

package cn.xpleaf.bigdata.spark.java.core.p1;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * spark Core 開發(fā)
 *
 * 基于Java
 * 計(jì)算國際慣例
 *
 * Spark程序的入口:
 *      SparkContext
 *          Java:JavaSparkContext
 *          scala:SparkContext
 *
 * D:/data\spark\hello.txt
 *
 * lambda表達(dá)式的版本
 */
public class _02SparkWordCountOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setAppName(_02SparkWordCountOps.class.getSimpleName());
        String master = "local";
        conf.setMaster(master);
        JavaSparkContext jsc = new JavaSparkContext(conf);
        /**
         * 下面的操作代碼,其實(shí)就是spark中RDD的DAG圖
         * 現(xiàn)在使用lambda表達(dá)式,更加簡單清晰
         */
        JavaRDD<String> linesRDD = jsc.textFile("D:/data/spark/hello.txt");
        JavaRDD<String> wordsRDD = linesRDD.flatMap(line -> {return Arrays.asList(line.split(" "));});
        JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(word -> {return new Tuple2<String, Integer>(word, 1);});
        JavaPairRDD<String, Integer> retRDD = pairRDD.reduceByKey((v1, v2) -> {return v1 + v2;});
        retRDD.foreach(tuple -> {
            System.out.println(tuple._1 + "---" + tuple._2);
        });
        jsc.close();
    }
}

本地執(zhí)行,輸出結(jié)果如下:

you---1
he---1
hello---3
me---1

spark wc之scala版本

package cn.xpleaf.bigdata.spark.scala.core.p1

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 基于Scala的WordCount統(tǒng)計(jì)
  *
  * java.net.UnknownHostException: ns1
  *
  * spark系統(tǒng)不認(rèn)識(shí)ns1
  * 在spark的配置文件spark-defaults.conf中添加:
  *     spark.files /home/uplooking/app/hadoop/etc/hadoop/hdfs-site.xml,/home/uplooking/app/hadoop/etc/hadoop/core-site.xml
  */
object _01SparkWordCountOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${_01SparkWordCountOps.getClass().getSimpleName}")
            .setMaster("local")
        val sc = new SparkContext(conf)

        val linesRDD:RDD[String] = sc.textFile("D:/data/spark/hello.txt")
        /*val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split(" "))
        val parsRDD:RDD[(String, Int)] = wordsRDD.map(word => new Tuple2[String, Int](word, 1))
        val retRDD:RDD[(String, Int)] = parsRDD.reduceByKey((v1, v2) => v1 + v2)
        retRDD.collect().foreach(t => println(t._1 + "..." + t._2))*/

        // 更簡潔的方式
        linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(t => println(t._1 + "..." + t._2))
        sc.stop()
    }
}

本地執(zhí)行,輸出結(jié)果如下:

you...1
he...1
hello...3
me...1

應(yīng)用部署

部署說明

上面的方式其實(shí)都是本地執(zhí)行的,可以把我們的應(yīng)用部署到Spark集群或Yarn集群上,前面的代碼注釋也有提到這一點(diǎn),就是關(guān)于Spark作業(yè)執(zhí)行的問題:

/**
 * sparkConf中設(shè)置的master選擇,
 * local
 *      local
 *          spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配一個(gè)工作線程
 *      local[N]
 *          spark作業(yè)在本地執(zhí)行,為該spark作業(yè)分配N個(gè)工作線程
 *      local[*]
 *          spark作業(yè)在本地執(zhí)行,根據(jù)機(jī)器的硬件資源,為spark分配適合的工作線程,一般也就2個(gè)
 *      local[N, M]
 *          local[N, M]和上面最大的區(qū)別就是,當(dāng)spark作業(yè)啟動(dòng)或者提交失敗之后,可以有M次重試的機(jī)會(huì),上面幾種沒有
 * standalone模式:
 *      就是spark集群中master的地址,spark://uplooking01:7077
 * yarn
 *      yarn-cluster
 *          基于yarn的集群模式,sparkContext的構(gòu)建和作業(yè)的運(yùn)行都在yarn集群中執(zhí)行
 *      yarn-client
 *          基于yarn的client模式,sparkContext的構(gòu)建在本地,作業(yè)的運(yùn)行在集群
 *
 * mesos
 *      mesos-cluster
 *      mesos-client
 */

local的多種情況可以自己測試一下。

這里只測試部署standalone和yarn-cluster兩種模式,實(shí)際上yarn-client也測試了,不過報(bào)異常,沒去折騰。注意用的是Scala的代碼。

其實(shí)很顯然,這里使用的是Spark離線計(jì)算的功能(Spark Core)。

程序打包

將前面的scala版本的代碼修改為如下:

package cn.xpleaf.bigdata.spark.scala.core.p1

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 基于Scala的WordCount統(tǒng)計(jì)
  *
  * java.net.UnknownHostException: ns1
  *
  * spark系統(tǒng)不認(rèn)識(shí)ns1
  * 在spark的配置文件spark-defaults.conf中添加:
  *     spark.files /home/uplooking/app/hadoop/etc/hadoop/hdfs-site.xml,/home/uplooking/app/hadoop/etc/hadoop/core-site.xml
  */
object _01SparkWordCountOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${_01SparkWordCountOps.getClass().getSimpleName}")
            //.setMaster("local")
        val sc = new SparkContext(conf)

        val linesRDD:RDD[String] = sc.textFile("hdfs://ns1/hello")
        /*val wordsRDD:RDD[String] = linesRDD.flatMap(line => line.split(" "))
        val parsRDD:RDD[(String, Int)] = wordsRDD.map(word => new Tuple2[String, Int](word, 1))
        val retRDD:RDD[(String, Int)] = parsRDD.reduceByKey((v1, v2) => v1 + v2)
        retRDD.collect().foreach(t => println(t._1 + "..." + t._2))*/

        // 更簡潔的方式
        linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).collect().foreach(t => println(t._1 + "..." + t._2))
        // collect不是必須要加的,但是如果在standalone的運(yùn)行模式下,不加就看不到控制臺(tái)的輸出
        // 而在yarn運(yùn)行模式下,是看不到輸出的
        sc.stop()
    }
}

主要是做了兩處的修改,一是注釋掉setMaster("local"),因?yàn)楝F(xiàn)在不是本地跑了,另外是數(shù)據(jù)來源,選擇的是HDFS上的數(shù)據(jù)文件。

需要注意的是,要想讓Spark集群認(rèn)識(shí)ns1(我的Hadoop集群是HA部署方式),其實(shí)有兩種方式,一種設(shè)置環(huán)境變量HADOOP_CONF_DIR,但我測試的時(shí)候不生效,依然是無法識(shí)別ns1;另外一種是需要在Spark的配置文件spark-defaults.conf中添加spark.files /home/uplooking/app/hadoop/etc/hadoop/hdfs-site.xml,/home/uplooking/app/hadoop/etc/hadoop/core-site.xml,即指定Hadoop的配置文件地址,Hadoop HA的配置,就是在這兩個(gè)文件中進(jìn)行的配置。我采用第二種方式有效。

上面準(zhǔn)備工作完成后就可以將程序打包了,使用普通的打包或者maven打包都可以,注意不需要將依賴一起打包,因?yàn)槲覀兊腟park集群環(huán)境中已經(jīng)存在這些依賴了。

部署到Spark集群上

關(guān)于應(yīng)用的部署,準(zhǔn)確來說是submit,官方文檔有很詳細(xì)的說明,可以參考:http://spark.apache.org/docs/latest/submitting-applications.html

先編寫下面一個(gè)腳本:

[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh 
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop

/home/uplooking/app/spark/bin/spark-submit \
--class $2 \
--master spark://uplooking01:7077 \
--executor-memory 1G \
--num-executors 1 \
$1 \

然后執(zhí)行下面的命令:

[uplooking@uplooking01 spark]$ ./spark-submit-standalone.sh spark-wc.jar cn.xpleaf.bigdata.spark.scala.core.p1._01SparkWordCountOps

因?yàn)樵诔绦虼a中已經(jīng)添加了collect Action算子,所以運(yùn)行成功后可以直接在控制臺(tái)中看到輸出結(jié)果:

hello...3
me...1
you...1
he...1

然后也可以在spark提供的UI界面中看到其提交的作業(yè)以及執(zhí)行結(jié)果:

怎么進(jìn)行Spark WC開發(fā)與應(yīng)用部署的分析

部署到Y(jié)arn集群上

先編寫下面一個(gè)腳本:

[uplooking@uplooking01 spark]$ cat spark-submit-yarn.sh 
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop

/home/uplooking/app/spark/bin/spark-submit \
--class $2 \
--master yarn \
--deploy-mode cluster \
--executor-memory 1G \
--num-executors 1 \
$1 \

執(zhí)行如下命令:

[uplooking@uplooking01 spark]$ ./spark-submit-yarn.sh spark-wc.jar cn.xpleaf.bigdata.spark.scala.core.p1._01SparkWordCountOps      
18/04/25 17:47:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/04/25 17:47:39 INFO yarn.Client: Requesting a new application from cluster with 2 NodeManagers
18/04/25 17:47:39 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
18/04/25 17:47:39 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
18/04/25 17:47:39 INFO yarn.Client: Setting up container launch context for our AM
18/04/25 17:47:39 INFO yarn.Client: Setting up the launch environment for our AM container
18/04/25 17:47:39 INFO yarn.Client: Preparing resources for our AM container
18/04/25 17:47:40 INFO yarn.Client: Uploading resource file:/home/uplooking/app/spark/lib/spark-assembly-1.6.2-hadoop2.6.0.jar -> hdfs://ns1/user/uplooking/.sparkStaging/application_1524552224611_0005/spark-assembly-1.6.2-hadoop2.6.0.jar
18/04/25 17:47:42 INFO yarn.Client: Uploading resource file:/home/uplooking/jars/spark/spark-wc.jar -> hdfs://ns1/user/uplooking/.sparkStaging/application_1524552224611_0005/spark-wc.jar
18/04/25 17:47:42 INFO yarn.Client: Uploading resource file:/tmp/spark-ae34fa23-5166-4fd3-a4ec-8e5115691801/__spark_conf__6834084285342234312.zip -> hdfs://ns1/user/uplooking/.sparkStaging/application_1524552224611_0005/__spark_conf__6834084285342234312.zip
18/04/25 17:47:43 INFO spark.SecurityManager: Changing view acls to: uplooking
18/04/25 17:47:43 INFO spark.SecurityManager: Changing modify acls to: uplooking
18/04/25 17:47:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(uplooking); users with modify permissions: Set(uplooking)
18/04/25 17:47:43 INFO yarn.Client: Submitting application 5 to ResourceManager
18/04/25 17:47:43 INFO impl.YarnClientImpl: Submitted application application_1524552224611_0005
18/04/25 17:47:44 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED)
18/04/25 17:47:44 INFO yarn.Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: default
         start time: 1524649663869
         final status: UNDEFINED
         tracking URL: http://uplooking02:8088/proxy/application_1524552224611_0005/
         user: uplooking
18/04/25 17:47:45 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED)
18/04/25 17:47:46 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED)
18/04/25 17:47:47 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED)
18/04/25 17:47:48 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED)
18/04/25 17:47:49 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED)
18/04/25 17:47:50 INFO yarn.Client: Application report for application_1524552224611_0005 (state: ACCEPTED)
18/04/25 17:47:51 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:51 INFO yarn.Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 192.168.43.103
         ApplicationMaster RPC port: 0
         queue: default
         start time: 1524649663869
         final status: UNDEFINED
         tracking URL: http://uplooking02:8088/proxy/application_1524552224611_0005/
         user: uplooking
18/04/25 17:47:52 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:53 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:54 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:55 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:56 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:57 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:58 INFO yarn.Client: Application report for application_1524552224611_0005 (state: RUNNING)
18/04/25 17:47:59 INFO yarn.Client: Application report for application_1524552224611_0005 (state: FINISHED)
18/04/25 17:47:59 INFO yarn.Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 192.168.43.103
         ApplicationMaster RPC port: 0
         queue: default
         start time: 1524649663869
         final status: SUCCEEDED
         tracking URL: http://uplooking02:8088/proxy/application_1524552224611_0005/
         user: uplooking
18/04/25 17:47:59 INFO util.ShutdownHookManager: Shutdown hook called
18/04/25 17:47:59 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-ae34fa23-5166-4fd3-a4ec-8e5115691801

可以通過yarn提供的Web界面來查看其提交的作業(yè)情況:

怎么進(jìn)行Spark WC開發(fā)與應(yīng)用部署的分析

但是找了日志也沒有找到輸出的統(tǒng)計(jì)結(jié)果,所以這種情況下,數(shù)據(jù)結(jié)果的落地就不應(yīng)該只是輸出而已了,可以考慮其它的持久化存儲(chǔ)。

總體而言,對(duì)比MapReduce,僅僅從Spark Core來看,速度真的是有非常大的提高。

關(guān)于wc執(zhí)行過程的說明

參考下面的圖示:

怎么進(jìn)行Spark WC開發(fā)與應(yīng)用部署的分析

然后,下面是我跑的一個(gè)wordcount任務(wù),在spark history server中查看其詳細(xì)信息,就很容易理解上面所說的stage劃分、寬依賴、窄依賴,相信會(huì)有一個(gè)相對(duì)比較清晰的認(rèn)識(shí):
怎么進(jìn)行Spark WC開發(fā)與應(yīng)用部署的分析

以上就是怎么進(jìn)行Spark WC開發(fā)與應(yīng)用部署的分析,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI