您好,登錄后才能下訂單哦!
今天小編給大家分享一下Kafka常用命令之kafka-console-consumer.sh怎么使用的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
kafka-console-consumer.sh 腳本是一個簡易的消費者控制臺。
該 shell 腳本的功能通過調用 kafka.tools 包下的 ConsoleConsumer 類,并將提供的命令行參數(shù)全部傳給該類實現(xiàn)。
注意:Kafka 從 2.2 版本開始將 kafka-topic.sh 腳本中的 −−zookeeper 參數(shù)標注為 “過時”,推薦使用 −−bootstrap-server 參數(shù)。
若讀者依舊使用的是 2.1 及以下版本,請將下述的 --bootstrap-server 參數(shù)及其值手動替換為 --zookeeper zk1:2181,zk2:2181,zk:2181。
一定要注意兩者參數(shù)值所指向的集群地址是不同的。
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName
表示從 latest 位移位置開始消費該主題的所有分區(qū)消息,即僅消費正在寫入的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName
表示從指定主題中有效的起始位移位置開始消費所有分區(qū)的消息。
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName
消費出的消息結果將打印出消息體的 key 和 value。
若還需要為你的消息添加其他屬性
請參考下述列表
參數(shù) | 值類型 | 說明 | 有效值 |
---|---|---|---|
--topic | string | 被消費的topic | |
--whitelist | string | 正則表達式,指定要包含以供使用的主題的白名單 | |
--partition | integer | 指定分區(qū) 除非指定’–offset’,否則從分區(qū)結束(latest)開始消費 | |
--offset | string | 執(zhí)行消費的起始offset位置 默認值:latest | latest earliest <offset> |
--consumer-property | string | 將用戶定義的屬性以key=value的形式傳遞給使用者 | |
--consumer.config | string | 消費者配置屬性文件 請注意,[consumer-property]優(yōu)先于此配置 | |
--formatter | string | 用于格式化kafka消息以供顯示的類的名稱 默認值:kafka.tools.DefaultMessageFormatter | kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter |
--property | string | 初始化消息格式化程序的屬性 | print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> |
--from-beginning | 從存在的最早消息開始,而不是從最新消息開始 | ||
--max-messages | integer | 消費的最大數(shù)據(jù)量,若不指定,則持續(xù)消費下去 | |
--timeout-ms | integer | 在指定時間間隔內沒有消息可用時退出 | |
--skip-message-on-error | 如果處理消息時出錯,請?zhí)^它而不是暫停 | ||
--bootstrap-server | string | 必需(除非使用舊版本的消費者),要連接的服務器 | |
--key-deserializer | string | ||
--value-deserializer | string | ||
--enable-systest-events | 除記錄消費的消息外,還記錄消費者的生命周期 (用于系統(tǒng)測試) | ||
--isolation-level | string | 設置為read_committed以過濾掉未提交的事務性消息 設置為read_uncommitted以讀取所有消息 默認值:read_uncommitted | |
--group | string | 指定消費者所屬組的ID | |
--blacklist | string | 要從消費中排除的主題黑名單 | |
--csv-reporter-enabled | 如果設置,將啟用csv metrics報告器 | ||
--delete-consumer-offsets | 如果指定,則啟動時刪除zookeeper中的消費者信息 | ||
--metrics-dir | string | 輸出csv度量值 需與[csv-reporter-enable]配合使用 | |
--zookeeper | string | 必需(僅當使用舊的使用者時)連接zookeeper的字符串。 可以給出多個URL以允許故障轉移 |
以上就是“Kafka常用命令之kafka-console-consumer.sh怎么使用”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。