您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)消息隊列 Kafka 的基本知識及 .NET Core 客戶端是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
最新項目中要用到消息隊列來做消息的傳輸,之所以選著 Kafka 是因為要配合其他 java 項目中,所以就對 Kafka 了解了一下,也算是做個筆記吧。
下面不談?wù)?Kafka 和其他的一些消息隊列的區(qū)別,包括性能及其使用方式。
Kafka 是一個實現(xiàn)了分布式的、具有分區(qū)、以及復(fù)制的日志的一個服務(wù)。它通過一套獨特的設(shè)計提供了消息系統(tǒng)中間件的功能。它是一種發(fā)布訂閱功能的消息系統(tǒng)。
如果要使用 Kafka ,那么在 Kafka 中有一些名詞需要知道,文本不討論這些名詞是否在其他消息隊列中具有相同的含義。所有名詞均是針對于 Kafka。
消息,就是要發(fā)送的內(nèi)容,一般包裝成一個消息對象。
通俗來講的話,就是放置“消息”的地方,也就是說消息投遞的一個容器。假如把消息看作是信封的話,那么 Topic 就是一個郵筒。
Partition 分區(qū),可以理解為一個邏輯上的分區(qū),像是我們電腦的磁盤 C:, D:, E: 盤一樣,
Kafka 為每個分區(qū)維護著一份日志Log文件。
每個分區(qū)是一個有序的,不可修改的,消息組成的隊列。 當(dāng)消息過來的時候,會被追加到日志文件中,這個追加是根據(jù) commit 命令來執(zhí)行的。
分區(qū)中的每一條消息都有一個編號,叫做 offset id,這個 id 在當(dāng)前分區(qū)中是唯一的,并且是遞增的。
日志,就是用來記錄分區(qū)中接收到的消息,因為每一個 Topic 可以同時向一個或者多個分區(qū)投遞消息,所以實際在存儲日志的時候,每個分區(qū)會對應(yīng)一個日志目錄,其命名規(guī)則一般為 <topic_name>-<partition_id>
, 目錄中就是一個分區(qū)的一份 commit log 日志文件。
Kafka 集群會保存一個時間段內(nèi)所有被發(fā)布出來的信息,無論這個消息是否已經(jīng)被消費過,這個時間段是可以配置的。比如日志保存時間段被設(shè)置為2天,那么2天以內(nèi)發(fā)布的消息都是可以消費的;而之前的消息為了釋放空間將會拋棄掉。Kafka的性能與數(shù)據(jù)量不相干,所以保存大量的消息數(shù)據(jù)不會造成性能問題。
對日志進(jìn)行分區(qū)主要是為了以下幾個目的:第一、這可以讓log的伸縮能力超過單臺服務(wù)器上線,每個獨立的partition的大小受限于單臺服務(wù)器的容積,但是一個topic可以有很多partition從而使得它有能力處理任意大小的數(shù)據(jù)。第二、在并行處理方面這可以作為一個獨立的單元。
和其他消息隊列一樣,生產(chǎn)者通常都是消息的產(chǎn)生方。
在 Kafka 中它決定消息發(fā)送到指定Topic的哪個分區(qū)上。
消費者就是消息的使用者,在消費者端也有幾個名詞需要區(qū)分一下。
一般消息隊列有兩種模式的消費方式,分別是 隊列模式 和 訂閱模式。
隊列模式:一對一,就是一個消息只能被一個消費者消費,不能重復(fù)消費。一般情況隊列支持存在多個消費者,但是對于一個消息,只會有一個消費者可以消費它。
訂閱模式:一對多,一個消息可能被多次消費,消息生產(chǎn)者將消息發(fā)布到Topic中,只要是訂閱改Topic的消費者都可以消費。
Group: 組,是一個消費者的集合,每一組都有一個或者多個消費者,Kafka 中在一個組內(nèi),消息只能被消費一次。
在發(fā)布訂閱模式中,消費者是以組的方式進(jìn)行訂閱的,就是Consumer Group,他們的關(guān)系如下圖:
每個發(fā)布到Topic上的消息都會被投遞到每個訂閱了此Topic的消費者組中的某一個消費者,也就是每個組都會被投遞,但是每個組都只會有一個消費者消費這個消息。
開頭介紹了Kafka 是 發(fā)布-訂閱 功能的消息隊列,所以在Kafka中,隊列模式是通過單個消費者組實現(xiàn)的,也就是整個結(jié)構(gòu)中只有一個消費者組,消費者之間負(fù)載均衡。
Borker: Kafka 集群有多個服務(wù)器組成,每個服務(wù)器稱做一個 Broker。同一個Topic的消息按照一定的key和算法被分區(qū)存儲在不同的Broker上。
上圖引用自:http://blog.csdn.net/lizhitao
因為 Kafka 的集群它是通過將分區(qū)散布到各個Server的實現(xiàn)的,也就是說集群中每個服務(wù)器他們都是彼此共享分區(qū)的數(shù)據(jù)和請求,每個分區(qū)的日志文件被復(fù)制成指定分?jǐn)?shù),分散在各個集群機器,這樣來實現(xiàn)的故障轉(zhuǎn)移。
對于每一個分區(qū)都會有一個服務(wù)器作為它的 "leader" 并且有零個或者多個服務(wù)器作為"followers" 。leader 服務(wù)器負(fù)責(zé)處理關(guān)于這個 partition 所有的讀寫請求, followers 服務(wù)器則被動的復(fù)制 leader 服務(wù)器。如果有 leader 服務(wù)器失效,那么 followers 服務(wù)器將有一臺被自動選舉成為新的 leader 。每個服務(wù)器作為某些 partition 的 leader 的同時也作為其它服務(wù)器的 follower ,從而實現(xiàn)了集群的負(fù)載均衡。
在 .NET Core 中,有相對應(yīng)的開源 kafka sdk 項目,就是 Rdkafka。它同時支持 .NET 4.5,并且支持跨平臺,可以運行于Linux,macOS 和 Windows。
RdKafka Github :https://github.com/ah-/rdkafka-dotnet
RdKafka Nuget :Install-Package RdKafka
// Producer 接受一個或多個 BrokerList
using (Producer producer = new Producer("127.0.0.1:9092"))//發(fā)送到一個名為 testtopic 的Topic,如果沒有就會創(chuàng)建一個u
sing (Topic topic = producer.Topic("testtopic")) {
//將message轉(zhuǎn)為一個 byte[]
byte[] data = Encoding.UTF8.GetBytes("Hello RdKafka");
DeliveryReport deliveryReport = await topic.Produce(data);
Console.WriteLine($"發(fā)送到分區(qū):{deliveryReport.Partition}, Offset 為: {deliveryReport.Offset}");
}
由于 Kafka 是以消費者組的形式進(jìn)行消費的,所以需要指定一個GroupId。
在內(nèi)部實現(xiàn)上,消費者是通過一個輪詢機制來實現(xiàn)的對 Topic 消息的監(jiān)控,這也是Kafka推薦的方式,在 Rdkafka 中輪詢的間隔為 1 秒鐘。
//配置消費者組
var config = new Config()
{
GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
{ //注冊一個事件
consumer.OnMessage += (obj, msg) =>
{
string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
}; //訂閱一個或者多個Topic
consumer.Subscribe(new[] { "testtopic" });
//啟動
consumer.Start();
Console.WriteLine("Started consumer, press enter to stop consuming");
Console.ReadLine();
}
看完上述內(nèi)容,你們對消息隊列 Kafka 的基本知識及 .NET Core 客戶端是怎樣的有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。