您好,登錄后才能下訂單哦!
這篇文章主要講解了“java分布式流處理組件Producer怎么使用”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“java分布式流處理組件Producer怎么使用”吧!
首先, 在了解生產(chǎn)者發(fā)送消息的原理之前,我們應(yīng)該先學(xué)會如何去發(fā)送消息。
Kafka為我們提供了很多項目可以操作的API客戶端,包括:
C/C++
GO
Python
...
通過官網(wǎng)查看API菜單,官方文檔上也是Java的版本。我們根據(jù)提示一步步操作即可~
先新建maven項目,并且引入對應(yīng)的****kafka-clients依賴
建議:Kafka-clients依賴版本,最好和安裝的kafka版本一致
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.1</version> </dependency>
Kafka生產(chǎn)者主要靠KafkaProducer來進行操作。點擊到對應(yīng)的文檔頁面,我們可以看到關(guān)于KafkaProducer<K,V> 的詳細信息。
一個好的組件是非常貼心的, 甚至我們都不用去網(wǎng)上搜任何相關(guān)的資料,只需要通過查看對應(yīng)的注釋就可以知道這個東西該怎么用。
Properties config = new Properties(); // --bootstrap-server config.setProperty( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092" ); // key 序列化器 config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化器 config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); try(Producer<String, String> producer = new KafkaProducer<>(config)) { ProducerRecord<String, String> record = new ProducerRecord<>( "newTopic001", "key01", "data from " + KafkaQuickProducer.class.getName() ); RecordMetadata recordMetadata = producer.send(record).get(); System.out.println( MessageFormat.format("{0}\t{1}\t{2}\t{3}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), recordMetadata.timestamp() ) ); } catch (Exception e) { e.printStackTrace(); }
以上代碼就是同步發(fā)送的過程,這已經(jīng)是在開發(fā)過程中需要配置的最小單元,而其他關(guān)于生產(chǎn)者的配置,我們可以通過ProducerConfig來進行查看
** 與命令行上的參數(shù),基本上是一模一樣的**
而關(guān)于序列化器的問題,我們在下面原理的部分說明
我們在調(diào)用同步send的時候,發(fā)現(xiàn)有兩個參數(shù)的方法, 而這個方法實現(xiàn)的就是****異步發(fā)送
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
異步發(fā)送會將發(fā)送結(jié)果以事件驅(qū)動的形式傳遞,那么這里,我們就需要注意一點:
程序調(diào)用完成之后,不能讓他立即執(zhí)行,否則我們無法查看到具體的發(fā)送結(jié)果
接下來我們看具體的程序?qū)崿F(xiàn)。理論上:我們只需要改最后發(fā)送的部分
Properties config = new Properties(); // --bootstrap-server config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092"); // key 序列化器 config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化器 config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); try(Producer<String, String> producer = new KafkaProducer<>(config)) { ProducerRecord<String, String> record = new ProducerRecord<>( "newTopic001", "key01", "data from " + KafkaQuickProducer.class.getName() ); async(producer, record); } catch (Exception e) { e.printStackTrace(); } // 異步發(fā)送 private static void async(Producer<String, String> producer, ProducerRecord<String, String> record) { producer.send(record, (recordMetadata, exception) -> { if (null != exception) { exception.printStackTrace(); return; } System.out.println( MessageFormat.format("{0}\t{1}\t{2}\t{3}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), recordMetadata.timestamp() ) ); }); try { // 將程序進行阻塞,防止由于消息發(fā)送成功之后進程停止而無法接收到事件反饋 System.in.read(); } catch (IOException e) { throw new RuntimeException(e); } }
這屬于整個生產(chǎn)者發(fā)送消息方式的最小單元,本文屬于Producer入門階段。
在ProducerConfig中還包含了非常多的配置項,更多的配置信息我們會在優(yōu)化章節(jié)中說明。
在第一部分,我們已經(jīng)了解到,關(guān)于生產(chǎn)者最基本的使用方式,到這里,其實我想跟大家聊一聊:
生產(chǎn)者在發(fā)送消息的時候中間到底經(jīng)歷了什么?
大家應(yīng)該已經(jīng)看到上面的那張原理圖,我們可以從中找出答案!
**這里我們分為兩個線程塊來說明, 第一部分是Main主線程, 也就是生產(chǎn)者在調(diào)用****send()**方法時所在的線程
在這里,我們可以看到:
外部數(shù)據(jù)首先被封裝為ProducerRecord**,然后調(diào)用**send()**方法。
在send()過程中,經(jīng)過攔截器、序列化器、分區(qū)器等處理之后進入到RecordAccumulator中。
接下來我們仔細聊一聊攔截器、序列化器、分區(qū)器的作用
攔截器很類似于我們在SpringMVC中Interceptor的功能,而且在Producer中我們是可以自定義攔截器的。
我們可以在發(fā)送之前對數(shù)據(jù)進行攔截處理,比如說:統(tǒng)計生產(chǎn)者發(fā)送數(shù)據(jù)的總量等等。
當(dāng)然目前來講,我們?nèi)绻婚_發(fā)Kafka監(jiān)控平臺的話,這里攔截器的用處并不大。我們忽略不計即可
后續(xù)如果有機會的話,我們可以專門寫篇文章,用來介紹如何開發(fā)一個攔截器
而序列化器,主要對兩個部分的數(shù)據(jù)進行處理:
Key
Value
byte[] serializedKey = serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
從本質(zhì)上來講,外部數(shù)據(jù)屬于屬于對象,而對象不能直接通過網(wǎng)絡(luò)進行傳輸。 所以我們就需要一個序列化器,將它轉(zhuǎn)換成字節(jié)數(shù)組,進而進行傳輸
Kafka本身為我們提供了很多可用的序列化器,不過我們能用到最多的還是StringSerializer。
在生產(chǎn)端將消息進行序列話,那么在消費端必然會進行反序列化操作
我們知道Kafka是以Topic為消息發(fā)送的主體,不過由于Topic是一個虛擬的概念, 所以我們沒有辦法在實際中查看到關(guān)于Topic的相關(guān)信息。 但是前面我們也說過, 當(dāng)前Topic下的消息數(shù)據(jù)都是通過Partition進行存儲的。
發(fā)送出去的消息需要存儲在哪個分區(qū)中就是通過分區(qū)器來進行指定的,在我們沒有指定分區(qū)策略的情況下,生產(chǎn)者會通過默認(rèn)的分區(qū)策略指定當(dāng)前消息應(yīng)該存儲在哪個分區(qū)下
分區(qū)的內(nèi)容還是比較多的,我們會在下一節(jié)做詳細的說明
此時,在主線程的區(qū)域中,當(dāng)消息進入到默認(rèn)大小為32m的記錄緩沖區(qū)時, 本區(qū)的工作就到此結(jié)束。
緩沖區(qū)中有多個雙端隊列,分別對應(yīng)Topic不同的分區(qū)。每一個分區(qū)就會創(chuàng)建一個雙端隊列。
此時的消息將會被按照批次的方式存放在隊列中, 默認(rèn)一批為16k大小。當(dāng)緩沖區(qū)達到指定條件之后,****sender線程將會被喚醒,Sender程序?qū)_隊列中不斷拉出消息進行下一步的發(fā)送
想要喚醒Sender線程有兩個因素,但不是說這兩個條件都必須滿足,他們是或的關(guān)系。
batch.size
是一個條件,這也是后期針對生產(chǎn)者優(yōu)化的主要參數(shù)之一。
當(dāng)發(fā)送消息之后,生產(chǎn)者會將消息進行整合。將其按照一批一批的方式發(fā)送給Broker,從而減少網(wǎng)絡(luò)間的傳輸請求次數(shù)。默認(rèn)情況下為16k。
而如果一批數(shù)據(jù)的大小累計達到了設(shè)置的batch.size
之后,sender才會做發(fā)送數(shù)據(jù)的操作
這是第一個限制
下面再來介紹一個非常強勢的參數(shù):liner.ms
。生產(chǎn)者優(yōu)化的主要參數(shù)之二。
這么說吧,如果你設(shè)置的liner.ms=0,表示不延遲直接發(fā)送。那么batch.size就不會生效了
而liner.ms=0屬于默認(rèn)配置
如果數(shù)據(jù)一直沒有達到設(shè)置的batch.size
大小,數(shù)據(jù)也不能不發(fā)對吧。所以Kafka也就為我們提供了這樣的參數(shù):
當(dāng)sender等待liner.ms設(shè)置的時間之后【單位ms】,不管數(shù)據(jù)如何都會將消息進行發(fā)送
如未設(shè)置當(dāng)前參數(shù),表示沒有延遲,直接發(fā)送
下面舉個小例子
config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5000");
將RecordAccumulator
內(nèi)存儲的數(shù)據(jù)拉取出來之后,開始將其創(chuàng)建為一個個的Request請求。這里需要注意的是:
NetworkClient并非一股腦的將全部可發(fā)送數(shù)據(jù)進行傳輸請求
正相反,為了能夠保證不同分區(qū)所對應(yīng)DQueue的數(shù)據(jù)進入到對應(yīng)的Broker所在的分區(qū)內(nèi),Kafka將按照<BrokerId, Request>的形式對請求進行傳輸。如果傳輸?shù)竭_Broker之后沒有acks應(yīng)答,那么當(dāng)前節(jié)點下最多能夠保存5個未響應(yīng)的請求。
這里簡單聊一下它的應(yīng)答方式。在ProducerConfig.ACKS_DOC
下我們也可以看到相關(guān)的說明:
acks=0: 生產(chǎn)者不會等待Broker的應(yīng)答,直接表示消息已經(jīng)發(fā)送成功。而消息有沒有真正達到Broker,不關(guān)心。
當(dāng)然了,這種方式在性能上來講是最好的,適合一些數(shù)據(jù)不重要的場景
acks=1: 生產(chǎn)者將消息發(fā)送到Broker之后,由Leader在本地將消息進行存儲之后,返回發(fā)送成功的應(yīng)答。
如果Follower還沒有同步到消息,Leader就已經(jīng)掛了。那么此時就會出現(xiàn)消息丟失的情況
acks=all:生產(chǎn)者將消息發(fā)送到Broker之后,由Leader在本地將消息進行存儲,并且Follower同步完消息之后才會返回發(fā)送成功的應(yīng)答。
這種方式是最能保證數(shù)據(jù)安全的情況,但是性能也是最低的~
最后:
當(dāng)Broker返回成功應(yīng)答之后,RecordAccumulator中的數(shù)據(jù)將會被清理
如果失敗,可以嘗試重試等操作
感謝各位的閱讀,以上就是“java分布式流處理組件Producer怎么使用”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對java分布式流處理組件Producer怎么使用這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。