溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Giraph源碼分析(三)—— 消息通信

發(fā)布時間:2020-07-19 16:24:07 來源:網(wǎng)絡(luò) 閱讀:267 作者:數(shù)瀾 欄目:大數(shù)據(jù)

由前文知道每個BSPServiceWorker有一個WorkerServer對象,WorkerServer對象里面又有ServerData對象,作為數(shù)據(jù)實。ServerData中包含該Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。其中incomingMessageStore對象為MessageStoreByPartition(接口)類型,也就是說消息時按照分區(qū)來存儲的。MessageStoreByPartition接口的關(guān)系圖如下:

Giraph源碼分析(三)—— 消息通信cdn.xitu.io/2019/7/25/16c27c8bc6000f2d?w=640&h=241&f=png&s=196139">

在SimpleMessageStore抽象類中,有一個ConcurrentMap<Integer,ConcurrentMap<I,T>>類型的變量map,用來存儲消息。第一層是pairtitionID到發(fā)送到該partition消息的映射;第二層是VertexID 到發(fā)送給該Vertex的消息隊列。

Giraph源碼分析(三)—— 消息通信

《Giraph通信模塊分析》:http://my.oschina.net/skyaugust/blog/95182

每個頂點的消息列表具體為ExtendedDataOutput類型,它繼承DataOutput接口,增加了幾個方法而已。每個消息是以字節(jié)形式寫入到ExtendedDataOutput對象中的。

發(fā)送消息時,采用異步式通信。

圖頂點的計算處理與消息通信并發(fā)執(zhí)行,在計算過程中就可以發(fā)送消息,將大規(guī)模消息發(fā)送分散在不同的時間段,避免瞬時網(wǎng)絡(luò)通信阻塞,但是接受端需要額外的空間,存儲臨時接收到的消息,相當于空間換時間。而集中式通信,圖頂點的計算處理與消息通信串行進行,在計算完畢后,統(tǒng)一發(fā)送消息,控制和實現(xiàn)方式簡單,可在發(fā)送端對消息進行最大程度優(yōu)化,但容易造成瞬時間的網(wǎng)絡(luò)通信阻塞以及增加發(fā)送端的消息存儲開銷。

不同Worker間的消息通信使用RPC方式,具體為Netty。同一Worker內(nèi),連續(xù)兩次迭代的消息直接通過內(nèi)存操作,把要發(fā)送的消息直接復制到Worker的incomingMessageStore中。下面詳述消息的存儲格式和發(fā)送機制。

Giraph使用Cache來緩存消息,當消息達到一定閾值后,一次性發(fā)送。

既按照bulk模式進行,不會一條一條信息發(fā)送。向某個頂點發(fā)送的消息是按照<destVertexId,Message> pair存儲在ByteArrayVertexIdData<I,T>中(實際為ByteArrayVertexIdMessages<I,M>類型)。介紹如下: org.apache.giraph.utils.ByteArrayVertexIdData<I,T>

功能:把<頂點ID,data> Pair 存儲在一個 byte數(shù)組中。里面有 ExtendedDataOutput對象用來存儲數(shù)據(jù)。

Giraph源碼分析(三)—— 消息通信
該類中還有一個內(nèi)部類:VertexIdDataIterator,該內(nèi)部類繼承 VertexIdIterator類。

Giraph源碼分析(三)—— 消息通信

org.apache.giraph.comm.SendCache用來緩存發(fā)送的信息,然后以“Bulk”模式發(fā)送。在Giraph中,每個Worker上可以對應(yīng)多個分區(qū)。消息緩存的閾值是以Worker為單位計算,而不是Partition。

Giraph源碼分析(三)—— 消息通信

SendCache中有ByteArrayVertexIdData<I,T>[ ] dataCache數(shù)組用來存儲發(fā)送給每個Partition的消息;有int[ ] dataSizes數(shù)組用于記錄向每個Worker發(fā)送的消息大小,若大于MAX_MSG_REQUEST_SIZE(默認為512KB)就把此Worker上的所有Partition緩存的消息發(fā)送到給該Worker,同一Worker內(nèi)消息也是如此緩存;有int[ ] initBufferSizes數(shù)組用于記錄每個Worker上的每個Partition的初始化ByteArrayVertexIdData中ExtendedDataOutput對象的大小,同一Worker上的所有Partition初始值相同,該值為平均值。記MAX_MSG_REQUEST_SIZE(message request size)值為M, 該Worker上有P個 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)默認為0.2f,記為A。則每個Partition的初始大小為:M*(1+A) / P .

由前文知道,每個Worker都有一個NettyWorkerClientRequestProcessor<I,V,E,M>用來發(fā)送消息。該類中有SendMessageCache對象用來緩存向外發(fā)送的信息。NettyWorkerClientRequestProcessor類中的sendMessageRequest(I,M)

方法如下,用于向某個頂點destVertexId發(fā)送消息message。

Giraph源碼分析(三)—— 消息通信

方法解釋:首先根據(jù)destVertexId得到對應(yīng)的partitionId和WorkerInfo,然后把消息add到SendMessageCache中,并返回向該頂點所屬Worker發(fā)送的消息大小workerMessageSize。若該值大于默認值512KB,則把此Worker對應(yīng)的所有Partition消息從SendMessageCache中刪除,把刪除的消息賦值給workerMessages,其類型為PairList<Integer,ByteArrayVertexIdMessages<I,M>> ,key為partitionId,value為發(fā)送給該partition的消息列表,最后調(diào)用doRequest()方法發(fā)送信息。doRequest()方法如下:

Giraph源碼分析(三)—— 消息通信

可以看到在發(fā)送消息時,先判斷是否在同一Worker上。如果是的話,調(diào)用SendWorkerMessagesRequest<T,M>的doRequest發(fā)送消息;否則使用WorkerClient(底層使用Netty)進行消息發(fā)送。下面著重討論同一Worker內(nèi)的機制。

org.apache.giraph.comm.requests.SendWorkerMessagesRequest類中的doRequest方法如下:

Giraph源碼分析(三)—— 消息通信

參數(shù)為該Worker的ServerData,代碼中的partitionVertexData實際為PairList<Integer,ByteArrayVertexIdMessages<I,M>>workerMessages。遍歷<partitionID,對應(yīng)的消息列表>來添加到ServerData中的incomingMessageStore中。

ByteArrayMessagesPerVertexStore類中的addPartitionMessages()方法如下:

Giraph源碼分析(三)—— 消息通信

當用戶使用了Combiner,incomingMessageStore對應(yīng)的類型則為OneMessagePerVertexStore,該類為每個頂點只存儲一個消息,而非消息隊列。 結(jié)構(gòu)如下圖:

Giraph源碼分析(三)—— 消息通信

當添加一條消息時,會把頂點已對應(yīng)的消息和要添加的消息調(diào)用combine()方法進行合并,然后存儲在上述結(jié)構(gòu)圖中。addPartitionMessages()方法如下:

Giraph源碼分析(三)—— 消息通信

在ComputeCallable中的call()方法調(diào)用computePartition(Partition)計算完所有Partition上的頂點后,調(diào)用WorkerClientRequestProcessor.flush()方法把所有剩余的消息發(fā)送出去。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI