溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么在Spring Boot中使用KafkaAdminClient集群管理工具

發(fā)布時間:2021-03-20 16:40:46 來源:億速云 閱讀:694 作者:Leah 欄目:編程語言

怎么在Spring Boot中使用KafkaAdminClient集群管理工具?相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。

原理介紹

在Kafka官網(wǎng)中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準(zhǔn)):

  • 創(chuàng)建Topic:createTopics(Collection<NewTopic> newTopics)

  • 刪除Topic:deleteTopics(Collection<String> topics)

  • 羅列所有Topic:listTopics()

  • 查詢Topic:describeTopics(Collection<String> topicNames)

  • 查詢集群信息:describeCluster()

  • 查詢ACL信息:describeAcls(AclBindingFilter filter)

  • 創(chuàng)建ACL信息:createAcls(Collection<AclBinding> acls)

  • 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)

  • 查詢配置信息:describeConfigs(Collection<ConfigResource> resources)

  • 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)

  • 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)

  • 查詢節(jié)點的日志目錄信息:describeLogDirs(Collection<Integer> brokers)

  • 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)

  • 增加分區(qū):createPartitions(Map<String, NewPartitions> newPartitions)

其內(nèi)部原理是使用Kafka自定義的一套二進(jìn)制協(xié)議來實現(xiàn),詳細(xì)可以參見Kafka協(xié)議。主要實現(xiàn)步驟:

客戶端根據(jù)方法的調(diào)用創(chuàng)建相應(yīng)的協(xié)議請求,比如創(chuàng)建Topic的createTopics方法,其內(nèi)部就是發(fā)送CreateTopicRequest請求。
客戶端發(fā)送請求至Kafka Broker。

Kafka Broker處理相應(yīng)的請求并回執(zhí),比如與CreateTopicRequest對應(yīng)的是CreateTopicResponse。
客戶端接收相應(yīng)的回執(zhí)并進(jìn)行解析處理。

和協(xié)議有關(guān)的請求和回執(zhí)的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執(zhí)類的兩個基本父類。

代碼如下

@Component
public class KafkaConfig{

   // 配置Kafka
  public Properties getProps(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
/*    props.put("retries", 2); // 重試次數(shù)
    props.put("batch.size", 16384); // 批量發(fā)送大小
    props.put("buffer.memory", 33554432); // 緩存大小,根據(jù)本機(jī)內(nèi)存大小配置
    props.put("linger.ms", 1000); // 發(fā)送頻率,滿足任務(wù)一個條件發(fā)送*/
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }

}
@RestController
public class KafkaTopicManager {

  @Autowired
  private KafkaConfig kafkaConfig;

  @GetMapping("createTopic")
  public void createTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    NewTopic newTopic = new NewTopic("test1",4, (short) 1);
    Collection<NewTopic> newTopicList = new ArrayList<>();
    newTopicList.add(newTopic);
    adminClient.createTopics(newTopicList);

    adminClient.close();
  }
  @GetMapping("deleteTopic")
  public void deleteTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    adminClient.deleteTopics(Arrays.asList("test1"));
    adminClient.close();
  }
  @GetMapping("listAllTopic")
  public void listAllTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    ListTopicsResult result = adminClient.listTopics();
    KafkaFuture<Set<String>> names = result.names();
    try {
      names.get().forEach((k)->{
        System.out.println(k);
      });
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    adminClient.close();
  }
  @GetMapping("getTopic")
  public void getTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));

    Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values();

    if(values.isEmpty()){
      System.out.println("找不到描述信息");
    }else{
      for (KafkaFuture<TopicDescription> value : values) {
        System.out.println(value);
      }
    }
    adminClient.close();
  }
}

看完上述內(nèi)容,你們掌握怎么在Spring Boot中使用KafkaAdminClient集群管理工具的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI