溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spark通過(guò)combineByKey算子實(shí)現(xiàn)條件性聚合的方法

發(fā)布時(shí)間:2020-07-20 05:50:18 來(lái)源:網(wǎng)絡(luò) 閱讀:965 作者:sirius_kb 欄目:大數(shù)據(jù)

實(shí)際開(kāi)發(fā)過(guò)程中遇到了需要實(shí)現(xiàn)選擇性聚合的場(chǎng)景,即對(duì)于某一個(gè)key對(duì)應(yīng)的數(shù)據(jù),將滿足條件的記錄進(jìn)行聚合,不滿足條件的則不進(jìn)行聚合。

使用spark處理這種計(jì)算場(chǎng)景時(shí),想到了使用combineByKey算子,先將輸入數(shù)據(jù)中的value映射成含一個(gè)元素的ArrayBuffer(scala中相當(dāng)于java中的ArrayList),然后在聚合時(shí)對(duì)滿足聚合條件的記錄聚合后覆蓋這一個(gè)ArrayBuffer,不滿足條件的待聚合的兩條記錄都填入ArrayBuffer。最后調(diào)用flatMap將ArrayBuffer中的元素分拆。

比如下面的代碼實(shí)現(xiàn)了對(duì)某個(gè)字段聚合時(shí),按照時(shí)間條件進(jìn)行選擇性的聚合:

val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir))

    .map(line => line.split("\\|"))

    .filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true)

    .map(arr => (arr(0), arr))

    .reduceByKey( (pure, after) => reduceSession(pure, after))

    .map(tup => (tup._2(13), tup._2))

    .combineByKey( x => ArrayBuffer(x),

    (x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y),

    (x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y))

    .flatMap(tup => arrToStr(tup._2))

def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String])

                    : ArrayBuffer[Array[String]] = {

    var outList = x.clone()

    var outarr = y.clone()

    var flag = true

    for(i <- 0 until outList.length){

        if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) {

            outarr = reduceSession(outList(i), y)

            outList(i) = outarr

            flag = false

        }

    }

    if(flag) {

        outList += y

    }

    outList

}

def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]])

                : ArrayBuffer[Array[String]] = {

    var outList = x.clone();

    for(i <- 0 until y.length){

    var outarr = y(i).clone()

    var flag = true

    for(j <- 0 until outList.length){

        if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) {

            outarr = reduceSession(outList(j), y(i))

            outList(j) = outarr

            flag = false

        }

    }

    if(flag) {

        outList += y(i)

    }

    }

    outList

}
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI