您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Kafka Java客戶端代碼的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)
kafka是linkedin用于日志處理的分布式消息隊(duì)列,linkedin的日志數(shù)據(jù)容量大,但對(duì)可靠性要求不高,其日志數(shù)據(jù)主要包括用戶行為(登錄、瀏覽、點(diǎn)擊、分享、喜歡)以及系統(tǒng)運(yùn)行日志(CPU、內(nèi)存、磁盤、網(wǎng)絡(luò)、系統(tǒng)及進(jìn)程狀態(tài))
當(dāng)前很多的消息隊(duì)列服務(wù)提供可靠交付保證,并默認(rèn)是即時(shí)消費(fèi)(不適合離線)。
高可靠交付對(duì)linkedin的日志不是必須的,故可通過降低可靠性來(lái)提高性能,同時(shí)通過構(gòu)建分布式的集群,允許消息在系統(tǒng)中累積,使得kafka同時(shí)支持離線和在線日志處理
測(cè)試環(huán)境
kafka_2.10-0.8.1.1 3個(gè)節(jié)點(diǎn)做的集群
zookeeper-3.4.5 一個(gè)實(shí)例節(jié)點(diǎn)
代碼示例
消息生產(chǎn)者代碼示例
import java.util.Collections; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 詳細(xì)可以參考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example * @author Fung * */ public class ProducerDemo { public static void main(String[] args) { Random rnd = new Random(); int events=100; // 設(shè)置配置屬性 Properties props = new Properties(); props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // key.serializer.class默認(rèn)為serializer.class props.put("key.serializer.class", "kafka.serializer.StringEncoder"); // 可選配置,如果不配置,則使用默認(rèn)的partitioner props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo"); // 觸發(fā)acknowledgement機(jī)制,否則是fire and forget,可能會(huì)引起數(shù)據(jù)丟失 // 值為0,1,-1,可以參考 // http://kafka.apache.org/08/configuration.html props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); // 創(chuàng)建producer Producer<String, String> producer = new Producer<String, String>(config); // 產(chǎn)生并發(fā)送消息 long start=System.currentTimeMillis(); for (long i = 0; i < events; i++) { long runtime = new Date().getTime(); String ip = "192.168.2." + i;//rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; //如果topic不存在,則會(huì)自動(dòng)創(chuàng)建,默認(rèn)replication-factor為1,partitions為0 KeyedMessage<String, String> data = new KeyedMessage<String, String>( "page_visits", ip, msg); producer.send(data); } System.out.println("耗時(shí):" + (System.currentTimeMillis() - start)); // 關(guān)閉producer producer.close(); } }
消息消費(fèi)者代碼示例
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; /** * 詳細(xì)可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example * * @author Fung * */ public class ConsumerDemo { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) { consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerMsgTask(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] arg) { String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" }; String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic); demo.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } demo.shutdown(); } }
消息處理類
import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class ConsumerMsgTask implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerMsgTask(KafkaStream stream, int threadNumber) { m_threadNumber = threadNumber; m_stream = stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); System.out.println("Shutting down Thread: " + m_threadNumber); } }
Partitioner類示例
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class PartitionerDemo implements Partitioner { public PartitionerDemo(VerifiableProperties props) { } @Override public int partition(Object obj, int numPartitions) { int partition = 0; if (obj instanceof String) { String key=(String)obj; int offset = key.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions; } }else{ partition = obj.toString().length() % numPartitions; } return partition; } }
關(guān)于Kafka Java客戶端代碼的示例分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。