溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決

發(fā)布時(shí)間:2023-03-07 11:07:21 來(lái)源:億速云 閱讀:271 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇“kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決”文章吧。

    kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)

    發(fā)送端

    kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決

    接收端

    kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決

    問(wèn)題

    采用內(nèi)置的zookeeper,發(fā)送端發(fā)送數(shù)據(jù),接收端能夠接收數(shù)據(jù)

    但是采用外置的zookeeper,發(fā)送端發(fā)送數(shù)據(jù),接收端一直接收不到數(shù)據(jù)

    解決

    先判斷主題是否一致,如果一致就在關(guān)閉kafka

    ./kafka-server-stop.sh ../config/server.properties

    修改一下配置,確保這些配置已加上,不要用localhost,在listeners的ip地址和端口號(hào)要和消費(fèi)者,生產(chǎn)者的的地址端口號(hào)一直

    vim ../config/server.propertiesst3

    kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決

    kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決

    最后把log.dirs后面的文件刪除或者重新?lián)Q個(gè)地址

    rm -rf /tmp/kafka

    kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決

    重新在前臺(tái)啟動(dòng)kafka,注意查看打印在桌面的日志,有無(wú)報(bào)錯(cuò)信息

    ./kafka-server-start.sh ../config/server.properties

    如果沒(méi)有報(bào)錯(cuò)信息,啟動(dòng)正常,那么就可以在后臺(tái)啟動(dòng)了

    ./kafka-server-start.sh -daemon ../config/server.properties

    創(chuàng)建生產(chǎn)者

    ./kafka-console-producer.sh --broker-list 172.16.193.175:9092 --topic test3

    創(chuàng)建消費(fèi)者

    ./kafka-console-consumer.sh --bootstrap-server 172.16.193.175:9092 --topic test3 --from-beginning

    關(guān)于kafka-console-consumer.sh消費(fèi)者的一些思考

    (人物設(shè)定初步了解kafka的我)

    我司現(xiàn)在有三臺(tái)kafka服務(wù)器作為一個(gè)集群

    需求是我寫(xiě)了一個(gè)監(jiān)聽(tīng)器去監(jiān)聽(tīng)活動(dòng)失敗的情況,如果活動(dòng)失敗則調(diào)用一個(gè)統(tǒng)計(jì)接口 做數(shù)據(jù)統(tǒng)計(jì)

    我需要從失敗事件的隨路數(shù)據(jù)中取一些數(shù)據(jù),做一些判斷.

    現(xiàn)在我想從集群中看一下失敗事件中的隨路數(shù)據(jù)是否完整正確

    于是,我xshell連接上了三臺(tái)服務(wù)器并且運(yùn)行以下命令

    ./kafka-console-consumer.sh --bootstrap-server broker1IP:9092 --topic topicname
     
    ./kafka-console-consumer.sh --bootstrap-server broker2IP:9092 --topic topicname
     
    ./kafka-console-consumer.sh --bootstrap-server broker3IP:9092 --topic topicname

    發(fā)現(xiàn)只要發(fā)送一個(gè)事件三個(gè)服務(wù)器都可以收到事件中的消息

    怪了,為什么三臺(tái)都會(huì)顯示.

    我第一反應(yīng)是:這是否是傳說(shuō)中的leader和follower 同步策略

    我問(wèn)了一下我的leader ,

    leader:.....,你知道你這個(gè)命令是什么意思嗎?

    這個(gè)命令就是相當(dāng)于創(chuàng)建了一個(gè)消費(fèi)者去消費(fèi)了隊(duì)列中的消息!

    你這個(gè)3個(gè)服務(wù)器相當(dāng)于啟動(dòng)了3個(gè)消費(fèi)者去消費(fèi)了,同一個(gè)消息三次!

    我:不對(duì)啊,同一個(gè)消息不能被消費(fèi)三次啊!?

    leader:........,你知道什么是消費(fèi)者組嗎?你這相當(dāng)于三個(gè)消費(fèi)者組 不信你看看

    ./kafka-console-consumer.sh -help
     
    ...
     
    --group <String: consumer group id>      The consumer group id of the consumer. 
     
    ...

    kafka-console-consumer.sh相關(guān)知識(shí)拓展

    kafka-console-consumer.sh腳本是一個(gè)簡(jiǎn)易的消費(fèi)者控制臺(tái)。該 shell 腳本的功能通過(guò)調(diào)用 kafka.tools 包下的 ConsoleConsumer 類(lèi),并將提供的命令行參數(shù)全部傳給該類(lèi)實(shí)現(xiàn)。

    ./kafka-console-consumer.sh --bootstrap-server node:9092 --topic topicName
     
    //表示從 latest 位移位置開(kāi)始消費(fèi)該主題的所有分區(qū)消息,即僅消費(fèi)正在寫(xiě)入的消息。
      
     
    bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName
     
    //?表示從指定主題中有效的起始位移位置開(kāi)始消費(fèi)所有分區(qū)的消息。
     
     
    bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName
     
    //?消費(fèi)出的消息結(jié)果將打印出消息體的 key 和 value。
    參數(shù)值類(lèi)型說(shuō)明有效值
    --topicstring被消費(fèi)的topic
    --whiteliststring正則表達(dá)式,指定要包含以供使用的主題的白名單
    --partitioninteger指定分區(qū)
    除非指定&rsquo;&ndash;offset&rsquo;,否則從分區(qū)結(jié)束(latest)開(kāi)始消費(fèi)

    --offsetstring執(zhí)行消費(fèi)的起始o(jì)ffset位置
    默認(rèn)值:latest

    --from-beginning
    從存在的最早消息開(kāi)始,而不是從最新消息開(kāi)始
    --max-messagesinteger消費(fèi)的最大數(shù)據(jù)量,若不指定,則持續(xù)消費(fèi)下去
    --timeout-msinteger在指定時(shí)間間隔內(nèi)沒(méi)有消息可用時(shí)退出
    --bootstrap-serverstring必需(除非使用舊版本的消費(fèi)者),要連接的服務(wù)器
    --key-deserializerstring

    --value-deserializerstring

    --groupstring指定消費(fèi)者所屬組的ID
    --zookeeperstring必需(僅當(dāng)使用舊的使用者時(shí))連接zookeeper的字符串。
    可以給出多個(gè)URL以允許故障轉(zhuǎn)移

    以上就是關(guān)于“kafka消費(fèi)者kafka-console-consumer接收不到數(shù)據(jù)如何解決”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI