溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Scala Actor多線程怎么理解

發(fā)布時(shí)間:2021-12-09 09:06:58 來(lái)源:億速云 閱讀:133 作者:iii 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“Scala Actor多線程怎么理解”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

Scala Actor是Scala里多線程的基礎(chǔ),核心思想是用消息傳遞來(lái)進(jìn)行線程間的信息共享和同步。

Scala Actor線程模型可以這樣理解:所有Actor共享一個(gè)線程池,總的線程個(gè)數(shù)可以配置,也可以根據(jù)CPU個(gè)數(shù)決定;當(dāng)一個(gè)Actor啟動(dòng)之后,Scala分配一個(gè)線程給它使用,如果使用receive模型,這個(gè)線程就一直為該Actor所有,如果使用react模型,Scala執(zhí)行完react方法后拋出異常,則該線程就可以被其它Actor使用。

下面看一些核心代碼。

 def start(): Actor = synchronized {    // Reset various flags.    //    // Note that we do *not* reset `trapExit`. The reason is that    // users should be able to set the field in the constructor    // and before `act` is called.     exitReason = 'normal    exiting = false   shouldExit = false    scheduler execute {      ActorGC.newActor(Actor.this)      (new Reaction(Actor.this)).run()    }     this }

其中Reaction實(shí)現(xiàn)Runnable接口,scheduler基本相當(dāng)于是一個(gè)線程池,所以調(diào)用start方法之后會(huì)有一個(gè)線程來(lái)為該Actor服務(wù)。

使用receive模型。

def receive[R](f: PartialFunction[Any, R]): R = {   assert(Actor.self == this, "receive from channel belonging to other actor")   this.synchronized {     if (shouldExit) exit() // links     val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))     if (null eq qel) {       waitingFor = f.isDefinedAt       isSuspended = true      suspendActor()     } else {       received = Some(qel.msg)       sessions = qel.session :: sessions     }     waitingFor = waitingForNone     isSuspended = false  }   val result = f(received.get)   sessions = sessions.tail   result

如果當(dāng)前mailbox里面沒(méi)有可以處理的消息,調(diào)用suspendActor,該方法會(huì)調(diào)用wait;如果有消息,這調(diào)用PartialFunction進(jìn)行處理。

使用react模型。

def react(f: PartialFunction[Any, Unit]): Nothing = {   assert(Actor.self == this, "react on channel belonging to other actor")   this.synchronized {     if (shouldExit) exit() // links     val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))     if (null eq qel) {       waitingFor = f.isDefinedAt       continuation = f       isDetached = true    } else {       sessions = List(qel.session)       scheduleActor(f, qel.msg)     }     throw new SuspendActorException   }

如果當(dāng)前mailbox沒(méi)有可以處理的消息,設(shè)置waitingFor和continuation,這兩個(gè)變量會(huì)在接收到消息的時(shí)候使用;如果有消息,則調(diào)用scheduleActor,該方法會(huì)在線程池里選擇一個(gè)新的線程來(lái)處理,具體的處理方法也是由PartialFunction決定。不管是哪條路徑,react都會(huì)立即返回,或者說(shuō)是立即拋出異常,結(jié)束該線程的執(zhí)行,這樣該線程就可以被其它Actor使用。

再來(lái)看看接收消息的處理代碼。

def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {   if (waitingFor(msg)) {     received = Some(msg)      if (isSuspended)       sessions = replyTo :: sessions     else      sessions = List(replyTo)      waitingFor = waitingForNone      if (!onTimeout.isEmpty) {       onTimeout.get.cancel()       onTimeout = None     }      if (isSuspended)       resumeActor()     else // assert continuation != null       scheduler.execute(new Reaction(this, continuation, msg))   } else {     mailbox.append(msg, replyTo)   }

如果當(dāng)前沒(méi)有在等待消息或者接收到的消息不能處理,就丟到mailbox里去;相反,則進(jìn)行消息的處理。這里對(duì)于receive模型和react模型就有了分支:如果isSuspended為true,表示是receive模型,并且線程在wait,就調(diào)用resumeActor,該方法會(huì)調(diào)用notify;否則就是react模型,同樣在線程池里選擇一個(gè)線程進(jìn)行處理。

“Scala Actor多線程怎么理解”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI