溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Ray-Handler消息訂閱器編寫(xiě)方法是什么

發(fā)布時(shí)間:2021-12-24 16:06:44 來(lái)源:億速云 閱讀:123 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“Ray-Handler消息訂閱器編寫(xiě)方法是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Ray-Handler消息訂閱器編寫(xiě)方法是什么”吧!

消息訂閱器:

Ray是基于Event Sourcing設(shè)計(jì)的ES/Actor框架,消息發(fā)布后需要訂閱處理,訂閱器主要有以下兩類(lèi):

  • CoreHandler消息訂閱器=RabbitSub+SubHandler

  • ToReadHandler消息訂閱器=RabbitSub+SQLToReadHandler(ToReadHandler的子類(lèi))

    RabbitSub特性

RabbitSub特性是RabbitMQ消息隊(duì)列訂閱器。

RabbitSub特性有兩個(gè)構(gòu)造函數(shù),常用的是這個(gè):

public RabbitSubAttribute(string group, string exchange, string queue, int queueCount = 1)
  • group:通常用于分類(lèi)。示例中,X-CoreHandler的group是Core,X-ToReadHandler是Read。

  • exchange:RabbitMQ中的exchange名稱(chēng)。

  • queue:RabbitMQ中的queue名稱(chēng)。

  • queueCount:消息隊(duì)列數(shù)。用于消息的負(fù)載均衡。

示例:

[RabbitSub("Core", "Account", "account")]
public sealed class AccountCoreHandler : SubHandler<string, MessageInfo>
{
    ……
}

RabbitSub可以單獨(dú)使用,用于訂閱消息。


CoreHandler消息訂閱器

Ray中的ESActor通過(guò)RaiseEvent方法發(fā)布事件,傳遞消息。Ray默認(rèn)使用RabbitMQ傳遞消息。ESActor發(fā)起事件后,CoreHandler訂閱事件,以處理事件。

實(shí)現(xiàn)方式是:

  • 繼承SubHandler。

  • 添加RabbitSub特性。 exchange名稱(chēng)、queue名稱(chēng)與ESGrain上RabbitPub特性的標(biāo)識(shí)一致。

  • 添加構(gòu)造函數(shù)(必須)。

    public AccountCoreHandler(IServiceProvider svProvider) : base(svProvider)
    {
    }
  • 事件被訂閱后會(huì)流轉(zhuǎn)到Tell方法中,data是要處理的事件。

    public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
    {
       switch (data)
       {
           case AmountTransferEvent value: return Task.WhenAll(task, AmountAddEventHandler(value));
           default: return task;
       }
    }
    ToReadHandler消息訂閱器
  1. SQLToReadHandler

ESActor發(fā)起事件后,X-ToReadHandler訂閱事件,以處理事件。X-ToReadHandler繼承自X-SQLToReadHandler,X-SQLToReadHandler繼承自ToReadHandler。

X-SQLToReadHandler需要使用者繼承PartSubHandler,根據(jù)使用的關(guān)系型數(shù)據(jù)庫(kù)自己實(shí)現(xiàn)。Ray默認(rèn)提供了PostgreSQL的PSQLToReadHandler。如果使用的是MySQL、SQL Server等其他關(guān)系型數(shù)據(jù)庫(kù),請(qǐng)自定義實(shí)現(xiàn)。

X-SQLToReadHandler實(shí)現(xiàn)細(xì)節(jié):
修改對(duì)應(yīng)關(guān)系型數(shù)據(jù)庫(kù)的Integrity Constraint Violation(違反完整性約束)的異常。
可以將實(shí)例中PSQLToReadHandler當(dāng)做X-ToReadHandler模板,修改if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))即可。


說(shuō)明:

當(dāng)X-ToReadHandler訂閱消息,消息有重放的場(chǎng)景,如果該消息已經(jīng)得到處理,數(shù)據(jù)庫(kù)中已經(jīng)存在其處理后的結(jié)果,這是可能會(huì)報(bào)Integrity Constraint Violation(違反完整性約束)異常,默認(rèn)不做處理,其他異常將其拋出,這是這段代碼的作用。


示例模板:

public abstract class PSQLToReadHandler<K> : PartSubHandler<K, MessageInfo>
{
    public PSQLToReadHandler(IServiceProvider svProvider) : base(svProvider)
    { }
    public override Task Notice(byte[] data)
    {
        return base.Notice(data).ContinueWith(t =>
        {
            if (t.Exception != null)
            {
                //根據(jù)使用數(shù)據(jù)庫(kù),修改這個(gè)if判斷
                if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))
                {
                    throw t.Exception;
                }
            }
        });
    }
}

  2. X-ToReadHandler
X-ToReadHandler訂閱器主要用于訂閱感興趣的消息,將數(shù)據(jù)寫(xiě)入到數(shù)據(jù)庫(kù)中。

實(shí)現(xiàn)方式是:

  • 實(shí)現(xiàn)SQLToReadHandler

  • ToReadHandler繼承SQLToReadHandler(ToReadHandler的子類(lèi))

  • 添加RabbitSub特性。

  • 添加構(gòu)造函數(shù)(必須),在構(gòu)造函數(shù)中注冊(cè)關(guān)注的事件。

    public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider)
    {
      Register<AmountAddEvent>();
      Register<AmountTransferEvent>();
    }

    代碼如下所示:

[RabbitSub("Read", "Account", "account")]
public sealed class AccountToReadHandler : PSQLToReadHandler<string>
{
    public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider)
    {
        Register<AmountAddEvent>();
        Register<AmountTransferEvent>();
    }
}
X-ToReadHandler消息訂閱器與CoreHandler消息訂閱器差異

X-ToReadHandler消息訂閱器使用時(shí),需要在構(gòu)造函數(shù)中注冊(cè)關(guān)心的事件,而X-CoreHandler中不需要,原因是事件在處理中需要反序列化,X-CoreHandler會(huì)對(duì)RabbitSub參數(shù)指定訂閱的所有的消息反序列化,X-ToReadHandler在此基礎(chǔ)上做了進(jìn)一步的控制,在訂閱的消息中只對(duì)Register的事件處理。這樣做的原因:1.反序列化會(huì)消耗一定的性能,進(jìn)一步控制有助于提高性能;2.Ray提供兩種實(shí)現(xiàn)方式,為開(kāi)發(fā)者擴(kuò)展自定義源碼提供借鑒。

到此,相信大家對(duì)“Ray-Handler消息訂閱器編寫(xiě)方法是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI