溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行Spark 1.6.0 新手的快速入門

發(fā)布時(shí)間:2021-12-17 10:55:33 來(lái)源:億速云 閱讀:161 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)如何進(jìn)行Spark 1.6.0 新手的快速入門,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

Spark交互式Shell的使用

基礎(chǔ)

Spark的交互式Shell提供了一個(gè)簡(jiǎn)單的方式來(lái)學(xué)習(xí)Spark的API,同時(shí)也提供了強(qiáng)大的交互式數(shù)據(jù)處理能力。Spark  Shell支持Scala和Python兩種語(yǔ)言。啟動(dòng)支持Scala的Spark Shell方式為

./bin/spark-shell

Spark最重要的一個(gè)抽象概念是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset)簡(jiǎn)稱RDD。RDDs可以通過(guò)Hadoop  InputFormats(例如HDFS文件)創(chuàng)建,也可以由其它RDDs轉(zhuǎn)換而來(lái)。下面的例子是通過(guò)加載Spark目錄下的README.md文件生成  RDD的例子:

scala> val textFile = sc.textFile("README.md")
textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3

RDDs有兩種操作:

  • actions:返回計(jì)算值

  • transformations:返回一個(gè)新RDDs的引用

actions示例如下:

scala> textFile.count() // Number of items in this RDD
res0: Long = 126

scala> textFile.first() // First item in this RDD
res1: String = # Apache Spark

如下transformations示例,使用filter操作返回了一個(gè)新的RDD,該RDD為文件中數(shù)據(jù)項(xiàng)的子集,該子集符合過(guò)濾條件:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09

Spark也支持將actions和transformations一起使用:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多RDD操作(More on RDD Operations)

RDD的actions和transformations操作可以用于更加復(fù)雜的計(jì)算。下面是查找README.md文件中單詞數(shù)最多的行的單詞數(shù)目:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

上面代碼中,***個(gè)map操作將一行文本按空格分隔,并計(jì)算單詞數(shù)目,將line映射為一個(gè)integer值,并創(chuàng)建了一個(gè)新的RDD保存這些  integer值。RDD調(diào)用reduce計(jì)算***的單詞數(shù)。示例中map和reduce操作的參數(shù)是Scala的函數(shù)式編程風(fēng)格,Spark支持  Scala、Java、Python的編程風(fēng)格,并支持Scala/Java庫(kù)。例如,使用Scala中的Math.max()函數(shù)讓程序變得更加簡(jiǎn)潔易讀:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

隨著Hadoop的流行,MapReduce變?yōu)橐环N常見的數(shù)據(jù)流模式。Spark可以輕松的實(shí)現(xiàn)MapReduce,使用Spark編寫MapReduce程序更加簡(jiǎn)單:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

上面示例中,使用flatMap、map和reduceByKey操作來(lái)計(jì)算每個(gè)單詞在文件中出現(xiàn)的次數(shù),并生成一個(gè)結(jié)構(gòu)為的RDD。可以使用collect操作完成單詞統(tǒng)計(jì)結(jié)果的收集整合:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

緩存

Spark支持將數(shù)據(jù)緩存到集群的分布式內(nèi)存中。在數(shù)據(jù)會(huì)被重復(fù)訪問(wèn)的情況下,將數(shù)據(jù)緩存到內(nèi)存能減少數(shù)據(jù)訪問(wèn)時(shí)間,從而提高運(yùn)行效率。尤其是在數(shù)據(jù)分布在幾十或幾百個(gè)節(jié)點(diǎn)上時(shí),效果更加明顯。下面為將數(shù)據(jù)linesWithSpark緩存到內(nèi)存的示例:

scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082

scala> linesWithSpark.count()
res8: Long = 19

scala> linesWithSpark.count()
res9: Long = 19

獨(dú)立應(yīng)用

假設(shè)我們想使用Spark  API編寫?yīng)毩?yīng)用程序。我們可以使用Scala、Java和Python輕松的編寫Spark應(yīng)用。下面示例為一個(gè)簡(jiǎn)單的應(yīng)用示例:

  • Scala

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

上面程序分別統(tǒng)計(jì)了README中包含字符‘a’以及‘b’的行數(shù)。與前面Spark shell例子不同的是,我們需要初始化SparkContext。

我們通過(guò)SparkContext創(chuàng)建了一個(gè)SparkConf對(duì)象,SparkConf對(duì)象包含應(yīng)用的基本信息。

我們基于Spark  API編寫應(yīng)用,所以我們需要編寫一個(gè)名為“simple.sbt”的sbt配置文件,用于指明Spark為該應(yīng)用的一個(gè)依賴。下面的sbt配置文件示例中,還增加了Spark的一個(gè)依賴庫(kù)“spark-core”:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"

為了讓sbt正確執(zhí)行,我們需要對(duì)SimpleApp.scala和simple.sbt根據(jù)sbt要求的目錄結(jié)構(gòu)布局。如果布局正確,就可以生成該應(yīng)用的JAR包,使用spark-submit命令即可運(yùn)行該程序。

  • Javaga

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDDlogData = sc.textFile(logFile).cache();      long numAs = logData.filter(new Function

該示例的代碼邏輯同上一段Scala示例代碼。與Scala示例類似,首先初始化了SparkContext,通過(guò)SparkContext創(chuàng)建了JavaSparkContext對(duì)象。并創(chuàng)建了RDDs以及執(zhí)行transformations操作。***,通過(guò)繼承了spark.api.java.function.Function的類將函數(shù)傳給Spark。

在這里,使用Maven進(jìn)行編譯,Maven的pom.xml如下:

<project>   <groupId>edu.berkeley</groupId>   <artifactId>simple-project</artifactId>   <modelVersion>4.0.0</modelVersion>   <name>Simple Project</name>   <packaging>jar</packaging>   <version>1.0</version>   <dependencies>     <dependency> <!-- Spark dependency -->       <groupId>org.apache.spark</groupId>       <artifactId>spark-core_2.10</artifactId>       <version>1.6.0</version>     </dependency>   </dependencies> </project>

按照Maven的要求架構(gòu)配置文件位置:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

現(xiàn)在,就可以使用Maven打包應(yīng)用,以及使用命令./bin/spark-submit.執(zhí)行該應(yīng)用程序。示例如下:

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

上述就是小編為大家分享的如何進(jìn)行Spark 1.6.0 新手的快速入門了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI