溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用spark-core實現(xiàn)廣度優(yōu)先搜索

發(fā)布時間:2021-12-17 10:13:11 來源:億速云 閱讀:145 作者:柒染 欄目:大數(shù)據

如何使用spark-core實現(xiàn)廣度優(yōu)先搜索,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

需求描述

數(shù)據源是一批網絡日志數(shù)據,每條數(shù)據都有兩個字段srcip和dstip,字段之間以逗號分隔,問題的需求是給定一個srcip和dstip,在給定的搜索深度下檢索這兩個ip之間所有的通聯(lián)路徑。這個問題是網絡日志處理中的一個實際需求,之前在單機的程序中實現(xiàn)過,但是需要將所有的ip對加載到內存中。考慮到如果數(shù)據量太大的情況,可能單節(jié)點的內存無法支撐這樣的操作,但是如果不將ip對全加載內存中,使用深度優(yōu)先遍歷的方法,搜索過程又會很慢。最近在學習spark框架,剛接觸RDD,就是這用RDD來解決這個問題。以下是scala代碼

package com.pxu.spark.core

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 *  pxu
 *  2021-01-29 16:57
 */
object FindIpRel {


  def main(args: Array[String]): Unit = {

    val srcIp = args(0) // 源ip
    val dstIp = args(1) // 目標ip
    val depth = args(2).toInt //搜索深度
    val resPath = args(3) //搜索結果的輸出位置

    val conf = new SparkConf().setAppName("findIpRel")
    val sc = new SparkContext(conf)


    /**
     * 從數(shù)據源中構建原始rdd,每一行的數(shù)據形式為a,b
     */

    val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv")

    /**
     * 對原始Rdd進行元組形式轉化,現(xiàn)在每一行的數(shù)據形式為(a,b)
     * 除此之外還對數(shù)據進行了去重處理,并顯示使用hash分區(qū)器對RDD中的數(shù)據進行分區(qū)
     * 為后面的join操作,做一些優(yōu)化
     */
    val base = ori.map(a => {
      val tmpArr = a.split(",")
      (tmpArr(0), tmpArr(1))
    }).distinct().partitionBy(new HashPartitioner(10))


    /**
     * 這是一個用于保存結果的RDD,其中每一行的形式為(dstIp,List(ip on path))
     * 在查找過程中,發(fā)現(xiàn)了搜索結果后,就會將其并入到res中
     */
    var res = sc.makeRDD[(String,List[String])](List())

    /**
     * 這是一個用于迭代的RDD,其初始化的內容是,首先從baseRdd中過濾出元組第一個元素a是參數(shù)SrcIp的,
     * 然后將其轉化成(b,List(a))的格式,其中b總是代表當前搜索路徑上的尾ip,list中的其他內容代表搜索
     * 路徑上其他的ip
     */
    var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1)))

    for(i <- 2 to depth){

      /**
       * 1.首先iteration和base按照key進行join,這個操作的意義就是更深一層的搜索,結果RDD的格式是(b,(List(ip on path),c))
       * 2.對數(shù)據進行一次過濾,過去掉那些路徑已經形成環(huán)的元素,成環(huán)的判據就是List(ip on path)中的數(shù)據已經包含c了
       * 3.進行map操作,b并入到List(ip on path),將c作為新的key,因此此時更深一層的搜索,導致c成為了當前搜索路徑中的尾節(jié)點,
       *   此時RDD中的每一個元素的格式應該是(c,(List(ip on path))
       */
      val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1))

      /**
       * 將tmp中已經成功搜索的路徑篩選出來,成功搜索的判據是(c,(List(ip on path)),c與dstIp相等
       */
      val success = tmp.filter(a => a._1.equals(dstIp))

      /**
       * 將成功搜索的數(shù)據合并到res中
       */
      res = res.union(success)
      
      /**
       * 更新iteration
       */
      iteration = tmp.subtract(success)

    }
    
    /**
     * 將成功搜索的路徑并入到res中
     */
    res.union(iteration.filter(a => a._1.equals(dstIp)))

    /**
     * 執(zhí)行一次轉換操作,將res中的元素從(c,(List(ip on path))格式轉換成List(all ip on path)
     */
    val finalResult = res.map(a => a._2 :+ a._1)

    finalResult.saveAsTextFile(resPath)

  }

}

關于如何使用spark-core實現(xiàn)廣度優(yōu)先搜索問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業(yè)資訊頻道了解更多相關知識。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI