溫馨提示×

kafka怎么往topic中發(fā)送消息

小億
205
2024-01-18 14:56:00
欄目: 編程語言

使用Kafka的ProducerAPI可以向一個(gè)或多個(gè)Topic發(fā)送消息。

首先,需要創(chuàng)建一個(gè)Producer實(shí)例,并將Kafka集群的地址傳遞給它??梢酝ㄟ^指定一個(gè)或多個(gè)bootstrap.servers屬性來指定Kafka集群的地址。

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

接下來,可以使用Producer的send方法將消息發(fā)送到Topic。需要創(chuàng)建一個(gè)ProducerRecord實(shí)例,指定要發(fā)送的消息的Topic、消息的鍵和值。

String topic = "my-topic";
String key = "my-key";
String value = "Hello, Kafka!";

ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

producer.send(record);

最后,使用完P(guān)roducer后需要調(diào)用close方法來關(guān)閉Producer并釋放相關(guān)資源。

producer.close();

以上代碼片段展示了如何向一個(gè)Topic發(fā)送一條消息??梢愿鶕?jù)需求進(jìn)行調(diào)整,例如發(fā)送多條消息、指定消息的分區(qū)、添加消息的回調(diào)函數(shù)等。

0