溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎樣建立連接Zookeeper中的服務(wù)端

發(fā)布時(shí)間:2021-09-13 11:55:30 來(lái)源:億速云 閱讀:203 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹Zookeeper之怎樣建立連接服務(wù)端,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

服務(wù)端處理請(qǐng)求的代碼有兩種NIOServerCnxnFactory和NettyServerCnxnFactory,默認(rèn)是NIOServerCnxnFactory,可以通過(guò)指定zookeeper.serverCnxnFactory參數(shù)來(lái)修改。

這兩個(gè)類邏輯是一樣的,只是一個(gè)用的java原生的NIO,一個(gè)用的netty,這里我們就分析下NIOServerCnxnFactory。

NIOServerCnxnFactory實(shí)現(xiàn)了Runnable接口,看下它的run方法,循環(huán)處理請(qǐng)求

//NIOServerCnxnFactory.java
//第200行
public void run() {
    while (!ss.socket().isClosed()) {
        try {
            selector.select(1000);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                selected);
            Collections.shuffle(selectedList);
            for (SelectionKey k : selectedList) {
                //如果是連接請(qǐng)求
                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                    SocketChannel sc = ((ServerSocketChannel) k
                                        .channel()).accept();
                    InetAddress ia = sc.socket().getInetAddress();
                    //獲取IP地址對(duì)應(yīng)的客戶端連接數(shù)
                    int cnxncount = getClientCnxnCount(ia);
                    //如果超出則關(guān)閉
                    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                        LOG.warn("Too many connections from " + ia
                                 + " - max is " + maxClientCnxns );
                        sc.close();
                    } else {
                        LOG.info("Accepted socket connection from "
                                 + sc.socket().getRemoteSocketAddress());
                        sc.configureBlocking(false);
                        SelectionKey sk = sc.register(selector,
                                                      SelectionKey.OP_READ);
                        //每一個(gè)連接都是一個(gè)NIOServerCnxn
                        NIOServerCnxn cnxn = createConnection(sc, sk);
                        sk.attach(cnxn);
                        addCnxn(cnxn);
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    //在第二個(gè)循環(huán)的時(shí)候,會(huì)進(jìn)入這里,處理真正的連接請(qǐng)求
                    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                    c.doIO(k);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Unexpected ops in select "
                                  + k.readyOps());
                    }
                }
            }
            selected.clear();
        } catch (RuntimeException e) {
            LOG.warn("Ignoring unexpected runtime exception", e);
        } catch (Exception e) {
            LOG.warn("Ignoring exception", e);
        }
    }
    closeAll();
    LOG.info("NIOServerCnxn factory exited run method");
}

//NIOServerCnxn.java
//第237行
void doIO(SelectionKey k) throws InterruptedException {
    try {
        if (isSocketOpen() == false) {
            LOG.warn("trying to do i/o on a null socket for session:0x"
                     + Long.toHexString(sessionId));

            return;
        }
        if (k.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                    "Unable to read additional data from client sessionid 0x"
                    + Long.toHexString(sessionId)
                    + ", likely client has closed socket");
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) { // start of next request
                    incomingBuffer.flip();
                    isPayload = readLength(k);
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
                if (isPayload) { // not the case for 4letterword
                    readPayload();
                }
                else {
                    // four letter words take care
                    // need not do anything else
                    return;
                }
            }
        }
        //省略部分代碼
    } catch (CancelledKeyException e) {
        
    } catch (CloseRequestException e) {
        
    } catch (EndOfStreamException e) {
        
    } catch (IOException e) {
        
    }
}

//NIOServerCnxn.java
//第194行
private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            throw new EndOfStreamException(
                "Unable to read additional data from client sessionid 0x"
                + Long.toHexString(sessionId)
                + ", likely client has closed socket");
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        if (!initialized) {
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

//NIOServerCnxn.java
//第434行
private void readConnectRequest() throws IOException, InterruptedException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    initialized = true;
}

//ZookeeperServer.java
//第886行
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();
    connReq.deserialize(bia, "connect");
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request from client "
                  + cnxn.getRemoteSocketAddress()
                  + " client's lastZxid is 0x"
                  + Long.toHexString(connReq.getLastZxidSeen()));
    }
    boolean readOnly = false;
    try {
        readOnly = bia.readBool("readOnly");
        cnxn.isOldClient = false;
    } catch (IOException e) {
        // this is ok -- just a packet from an old client which
        // doesn't contain readOnly field
        LOG.warn("Connection request from old client "
                 + cnxn.getRemoteSocketAddress()
                 + "; will be dropped if server is in r-o mode");
    }
    //如果客戶端沒(méi)有設(shè)置readOnly,但是服務(wù)端是只讀的,直接拋出異常關(guān)閉連接
    if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client "
            + cnxn.getRemoteSocketAddress();
        LOG.info(msg);
        throw new CloseRequestException(msg);
    }
    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
            + cnxn.getRemoteSocketAddress()
            + " as it has seen zxid 0x"
            + Long.toHexString(connReq.getLastZxidSeen())
            + " our last zxid is 0x"
            + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
            + " client must try another server";

        LOG.info(msg);
        throw new CloseRequestException(msg);
    }
    //協(xié)商session超時(shí)時(shí)間
    int sessionTimeout = connReq.getTimeOut();
    byte passwd[] = connReq.getPasswd();
    int minSessionTimeout = getMinSessionTimeout();
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout();
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout);
    // We don't want to receive any packets until we are sure that the
    // session is setup
    cnxn.disableRecv();
    long sessionId = connReq.getSessionId();
    if (sessionId != 0) {
        //如果sessionId不是0,說(shuō)明是之前已經(jīng)連接過(guò)的客戶端因?yàn)榈艟€等原因重新連接的情況
        long clientSessionId = connReq.getSessionId();
        LOG.info("Client attempting to renew session 0x"
                 + Long.toHexString(clientSessionId)
                 + " at " + cnxn.getRemoteSocketAddress());
        serverCnxnFactory.closeSession(sessionId);
        cnxn.setSessionId(sessionId);
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
    } else {
        LOG.info("Client attempting to establish new session at "
                 + cnxn.getRemoteSocketAddress());
        createSession(cnxn, passwd, sessionTimeout);
    }
}

//ZookeeperServer.java
//第617行
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    //創(chuàng)建一個(gè)session,zookeeper的session管理比較復(fù)雜,具體情況下一章分析
    long sessionId = sessionTracker.createSession(timeout);
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);
    //響應(yīng)客戶端
    submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
    return sessionId;
}

//ZookeeperServer.java
//第728行
public void submitRequest(Request si) {
    //省略部分代碼
    
    try {
        //刷新session的超時(shí)時(shí)間
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            //提交給PrepRequestProcessor進(jìn)一步處理
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

//PrepRequestProcessor.java
//第294行
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
            throws KeeperException, IOException, RequestProcessorException {
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                Time.currentWallTime(), type);

    switch (type) {
        //省略部分代碼
        
        case OpCode.createSession:
            request.request.rewind();
            int to = request.request.getInt();
            request.txn = new CreateSessionTxn(to);
            request.request.rewind();
            //這里又調(diào)用了一次addSession,但是之前的代碼其實(shí)已經(jīng)新增過(guò)了,不太明白為什么
            zks.sessionTracker.addSession(request.sessionId, to);
            zks.setOwner(request.sessionId, request.getOwner());
            break;
            
        //省略部分代碼
            
        default:
            LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);
    }
}

關(guān)于Zookeeper之怎樣建立連接服務(wù)端就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI