溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何分析Spark中大數(shù)據(jù)產(chǎn)品的測試方法與實(shí)現(xiàn)

發(fā)布時(shí)間:2021-12-17 11:14:58 來源:億速云 閱讀:153 作者:柒染 欄目:大數(shù)據(jù)

如何分析Spark中大數(shù)據(jù)產(chǎn)品的測試方法與實(shí)現(xiàn),很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

Spark作為現(xiàn)在主流的分布式計(jì)算框架,已經(jīng)融入到了很多的產(chǎn)品中作為ETL的解決方案。  而我們?nèi)绻胍y試這樣的產(chǎn)品就要對(duì)分布式計(jì)算的原理有個(gè)清晰的認(rèn)知并且也要熟悉分布式計(jì)算框架的使用來針對(duì)各種ETL場景設(shè)計(jì)不同的測試數(shù)據(jù)。  而一般來說我們需要從以下兩個(gè)角度來進(jìn)行測試。

ETL能兼容各種不同的數(shù)據(jù)(不同的數(shù)據(jù)規(guī)模,數(shù)據(jù)分布和數(shù)據(jù)類型)

ETL處理數(shù)據(jù)的正確性

測試數(shù)據(jù)兼容

ETL是按一定規(guī)則針對(duì)數(shù)據(jù)進(jìn)行清洗,抽取,轉(zhuǎn)換等一系列操作的簡寫。那么一般來說他要能夠處理很多種不同的數(shù)據(jù)類型。  我們在生產(chǎn)上遇見的bug有很大一部分占比是生產(chǎn)環(huán)境遇到了比較極端的數(shù)據(jù)導(dǎo)致我們的ETL程序無法處理。 比如:

數(shù)據(jù)擁有大量分片

在分布式計(jì)算中,一份數(shù)據(jù)是由多個(gè)散落在HDFS上的文件組成的, 這些文件可能散落在不同的機(jī)器上,  只不過HDFS會(huì)給使用者一個(gè)統(tǒng)一的視圖,讓使用者以為自己在操作的是一個(gè)文件,而不是很多個(gè)文件。 這是HDFS這種分布式文件系統(tǒng)的存儲(chǔ)方式。  而各種分布式計(jì)算框架, 比如hadoop的MapReduce,或者是spark。  就會(huì)利用這種特性,直接讀取散落在各個(gè)機(jī)器上文件并保存在那個(gè)節(jié)點(diǎn)的內(nèi)存中(理想狀態(tài)下,如果資源不夠可能還是會(huì)發(fā)生數(shù)據(jù)在節(jié)點(diǎn)間遷移)。

而讀取到內(nèi)存中的數(shù)據(jù)也是分片的(partition)。  spark默認(rèn)以128M為單位讀取數(shù)據(jù),如果數(shù)據(jù)小于這個(gè)值會(huì)按一個(gè)分片存儲(chǔ),如果大于這個(gè)值就繼續(xù)往上增長分片。 比如一個(gè)文件的大小是130M,  spark讀取它的時(shí)候會(huì)在內(nèi)存中分成兩個(gè)partition(1個(gè)128M,1個(gè)2M)。  如果這個(gè)文件特別小,只有10M,那它也會(huì)被當(dāng)做一個(gè)partition存在內(nèi)存中。  所以如果一份數(shù)據(jù)存放在HDFS中,這個(gè)數(shù)據(jù)是由10個(gè)散落在各個(gè)節(jié)點(diǎn)的文件組成的。 那么spark在讀取的時(shí)候,就會(huì)至少在內(nèi)存中有10個(gè)partition,  如果每個(gè)文件的大小都超過了128M,partition的數(shù)量會(huì)繼續(xù)增加。

而在執(zhí)行計(jì)算的時(shí)候,這些存儲(chǔ)在多個(gè)節(jié)點(diǎn)內(nèi)存中的數(shù)據(jù)會(huì)并發(fā)的執(zhí)行數(shù)據(jù)計(jì)算任務(wù)。 也就是說我們的數(shù)據(jù)是存放在多個(gè)節(jié)點(diǎn)中的內(nèi)存中的,  我們?yōu)槊恳粋€(gè)partition都執(zhí)行一個(gè)計(jì)算任務(wù)。 所以我們針對(duì)一個(gè)特別大的數(shù)據(jù)的計(jì)算任務(wù), 會(huì)首先把數(shù)據(jù)按partition讀取到不同節(jié)點(diǎn)的不同的內(nèi)存中,  也就是把數(shù)據(jù)拆分成很多小的分片放在不同機(jī)器的內(nèi)存中。 然后分別在這些小的分片上執(zhí)行計(jì)算任務(wù)。 最后再聚合每個(gè)計(jì)算任務(wù)的結(jié)果。  這就是分布式計(jì)算的基本原理。

那么這個(gè)時(shí)候問題就來了, 這種按partition為單位的分布式計(jì)算框架。partition的數(shù)量決定著并發(fā)的數(shù)量。  可以理解為,如果數(shù)據(jù)有100個(gè)partition,就會(huì)有100個(gè)線程針對(duì)這份數(shù)據(jù)做計(jì)算任務(wù)。所以partition的數(shù)量代表著計(jì)算的并行程度。  但是不是說partition越多越好,如果明明數(shù)據(jù)就很小, 我們卻拆分了大量的partition的話,反而是比較慢的。  而且所有分片的計(jì)算結(jié)果最后是要聚合在一個(gè)地方的。 這些都會(huì)造成網(wǎng)絡(luò)IO的開銷(因?yàn)閿?shù)據(jù)是在不同的節(jié)點(diǎn)之前傳輸?shù)?。  尤其是在分布式計(jì)算中,我們有shuffle這個(gè)性能殺手(不熟悉這個(gè)概念的同學(xué)請(qǐng)看我之前的文章)。  在大量的分片下執(zhí)行shuffle將會(huì)是一個(gè)災(zāi)難,因?yàn)榇罅康木W(wǎng)絡(luò)IO會(huì)導(dǎo)致集群處于很高的負(fù)載甚至癱瘓。  我們曾經(jīng)碰見過只有500M但是卻有7000個(gè)分片的數(shù)據(jù),那一次的結(jié)果是針對(duì)這個(gè)數(shù)據(jù)并行執(zhí)行了多個(gè)ETL程序后,整個(gè)hadoop集群癱瘓了。  這是在數(shù)據(jù)預(yù)處理的時(shí)候忘記做reparation(重新分片)的結(jié)果。

數(shù)據(jù)傾斜

如何分析Spark中大數(shù)據(jù)產(chǎn)品的測試方法與實(shí)現(xiàn)

在上面的任務(wù)處理中出現(xiàn)了shuffle的操作。shuffle也叫洗牌,  在上面講partition和分布式計(jì)算原理的時(shí)候,我們知道分布式計(jì)算就是把數(shù)據(jù)劃分很多個(gè)數(shù)據(jù)片存放在很多個(gè)不同的節(jié)點(diǎn)上,  然后在這些數(shù)據(jù)片上并發(fā)執(zhí)行同樣的計(jì)算任務(wù)來達(dá)到分布式計(jì)算的目的,這些任務(wù)互相是獨(dú)立的, 比如我們執(zhí)行一個(gè)count操作, 也就是計(jì)算這個(gè)數(shù)據(jù)的行數(shù)。  實(shí)際的操作其實(shí)是針對(duì)每個(gè)數(shù)據(jù)分片,也就是partition分別執(zhí)行count的操作。 比如我們有3個(gè)分片分別是A,B,C,  那執(zhí)行count的時(shí)候其實(shí)是并發(fā)3個(gè)線程,每個(gè)線程去計(jì)算一個(gè)partition的行數(shù), 他們都計(jì)算完畢后,再匯總到driver程序中,  也就是A,B,C這三個(gè)計(jì)算任務(wù)的計(jì)算過程是彼此獨(dú)立互不干擾的,只在計(jì)算完成后進(jìn)行聚合。

但并不是所有的計(jì)算任務(wù)都可以這樣獨(dú)立的,比如你要執(zhí)行一個(gè)groupby的sql操作。 就像上面的圖中,我要先把數(shù)據(jù)按單詞分組,之后才能做其他的統(tǒng)計(jì)計(jì)算,  比如統(tǒng)計(jì)詞頻或者其他相關(guān)操作。 那么首先spark要做的是根據(jù)groupby的字段做哈希,相同值的數(shù)據(jù)傳送到一個(gè)固定的partition上。  這樣就像上圖一樣,我們把數(shù)據(jù)中擁有相同key值的數(shù)分配到一個(gè)partition, 這樣從數(shù)據(jù)分片上就把數(shù)據(jù)進(jìn)行分組隔離。

然后我們要統(tǒng)計(jì)詞頻的話,只需要才來一個(gè)count操作就可以了。 shuffle的出現(xiàn)是為了計(jì)算能夠高效的執(zhí)行下去,  把相似的數(shù)據(jù)聚合到相同的partition上就可以方便之后的計(jì)算任務(wù)依然是獨(dú)立隔離的并且不會(huì)觸發(fā)網(wǎng)絡(luò)IO。  這是方便后續(xù)計(jì)算的設(shè)計(jì)模式,也就是節(jié)省了后續(xù)一系列計(jì)算的開銷。 但代價(jià)是shuffle本身的開銷,而且很多情況下shuffle本身的開銷也是很大的。  尤其是shuffle會(huì)因?yàn)閿?shù)據(jù)傾斜而出現(xiàn)著名的長尾現(xiàn)象。

根據(jù)shuffle的理論,相似的數(shù)據(jù)會(huì)聚合到同一個(gè)partition上。 但是如果我們的數(shù)據(jù)分布不均勻會(huì)出現(xiàn)什么情況呢?  比如我們要針對(duì)職業(yè)這個(gè)字段做groupby的操作, 但是如果100W行數(shù)據(jù)中有90W行的數(shù)據(jù)都是程序員這個(gè)職業(yè)的話, 會(huì)出現(xiàn)什么情況?  你會(huì)發(fā)現(xiàn)有90W行的數(shù)據(jù)都跑到了同一個(gè)partition上造成一個(gè)巨大的partition。這樣就違背了分布式計(jì)算的初衷,  分布式計(jì)算的初衷就是把數(shù)據(jù)切分成很多的小數(shù)據(jù)分布在不同的節(jié)點(diǎn)內(nèi)存中,利用多個(gè)節(jié)點(diǎn)的并行計(jì)算能力來加速計(jì)算過程。

但是現(xiàn)在我們絕大部分的數(shù)據(jù)都匯聚到了一個(gè)partition中,這樣就又變成了單點(diǎn)計(jì)算。 而且這里還有一個(gè)特別大的問題, 就是我們在提交任務(wù)到hadoop  yarn上的時(shí)候,申請(qǐng)的資源是固定且平均分配的。  比如我申請(qǐng)10個(gè)container去計(jì)算這份數(shù)據(jù),那這10個(gè)container的資源是相等的,哪個(gè)也不多,哪個(gè)也不少。 但是我們的數(shù)據(jù)分片的大小卻是不一樣的,  比如90W行的分片需要5個(gè)G的內(nèi)存,但是其他的數(shù)據(jù)分片可能1個(gè)G就夠了。 所以如果我們不知道有數(shù)據(jù)傾斜的情況出現(xiàn)而導(dǎo)致申請(qǐng)的資源教少,就會(huì)導(dǎo)致任務(wù)OOM而掛掉。  而如果我們?yōu)榱司薮蟮臄?shù)據(jù)分片為每個(gè)container都申請(qǐng)了5G的資源, 那又造成了資源浪費(fèi)。

數(shù)據(jù)傾斜和shuffle是業(yè)界經(jīng)典難題,很難處理。 在很多大數(shù)據(jù)產(chǎn)品中都會(huì)有根據(jù)數(shù)據(jù)大小自動(dòng)調(diào)整申請(qǐng)資源的功能。而數(shù)據(jù)傾斜就是這種功能絕對(duì)的天敵。  處理不好的話,要不會(huì)變成申請(qǐng)過大資源承包集群,要不會(huì)申請(qǐng)過小資源導(dǎo)致任務(wù)掛掉。 而我們在測試階段要做的,就是模擬出這種數(shù)據(jù)傾斜的數(shù)據(jù),  然后驗(yàn)證ETL程序的表現(xiàn)。

寬表

列數(shù)太多的表就是寬表。比如我見過的最寬的表是1W列的, 尤其在機(jī)器學(xué)習(xí)系統(tǒng)中, 由于要抽取高維特征,  所以在ETL階段經(jīng)常會(huì)把很多的表拼接成一個(gè)很大的寬表。這種寬表是數(shù)據(jù)可視化的天敵,比如我們的功能是可以隨機(jī)預(yù)覽一份數(shù)據(jù)的100行。  那100*1W這樣的數(shù)據(jù)量要傳輸?shù)角岸瞬秩揪褪莻€(gè)很費(fèi)事的操作了。尤其是預(yù)覽本身也是要執(zhí)行一些計(jì)算的。如果加上這份數(shù)據(jù)本來就有海量分片的話,  要在后臺(tái)打開這么多的文件,再加上讀取這么寬的表的數(shù)據(jù)。 甚至有可能OOM, 實(shí)際上我也確實(shí)見過因?yàn)檫@個(gè)原因OOM的。  所以這個(gè)測試點(diǎn)就是我們故意去造這樣的寬表進(jìn)行測試。

其他的數(shù)據(jù)類型不一一解釋了, 都跟字面的意思差不多。

造數(shù)

之所以也使用spark這種分布式框架來造數(shù),而不是單獨(dú)使用parquet或者h(yuǎn)dfs的client是因?yàn)槲覀冊斓臄?shù)據(jù)除了要符合一些極端場景外,也要保證要有足夠的數(shù)據(jù)量,  畢竟ETL都是面對(duì)大數(shù)據(jù)場景的。 所以利用spark的分布式計(jì)算的優(yōu)勢可以在短時(shí)間內(nèi)創(chuàng)建大量數(shù)據(jù)。  比如我前兩天造過一個(gè)1億行,60個(gè)G的數(shù)據(jù),只用了20分鐘。

技術(shù)細(xì)節(jié)

RDD是spark的分布式數(shù)據(jù)結(jié)構(gòu)。 一份數(shù)據(jù)被spark讀取后會(huì)就生成一個(gè)RDD,當(dāng)然RDD就包含了那些partition。  我們創(chuàng)建RDD的方式有兩種, 一種是從一個(gè)已有的文件中讀取RDD,當(dāng)然這不是我們想要的效果。 所以我們使用第二種, 從內(nèi)存中的一個(gè)List中生成RDD。  如下:

public class Demo {  public static void main(String[] args) {  SparkConf conf = new SparkConf().setAppName("data produce")  .setMaster("local");  JavaSparkContext sc = new JavaSparkContext(conf);  SparkSession spark = SparkSession  .builder()  .appName("Java Spark SQL basic example")  .getOrCreate();  List data = new XRange(1000);  JavaRDD distData = sc.parallelize(data, 100);

上面是我寫的一個(gè)demo,前面初始化spark conf和spark session的代碼可以先忽略不用管。 主要看最后兩行,  XRange是我仿照python的xrange設(shè)計(jì)的類。 可以幫我用類似生成器的原理創(chuàng)建一個(gè)帶有index序列的List。  其實(shí)這里我們手動(dòng)創(chuàng)建一個(gè)list也行。  而最后一行就是我們通過spark的API把一個(gè)List轉(zhuǎn)換成一個(gè)RDD。sc.parallelize的第一個(gè)參數(shù)是List,而第二個(gè)參數(shù)就是你要設(shè)置的并行度,  也可以理解為你要生成這個(gè)數(shù)據(jù)的partition的數(shù)量。 其實(shí)如果我們現(xiàn)在想生成這一千行的只有index的數(shù)據(jù)的話,  再調(diào)用這樣一個(gè)API就可以了:distData.saveAsTextFile("path"); 通過這樣一個(gè)API就可以直接保存文件了。  當(dāng)然這樣肯定不是我們想要的,因?yàn)槔锩孢€沒有我們要的數(shù)據(jù)。 所以這個(gè)時(shí)候我們要出動(dòng)spark的一個(gè)高級(jí)接口,dataframe。  dataframe是spark仿照pandas的dataframe的設(shè)計(jì)開發(fā)的高級(jí)API。 功能跟pandas很像,  我們可以把一個(gè)dataframe就當(dāng)做一個(gè)表來看, 而它也有很多好用的API。  最重要的是我們有一個(gè)DataframeWriter類專門用來將dataframe保存成各種各樣格式和分區(qū)的數(shù)據(jù)的。  比如可以很方便的保存為scv,txt這種傳統(tǒng)數(shù)據(jù), 可以很方便保存成parquet和orc這種列式存儲(chǔ)的文件格式。 也提供partition  by的操作來保存成分區(qū)表或者是分桶表??傊軌驇臀覀冊斐龈鞣N我們需要的數(shù)據(jù)。  那么我們?nèi)绾伟岩粋€(gè)RDD轉(zhuǎn)換成我們需要的dataframe并填充進(jìn)我們需要的數(shù)據(jù)呢。 往下看:

List fields = new ArrayList<>();  String schemaString = "name,age";  fields.add(DataTypes.createStructField("name",  DataTypes.StringType, true));  fields.add(DataTypes.createStructField("age",  DataTypes.IntegerType, true));  StructType schema = DataTypes.createStructType(fields);  // Convert records of the RDD (people) to Rows  JavaRDD rowRDD = distData.map( record ->{  RandomStringField randomStringField = new  RandomStringField();  randomStringField.setLength(10); BinaryIntLabelField  binaryIntLabelField = new  BinaryIntLabelField();  return RowFactory.create(randomStringField.gen(),  binaryIntLabelField.gen());  });  Dataset dataset =spark.createDataFrame(rowRDD, schema);  dataset.persist();  dataset.show();  DataFrameWriter writer = new DataFrameWriter(dataset);  writer.mode(SaveMode.Overwrite).partitionBy("age").  parquet("/Users/sungaofei/gaofei");

dataframe中每一個(gè)數(shù)據(jù)都是一行,也就是一個(gè)Row對(duì)象,而且dataframe對(duì)于每一列也就是每個(gè)schema有著嚴(yán)格的要求。  因?yàn)樗且粋€(gè)表么。所以跟數(shù)據(jù)庫的表或者pandas中的表是一樣的。要規(guī)定好每一列的schema以及每一行的數(shù)據(jù)。 所以首先我們先定義好schema,  定義每個(gè)schema的列名和數(shù)據(jù)類型。 然后通過DataTypes的API創(chuàng)建schema。 這樣我們的列信息就有了。  然后是關(guān)鍵的我們?nèi)绾伟岩粋€(gè)RDD轉(zhuǎn)換成dataframe需要的Row并且填充好每一行的數(shù)據(jù)。 這里我們使用RDD的map方法,  其實(shí)dataframe也是一個(gè)特殊的RDD, 這個(gè)RDD里的每一行都是一個(gè)ROW對(duì)象而已。  所以我們使用RDD的map方法來填充我們每一行的數(shù)據(jù)并把這一行數(shù)據(jù)轉(zhuǎn)換成Row對(duì)象。

JavaRDD rowRDD = distData.map( record ->{  RandomStringField randomStringField = new RandomStringField();  randomStringField.setLength(10);  BinaryIntLabelField binaryIntLabelField = new BinaryIntLabelField();  return RowFactory.create(randomStringField.gen(), binaryIntLabelField.gen());  });

因?yàn)橹岸xschema的時(shí)候只定義了兩列, 分別是name和age。  所以在這里我分別用一個(gè)隨機(jī)生成String類型的類和隨機(jī)生成int類型的類來填充數(shù)據(jù)。  最后使用RowFactory.create方法來把這兩個(gè)數(shù)據(jù)生成一個(gè)Row。 map方法其實(shí)就是讓使用者處理每一行數(shù)據(jù)的方法,  record這個(gè)參數(shù)就是把行數(shù)據(jù)作為參數(shù)給我們使用。 當(dāng)然這個(gè)例子里原始RDD的每一行都是當(dāng)初生成List的時(shí)候初始化的index序號(hào)。 而我們現(xiàn)在不需要它,  所以也就沒有使用。 直接返回隨機(jī)字符串和int類型的數(shù)。 然后我們有了這個(gè)每一行數(shù)據(jù)都是Row對(duì)象的RDD后。  就可以通過調(diào)用下面的API來生成dataframe。

Dataset dataset =spark.createDataFrame(rowRDD, schema);

分別把row和schema傳遞進(jìn)去,生成dataframe的表。 最后利用DataFrameWriter保存數(shù)據(jù)。

好了, 這就是造數(shù)的基本原理了, 其實(shí)也是蠻簡單的。 當(dāng)然要做到嚴(yán)格控制數(shù)據(jù)分布,數(shù)據(jù)類型,特征維度等等就需要做很多特殊的處理。  這里就不展開細(xì)節(jié)了。

測試ETL處理的正確性

輸入一份數(shù)據(jù),然后判斷輸出的數(shù)據(jù)是否是正確的。 只不過我們這是在大數(shù)據(jù)量下的處理和測試,輸入的數(shù)據(jù)是大數(shù)據(jù),ELT輸出的也是大數(shù)據(jù),  所以就需要一些新的測試手段。 其實(shí)這個(gè)測試手段也沒什么新奇的了, 是我們剛才一直在講的技術(shù),也就是spark這種分布式計(jì)算框架。  我們以spark任務(wù)來測試這些ETL程序,這同樣也是為了測試自身的效率和性能。 如果單純使用hdfs client來讀取文件的話,  掃描那么大的數(shù)據(jù)量是很耗時(shí)的,這是我們不能接受的。 所以我們利用大數(shù)據(jù)技術(shù)來測試大數(shù)據(jù)功能就成為了必然。  當(dāng)然也許有些同學(xué)會(huì)認(rèn)為我只是測試功能么,又不是測試算法的處理性能,沒必要使用那么大的數(shù)據(jù)量。 我們用小一點(diǎn)的數(shù)據(jù),比如一百行的數(shù)據(jù)就可以了。  但其實(shí)這也是不對(duì)的, 因?yàn)樵诜植际接?jì)算中, 大數(shù)量和小數(shù)據(jù)量的處理結(jié)果可能不是完全一致的, 比如隨機(jī)拆分?jǐn)?shù)據(jù)這種場景在大數(shù)據(jù)量下可能才能測試出bug。  而且大數(shù)據(jù)測試還有另外一種場景就是數(shù)據(jù)監(jiān)控, 定期的掃描線上數(shù)據(jù),驗(yàn)證線上數(shù)據(jù)是否出現(xiàn)異常。 這也是一種測試場景,而且線上的數(shù)據(jù)一定是海量的。

廢話不多說,直接看下面的代碼片段。

@Features(Feature.ModelIde)  @Stories(Story.DataSplit)  @Description("使用pyspark驗(yàn)證隨機(jī)拆分中的分層拆分")  @Test  public void dataRandomFiledTest(){  String script = "# coding: UTF-8\n" +  "# input script according to definition of \"run\" interface\n" +  "from trailer import logger\n" +  "from pyspark import SparkContext\n" +  "from pyspark.sql import SQLContext\n" +  "\n" +  "\n" +  "def run(t1, t2, context_string):\n" +  " # t2為原始數(shù)據(jù), t1為經(jīng)過數(shù)據(jù)拆分算子根據(jù)字段分層拆分后的數(shù)據(jù)\n" +  " # 由于數(shù)據(jù)拆分是根據(jù)col_20這一列進(jìn)行的分層拆分, 所以在這里分別\n" +  " # 對(duì)這2份數(shù)據(jù)進(jìn)行分組并統(tǒng)計(jì)每一個(gè)分組的計(jì)數(shù)。由于這一列是label\n" +  " # 所以其實(shí)只有兩個(gè)分組,分別是0和1\n" +  " t2_row = t2.groupby(t2.col_20).agg({\"*\" : \"count\"}).cache()\n" +  " t1_row = t1.groupby(t1.col_20).agg({\"*\" : \"count\"}).cache()\n" +  " \n" +  " \n" +  " t2_0 = t2_row.filter(t2_row.col_20 == 1).collect()[0][\"count(1)\"]\n" +  " t2_1 = t2_row.filter(t2_row.col_20 == 0).collect()[0][\"count(1)\"]\n" +  " \n" +  " t1_0 = t1_row.filter(t1_row.col_20 == 1).collect()[0][\"count(1)\"]\n" +  " t1_1 = t1_row.filter(t1_row.col_20 == 0).collect()[0][\"count(1)\"]\n" +  " \n" +  " # 數(shù)據(jù)拆分算子是根據(jù)字段按照1:1的比例進(jìn)行拆分的。所以t1和t2的每一個(gè)分組\n" +  " # 都應(yīng)該只有原始數(shù)據(jù)量的一半\n" +  " if t2_0/2 - t1_0 >1:\n" +  " raise RuntimeError(\"the 0 class is not splited correctly\")\n" +  " \n" +  " if t2_1/2 - t1_1 >1:\n" +  " raise RuntimeError(\"the 1 class is not splited correctly\")\n" +  "\n" +  " return [t1]";

我們用來掃描數(shù)據(jù)表的API仍然是我們之前提到的dataframe。上面的代碼片段是我們嵌入spark任務(wù)的腳本。 里面t1和t2都是dataframe,  分別代表原始數(shù)據(jù)和經(jīng)過數(shù)據(jù)拆分算法拆分后的數(shù)據(jù)。 測試的功能是分層拆分。 也就是按某一列按比例抽取數(shù)據(jù)。 比如說100W行的數(shù)據(jù),我按job這個(gè)字段分層拆分,  我要求的比例是30%。 也即是說每種職業(yè)抽取30%的數(shù)據(jù)出來,相當(dāng)于這是一個(gè)數(shù)據(jù)采樣的功能。 OK,  所以在測試腳本中,我們分別先把原始表和經(jīng)過采樣的表按這一列進(jìn)行分組操作, 也就是groupby(col_20)。 這里我選擇的是按col_20進(jìn)行分層拆分。  根據(jù)剛才講的這樣的分組操作后會(huì)觸發(fā)shuffle,把有相同職業(yè)的數(shù)據(jù)傳到一個(gè)數(shù)據(jù)分片上。 然后我們做count這種操作統(tǒng)計(jì)每一個(gè)組的行數(shù)。  因?yàn)檫@個(gè)算法我是按1:1拆分的,也就是按50%采樣。 所以最后我要驗(yàn)證拆分后的數(shù)據(jù)的每一組的行數(shù)都是原始數(shù)據(jù)中該組的一半。

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI