溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

zookeeper(11)源碼分析-請(qǐng)求處理鏈(1)

發(fā)布時(shí)間:2020-08-06 21:45:14 來(lái)源:網(wǎng)絡(luò) 閱讀:239 作者:shayang88 欄目:編程語(yǔ)言

對(duì)于請(qǐng)求處理鏈而言,所有請(qǐng)求處理器的父接口為RequestProcessor。

RequestProcessor內(nèi)部類RequestProcessorException,用來(lái)表示處理過程中的出現(xiàn)的異常,而proceequest和shutdown方法則是核心方法,是子類必須要實(shí)現(xiàn)的方法,處理的主要邏輯在proceequest中,通過proce***equest方法可以將請(qǐng)求傳遞到下個(gè)處理器。而shutdown表示關(guān)閉處理器,其意味著該處理器要關(guān)閉和其他處理器的連接。

public interface RequestProcessor {
    @SuppressWarnings("serial")
    public static class RequestProcessorException extends Exception {
        public RequestProcessorException(String msg, Throwable t) {
            super(msg, t);
        }
    }

    void proce***equest(Request request) throws RequestProcessorException;

    void shutdown();
}

實(shí)現(xiàn)RequestProcessor的processor有很多,PrepRequestProcessor,通常是請(qǐng)求處理鏈的第一個(gè)處理器。

PrepRequestProcessor

1、類的定義

public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {}

PrepRequestProcessor繼承了ZooKeeperCriticalThread類并實(shí)現(xiàn)了RequestProcessor接口,表示其可以作為線程使用。

2、類核心成員

//已提交的請(qǐng)求隊(duì)列
    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
    //下一個(gè)處理器
    private final RequestProcessor nextProcessor;
    // zk服務(wù)器
    ZooKeeperServer zks;

3、核心函數(shù)

3.1、run

while (true) {
                //從隊(duì)列獲取請(qǐng)求
                Request request = submittedRequests.take();
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                //requestOfDeath類型的請(qǐng)求,代表當(dāng)前處理器已經(jīng)關(guān)閉,不再處理請(qǐng)求。
                if (Request.requestOfDeath == request) {
                    break;
                }
                //調(diào)用關(guān)鍵函數(shù)
                pRequest(request);
            }

3.2、pRequest

pRequest會(huì)確定請(qǐng)求類型,并根據(jù)請(qǐng)求類型不同生成不同的請(qǐng)求對(duì)象,我們以創(chuàng)建節(jié)點(diǎn)為例子分析

//設(shè)置消息頭和事務(wù)為空
        request.setHdr(null);
        request.setTxn(null);

        try {
            switch (request.type) {
            case OpCode.createContainer:
            case OpCode.create:
            case OpCode.create2:
                //創(chuàng)建節(jié)點(diǎn)請(qǐng)求
                CreateRequest create2Request = new CreateRequest();
                //處理請(qǐng)求
                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
                break;

                                //省略其他代碼
                 //給請(qǐng)求的zxid賦值               
                 request.zxid = zks.getZxid();
                 //交給下一個(gè)處理器繼續(xù)處理
         nextProcessor.proce***equest(request);     

pRequest2Txn函數(shù)是實(shí)際的處理請(qǐng)求的函數(shù),對(duì)于創(chuàng)建方法會(huì)調(diào)用pRequest2TxnCreate函數(shù)

//設(shè)置請(qǐng)求頭
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                Time.currentWallTime(), type));

        switch (type) {
            case OpCode.create:
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer: {
                pRequest2TxnCreate(type, request, record, deserialize);
                break;
            }

pRequest2TxnCreate方法如下:

private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
        if (deserialize) {
            //反序列化,將ByteBuffer轉(zhuǎn)化為Record
            ByteBufferInputStream.byteBuffer2Record(request.request, record);
        }

        int flags;
        String path;
        List<ACL> acl;
        byte[] data;
        long ttl;
        if (type == OpCode.createTTL) {
            CreateTTLRequest createTtlRequest = (CreateTTLRequest)record;
            flags = createTtlRequest.getFlags();
            path = createTtlRequest.getPath();
            acl = createTtlRequest.getAcl();
            data = createTtlRequest.getData();
            ttl = createTtlRequest.getTtl();
        } else {
            //轉(zhuǎn)換createRequest對(duì)象
            CreateRequest createRequest = (CreateRequest)record;
            flags = createRequest.getFlags();
            path = createRequest.getPath();
            acl = createRequest.getAcl();
            data = createRequest.getData();
            ttl = -1;
        }
        CreateMode createMode = CreateMode.fromFlag(flags);
        validateCreateRequest(path, createMode, request, ttl);
        //獲取父節(jié)點(diǎn)路徑
        String parentPath = validatePathForCreate(path, request.sessionId);

        List<ACL> listACL = fixupACL(path, request.authInfo, acl);
        //獲取父節(jié)點(diǎn)的record
        ChangeRecord parentRecord = getRecordForPath(parentPath);
        //檢查ACL列表
        checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
        int parentCVersion = parentRecord.stat.getCversion();
        //是否創(chuàng)建順序節(jié)點(diǎn)
        if (createMode.isSequential()) {
            //子路徑后追加一串?dāng)?shù)字,順序的
            path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
        }
        validatePath(path, request.sessionId);
        try {
            if (getRecordForPath(path) != null) {
                throw new KeeperException.NodeExistsException(path);
            }
        } catch (KeeperException.NoNodeException e) {
            // ignore this one
        }
        boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
        //父節(jié)點(diǎn)不能是臨時(shí)節(jié)點(diǎn)
        if (ephemeralParent) {
            throw new KeeperException.NoChildrenForEphemeralsException(path);
        }
        //新的子節(jié)點(diǎn)版本號(hào)
        int newCversion = parentRecord.stat.getCversion()+1;
        //新生事務(wù)
        if (type == OpCode.createContainer) {
            request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
        } else if (type == OpCode.createTTL) {
            request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
        } else {
            request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
                    newCversion));
        }
        StatPersisted s = new StatPersisted();
        if (createMode.isEphemeral()) { //是否臨時(shí)節(jié)點(diǎn)
            s.setEphemeralOwner(request.sessionId);
        }
        //拷貝
        parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
        //子節(jié)點(diǎn)數(shù)量+1
        parentRecord.childCount++;
        //設(shè)置新版本號(hào)
        parentRecord.stat.setCversion(newCversion);
        //將parentRecord添加至outstandingChanges和outstandingChangesForPath中
        addChangeRecord(parentRecord);
        // 將新生成的ChangeRecord(包含了StatPersisted信息)添加至outstandingChanges和outstandingChangesForPath中
        addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
    }

addChangeRecord函數(shù)將ChangeRecord添加至ZooKeeperServer的outstandingChanges和outstandingChangesForPath中。

private void addChangeRecord(ChangeRecord c) {
        synchronized (zks.outstandingChanges) {
            zks.outstandingChanges.add(c);
            zks.outstandingChangesForPath.put(c.path, c);
        }
    }

outstandingChanges 位于ZooKeeperServer 中,用于存放剛進(jìn)行更改還沒有同步到ZKDatabase中的節(jié)點(diǎn)信息。

znode節(jié)點(diǎn)會(huì)由于用戶的讀寫操作頻繁發(fā)生變化,為了提升數(shù)據(jù)的訪問效率,ZooKeeper中有一個(gè)三層的數(shù)據(jù)緩沖層用于存放節(jié)點(diǎn)數(shù)據(jù)。

outstandingChanges->ZKDatabase->FileSnap+FileTxnLog

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI