溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

.NET Core如何使用Kafka

發(fā)布時(shí)間:2020-11-03 16:37:49 來(lái)源:億速云 閱讀:237 作者:Leah 欄目:開(kāi)發(fā)技術(shù)

今天就跟大家聊聊有關(guān).NET Core如何使用Kafka,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

安裝

CentOS安裝 kafka

下載并解壓

# 下載,并解壓
$ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz
$ tar -zxvf kafka_2.12-2.1.1.tgz
$ mv kafka_2.12-2.1.1.tgz /data/kafka

# 下載 zookeeper,解壓
$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
$ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
$ mv apache-zookeeper-3.5.8-bin /data/zookeeper

啟動(dòng) ZooKeeper

# 復(fù)制配置模版
$ cd /data/kafka/conf
$ cp zoo_sample.cfg zoo.cfg

# 看看配置需不需要改
$ vim zoo.cfg

# 命令
$ ./bin/zkServer.sh start  # 啟動(dòng)
$ ./bin/zkServer.sh status  # 狀態(tài)
$ ./bin/zkServer.sh stop   # 停止
$ ./bin/zkServer.sh restart # 重啟

# 使用客戶端測(cè)試
$ ./bin/zkCli.sh -server localhost:2181
$ quit

啟動(dòng) Kafka

# 備份配置
$ cd /data/kafka
$ cp config/server.properties config/server.properties_copy

# 修改配置
$ vim /data/kafka/config/server.properties

# 集群配置下,每個(gè) broker 的 id 是必須不同的
# broker.id=0

# 監(jiān)聽(tīng)地址設(shè)置(內(nèi)網(wǎng))
# listeners=PLAINTEXT://ip:9092

# 對(duì)外提供服務(wù)的IP、端口
# advertised.listeners=PLAINTEXT://106.75.84.97:9092

# 修改每個(gè)topic的默認(rèn)分區(qū)參數(shù)num.partitions,默認(rèn)是1,具體合適的取值需要根據(jù)服務(wù)器配置進(jìn)程確定,UCloud.ukafka = 3
# num.partitions=3

# zookeeper 配置
# zookeeper.connect=localhost:2181

# 通過(guò)配置啟動(dòng) kafka
$ ./bin/kafka-server-start.sh config/server.properties&

# 狀態(tài)查看
$ ps -ef|grep kafka
$ jps

docker下安裝Kafka

docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

介紹

  • Broker:消息中間件處理節(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker,多個(gè)broker可以組成一個(gè)Kafka集群。
  • Topic:一類消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時(shí)負(fù)責(zé)多個(gè)topic的分發(fā)。
  • Partition:topic物理上的分組,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。
  • Segment:partition物理上由多個(gè)segment組成,下面2.2和2.3有詳細(xì)說(shuō)明。
  • offset:每個(gè)partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中。partition中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做offset,用于partition唯一標(biāo)識(shí)一條消息。

.NET Core如何使用Kafka

kafka partition 和 consumer 數(shù)目關(guān)系

  • 如果consumer比partition多是浪費(fèi),因?yàn)閗afka的設(shè)計(jì)是在一個(gè)partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù) 。
  • 如果consumer比partition少,一個(gè)consumer會(huì)對(duì)應(yīng)于多個(gè)partitions,這里主要合理分配consumer數(shù)和partition數(shù),否則會(huì)導(dǎo)致partition里面的數(shù)據(jù)被取的不均勻 。最好partiton數(shù)目是consumer數(shù)目的整數(shù)倍,所以partition數(shù)目很重要,比如取24,就很容易設(shè)定consumer數(shù)目 。
  • 如果consumer從多個(gè)partition讀到數(shù)據(jù),不保證數(shù)據(jù)間的順序性,kafka只保證在一個(gè)partition上數(shù)據(jù)是有序的,但多個(gè)partition,根據(jù)你讀的順序會(huì)有不同
  • 增減consumer,broker,partition會(huì)導(dǎo)致rebalance,所以rebalance后consumer對(duì)應(yīng)的partition會(huì)發(fā)生變化 快速開(kāi)始

在 .NET Core 項(xiàng)目中安裝組件

Install-Package Confluent.Kafka

開(kāi)源地址: https://github.com/confluentinc/confluent-kafka-dotnet

添加 IKafkaService 服務(wù)接口

public interface IKafkaService
{
  /// <summary>
  /// 發(fā)送消息至指定主題
  /// </summary>
  /// <typeparam name="TMessage"></typeparam>
  /// <param name="topicName"></param>
  /// <param name="message"></param>
  /// <returns></returns>
  Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class;

  /// <summary>
  /// 從指定主題訂閱消息
  /// </summary>
  /// <typeparam name="TMessage"></typeparam>
  /// <param name="topics"></param>
  /// <param name="messageFunc"></param>
  /// <param name="cancellationToken"></param>
  /// <returns></returns>
  Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;
}

實(shí)現(xiàn) IKafkaService

public class KafkaService : IKafkaService
{
  public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class
  {
    var config = new ProducerConfig
    {
      BootstrapServers = "127.0.0.1:9092"
    };
    using var producer = new ProducerBuilder<string, string>(config).Build();
    await producer.ProduceAsync(topicName, new Message<string, string>
    {
      Key = Guid.NewGuid().ToString(),
      Value = message.SerializeToJson()
    });
  }

  public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class
  {
    var config = new ConsumerConfig
    {
      BootstrapServers = "127.0.0.1:9092",
      GroupId = "crow-consumer",
      EnableAutoCommit = false,
      StatisticsIntervalMs = 5000,
      SessionTimeoutMs = 6000,
      AutoOffsetReset = AutoOffsetReset.Earliest,
      EnablePartitionEof = true
    };
    //const int commitPeriod = 5;
    using var consumer = new ConsumerBuilder<Ignore, string>(config)
               .SetErrorHandler((_, e) =>
               {
                 Console.WriteLine($"Error: {e.Reason}");
               })
               .SetStatisticsHandler((_, json) =>
               {
                 Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息監(jiān)聽(tīng)中..");
               })
               .SetPartitionsAssignedHandler((c, partitions) =>
               {
                 string partitionsStr = string.Join(", ", partitions);
                 Console.WriteLine($" - 分配的 kafka 分區(qū): {partitionsStr}");
               })
               .SetPartitionsRevokedHandler((c, partitions) =>
               {
                 string partitionsStr = string.Join(", ", partitions);
                 Console.WriteLine($" - 回收了 kafka 的分區(qū): {partitionsStr}");
               })
               .Build();
    consumer.Subscribe(topics);
    try
    {
      while (true)
      {
        try
        {
          var consumeResult = consumer.Consume(cancellationToken);
          Console.WriteLine($"Consumed message '{consumeResult.Message&#63;.Value}' at: '{consumeResult&#63;.TopicPartitionOffset}'.");
          if (consumeResult.IsPartitionEOF)
          {
            Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經(jīng)到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
            continue;
          }
          TMessage messageResult = null;
          try
          {
            messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);
          }
          catch (Exception ex)
          {
            var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace&#63;.ToString()}";
            Console.WriteLine(errorMessage);
            messageResult = null;
          }
          if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
          {
            messageFunc(messageResult);
            try
            {
              consumer.Commit(consumeResult);
            }
            catch (KafkaException e)
            {
              Console.WriteLine(e.Message);
            }
          }
        }
        catch (ConsumeException e)
        {
          Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
      }
    }
    catch (OperationCanceledException)
    {
      Console.WriteLine("Closing consumer.");
      consumer.Close();
    }
    await Task.CompletedTask;
  }
}

注入 IKafkaService ,在需要使用的地方直接調(diào)用即可。

public class MessageService : IMessageService, ITransientDependency
{
  private readonly IKafkaService _kafkaService;
  public MessageService(IKafkaService kafkaService)
  {
    _kafkaService = kafkaService;
  }

  public async Task RequestTraceAdded(XxxEventData eventData)
  {
    await _kafkaService.PublishAsync(eventData.TopicName, eventData);
  }
}

以上相當(dāng)于一個(gè)生產(chǎn)者,當(dāng)我們消息隊(duì)列發(fā)出后,還需一個(gè)消費(fèi)者進(jìn)行消費(fèi),所以可以使用一個(gè)控制臺(tái)項(xiàng)目接收消息來(lái)處理業(yè)務(wù)。

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
  e.Cancel = true;
  cts.Cancel();
};

await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>
{
  // Your logic

  Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已處理");
}, cts.Token);

IKafkaService 中已經(jīng)寫了訂閱消息的接口,這里也是注入后直接使用即可。

生產(chǎn)者消費(fèi)者示例

生產(chǎn)者

static async Task Main(string[] args)
{
  if (args.Length != 2)
  {
    Console.WriteLine("Usage: .. brokerList topicName");
    // 127.0.0.1:9092 helloTopic
    return;
  }

  var brokerList = args.First();
  var topicName = args.Last();

  var config = new ProducerConfig { BootstrapServers = brokerList };

  using var producer = new ProducerBuilder<string, string>(config).Build();

  Console.WriteLine("\n-----------------------------------------------------------------------");
  Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
  Console.WriteLine("-----------------------------------------------------------------------");
  Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");
  Console.WriteLine("> key value<Enter>");
  Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");
  Console.WriteLine("> value<enter>");
  Console.WriteLine("Ctrl-C to quit.\n");

  var cancelled = false;

  Console.CancelKeyPress += (_, e) =>
  {
    e.Cancel = true;
    cancelled = true;
  };

  while (!cancelled)
  {
    Console.Write("> ");

    var text = string.Empty;

    try
    {
      text = Console.ReadLine();
    }
    catch (IOException)
    {
      break;
    }

    if (string.IsNullOrWhiteSpace(text))
    {
      break;
    }

    var key = string.Empty;
    var val = text;

    var index = text.IndexOf(" ");
    if (index != -1)
    {
      key = text.Substring(0, index);
      val = text.Substring(index + 1);
    }

    try
    {
      var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string>
      {
        Key = key,
        Value = val
      });

      Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");
    }
    catch (ProduceException<string, string> e)
    {
      Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
    }
  }
}

消費(fèi)者

static void Main(string[] args)
{
  if (args.Length != 2)
  {
    Console.WriteLine("Usage: .. brokerList topicName");
    // 127.0.0.1:9092 helloTopic
    return;
  }

  var brokerList = args.First();
  var topicName = args.Last();

  Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");

  var cts = new CancellationTokenSource();
  Console.CancelKeyPress += (_, e) =>
  {
    e.Cancel = true;
    cts.Cancel();
  };

  var config = new ConsumerConfig
  {
    BootstrapServers = brokerList,
    GroupId = "consumer",
    EnableAutoCommit = false,
    StatisticsIntervalMs = 5000,
    SessionTimeoutMs = 6000,
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnablePartitionEof = true
  };

  const int commitPeriod = 5;

  using var consumer = new ConsumerBuilder<Ignore, string>(config)
             .SetErrorHandler((_, e) =>
             {
               Console.WriteLine($"Error: {e.Reason}");
             })
             .SetStatisticsHandler((_, json) =>
             {
               Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");
               //Console.WriteLine($"Statistics: {json}");
             })
             .SetPartitionsAssignedHandler((c, partitions) =>
             {
               Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
             })
             .SetPartitionsRevokedHandler((c, partitions) =>
             {
               Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
             })
             .Build();
  consumer.Subscribe(topicName);

  try
  {
    while (true)
    {
      try
      {
        var consumeResult = consumer.Consume(cts.Token);

        if (consumeResult.IsPartitionEOF)
        {
          Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");

          continue;
        }

        Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");

        if (consumeResult.Offset % commitPeriod == 0)
        {
          try
          {
            consumer.Commit(consumeResult);
          }
          catch (KafkaException e)
          {
            Console.WriteLine($"Commit error: {e.Error.Reason}");
          }
        }
      }
      catch (ConsumeException e)
      {
        Console.WriteLine($"Consume error: {e.Error.Reason}");
      }
    }
  }
  catch (OperationCanceledException)
  {
    Console.WriteLine("Closing consumer.");
    consumer.Close();
  }
}

.NET Core如何使用Kafka

看完上述內(nèi)容,你們對(duì).NET Core如何使用Kafka有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI