溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Netty分布式pipeline管道傳播outBound事件的示例分析

發(fā)布時(shí)間:2022-03-28 13:56:23 來(lái)源:億速云 閱讀:166 作者:小新 欄目:開(kāi)發(fā)技術(shù)

這篇文章將為大家詳細(xì)講解有關(guān)Netty分布式pipeline管道傳播outBound事件的示例分析,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。

outbound事件傳輸流程

在我們業(yè)務(wù)代碼中, 有可能使用wirte方法往寫(xiě)數(shù)據(jù):

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.channel().write("test data");
}

當(dāng)然, 直接調(diào)用write方法是不能往對(duì)方channel中寫(xiě)入數(shù)據(jù)的, 因?yàn)檫@種方式只能寫(xiě)入到緩沖區(qū), 還要調(diào)用flush方法才能將緩沖區(qū)數(shù)據(jù)刷到channel中, 或者直接調(diào)用writeAndFlush方法, 有關(guān)邏輯, 我們會(huì)在后面章節(jié)中詳細(xì)講解, 這里只是以wirte方法為例為了演示outbound事件的傳播的流程

這里我們同樣給出兩種寫(xiě)法

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫(xiě)法1
    ctx.channel().write("test data");
    //寫(xiě)法2
    ctx.write("test data");
}

這兩種寫(xiě)法有什么區(qū)別, 我們首先跟到第一種寫(xiě)法中去:

ctx.channel().write("test data");

這里獲取ctx所綁定的channel

我們跟到AbstractChannel的write方法中:

public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

這里pipeline是DefaultChannelPipeline

跟到其write方法中:
public final ChannelFuture write(Object msg) {
    //從tail節(jié)點(diǎn)開(kāi)始(從最后的節(jié)點(diǎn)往前寫(xiě))
    return tail.write(msg);
}

這里調(diào)用tail節(jié)點(diǎn)write方法, 這里我們應(yīng)該能分析到, outbound事件, 是通過(guò)tail節(jié)點(diǎn)開(kāi)始往上傳播的, 帶著這點(diǎn)猜想, 我們繼往下看

其實(shí)tail節(jié)點(diǎn)并沒(méi)有重寫(xiě)write方法, 最終會(huì)調(diào)用其父類AbstractChannelHandlerContext的write方法

AbstractChannelHandlerContext的write方法:

public ChannelFuture write(Object msg) { 
    return write(msg, newPromise());
}

我們看到這里有個(gè)newPromise()這個(gè)方法, 這里是創(chuàng)建一個(gè)Promise對(duì)象, 有關(guān)Promise的相關(guān)知識(shí)我們會(huì)在以后的章節(jié)剖析

我們繼續(xù)跟write:

public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    //代碼省略
    write(msg, false, promise);
    return promise;
}

繼續(xù)跟write:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒(méi)有調(diào)flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

這里跟我們上一小節(jié)剖析過(guò)channelRead方法有點(diǎn)類似, 但是事件傳輸?shù)姆较蛴兴煌? 這里findContextOutbound()是獲取上一個(gè)標(biāo)注outbound事件的HandlerContext

跟到findContextOutbound中
private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

這里的邏輯我們似曾相識(shí), 跟我們上一小節(jié)的findContextInbound()方法有點(diǎn)像, 只是過(guò)程是反過(guò)來(lái)的

在這里, 會(huì)找到當(dāng)前context的上一個(gè)節(jié)點(diǎn), 如果標(biāo)注的事件不是outbound事件, 則繼續(xù)往上找, 意思就是找到上一個(gè)標(biāo)注outbound事件的節(jié)點(diǎn)

回到write方法:
AbstractChannelHandlerContext next = findContextOutbound();

這里將找到節(jié)點(diǎn)賦值到next屬性中

因?yàn)槲覀冎胺治龅膚rite事件是從tail節(jié)點(diǎn)傳播的, 所以上一個(gè)節(jié)點(diǎn)就有可能是用戶自定的handler所屬的context

然后判斷是否為當(dāng)前eventLoop線程, 如果是不是, 則封裝成task異步執(zhí)行, 如果不是, 則繼續(xù)判斷是否調(diào)用了flush方法, 因?yàn)槲覀冞@里沒(méi)有調(diào)用, 所以會(huì)執(zhí)行到next.invokeWrite(m, promise),

我們繼續(xù)跟invokeWrite

private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
    } else {
        write(msg, promise);
    }
}

這里會(huì)判斷當(dāng)前handler的狀態(tài)是否是添加狀態(tài), 這里返回的是true, 將會(huì)走到invokeWrite0(msg, promise)這一步

繼續(xù)跟invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        //調(diào)用當(dāng)前handler的wirte()方法
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

這里的邏輯也似曾相識(shí), 調(diào)用了當(dāng)前節(jié)點(diǎn)包裝的handler的write方法, 如果用戶沒(méi)有重寫(xiě)write方法, 則會(huì)交給其父類處理

我們跟到ChannelOutboundHandlerAdapter的write方法中看:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

這里調(diào)用了當(dāng)前ctx的write方法, 這種寫(xiě)法和我們小節(jié)開(kāi)始的寫(xiě)法是相同的, 我們回顧一下:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //寫(xiě)法1
    ctx.channel().write("test data");
    //寫(xiě)法2
    ctx.write("test data");
}

我們跟到其write方法中, 這里走到的是AbstractChannelHandlerContext類的write方法:

private void write(Object msg, boolean flush, ChannelPromise promise) { 
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            //沒(méi)有調(diào)flush
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

又是我們所熟悉邏輯, 找到當(dāng)前節(jié)點(diǎn)的上一個(gè)標(biāo)注事件為outbound事件的節(jié)點(diǎn), 繼續(xù)執(zhí)行invokeWrite方法, 根據(jù)之前的剖析, 我們知道最終會(huì)執(zhí)行到上一個(gè)handler的write方法中

走到這里已經(jīng)不難理解, ctx.channel().write("test data")其實(shí)是從tail節(jié)點(diǎn)開(kāi)始傳播寫(xiě)事件, 而ctx.write("test data")是從自身開(kāi)始傳播寫(xiě)事件

所以, 在handler中如果重寫(xiě)了write方法要傳遞write事件, 一定采用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")這種方式, 因?yàn)闀?huì)造成每次事件傳輸?shù)竭@里都會(huì)從tail節(jié)點(diǎn)重新傳輸, 導(dǎo)致不可預(yù)知的錯(cuò)誤

如果用代碼中沒(méi)有重寫(xiě)handler的write方法, 則事件會(huì)一直往上傳輸, 當(dāng)傳輸完所有的outbound節(jié)點(diǎn)之后, 最后會(huì)走到head節(jié)點(diǎn)的wirte方法中

我們跟到HeadContext的write方法中
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

我們看到write事件最終會(huì)流向這里, 通過(guò)unsafe對(duì)象進(jìn)行最終的寫(xiě)操作

有關(guān)inbound事件和outbound事件的傳輸, 可通過(guò)下圖進(jìn)行說(shuō)明:

Netty分布式pipeline管道傳播outBound事件的示例分析

關(guān)于“Netty分布式pipeline管道傳播outBound事件的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI