您好,登錄后才能下訂單哦!
這篇文章主要介紹PHP實現(xiàn)生產者與消費者的案例,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
PHP中使用Kafka需要RdKafka擴展,而RdKafka依賴于librdkafka,所以這兩個我們都需要安裝,具體安裝方法自行百度,本篇不做說明了。
創(chuàng)建消費者需要步驟:
生產者配置參數(shù)
創(chuàng)建生產者實例
創(chuàng)建主題實例(依賴生產者)
生產主題消息
推送消息
具體代碼如下:
$conf = new \RdKafka\Conf(); // 綁定服務節(jié)點 $conf->set('metadata.broker.list', '127.0.0.1:32772'); // 創(chuàng)建生產者 $kafka = new \RdKafka\Producer($conf); // 創(chuàng)建主題實例 $topic = $kafka->newTopic('p1r1'); // 生產主題數(shù)據(jù),此時消息在緩沖區(qū)中,并沒有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message'); // 阻塞時間(毫秒), 0為非阻塞 $kafka->poll(0); // 推送消息,如果不調用此函數(shù),消息不會被發(fā)送且會丟失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); }
創(chuàng)建一個消費者需要幾個步驟:
消費者配置參數(shù)
應用配置參數(shù)創(chuàng)建消費者實例
訂閱對應主題
拉取數(shù)據(jù)
提交位移
具體代碼如下:
$conf = new \RdKafka\Conf(); // 綁定消費者組 $conf->set('group.id', 'ceshi'); // 綁定服務節(jié)點,多個用,分隔 $conf->set('metadata.broker.list', '127.0.0.1:32787'); // 設置自動提交為false $conf->set('enable.auto.commit', 'false'); // 設置當前消費者拉取數(shù)據(jù)時的偏移量, 可選參數(shù): // earliest: 如果消費者組是新創(chuàng)建的,從頭開始消費,否則從消費者組當前消費位移開始。 // latest:如果消費者組是新創(chuàng)建的,從最新偏移量開始,否則從消費者組當前消費位移開始。 $conf->set('auto.offset.reset', 'earliest'); // 創(chuàng)建消費者實例 $consumer = new \RdKafka\KafkaConsumer($conf); // 消費者訂閱主題,數(shù)組形式 $consumer->subscribe(['topic1','topic2']); while (true) { // 消費數(shù)據(jù),阻塞5秒(5秒內有數(shù)據(jù)就消費,沒有數(shù)據(jù)等待5秒進入下一輪循環(huán)) $message = $consumer->consume(5000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // 業(yè)務邏輯 var_dump($message); // 提交位移 $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } // 關閉消費者(一般用在腳本中,不需要關閉) $conumser->close();
只消費指定分區(qū)中的數(shù)據(jù):
// 對消費者指定分區(qū),注意此方式不能與subscribe一同使用 $consumer->assign([ new RdKafka\TopicPartition("topic", 0), new RdKafka\TopicPartition("topic", 1), ]);
以上是“PHP實現(xiàn)生產者與消費者的案例”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業(yè)資訊頻道!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。