溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark Python操作命令三

發(fā)布時間:2020-07-05 22:38:10 來源:網(wǎng)絡(luò) 閱讀:408 作者:zjy1002261870 欄目:大數(shù)據(jù)

12 數(shù)據(jù)格式

  1. [[u'3', u'5'], [u'4', u'6'], [u'4', u'5'], [u'4', u'2']] 拆分或截取的原始數(shù)據(jù), 可以通過 map 中的 x[0], x[1] 來獲取對應(yīng)列的數(shù)據(jù)
      可以通過 map 來轉(zhuǎn)換為key-value 數(shù)據(jù)格式 例如: df3 = df2.map(lambda x: (x[0], x[1]))

  2. key-value 數(shù)據(jù)格式
      [(u'3', u'5'), (u'4', u'6'), (u'4', u'5'), (u'4', u'2')] 中每一個() 表示一組數(shù)據(jù), 第一個表示key 第二個表示value
    3)PipelinedRDD 類型表示 key-value形式數(shù)據(jù)
    13 RDD類型轉(zhuǎn)換
    userRdd = sc.textFile("D:\data\people.json")
    userRdd = userRdd.map(lambda x: x.split(" "))

    userRows = userRdd.map(lambda p:
                            Row(
                                userName = p[0],
                                userAge = int(p[1]),
                                userAdd = p[2],
                                userSalary = int(p[3])
                                )
                            )
    print(userRows.take(4))

    結(jié)果: [Row(userAdd='shanghai', userAge=20, userName='zhangsan', userSalary=13), Row(userAdd='beijin', userAge=30, userName='lisi', userSalary=15)]

    2) 創(chuàng)建 DataFrame
      userDF = sqlContext.createDataFrame(userRows)

    1. 通過sql 語句查詢字段
      from pyspark.conf import SparkConf
      from pyspark.sql.session import SparkSession
      from pyspark.sql.types import Row

if name == 'main':
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

sc = spark.sparkContext

rd = sc.textFile("D:\data\people.txt")
rd2 = rd.map(lambda x:x.split(","))
people = rd2.map(lambda p: Row(name=p[0], age=int(p[1])))

peopleDF = spark.createDataFrame(people)
peopleDF.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name,age FROM people where name='Andy'")
teenagers.show(5)

print(teenagers.rdd.collect())

teenNames = teenagers.rdd.map(lambda p: 100 + p.age).collect()

for name in teenNames:
    print(name)

15 dateFrame,sql,json使用詳細示例
#

Licensed to the Apache Software Foundation (ASF) under one or more

contributor license agreements. See the NOTICE file distributed with

this work for additional information regarding copyright ownership.

The ASF licenses this file to You under the Apache License, Version 2.0

(the "License"); you may not use this file except in compliance with

the License. You may obtain a copy of the License at

#

http://www.apache.org/licenses/LICENSE-2.0

#

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

#

"""
A simple example demonstrating basic Spark SQL features.
Run with:
./bin/spark-submit examples/src/main/python/sql/basic.py
"""
from future import print_function

$example on:init_session$

from pyspark.sql import SparkSession

$example off:init_session$

$example on:schema_inferring$

from pyspark.sql import Row

$example off:schema_inferring$

$example on:programmatic_schema$

Import data types

from pyspark.sql.types import *

$example off:programmatic_schema$

def basic_df_example(spark):

$example on:create_df$

# spark is an existing SparkSession
df = spark.read.json("/data/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
# $example off:create_df$

# $example on:untyped_ops$
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+
# $example off:untyped_ops$

# $example on:run_sql$
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
# $example off:run_sql$

# $example on:global_temp_view$
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
# $example off:global_temp_view$

def schema_inference_example(spark):

$example on:schema_inferring$

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin
# $example off:schema_inferring$

def programmatic_schema_example(spark):

$example on:programmatic_schema$

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+
# $example off:programmatic_schema$

if name == "main":

$example on:init_session$

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
# $example off:init_session$

basic_df_example(spark)
# schema_inference_example(spark)
# programmatic_schema_example(spark)

spark.stop()
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI