溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

知識點-Spark小節(jié)

發(fā)布時間:2020-10-04 09:56:01 來源:網(wǎng)絡(luò) 閱讀:959 作者:知了小巷 欄目:大數(shù)據(jù)

主頁博客地址:Spark小節(jié)
https://blog.icocoro.me

Spark處理字符串日期的max和min的方式
Spark處理數(shù)據(jù)存儲到Hive的方式
Spark處理新增列的方式map和udf、functions
Spark處理行轉(zhuǎn)列pivot的使用
Python 3.5.3
Spark1.6.2

Spark處理字符串日期的max和min的方式

一般是字符串類型的日期在使用Spark的agg求max時,是不正確的,API顯示只支持?jǐn)?shù)值型的max、min
hive的SQL查詢引擎是支持字符串日期的max和min的

字符串日期轉(zhuǎn)為時間戳再聚合

unix_timestamp

public static Column unix_timestamp(Column s)
Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.
Parameters:
s - (undocumented)
Returns:
(undocumented)
Since:
1.5.0
from pyspark.sql import functions as F

df.withColumn('startuptime_stamp', F.unix_timestamp('startuptime'))
使用HiveSQL
select device_id, max(startuptime) as max_startuptime, min(startuptime) as min_startuptime from app_table group by device_id
Spark處理數(shù)據(jù)存儲到Hive的方式

通常Spark任務(wù)處理后的結(jié)果數(shù)據(jù)會存儲到Hive表中,可以先保存至HDFS目錄再load、最方便還是直接使用臨時表和HiveContext插入數(shù)據(jù)

saveAsTextFile & load data

repartition根據(jù)實際文件大小進(jìn)行調(diào)整,數(shù)據(jù)比較小時,保存成一個文件

df.map(lambda r: func).repartition(1).saveAsTextFile(data_dir)

先刪除分區(qū),如果已經(jīng)存在的話
再覆蓋原來的數(shù)據(jù)【方便重新重復(fù)跑或修復(fù)數(shù)據(jù)】
此處使用shell,也可使用HiveContext的sql

alter table app_table drop if exists partition(datestr='$day_01');
load data inpath 'hdfs://xx/out/$day_01' overwrite into table app_table partition(datestr='$day_01');
hivectx.sql & insert
app_table1_df.registerTempTable("app_table1_tmp")
app_table2_df.registerTempTable("app_table2_tmp")
hivectx.sql("set spark.sql.shuffle.partitions=1")
hivectx.sql("alter table app_table drop if exists partition(datestr='%s')" % daystr)
hivectx.sql("insert overwrite table app_table partition(datestr='%s') select * from app_table1_tmp" % daystr)
hivectx.sql("insert into app_table partition(datestr='%s') select * from app_table2_tmp" % daystr)
Spark處理新增列的方式map和udf、functions

Spark在處理數(shù)據(jù)轉(zhuǎn)換時,通常需要使用map、flatmap等操作,其中使用map會產(chǎn)生新的列或修改某列字段的值
Spark同樣支持自定義函數(shù)UDF以及提供了類似Hive內(nèi)置函數(shù)的各種各樣的處理函數(shù)

map

需要定義函數(shù)和StructType
忽略數(shù)值判斷細(xì)節(jié)和精度等

from pyspark.sql.types import *

def a_func(_):
    return _['id'], _['cnt1'], _['cnt2'], _['cnt1'] / (_['cnt1'] + _['cnt1'])

a_schema = StructType([
    StructField('id', StringType(), True),
    StructField('cnt1', IntegerType(), True),
    StructField('cnt2', IntegerType(), True),
    StructField('cnt1_rate', IntegerType(), True)
])

a_new_df = sqlctx.createDataFrame(df.select('id', 'cnt1', 'cnt2').map(a_func), a_schema)
udf

需要定義函數(shù)和UDF
忽略數(shù)值判斷細(xì)節(jié)和精度等

def a_func(cnt1, cnt2):
    return cnt1 / (cnt1 + cnt2)

a_udf = F.udf(a_func, IntegerType())

a_new_df = df.withColumn('cnt1_rate', a_udf(df['cnt1'], df['cnt2'])
functions

處理類似日期字符串的格式轉(zhuǎn)換、等等等
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

Spark處理行轉(zhuǎn)列pivot的使用

在使用SQL查詢數(shù)據(jù)時,很多情況下需要將行轉(zhuǎn)為列,以有利于數(shù)據(jù)的展示和不同維度需求的利用
一般可采用子查詢case when、連續(xù)join、字段補全union的形式
Spark的DataFrame中可以通過GroupedData的pivot函數(shù)來實現(xiàn)

df.groupBy(['course_name']).pivot('daystr').sum('score')

df.groupBy(['course_name']).pivot('daystr').count()

轉(zhuǎn)換前

daystr course_name score
2017-11-15 yuwen 1
2017-11-15 yuwen 1
2017-11-15 shuxue 1
2017-11-15 yingyu 2
2017-11-16 yuwen 1
2017-11-16 shuxue 1
2017-11-16 yingyu 2

轉(zhuǎn)換后

course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 2 2
course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 1 1
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI