溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Kafka Network層解析,還是有人把它說清楚了

發(fā)布時間:2020-08-13 00:33:41 來源:網(wǎng)絡(luò) 閱讀:480 作者:Java筆記丶 欄目:編程語言

我們知道kafka是基于TCP連接的。其并沒有像很多中間件使用netty作為TCP服務(wù)器。而是自己基于Java NIO寫了一套。

幾個重要類

先看下Kafka Client的網(wǎng)絡(luò)層架構(gòu)。


Kafka Network層解析,還是有人把它說清楚了


本文主要分析的是Network層。

Network層有兩個重要的類:SelectorKafkaChannel。

這兩個類和Java NIO層的java.nio.channels.SelectorChannel有點(diǎn)類似。

Selector幾個關(guān)鍵字段如下

//?jdk?nio中的Selector
java.nio.channels.Selector?nioSelector;
//?記錄當(dāng)前Selector的所有連接信息
Map<String,?KafkaChannel>?channels;
//?已發(fā)送完成的請求
List<Send>?completedSends;
//?已收到的請求
List<NetworkReceive>?completedReceives;
//?還沒有完全收到的請求,對上層不可見
Map<KafkaChannel,?Deque<NetworkReceive>>?stagedReceives;
//?作為client端,調(diào)用connect連接遠(yuǎn)端時返回true的連接
Set<SelectionKey>?immediatelyConnectedKeys;
//?已經(jīng)完成的連接
List<String>?connected;
//?一次讀取的最大大小
int?maxReceiveSize;

從網(wǎng)絡(luò)層來看kafka是分為client端(producer和consumer,broker作為從時也是client)和server端(broker)的。本文將分析client端是如何建立連接,以及收發(fā)數(shù)據(jù)的。server也是依靠SelectorKafkaChannel進(jìn)行網(wǎng)絡(luò)傳輸。在Network層兩端的區(qū)別并不大。

建立連接

kafka的client端啟動時會調(diào)用Selector#connect(下文中如無特殊注明,均指org.apache.kafka.common.network.Selector)方法建立連接。

public?void?connect(String?id,?InetSocketAddress?address,?int?sendBufferSize,?int?receiveBufferSize)?throws?IOException?{
????if?(this.channels.containsKey(id))
????????throw?new?IllegalStateException("There?is?already?a?connection?for?id?"?+?id);
????//?創(chuàng)建一個SocketChannel
????SocketChannel?socketChannel?=?SocketChannel.open();
????//?設(shè)置為非阻塞模式
????socketChannel.configureBlocking(false);
????//?創(chuàng)建socket并設(shè)置相關(guān)屬性
????Socket?socket?=?socketChannel.socket();
????socket.setKeepAlive(true);
????if?(sendBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE)
????????socket.setSendBufferSize(sendBufferSize);
????if?(receiveBufferSize?!=?Selectable.USE_DEFAULT_BUFFER_SIZE)
????????socket.setReceiveBufferSize(receiveBufferSize);
????socket.setTcpNoDelay(true);
????boolean?connected;
????try?{
????????//?調(diào)用SocketChannel的connect方法,該方法會向遠(yuǎn)端發(fā)起tcp建連請求
????????//?因?yàn)槭欠亲枞?,所以該方法返回時,連接不一定已經(jīng)建立好(即完成3次握手)。連接如果已經(jīng)建立好則返回true,否則返回false。一般來說server和client在一臺機(jī)器上,該方法可能返回true。
????????connected?=?socketChannel.connect(address);
????}?catch?(UnresolvedAddressException?e)?{
????????socketChannel.close();
????????throw?new?IOException("Can't?resolve?address:?"?+?address,?e);
????}?catch?(IOException?e)?{
????????socketChannel.close();
????????throw?e;
????}
????//?對CONNECT事件進(jìn)行注冊
????SelectionKey?key?=?socketChannel.register(nioSelector,?SelectionKey.OP_CONNECT);
????KafkaChannel?channel;
????try?{
????????//?構(gòu)造一個KafkaChannel
????????channel?=?channelBuilder.buildChannel(id,?key,?maxReceiveSize);
????}?catch?(Exception?e)?{
??????...
????}
????//?將kafkachannel綁定到SelectionKey上
????key.attach(channel);
????//?放入到map中,id是遠(yuǎn)端服務(wù)器的名稱
????this.channels.put(id,?channel);
????//?connectct為true代表該連接不會再觸發(fā)CONNECT事件,所以這里要單獨(dú)處理
????if?(connected)?{
????????//?OP_CONNECT?won't?trigger?for?immediately?connected?channels
????????log.debug("Immediately?connected?to?node?{}",?channel.id());
????????//?加入到一個單獨(dú)的集合中
????????immediatelyConnectedKeys.add(key);
????????//?取消對該連接的CONNECT事件的監(jiān)聽
????????key.interestOps(0);
????}
}

這里的流程和標(biāo)準(zhǔn)的NIO流程差不多,需要單獨(dú)說下的是socketChannel#connect方法返回true的場景,該方法的注釋中有提到

*?<p>?If?this?channel?is?in?non-blocking?mode?then?an?invocation?of?this
*?method?initiates?a?non-blocking?connection?operation.??If?the?connection
*?is?established?immediately,?as?can?happen?with?a?local?connection,?then
*?this?method?returns?<tt>true</tt>.??Otherwise?this?method?returns
*?<tt>false</tt>?and?the?connection?operation?must?later?be?completed?by
*?invoking?the?{@link?#finishConnect?finishConnect}?method.

也就是說在非阻塞模式下,對于local connection,連接可能在馬上就建立好了,那該方法會返回true,對于這種情況,不會再觸發(fā)之后的connect事件。因此kafka用一個單獨(dú)的集合immediatelyConnectedKeys將這些特殊的連接記錄下來。在接下來的步驟會進(jìn)行特殊處理。

之后會調(diào)用poll方法對網(wǎng)絡(luò)事件監(jiān)聽:

public?void?poll(long?timeout)?throws?IOException?{
...
//?select方法是對java.nio.channels.Selector#select的一個簡單封裝
int?readyKeys?=?select(timeout);
...
//?如果有就緒的事件或者immediatelyConnectedKeys非空
if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{
????//?對已就緒的事件進(jìn)行處理,第2個參數(shù)為false
????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect);
????//?對immediatelyConnectedKeys進(jìn)行處理。第2個參數(shù)為true
????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect);
}

addToCompletedReceives();

...
}

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當(dāng)前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創(chuàng)建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????//?如果當(dāng)前處理的是immediatelyConnectedKeys集合的元素或處理的是CONNECT事件
????????if?(isImmediatelyConnected?||?key.isConnectable())?{
????????????//?finishconnect中會增加READ事件的監(jiān)聽
????????????if?(channel.finishConnect())?{
????????????????this.connected.add(channel.id());
????????????????this.sensors.connectionCreated.record();
????????????????...
????????????}?else
????????????????continue;
????????}

????????//?對于ssl的連接還有些額外的步驟
????????if?(channel.isConnected()?&&?!channel.ready())
????????????channel.prepare();

????????//?如果是READ事件
????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{
????????????NetworkReceive?networkReceive;
????????????while?((networkReceive?=?channel.read())?!=?null)
????????????????addToStagedReceives(channel,?networkReceive);
????????}

????????//?如果是WRITE事件
????????if?(channel.ready()?&&?key.isWritable())?{
????????????Send?send?=?channel.write();
????????????if?(send?!=?null)?{
????????????????this.completedSends.add(send);
????????????????this.sensors.recordBytesSent(channel.id(),?send.size());
????????????}
????????}

????????//?如果連接失效
????????if?(!key.isValid())
????????????close(channel,?true);

????}?catch?(Exception?e)?{
????????String?desc?=?channel.socketDescription();
????????if?(e?instanceof?IOException)
????????????log.debug("Connection?with?{}?disconnected",?desc,?e);
????????else
????????????log.warn("Unexpected?error?from?{};?closing?connection",?desc,?e);
????????close(channel,?true);
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

因?yàn)?code >immediatelyConnectedKeys中的連接不會觸發(fā)CONNNECT事件,所以在poll時會單獨(dú)對immediatelyConnectedKeys的channel調(diào)用finishConnect方法。在明文傳輸模式下該方法會調(diào)用到PlaintextTransportLayer#finishConnect,其實(shí)現(xiàn)如下:

public?boolean?finishConnect()?throws?IOException?{
????//?返回true代表已經(jīng)連接好了
????boolean?connected?=?socketChannel.finishConnect();
????if?(connected)
????????//?取消監(jiān)聽CONNECt事件,增加READ事件的監(jiān)聽
????????key.interestOps(key.interestOps()?&?~SelectionKey.OP_CONNECT?|?SelectionKey.OP_READ);
????return?connected;
}

關(guān)于immediatelyConnectedKeys更詳細(xì)的內(nèi)容可以看看這里。

發(fā)送數(shù)據(jù)

kafka發(fā)送數(shù)據(jù)分為兩個步驟:

1.調(diào)用Selector#send將要發(fā)送的數(shù)據(jù)保存在對應(yīng)的KafkaChannel中,該方法并沒有進(jìn)行真正的網(wǎng)絡(luò)IO。

//?Selector#send
public?void?send(Send?send)?{
????String?connectionId?=?send.destination();
????//?如果所在的連接正在關(guān)閉中,則加入到失敗集合failedSends中
????if?(closingChannels.containsKey(connectionId))
????????this.failedSends.add(connectionId);
????else?{
????????KafkaChannel?channel?=?channelOrFail(connectionId,?false);
????????try?{
????????????channel.setSend(send);
????????}?catch?(CancelledKeyException?e)?{
????????????this.failedSends.add(connectionId);
????????????close(channel,?false);
????????}
????}
}

//KafkaChannel#setSend
public?void?setSend(Send?send)?{
????//?如果還有數(shù)據(jù)沒有發(fā)送出去則報錯
????if?(this.send?!=?null)
????????throw?new?IllegalStateException("Attempt?to?begin?a?send?operation?with?prior?send?operation?still?in?progress.");
????//?保存下來
????this.send?=?send;
????//?添加對WRITE事件的監(jiān)聽
????this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
  1. 調(diào)用Selector#poll,在第一步中已經(jīng)對該channel注冊了WRITE事件的監(jiān)聽,所以在當(dāng)channel可寫時,會調(diào)用到pollSelectionKeys將數(shù)據(jù)真正的發(fā)送出去。

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當(dāng)前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創(chuàng)建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????...
?

????????//?如果是WRITE事件
????????if?(channel.ready()?&&?key.isWritable())?{
????????????//?真正的網(wǎng)絡(luò)寫
????????????Send?send?=?channel.write();
????????????//?一個Send對象可能會被拆成幾次發(fā)送,write非空代表一個send發(fā)送完成
????????????if?(send?!=?null)?{
????????????????//?completedSends代表已發(fā)送完成的集合
????????????????this.completedSends.add(send);
????????????????this.sensors.recordBytesSent(channel.id(),?send.size());
????????????}
????????}
		...
????}?catch?(Exception?e)?{
?????...
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

當(dāng)可寫時,會調(diào)用KafkaChannel#write方法,該方法中會進(jìn)行真正的網(wǎng)絡(luò)IO:

public?Send?write()?throws?IOException?{
????Send?result?=?null;
????if?(send?!=?null?&&?send(send))?{
????????result?=?send;
????????send?=?null;
????}
????return?result;
}
private?boolean?send(Send?send)?throws?IOException?{
????//?最終調(diào)用SocketChannel#write進(jìn)行真正的寫
????send.writeTo(transportLayer);
????if?(send.completed())
????????//?如果寫完了,則移除對WRITE事件的監(jiān)聽
????????transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

????return?send.completed();
}

接收數(shù)據(jù)

如果遠(yuǎn)端有發(fā)送數(shù)據(jù)過來,那調(diào)用poll方法時,會對接收到的數(shù)據(jù)進(jìn)行處理。

public?void?poll(long?timeout)?throws?IOException?{
...
//?select方法是對java.nio.channels.Selector#select的一個簡單封裝
int?readyKeys?=?select(timeout);
...
//?如果有就緒的事件或者immediatelyConnectedKeys非空
if?(readyKeys?>?0?||?!immediatelyConnectedKeys.isEmpty())?{
????//?對已就緒的事件進(jìn)行處理,第2個參數(shù)為false
????pollSelectionKeys(this.nioSelector.selectedKeys(),?false,?endSelect);
????//?對immediatelyConnectedKeys進(jìn)行處理。第2個參數(shù)為true
????pollSelectionKeys(immediatelyConnectedKeys,?true,?endSelect);
}

addToCompletedReceives();

...
}

private?void?pollSelectionKeys(Iterable<SelectionKey>?selectionKeys,
???????????????????????????boolean?isImmediatelyConnected,
???????????????????????????long?currentTimeNanos)?{
Iterator<SelectionKey>?iterator?=?selectionKeys.iterator();
//?遍歷集合
while?(iterator.hasNext())?{
????SelectionKey?key?=?iterator.next();
????//?移除當(dāng)前元素,要不然下次poll又會處理一遍
????iterator.remove();
????//?得到connect時創(chuàng)建的KafkaChannel
????KafkaChannel?channel?=?channel(key);
???...

????try?{
????????...
?

????????//?如果是READ事件
????????if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{
????????????NetworkReceive?networkReceive;
????????????//?read方法會從網(wǎng)絡(luò)中讀取數(shù)據(jù),但可能一次只能讀取一個req的部分?jǐn)?shù)據(jù)。只有讀到一個完整的req的情況下,該方法才返回非null
????????????while?((networkReceive?=?channel.read())?!=?null)
????????????????//?將讀到的請求存在stagedReceives中
????????????????addToStagedReceives(channel,?networkReceive);
????????}
		...
????}?catch?(Exception?e)?{
?????...
????}?finally?{
????????maybeRecordTimePerConnection(channel,?channelStartTimeNanos);
????}
}
}

private?void?addToStagedReceives(KafkaChannel?channel,?NetworkReceive?receive)?{
????if?(!stagedReceives.containsKey(channel))
????????stagedReceives.put(channel,?new?ArrayDeque<NetworkReceive>());

????Deque<NetworkReceive>?deque?=?stagedReceives.get(channel);
????deque.add(receive);
}

在之后的addToCompletedReceives方法中會對該集合進(jìn)行處理。

private?void?addToCompletedReceives()?{
????if?(!this.stagedReceives.isEmpty())?{
????????Iterator<Map.Entry<KafkaChannel,?Deque<NetworkReceive>>>?iter?=?this.stagedReceives.entrySet().iterator();
????????while?(iter.hasNext())?{
????????????Map.Entry<KafkaChannel,?Deque<NetworkReceive>>?entry?=?iter.next();
????????????KafkaChannel?channel?=?entry.getKey();
????????????//?對于client端來說該isMute返回為false,server端則依靠該方法保證消息的順序
????????????if?(!channel.isMute())?{
????????????????Deque<NetworkReceive>?deque?=?entry.getValue();
????????????????addToCompletedReceives(channel,?deque);
????????????????if?(deque.isEmpty())
????????????????????iter.remove();
????????????}
????????}
????}
}
private?void?addToCompletedReceives(KafkaChannel?channel,?Deque<NetworkReceive>?stagedDeque)?{
????//?將每個channel的第一個NetworkReceive加入到completedReceives
????NetworkReceive?networkReceive?=?stagedDeque.poll();
????this.completedReceives.add(networkReceive);
????this.sensors.recordBytesReceived(channel.id(),?networkReceive.payload().limit());
}

讀出數(shù)據(jù)后,會先放到stagedReceives集合中,然后在addToCompletedReceives方法中對于每個channel都會從stagedReceives取出一個NetworkReceive(如果有的話),放入到completedReceives中。

這樣做的原因有兩點(diǎn):

  1. 對于SSL的連接來說,其數(shù)據(jù)內(nèi)容是加密的,所以不能精準(zhǔn)的確定本次需要讀取的數(shù)據(jù)大小,只能盡可能的多讀,這樣會導(dǎo)致可能會比請求的數(shù)據(jù)讀的要多。那如果該channel之后沒有數(shù)據(jù)可以讀,會導(dǎo)致多讀的數(shù)據(jù)將不會被處理。

  2. kafka需要確保一個channel上request被處理的順序是其發(fā)送的順序。因此對于每個channel而言,每次poll上層最多只能看見一個請求,當(dāng)該請求處理完成之后,再處理其他的請求。在sever端,每次poll后都會將該channel給mute掉,即不再從該channel上讀取數(shù)據(jù)。當(dāng)處理完成之后,才將該channelunmute,即之后可以從該socket上讀取數(shù)據(jù)。而client端則是通過InFlightRequests#canSendMore控制。

代碼中關(guān)于這段邏輯的注釋如下:

/*?In?the?"Plaintext"?setting,?we?are?using?socketChannel?to?read?&?write?to?the?network.?But?for?the?"SSL"?setting,
*?we?encrypt?the?data?before?we?use?socketChannel?to?write?data?to?the?network,?and?decrypt?before?we?return?the?responses.
*?This?requires?additional?buffers?to?be?maintained?as?we?are?reading?from?network,?since?the?data?on?the?wire?is?encrypted
*?we?won't?be?able?to?read?exact?no.of?bytes?as?kafka?protocol?requires.?We?read?as?many?bytes?as?we?can,?up?to?SSLEngine's
*?application?buffer?size.?This?means?we?might?be?reading?additional?bytes?than?the?requested?size.
*?If?there?is?no?further?data?to?read?from?socketChannel?selector?won't?invoke?that?channel?and?we've?have?additional?bytes
*?in?the?buffer.?To?overcome?this?issue?we?added?"stagedReceives"?map?which?contains?per-channel?deque.?When?we?are
*?reading?a?channel?we?read?as?many?responses?as?we?can?and?store?them?into?"stagedReceives"?and?pop?one?response?during
*?the?poll?to?add?the?completedReceives.?If?there?are?any?active?channels?in?the?"stagedReceives"?we?set?"timeout"?to?0
*?and?pop?response?and?add?to?the?completedReceives.

*?Atmost?one?entry?is?added?to?"completedReceives"?for?a?channel?in?each?poll.?This?is?necessary?to?guarantee?that
?????*?requests?from?a?channel?are?processed?on?the?broker?in?the?order?they?are?sent.?Since?outstanding?requests?added
?????*?by?SocketServer?to?the?request?queue?may?be?processed?by?different?request?handler?threads,?requests?on?each
?????*?channel?must?be?processed?one-at-a-time?to?guarantee?ordering.
*/

End

本文分析了kafka network層的實(shí)現(xiàn),在閱讀kafka源碼時,如果不把network層搞清楚會比較迷,比如req/resp的順序保障機(jī)制、真正進(jìn)行網(wǎng)絡(luò)IO的不是send方法等等。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI