溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何進行Kafka 1.0.0 d代碼示例分析

發(fā)布時間:2021-12-15 15:50:26 來源:億速云 閱讀:133 作者:柒染 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)如何進行Kafka 1.0.0 d代碼示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

package kafka.demo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 
 *  <p>Description: kafka 1.0.0</p> 
 * @author guangshihao
 * @date 2018年9月19日 
 *
 */
public class KafkaProduderDemo {
	public static void main(String[] args) {
		Map<String,Object> props = new HashMap<>();
		/*
         * acks,設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個值0,1,-1
		 * 0,意味著producer永遠(yuǎn)不會等待一個來自broker的ack,這就是0.7版本的行為。
		 * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當(dāng)server掛掉的時候會丟失一些數(shù)據(jù)。
		 * 1,意味著在leader replica已經(jīng)接收到數(shù)據(jù)后,producer會得到一個ack。
		 * 這個選項提供了更好的持久性,因為在server確認(rèn)請求成功處理后,client才會返回。
		 * 如果剛寫到leader上,還沒來得及復(fù)制leader就掛了,那么消息才可能會丟失。
		 * -1,意味著在所有的ISR都接收到數(shù)據(jù)后,producer才得到一個ack。
		 * 這個選項提供了最好的持久性,只要還有一個replica存活,那么數(shù)據(jù)就不會丟失
		 */
		props.put("acks", "1");
		//配置默認(rèn)的分區(qū)方式
		props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
		//配置topic的序列化類
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		//配置value的序列化類
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*
		 * kafka broker對應(yīng)的主機,格式為host1:port1,host2:port2
		 */
		props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
		//topic
		String topic = "test7";
		KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props);
		for(int i = 1 ;i <= 100 ; i++) {
			String line = i+" this is a test ";
			ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,line );
			producer.send(record);
		}
		producer.close();
	}
	
	
}
//---------------------------------------------------------------------------------------------------------------------------
package kafka.demo;
import java.util.Arrays;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
props.put("group.id", "group_test7");
//配置topic的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//配置value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//自動同步offset
        props.put("enable.auto.commit","true");
        //自動同步offset的時間間隔
        props.put("auto.commit.intervals.ms", "2000");
        //當(dāng)在zookeeper中發(fā)現(xiàn)要消費的topic沒有或者topic的offset不合法時自動設(shè)置為最小值,可以設(shè)的值為 latest, earliest, none,默認(rèn)為largest
        props.put("auto.offset.reset", "earliest ");
        
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("test7"));
//consumer.beginningOffsets("");
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record: records) {
System.out.println("partition:"+record.partition()+"  "+record.value());
}
//consumer.commitSync();
if((new Random(10)).nextInt()>5) {
consumer.wakeup();
}
}
}catch(WakeupException e) {
e.printStackTrace();
}finally {
consumer.close();
}
}
}

關(guān)于如何進行Kafka 1.0.0 d代碼示例分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI