溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark RDD的collect action 不適用于單個(gè)element size過大的示例分析

發(fā)布時(shí)間:2021-12-17 10:03:31 來源:億速云 閱讀:135 作者:柒染 欄目:大數(shù)據(jù)

本篇文章為大家展示了Spark RDD的collect action 不適用于單個(gè)element size過大的示例分析,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

collect是Spark RDD一個(gè)非常易用的action,通過collect可以輕易獲得一個(gè)RDD當(dāng)中所有的elements。當(dāng)這些elements是String類型的時(shí)候,可以輕易將整個(gè)RDD轉(zhuǎn)化成一個(gè)List<String>,簡直不要太好用。

不過等一等,這么好用的action有一個(gè)弱點(diǎn),它不適合size比較的element。舉個(gè)例子來說吧。請看下面這段代碼:

... ...

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);


JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function<JavaRDD<String>, Void>(){

@Override

public Void call(JavaRDD<String> strJavaRDD) throws Exception {

List<String> messages = strJavaRDD.collect();

List<String> sizeStrs = new ArrayList<String>();

for (String message: messages) {

if (message== null)

continue;

String logStr = "message size is " + message.length();

strs.add(logStr);

}

saveToLog(outputLogPath, strs);

return null;

}

});

... ...

上述這段代碼當(dāng)Kafka中單個(gè)message(也就是)的size很?。ū热?00Bytes)的時(shí)候,運(yùn)行得很好。可是當(dāng)單個(gè)message size變大到一定程度(例如10MB),就會拋出以下異常:

sparkDriver-akka.actor.default-dispatcher-18 2015-10-15 21:52:28,606 ERROR JobSc

heduler - Error running job streaming job 1444971120000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 4 times, most recent failure: Lost task 0.3 in stage 238.0 (TID421, 127.0.0.1): ExecutorLostFailure (executor 123 lost)

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1215)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1204)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1203)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at scala.Option.foreach(Option.scala:236)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1404)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1365)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

原因很簡單,collect()無法handle“大數(shù)據(jù)”。對于10MB size這樣的單條message。我們可以用下面這段代碼替代上面最后一部分:

lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> strJavaRDD) throws Exception {

JavaRDD<String> sizeRDD = strJavaRDD.map(new Function<String, String>() {

@Override

public String call(String message) throws Exception {

if (message == null)

return null;

String logStr = "Message size is " + message.length();

return logStr;

}

});

List<String> sizeStrs = sizeRDD.collect();

saveToLog(outputLogPat, sizeStrs);

return null;

}

});

上述內(nèi)容就是Spark RDD的collect action 不適用于單個(gè)element size過大的示例分析,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI