您好,登錄后才能下訂單哦!
通信模型架構圖
master 端代碼 import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory // 需要導入這2個包 封裝一些屬性。 class MasterActor extends Actor { //在開始之前調用一次 override def preStart(): Unit = { } //用于接收消息 override def receive: Receive = { case "started" => { println("Master has been started!") //進入這個分支,說明這個Master線程已經啟動完成 } case "connecting" => { println("Master has been get connect from Worker!") println("a Worker Node has been register!") //返回消息給Worker sender() ! "connected" Thread.sleep(1000) } case "stoped" => { } } } object Demo01MasterActor { def main(args: Array[String]) { //設置MasterIP和端口 val masterHost = "localhost" val masterPort = "1234" //端口和IP封裝到akka架構,獲取一個屬性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$masterHost" |akka.remote.netty.tcp.port = "$masterPort" """.stripMargin val config = ConfigFactory.parseString(conStr) val masterActorSystem = ActorSystem("MasterActorSystem", config) val masterActor = masterActorSystem.actorOf(Props[MasterActor], "MasterActor") masterActor ! "started" masterActorSystem.awaitTermination(); } } worker端代碼 import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory class WorkerActor extends Actor { var masterURL: ActorSelection = null //啟動Actor之前執(zhí)行,做初始化工作 override def preStart(): Unit = { //配置訪問Master的URL //MasterIP:localhost //MasterPort:8888(根據Master配置) //Master的 ActorSystem對象:MasterActorSystem、MasterActor masterURL = context.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/user/MasterActor") } override def receive: Receive = { case "started" => { println("Worker has been started!") //進入這個分支,說明這個Worker線程已經啟動完成 //可以去向Master注冊 //請求和Master建立連接 masterURL ! "connecting" } case "connected" => { println("Worker 收到來自Master確認信息!") } case "stoped" => { } } } object Demo01WorkerActor { def main(args: Array[String]) { //初始化MastereIP和端口、WorkerIP和端口 // val masterHost = args(0) // val masterPort = args(1) // val workerHost = args(2) // val workePort = args(3) val masterHost = "localhost" val masterPort = "8888" val workerHost = "localhost" val workePort = "8889" //端口和IP封裝到akka架構,獲取一個屬性配置文件 val conStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$workerHost" |akka.remote.netty.tcp.port = "$workePort" """.stripMargin val config = ConfigFactory.parseString(conStr) val workerActorSystem = ActorSystem("WorkerActorSystem", config) val workerActor = workerActorSystem.actorOf(Props[WorkerActor], "WorkerActor") workerActor ! "started" workerActorSystem.awaitTermination(); } }
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。