您好,登錄后才能下訂單哦!
學習spark的任何技術前請先正確理解spark,可以參考: 正確理解Spark
我們知道spark的RDD支持scala api、java api以及python api,我們分別對scala api與java api做了詳細的介紹,本文我們將探討rdd python api是怎么使用py4j來調用scala/java的api的,從而來實現python api的功能。首先我們先介紹下py4j。
一、py4j
py4j是一個使得python可以調用jvm中的對象的類庫。看一個py4j官網上的例子:
首先編寫一個java程序
package com.twq.javaapi; import py4j.GatewayServer; /** * Created by tangweiqun on 2017/9/22. */ public class AdditionApplication { public int addition(int first, int second) { return first + second; } public static void main(String[] args) { AdditionApplication app = new AdditionApplication(); // app is now the gateway.entry_point //啟動一個py4j的服務端,python可以連接到這個服務監(jiān)聽的端口,然后調用java的對象及其方法 GatewayServer server = new GatewayServer(app); server.start(); } }
上面的java代碼依賴一個jar包,我們可以通過maven引進來,如下:
<dependency> <groupId>net.sf.py4j</groupId> <artifactId>py4j</artifactId> <version>0.10.4</version> </dependency>
我們可以先在ide中啟動上面的類AdditionApplication的main方法,將GatewayServer啟動起來
然后,我們打開python解釋器,執(zhí)行下面的代碼:
>>> from py4j.java_gateway import JavaGateway>>> gateway = JavaGateway() # connect to the JVM>>> random = gateway.jvm.java.util.Random() # create a java.util.Random instance>>> number1 = random.nextInt(10) # call the Random.nextInt method>>> number2 = random.nextInt(10)>>> print(number1,number2)(2, 7)>>> addition_app = gateway.entry_point # get the AdditionApplication instance>>> addition_app.addition(number1,number2) # call the addition method9
上面的python代碼依賴py4j,我們可以根據http://www.py4j.org/install.html#install-instructions的方法來安裝py4j。從上面可以看出,我們在python中可以很簡單的調用jvm中的Random以及AdditionApplication對象的方法
二、py4j在spark中實現python api調用java/scala api
首先,我們編寫一個很簡單的python版本的spark應用,如下:
if __name__ == "__main__": conf = SparkConf().setAppName("appName") sc = SparkContext(conf=conf) sourceDataRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt") wordsRDD = sourceDataRDD.flatMap(lambda line: line.split()) keyValueWordsRDD = wordsRDD.map(lambda s: (s, 1)) wordCountRDD = keyValueWordsRDD.reduceByKey(lambda a, b: a + b) wordCountRDD.saveAsTextFile("hdfs://master:9999" + output_path_service.get_output_path()) print utils.get_rdd_result("wordCountRDD", wordCountRDD)
上面是一個很簡單的python版的spark wordcount應用,我們通過下面的spark-submit命令,提交到spark集群中執(zhí)行:
spark-submit \ --name "PythonWordCount" \ --master yarn \ --deploy-mode client \ --driver-memory 512m \ --executor-memory 512m \ --num-executors 2 \ --executor-cores 1 \ --py-files word_count_python.zip \ /home/hadoop-twq/spark-course/spark_word_count.py
(對于spark-submit每一個參數的含義以及spark-submit的原理是怎么樣的,可以參考:正確提交spark應用)
提交到集群運行后,會在driver端程序啟動一個org.apache.spark.deploy.PythonRunner的類,這個里面做了兩件事情
1、初始化并啟動GatewayServer,如下代碼:
// Launch a Py4J gateway server for the process to connect to; this will let it see our // Java system properties and such //用于python的代碼訪問當前jvm中的對象的 val gatewayServer = new py4j.GatewayServer(null, 0) val thread = new Thread(new Runnable() { override def run(): Unit = Utils.logUncaughtExceptions { gatewayServer.start() } }) thread.setName("py4j-gateway-init") thread.setDaemon(true) thread.start()
2、利用ProcessBuilder來啟動執(zhí)行上面的spark_word_count.py python文件,如下:
// Launch Python process val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) val env = builder.environment() env.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) // pass conf spark.pyspark.python to python process, the only way to pass info to // python process is through environment variable. sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _)) sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
這邊需要注意的是PYSPARK_GATEWAY_PORT這個環(huán)境變量,這個環(huán)境變量的值是第一步啟動起來的GatewayServer的監(jiān)聽端口,我們將這個端口以環(huán)境變量的方式傳遞給啟動的python進程。
然后,當上面的第2步啟動了spark_word_count.py python進程后,開始執(zhí)行spark_word_count.py中的內容,當執(zhí)行到sc=SparkContext(conf),即初始化SparkContext,這個時候在SparkContext初始化的時候,會啟動一個py4j的Gateway來和上面啟動的GatewayServer進行通訊,如下代碼(在context.py文件中):
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
def _ensure_initialized(cls, instance=None, gateway=None, conf=None): """ Checks whether a SparkContext is initialized or not. Throws error if a SparkContext is already running. """ with SparkContext._lock: if not SparkContext._gateway: #這里是啟動一個Gateway,并且將Gateway的jvm賦值給成員變量_jvm,這樣我們就可以通過這個_jvm變量來訪問jvm中的java對象及其方法 SparkContext._gateway = gateway or launch_gateway(conf) SparkContext._jvm = SparkContext._gateway.jvm if instance: if (SparkContext._active_spark_context and SparkContext._active_spark_context != instance): currentMaster = SparkContext._active_spark_context.master currentAppName = SparkContext._active_spark_context.appName callsite = SparkContext._active_spark_context._callsite # Raise error if there is already a running Spark context raise ValueError( "Cannot run multiple SparkContexts at once; " "existing SparkContext(app=%s, master=%s)" " created by %s at %s:%s " % (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum)) else: SparkContext._active_spark_context = instance
在launch_gateway(conf)(源代碼在java_gateway.py中)方法中會初始化一個Gateway,如下:
#從環(huán)境變量中拿到環(huán)境變量PYSPARK_GATEWAY_PORT,這個就是我們在PythonRunner中設置的環(huán)境變量 if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) # Connect to the gateway # 啟動一個JavaGateway同GatewayServer進行通訊 gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True) #將python api中需要的java/scala的類引入引進來 # Import the classes used by PySpark java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.ml.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") # TODO(davies): move into sql java_import(gateway.jvm, "org.apache.spark.sql.*") java_import(gateway.jvm, "org.apache.spark.sql.hive.*") java_import(gateway.jvm, "scala.Tuple2") return gateway
這樣,python中的SparkContext就可以訪問RDD java api了,如下是在python文件context.py中訪問java api的JavaSparkContext:
def _initialize_context(self, jconf): """ Initialize SparkContext in function to allow subclass specific initialization """ return self._jvm.JavaSparkContext(jconf)
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。