您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Ray-Handler消息訂閱器編寫(xiě)方法是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Ray-Handler消息訂閱器編寫(xiě)方法是什么”吧!
Ray是基于Event Sourcing設(shè)計(jì)的ES/Actor框架,消息發(fā)布后需要訂閱處理,訂閱器主要有以下兩類(lèi):
CoreHandler消息訂閱器=RabbitSub+SubHandler
ToReadHandler消息訂閱器=RabbitSub+SQLToReadHandler(ToReadHandler的子類(lèi))
RabbitSub特性是RabbitMQ消息隊(duì)列訂閱器。
RabbitSub特性有兩個(gè)構(gòu)造函數(shù),常用的是這個(gè):
public RabbitSubAttribute(string group, string exchange, string queue, int queueCount = 1)
group:通常用于分類(lèi)。示例中,X-CoreHandler的group是Core,X-ToReadHandler是Read。
exchange:RabbitMQ中的exchange名稱(chēng)。
queue:RabbitMQ中的queue名稱(chēng)。
queueCount:消息隊(duì)列數(shù)。用于消息的負(fù)載均衡。
示例:
[RabbitSub("Core", "Account", "account")] public sealed class AccountCoreHandler : SubHandler<string, MessageInfo> { …… }
RabbitSub可以單獨(dú)使用,用于訂閱消息。
Ray中的ESActor通過(guò)RaiseEvent方法發(fā)布事件,傳遞消息。Ray默認(rèn)使用RabbitMQ傳遞消息。ESActor發(fā)起事件后,CoreHandler訂閱事件,以處理事件。
實(shí)現(xiàn)方式是:
繼承SubHandler。
添加RabbitSub特性。 exchange名稱(chēng)、queue名稱(chēng)與ESGrain上RabbitPub特性的標(biāo)識(shí)一致。
添加構(gòu)造函數(shù)(必須)。
public AccountCoreHandler(IServiceProvider svProvider) : base(svProvider) { }
事件被訂閱后會(huì)流轉(zhuǎn)到Tell方法中,data是要處理的事件。
public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg) { switch (data) { case AmountTransferEvent value: return Task.WhenAll(task, AmountAddEventHandler(value)); default: return task; } }
SQLToReadHandler
ESActor發(fā)起事件后,X-ToReadHandler訂閱事件,以處理事件。X-ToReadHandler繼承自X-SQLToReadHandler,X-SQLToReadHandler繼承自ToReadHandler。
X-SQLToReadHandler需要使用者繼承PartSubHandler,根據(jù)使用的關(guān)系型數(shù)據(jù)庫(kù)自己實(shí)現(xiàn)。Ray默認(rèn)提供了PostgreSQL的PSQLToReadHandler。如果使用的是MySQL、SQL Server等其他關(guān)系型數(shù)據(jù)庫(kù),請(qǐng)自定義實(shí)現(xiàn)。
X-SQLToReadHandler實(shí)現(xiàn)細(xì)節(jié):
修改對(duì)應(yīng)關(guān)系型數(shù)據(jù)庫(kù)的Integrity Constraint Violation(違反完整性約束)
的異常。
可以將實(shí)例中PSQLToReadHandler當(dāng)做X-ToReadHandler模板,修改if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))
即可。
說(shuō)明:
當(dāng)X-ToReadHandler訂閱消息,消息有重放的場(chǎng)景,如果該消息已經(jīng)得到處理,數(shù)據(jù)庫(kù)中已經(jīng)存在其處理后的結(jié)果,這是可能會(huì)報(bào)Integrity Constraint Violation(違反完整性約束)
異常,默認(rèn)不做處理,其他異常將其拋出,這是這段代碼的作用。
示例模板:
public abstract class PSQLToReadHandler<K> : PartSubHandler<K, MessageInfo> { public PSQLToReadHandler(IServiceProvider svProvider) : base(svProvider) { } public override Task Notice(byte[] data) { return base.Notice(data).ContinueWith(t => { if (t.Exception != null) { //根據(jù)使用數(shù)據(jù)庫(kù),修改這個(gè)if判斷 if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505")) { throw t.Exception; } } }); } }
2. X-ToReadHandler
X-ToReadHandler訂閱器主要用于訂閱感興趣的消息,將數(shù)據(jù)寫(xiě)入到數(shù)據(jù)庫(kù)中。
實(shí)現(xiàn)方式是:
實(shí)現(xiàn)SQLToReadHandler
ToReadHandler繼承SQLToReadHandler(ToReadHandler的子類(lèi))
添加RabbitSub特性。
添加構(gòu)造函數(shù)(必須),在構(gòu)造函數(shù)中注冊(cè)關(guān)注的事件。
public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider) { Register<AmountAddEvent>(); Register<AmountTransferEvent>(); }
代碼如下所示:
[RabbitSub("Read", "Account", "account")] public sealed class AccountToReadHandler : PSQLToReadHandler<string> { public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider) { Register<AmountAddEvent>(); Register<AmountTransferEvent>(); } }
X-ToReadHandler消息訂閱器使用時(shí),需要在構(gòu)造函數(shù)中注冊(cè)關(guān)心的事件,而X-CoreHandler中不需要,原因是事件在處理中需要反序列化,X-CoreHandler會(huì)對(duì)RabbitSub參數(shù)指定訂閱的所有的消息反序列化,X-ToReadHandler在此基礎(chǔ)上做了進(jìn)一步的控制,在訂閱的消息中只對(duì)Register的事件處理。這樣做的原因:1.反序列化會(huì)消耗一定的性能,進(jìn)一步控制有助于提高性能;2.Ray提供兩種實(shí)現(xiàn)方式,為開(kāi)發(fā)者擴(kuò)展自定義源碼提供借鑒。
到此,相信大家對(duì)“Ray-Handler消息訂閱器編寫(xiě)方法是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢(xún),關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。