您好,登錄后才能下訂單哦!
這篇文章主要介紹“kafka客戶端的使用方法”,在日常操作中,相信很多人在kafka客戶端的使用方法問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”kafka客戶端的使用方法”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
record(消息)
到kafka集群。新的生產(chǎn)者是線程安全的,在線程之間共享單個(gè)生產(chǎn)者實(shí)例,通常單例比多個(gè)實(shí)例要快。
一個(gè)簡(jiǎn)單的例子,使用producer發(fā)送一個(gè)有序的key/value(鍵值對(duì)),放到j(luò)ava的main
方法里就能直接運(yùn)行,
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
生產(chǎn)者的緩沖空間池保留尚未發(fā)送到服務(wù)器的消息,后臺(tái)I/O線程負(fù)責(zé)將這些消息轉(zhuǎn)換成請(qǐng)求發(fā)送到集群。如果使用后不關(guān)閉生產(chǎn)者,則會(huì)泄露這些資源。
send()
方法是異步的,添加消息到緩沖區(qū)等待發(fā)送,并立即返回。生產(chǎn)者將單個(gè)的消息批量在一起發(fā)送來(lái)提高效率。
ack
是判別請(qǐng)求是否為完整的條件(就是是判斷是不是成功發(fā)送了)。我們指定了“all”將會(huì)阻塞消息,這種設(shè)置性能最低,但是是最可靠的。
retries
,如果請(qǐng)求失敗,生產(chǎn)者會(huì)自動(dòng)重試,我們指定是0次,如果啟用重試,則會(huì)有重復(fù)消息的可能性。
producer
(生產(chǎn)者)緩存每個(gè)分區(qū)未發(fā)送的消息。緩存的大小是通過(guò) batch.size
配置指定的。值較大的話將會(huì)產(chǎn)生更大的批。并需要更多的內(nèi)存(因?yàn)槊總€(gè)“活躍”的分區(qū)都有1個(gè)緩沖區(qū))。
默認(rèn)緩沖可立即發(fā)送,即便緩沖空間還沒(méi)有滿,但是,如果你想減少請(qǐng)求的數(shù)量,可以設(shè)置linger.ms
大于0。這將指示生產(chǎn)者發(fā)送請(qǐng)求之前等待一段時(shí)間,希望更多的消息填補(bǔ)到未滿的批中。這類似于TCP的算法,例如上面的代碼段,可能100條消息在一個(gè)請(qǐng)求發(fā)送,因?yàn)槲覀冊(cè)O(shè)置了linger(逗留)時(shí)間為1毫秒,然后,如果我們沒(méi)有填滿緩沖區(qū),這個(gè)設(shè)置將增加1毫秒的延遲請(qǐng)求以等待更多的消息。需要注意的是,在高負(fù)載下,相近的時(shí)間一般也會(huì)組成批,即使是 linger.ms=0
。在不處于高負(fù)載的情況下,如果設(shè)置比0大,以少量的延遲代價(jià)換取更少的,更有效的請(qǐng)求。
buffer.memory
控制生產(chǎn)者可用的緩存總量,如果消息發(fā)送速度比其傳輸?shù)椒?wù)器的快,將會(huì)耗盡這個(gè)緩存空間。當(dāng)緩存空間耗盡,其他發(fā)送調(diào)用將被阻塞,阻塞時(shí)間的閾值通過(guò)max.block.ms
設(shè)定,之后它將拋出一個(gè)TimeoutException。
key.serializer
和value.serializer
示例,將用戶提供的key和value對(duì)象ProducerRecord轉(zhuǎn)換成字節(jié),你可以使用附帶的ByteArraySerializaer或StringSerializer處理簡(jiǎn)單的string或byte類型。
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
異步發(fā)送一條消息到topic,并調(diào)用callback
(當(dāng)發(fā)送已確認(rèn))。
send是異步的,并且一旦消息被保存在等待發(fā)送的消息緩存
中,此方法就立即返回。這樣并行發(fā)送多條消息而不阻塞去等待每一條消息的響應(yīng)。
發(fā)送的結(jié)果是一個(gè)RecordMetadata,它指定了消息發(fā)送的分區(qū),分配的offset和消息的時(shí)間戳。如果topic使用的是CreateTime,則使用用戶提供的時(shí)間戳或發(fā)送的時(shí)間(如果用戶沒(méi)有指定指定消息的時(shí)間戳)如果topic使用的是LogAppendTime,則追加消息時(shí),時(shí)間戳是broker的本地時(shí)間。
由于send調(diào)用是異步的,它將為分配消息的此消息的RecordMetadata
返回一個(gè)Future。如果future調(diào)用get(),則將阻塞,直到相關(guān)請(qǐng)求完成并返回該消息的metadata,或拋出發(fā)送異常。
如果要模擬一個(gè)簡(jiǎn)單的阻塞調(diào)用,你可以調(diào)用get()
方法。
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get();
完全無(wú)阻塞的話,可以利用回調(diào)參數(shù)提供的請(qǐng)求完成時(shí)將調(diào)用的回調(diào)通知。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
發(fā)送到同一個(gè)分區(qū)的消息回調(diào)保證按一定的順序執(zhí)行,也就是說(shuō),在下面的例子中 callback1
保證執(zhí)行 callback2
之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
注意:callback一般在生產(chǎn)者的I/O線程中執(zhí)行,所以是相當(dāng)?shù)目斓?,否則將延遲其他的線程的消息發(fā)送。如果你需要執(zhí)行阻塞或計(jì)算昂貴(消耗)的回調(diào),建議在callback主體中使用自己的Executor來(lái)并行處理。
pecified by:
send in interface Producer<K,V>
Parameters:
record - 發(fā)送的記錄(消息)
callback - 用戶提供的callback,服務(wù)器來(lái)調(diào)用這個(gè)callback來(lái)應(yīng)答結(jié)果(null表示沒(méi)有callback)。
Throws:
InterruptException - 如果線程在阻塞中斷。
SerializationException - 如果key或value不是給定有效配置的serializers。
TimeoutException - 如果獲取元數(shù)據(jù)或消息分配內(nèi)存話費(fèi)的時(shí)間超過(guò)max.block.ms。
KafkaException - Kafka有關(guān)的錯(cuò)誤(不屬于公共API的異常)。
到此,關(guān)于“kafka客戶端的使用方法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。