您好,登錄后才能下訂單哦!
好程序員大數(shù)據(jù)分享Spark任務和集群啟動流程,Spark集群啟動流程
1.調(diào)用start-all.sh腳本,開始啟動Master
2.Master啟動以后,preStart方法調(diào)用了一個定時器,定時檢查超時的Worker后刪除
3.啟動腳本會解析slaves配置文件,找到啟動Worker的相應節(jié)點.開始啟動Worker
4.Worker服務啟動后開始調(diào)用preStart方法開始向所有的Master進行注冊
5.Master接收到Worker發(fā)送過來的注冊信息,Master開始保存注冊信息并把自己的URL響應給Worker
6.Worker接收到Master的URL后并更新,開始調(diào)用一個定時器,定時的向Master發(fā)送心跳信息
?
任務提交流程
1.Driver端會通過spark-submit腳本啟動SaparkSubmit進程,此時創(chuàng)建了一個非常重要的對象(SparkContext),開始向Master發(fā)送消息
2.Master接收到發(fā)送過來的信息后開始生成任務信息,并把任務信息放到一個對列里
3.Master把所有有效的Worker過濾出來,按照空閑的資源進行排序
4.Master開始向有效的Worker通知拿取任務信息并啟動相應的Executor
5.Worker啟動Executor并向Driver反向注冊
6.Driver開始把生成的task發(fā)送給相應的Executor,Executor開始執(zhí)行任務
?
集群啟動流程
1.首先創(chuàng)建Master類
import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} ? import scala.collection.mutable import scala.concurrent.duration._ ? class Master(val masterHost: String, val masterPort: Int) extends Actor{ ? ??// 用來存儲Worker的注冊信息 ??val idToWorker = new mutable.HashMap[String, WorkerInfo]() ? ??// 用來存儲Worker的信息 ??val workers = new mutable.HashSet[WorkerInfo]() ? ??// Worker的超時時間間隔 ??val checkInterval: Long = 15000 ? ? ??// 生命周期方法,在構(gòu)造器之后,receive方法之前只調(diào)用一次 ??override def preStart(): Unit = { ????// 啟動一個定時器,用來定時檢查超時的Worker ????import context.dispatcher ????context.system.scheduler.schedule(0 millis, checkInterval millis, self, CheckTimeOutWorker) ??} ? ??// 在preStart方法之后,不斷的重復調(diào)用 ??override def receive: Receive = { ????// Worker -> Master ????case RegisterWorker(id, host, port, memory, cores) => { ??????if (!idToWorker.contains(id)){ ????????val workerInfo = new WorkerInfo(id, host, port, memory, cores) ????????idToWorker += (id -> workerInfo) ????????workers += workerInfo ? ????????println("a worker registered") ? ????????sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" + ??????????s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") ??????} ????} ????case HeartBeat(workerId) => { ??????// 通過傳過來的workerId獲取對應的WorkerInfo ??????val workerInfo: WorkerInfo = idToWorker(workerId) ??????// 獲取當前時間 ??????val currentTime = System.currentTimeMillis() ??????// 更新最后一次心跳時間 ??????workerInfo.lastHeartbeatTime = currentTime ????} ????case CheckTimeOutWorker => { ??????val currentTime = System.currentTimeMillis() ??????val toRemove: mutable.HashSet[WorkerInfo] = ????????workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval) ? ??????// 將超時的Worker從idToWorker和workers中移除 ??????toRemove.foreach(deadWorker => { ????????idToWorker -= deadWorker.id ????????workers -= deadWorker ??????}) ? ??????println(s"num of workers: ${workers.size}") ????} ??} } object Master{ ??val MASTER_SYSTEM = "MasterSystem" ??val MASTER_ACTOR = "Master" ? ??def main(args: Array[String]): Unit = { ????val host = args(0) ????val port = args(1).toInt ? ????val configStr = ??????s""" ?????????|akka.actor.provider = "akka.remote.RemoteActorRefProvider" ?????????|akka.remote.netty.tcp.hostname = "$host" ?????????|akka.remote.netty.tcp.port = "$port" ??????""".stripMargin ? ????// 配置創(chuàng)建Actor需要的配置信息 ????val config: Config = ConfigFactory.parseString(configStr) ? ????// 創(chuàng)建ActorSystem ????val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config) ? ????// 用actorSystem實例創(chuàng)建Actor ????actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR) ? ????actorSystem.awaitTermination() ? ??} } |
2.創(chuàng)建RemoteMsg特質(zhì)
trait RemoteMsg extends Serializable{ ? } ? // Master -> self(Master) case object CheckTimeOutWorker ? // Worker -> Master case class RegisterWorker(id: String, host: String, ??????????????????????????port: Int, memory: Int, cores: Int) extends RemoteMsg ? // Master -> Worker case class RegisteredWorker(masterUrl: String) extends RemoteMsg ? // Worker -> self case object SendHeartBeat ? // Worker -> Master(HeartBeat) case class HeartBeat(workerId: String) extends RemoteMsg |
3.創(chuàng)建Worker類
import java.util.UUID ? import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} ? import scala.concurrent.duration._ ? class Worker(val host: String, val port: Int, val masterHost: String, ?????????????val masterPort: Int, val memory: Int, val cores: Int) extends Actor{ ? ??// 生成一個Worker ID ??val workerId = UUID.randomUUID().toString ? ??// 用來存儲MasterURL ??var masterUrl: String = _ ? ??// 心跳時間間隔 ??val heartBeat_interval: Long = 10000 ? ??// master的Actor ??var master: ActorSelection = _ ? ??override def preStart(){ ????// 獲取Master的Actor ????master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" + ??????s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") ? ????master ! RegisterWorker(workerId, host, port, memory, cores) ??} ? ??override def receive: Receive = { ????// Worker接收到Master發(fā)送過來的注冊成功的信息(masterUrl) ????case RegisteredWorker(masterUrl) => { ??????this.masterUrl = masterUrl ??????// 啟動一個定時器,定時給Master發(fā)送心跳 ??????import context.dispatcher ??????context.system.scheduler.schedule(0 millis, heartBeat_interval millis, self, SendHeartBeat) ????} ????case SendHeartBeat => { ??????// 向Master發(fā)送心跳 ??????master ! HeartBeat(workerId) ????} ? ??} ? } object Worker{ ??val WORKER_SYSTEM = "WorkerSystem" ??val WORKER_ACTOR = "Worker" ? ??def main(args: Array[String]): Unit = { ????val host = args(0) ????val port = args(1).toInt ????val masterHost = args(2) ????val masterPort = args(3).toInt ????val memory = args(4).toInt ????val cores = args(5).toInt ? ????val configStr = ??????s""" ?????????|akka.actor.provider = "akka.remote.RemoteActorRefProvider" ?????????|akka.remote.netty.tcp.hostname = "$host" ?????????|akka.remote.netty.tcp.port = "$port" ??????""".stripMargin ? ????// 配置創(chuàng)建Actor需要的配置信息 ????val config: Config = ConfigFactory.parseString(configStr) ? ????// 創(chuàng)建ActorSystem ????val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config) ? ????// 用actorSystem實例創(chuàng)建Actor ????val worker: ActorRef = actorSystem.actorOf( ??????Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR) ? ????actorSystem.awaitTermination() ? ??} } |
4.創(chuàng)建初始化類
class WorkerInfo(val id: String, val host: String, val port: Int, ?????????????????val memory: Int, val cores: Int) { ? ??// 初始化最后一次心跳的時間 ??var lastHeartbeatTime: Long = _ ? } |
5.本地測試需要傳入?yún)?shù):
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。