溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Zookeeper源碼中session管理的示例分析

發(fā)布時(shí)間:2021-09-10 17:44:10 來源:億速云 閱讀:110 作者:柒染 欄目:大數(shù)據(jù)

Zookeeper源碼中session管理的示例分析,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

一、session的創(chuàng)建

//ZookeeperServer.java
//第617行
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    long sessionId = sessionTracker.createSession(timeout);
    
    //省略部分代碼
}

//SessionTrackerImpl.java
//第236行
synchronized public long createSession(int sessionTimeout) {
    addSession(nextSessionId, sessionTimeout);
    return nextSessionId++;
}

//SessionTrackerImpl.java
//第241行
synchronized public void addSession(long id, int sessionTimeout) {
    //保存sessionId和過期時(shí)間的關(guān)系
    sessionsWithTimeout.put(id, sessionTimeout);
	//如果session不存在,就新建一個(gè)
    if (sessionsById.get(id) == null) {
        SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
        sessionsById.put(id, s);
        //省略日志打印
    } else {
        //省略日志打印
    }
    //將session按照一定規(guī)則聚合
    touchSession(id, sessionTimeout);
}

//SessionTrackerImpl.java
//第166行
synchronized public boolean touchSession(long sessionId, int timeout) {
    if (LOG.isTraceEnabled()) {
        //省略日志打印
    }
    SessionImpl s = sessionsById.get(sessionId);
    // Return false, if the session doesn't exists or marked as closing
    if (s == null || s.isClosing()) {
        return false;
    }
    long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
    //如果當(dāng)前session的過期時(shí)間大于這個(gè)值,不需要操作
    if (s.tickTime >= expireTime) {
        // Nothing needs to be done
        return true;
    }
    //將session從舊的桶中移出,并放入(剩余超時(shí)時(shí)間更長的)新的桶
    SessionSet set = sessionSets.get(s.tickTime);
    if (set != null) {
        set.sessions.remove(s);
    }
    s.tickTime = expireTime;
    set = sessionSets.get(s.tickTime);
    if (set == null) {
        set = new SessionSet();
        sessionSets.put(expireTime, set);
    }
    set.sessions.add(s);
    return true;
}

//SessionTrackerImpl.java
//第89行
private long roundToInterval(long time) {
    //expirationInterval就是zookeeper的心跳周期(tickTime),默認(rèn)值是3000
    //這段計(jì)算的意思是將過期時(shí)間每3000ms分一個(gè)段
    //比如200ms、500ms、3000ms返回0,3001ms、5000ms返回3000
    //由于這里的time是加了Time.currentElapsedTime()的,所以不會(huì)出現(xiàn)0的情況
    return (time / expirationInterval + 1) * expirationInterval;
}

二、session激活

//ZookeeperServer.java
//第728行
public void submitRequest(Request si) {
    //省略部分代碼
    
    try {
        touch(si.cnxn);
        
        //省略部分代碼
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

//ZookeeperServer.java
//第368行
void touch(ServerCnxn cnxn) throws MissingSessionException {
    //省略部分代碼
    if (!sessionTracker.touchSession(id, to)) {
        throw new MissingSessionException(
            "No session with sessionid 0x" + Long.toHexString(id)
            + " exists, probably expired and removed");
    }
}

zookeeper服務(wù)端響應(yīng)客戶端的請(qǐng)求時(shí),都會(huì)調(diào)用submitRequest方法,最終會(huì)調(diào)用到touchSession方法,這里會(huì)將session移動(dòng)到新的桶中

三、session的過期

//SessionTrackerImpl.java
//第142行
synchronized public void run() {
    try {
        while (running) {
            currentTime = Time.currentElapsedTime();
            //在SessionTrackerImpl初始化的時(shí)候,會(huì)給nextExpirationTime賦一個(gè)初值
            //nextExpirationTime = roundToInterval(Time.currentElapsedTime());
            if (nextExpirationTime > currentTime) {
                this.wait(nextExpirationTime - currentTime);
                continue;
            }
            SessionSet set;
            //如果到達(dá)了過期時(shí)間,則移除對(duì)應(yīng)桶中的所有session
            set = sessionSets.remove(nextExpirationTime);
            if (set != null) {
                for (SessionImpl s : set.sessions) {
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
            nextExpirationTime += expirationInterval;
        }
    } catch (InterruptedException e) {
        handleException(this.getName(), e);
    }
    LOG.info("SessionTrackerImpl exited loop!");
}

//ZookeeperServer.java
//第353行
public void expire(Session session) {
    long sessionId = session.getSessionId();
    LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
             + ", timeout of " + session.getTimeout() + "ms exceeded");
    close(sessionId);
}

//ZookeeperServer.java
//第329行
private void close(long sessionId) {
    submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}

//PrepRequestProcessor.java
//第294行
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
            throws KeeperException, IOException, RequestProcessorException {
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                Time.currentWallTime(), type);

    switch (type) {
        //省略代碼
            
        case OpCode.closeSession:
            // We don't want to do this check since the session expiration thread
            // queues up this operation without being the session owner.
            // this request is the last of the session so it should be ok
            //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            HashSet<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
            //刪除session關(guān)聯(lián)的所有臨時(shí)節(jié)點(diǎn)
            synchronized (zks.outstandingChanges) {
                //zookeeper的大部分操作都會(huì)記錄并放入列表
                for (ChangeRecord c : zks.outstandingChanges) {
                    //c.stat == null表示這是刪除操作
                    if (c.stat == null) {
                        es.remove(c.path);
                    } else if (c.stat.getEphemeralOwner() == request.sessionId) {
                        es.add(c.path);
                    }
                }
                for (String path3Delete : es) {
                    addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                                     path3Delete, null, 0, null));
                }

                zks.sessionTracker.setSessionClosing(request.sessionId);
            }

            LOG.info("Processed session termination for sessionid: 0x"
                     + Long.toHexString(request.sessionId));
            break;
        
         //省略代碼
    }
}

看完上述內(nèi)容,你們掌握Zookeeper源碼中session管理的示例分析的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI