您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)如何理解zk-client通信層ClientCnxnSocket,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
ClientCnxnSocket抽象類結(jié)構(gòu) 定義了底層Socket通信接口,默認(rèn)實(shí)現(xiàn)是ClientCnxnSocketNIO
readConnectResult 讀取server的connnect的response
readLength 方法 讀取buffer長度并給incomingBuffer分配對應(yīng)大小空間
ClientCnxnSocketNIO 實(shí)現(xiàn)
findSendablePacket函數(shù) 從outgoingQueue中讀取發(fā)送的packet
doIO函數(shù) 處理讀寫
doTransport函數(shù)
如果連接就緒,調(diào)用sendThread連接操作
若讀寫就緒,調(diào)用doIO函數(shù)
ClientCnxnSocket
屬性
ClientCnxnSocketNIO子類
屬性
//nio中的selector private final Selector selector = Selector.open(); /** * nio中的selectionKey */ private SelectionKey sockKey; private SocketAddress localSocketAddress; private SocketAddress remoteSocketAddress; 方法 @Override void connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } //已經(jīng)連接,但是沒有收到resposne initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; } client和server主要交互函數(shù) @Override void doTransport( int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } } selected.clear(); }
主要分為讀寫兩種 讀:沒有初始化完成初始化 讀取len再改incomingbuffer分配對應(yīng)空間 讀取對應(yīng)response 寫: 找到可以發(fā)送的packet 如果packet的bytebuffer沒有創(chuàng)建,那就進(jìn)行屬性添加 bytebuffer寫入socketChannel 把Packet從outgingQueue中取出,放入pendingQueue中 /** * @throws InterruptedException * @throws IOException */ void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } if (sockKey.isWritable()) { Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } } } 查詢待發(fā)送隊(duì)列中可以發(fā)送的packet private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) { //沒有要發(fā)送的,返回null if (outgoingQueue.isEmpty()) { return null; } // If we've already starting sending the first packet, we better finish if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) { //取隊(duì)列第一個進(jìn)行發(fā)送 return outgoingQueue.getFirst(); } // Since client's authentication with server is in progress, // send only the null-header packet queued by primeConnection(). // This packet must be sent so that the SASL authentication process // can proceed, but all other packets should wait until // SASL authentication completes. //有正在認(rèn)證處理,發(fā)送空請求頭包給服務(wù)端 Iterator<Packet> iter = outgoingQueue.iterator(); while (iter.hasNext()) { Packet p = iter.next(); if (p.requestHeader == null) { // We've found the priming-packet. Move it to the beginning of the queue. iter.remove(); outgoingQueue.addFirst(p); return p; } else { // Non-priming packet: defer it until later, leaving it in the queue // until authentication completes. LOG.debug("deferring non-priming packet {} until SASL authentation completes.", p); } } return null; }
發(fā)送接收packet圖
關(guān)于如何理解zk-client通信層ClientCnxnSocket就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。