溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

PHP實現(xiàn)生產者與消費者的案例

發(fā)布時間:2021-03-26 10:25:34 來源:億速云 閱讀:202 作者:小新 欄目:編程語言

這篇文章主要介紹PHP實現(xiàn)生產者與消費者的案例,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

PHP中使用Kafka需要RdKafka擴展,而RdKafka依賴于librdkafka,所以這兩個我們都需要安裝,具體安裝方法自行百度,本篇不做說明了。

生產者(測試)

創(chuàng)建消費者需要步驟:

  • 生產者配置參數(shù)

  • 創(chuàng)建生產者實例

  • 創(chuàng)建主題實例(依賴生產者)

  • 生產主題消息

  • 推送消息

具體代碼如下:

        $conf = new \RdKafka\Conf();
        // 綁定服務節(jié)點
        $conf->set('metadata.broker.list', '127.0.0.1:32772');

        // 創(chuàng)建生產者
        $kafka = new \RdKafka\Producer($conf);

        // 創(chuàng)建主題實例
        $topic = $kafka->newTopic('p1r1');
        // 生產主題數(shù)據(jù),此時消息在緩沖區(qū)中,并沒有真正被推送
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');
        // 阻塞時間(毫秒), 0為非阻塞
        $kafka->poll(0); 

        // 推送消息,如果不調用此函數(shù),消息不會被發(fā)送且會丟失
        $result = $kafka->flush(5000);

        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }

消費者

創(chuàng)建一個消費者需要幾個步驟:

  • 消費者配置參數(shù)

  • 應用配置參數(shù)創(chuàng)建消費者實例

  • 訂閱對應主題

  • 拉取數(shù)據(jù)

  • 提交位移

具體代碼如下:

        $conf = new \RdKafka\Conf();
        // 綁定消費者組
        $conf->set('group.id', 'ceshi');
        // 綁定服務節(jié)點,多個用,分隔
        $conf->set('metadata.broker.list', '127.0.0.1:32787');
        // 設置自動提交為false
        $conf->set('enable.auto.commit', 'false');
        // 設置當前消費者拉取數(shù)據(jù)時的偏移量, 可選參數(shù):
        // earliest: 如果消費者組是新創(chuàng)建的,從頭開始消費,否則從消費者組當前消費位移開始。
        // latest:如果消費者組是新創(chuàng)建的,從最新偏移量開始,否則從消費者組當前消費位移開始。
        $conf->set('auto.offset.reset', 'earliest');

        // 創(chuàng)建消費者實例
        $consumer = new \RdKafka\KafkaConsumer($conf);
        // 消費者訂閱主題,數(shù)組形式
        $consumer->subscribe(['topic1','topic2']);
        while (true) {
            // 消費數(shù)據(jù),阻塞5秒(5秒內有數(shù)據(jù)就消費,沒有數(shù)據(jù)等待5秒進入下一輪循環(huán))
            $message = $consumer->consume(5000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    // 業(yè)務邏輯
                    var_dump($message);

                    // 提交位移
                    $consumer->commit($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
        // 關閉消費者(一般用在腳本中,不需要關閉)
        $conumser->close();

只消費指定分區(qū)中的數(shù)據(jù):

    // 對消費者指定分區(qū),注意此方式不能與subscribe一同使用
    $consumer->assign([
        new RdKafka\TopicPartition("topic", 0),
        new RdKafka\TopicPartition("topic", 1),
    ]);

以上是“PHP實現(xiàn)生產者與消費者的案例”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

php
AI