溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

發(fā)布時(shí)間:2022-05-11 10:21:32 來(lái)源:億速云 閱讀:200 作者:iii 欄目:開發(fā)技術(shù)

這篇“PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)”文章吧。

業(yè)務(wù)場(chǎng)景

項(xiàng)目公司是主php做開發(fā)的,框架為thinkphp。眾所周知,php本身的運(yùn)行效率存在一定的缺陷,所以如果有一個(gè)很復(fù)雜很耗時(shí)的業(yè)務(wù)時(shí),必須開發(fā)一個(gè)常駐內(nèi)存的程序。首先我想到了php的workerman與swoole,但是這里應(yīng)上面的標(biāo)題哈,想將耗時(shí)任務(wù)交給另一個(gè)服務(wù)器,同時(shí)列隊(duì)處理。所以這里我想獨(dú)立部署一個(gè)rabbitMQ服務(wù)器用于處理列隊(duì)任務(wù)。

當(dāng)rabbitMQ服務(wù)器我們準(zhǔn)備好了,建立了一個(gè)持久化命名為ceshi的列隊(duì),如下:

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

項(xiàng)目上生產(chǎn)者和消費(fèi)者的開發(fā)我這里全部采用tinkphp6+workerman,為便于管理。這里這么做也是因?yàn)榘l(fā)現(xiàn)workerman中對(duì)rabbitMQ的文檔解釋太少了!

所以開始踩坑!

1、首先部署好thinkphp6框架

過(guò)程去看thinkphp6手冊(cè)

2、安裝workerman擴(kuò)展

過(guò)程去看thinkphp6手冊(cè)

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

3、生產(chǎn)者

配置一個(gè)workerman類

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

創(chuàng)建的Send類代碼如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Send extends Server
{
    //websocket地址,一會(huì)用于測(cè)試。
    protected $socket = 'websocket://127.0.0.1:2345';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{
        //websocket發(fā)送過(guò)來(lái)的消息
        $connection->send('我收到你的信息了:'.$data);
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通訊端口
            'user'=>'admin',//rabbitMQ 賬號(hào)
            'password'=>'123456'//rabbitMQ 密碼
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 創(chuàng)建隊(duì)列(Queue)
             * name: ceshi         // 隊(duì)列名稱
             * passive: false      // 如果設(shè)置true存在則返回OK,否則就報(bào)錯(cuò)。設(shè)置false存在返回OK,不存在則自動(dòng)創(chuàng)建
             * durable: true       // 是否持久化,設(shè)置false是存放到內(nèi)存中RabbitMQ重啟后會(huì)丟失,
             *                        設(shè)置true則代表是一個(gè)持久的隊(duì)列,服務(wù)重啟之后也會(huì)存在,因?yàn)榉?wù)會(huì)把持久化的Queue存放在硬盤上,當(dāng)服務(wù)重啟的時(shí)候,會(huì)重新加載之前被持久化的Queue
             * exclusive: false    // 是否排他,指定該選項(xiàng)為true則隊(duì)列只對(duì)當(dāng)前連接有效,連接斷開后自動(dòng)刪除
             *  auto_delete: false // 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開連接之后隊(duì)列是否自動(dòng)被刪除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) use($data){
            echo "發(fā)送消息內(nèi)容:".$data."\n";

            /**
             * 發(fā)送消息
             * body 發(fā)送的數(shù)據(jù)
             * headers 數(shù)據(jù)頭,建議 ['content_type' => 'text/plain'],這樣消費(fèi)端是springboot注解接收直接是字符串類型
             * exchange 交換器名稱
             * routingKey 路由key
             * mandatory
             * immediate
             * @return bool|PromiseInterface|int
             */

            return $channel->publish($data, ['content_type' => 'text/plain'], '', 'ceshi')->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            //echo " [x] Sent 'Hello World!'\n";
            $client = $channel->getClient();
            return $channel->close()->then(function () use ($client) {
                return $client;
            });
        })->then(function (Client $client) {
            $client->disconnect();
        });
    }

    /**
     * 當(dāng)連接建立時(shí)觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 當(dāng)連接斷開時(shí)觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 當(dāng)客戶端的連接上發(fā)生錯(cuò)誤時(shí)觸發(fā)
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每個(gè)進(jìn)程啟動(dòng)
     * @param $worker
     */
    public function onWorkerStart($worker)
{


    }
}

上述都OK以后咱們可以項(xiàng)目路徑下通過(guò)命令啟動(dòng)這個(gè)生產(chǎn)者:

php think worker:server

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

測(cè)試發(fā)送數(shù)據(jù):

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

通過(guò)這個(gè)網(wǎng)站

連接【ws://127.0.0.1:2345】后發(fā)送數(shù)據(jù)!

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

前往rabbitMQ控制臺(tái)

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

列隊(duì)中有一條消息產(chǎn)生并且等待了!

這個(gè)時(shí)候你可能問(wèn),如果我發(fā)送數(shù)據(jù)不想通過(guò)ws發(fā)送而是接口發(fā)送怎么辦?

笨思路唄:接口給內(nèi)置服務(wù)器發(fā)消息->內(nèi)置服務(wù)去發(fā)消息給rabbitMQ

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

將協(xié)議改為tcp

然后重新啟動(dòng)服務(wù)

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

然后去tp6創(chuàng)建一個(gè)路由接口

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

接口代碼

<?php
namespace app\controller;

use app\BaseController;

class Index extends BaseController
{
    public function index(string $msg)
{
        //連接本地tcp服務(wù)
        $client = stream_socket_client('tcp://127.0.0.1:2345', $errno, $errmsg, 1);
        //發(fā)送字符串
        fwrite($client, $msg."\n");
        //斷開服務(wù)
        fclose($client);
        return 'OK';
    }

}

執(zhí)行結(jié)果:

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

說(shuō)明接口成功的將數(shù)據(jù)發(fā)送給了本地內(nèi)置的tcp服務(wù)。

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

同時(shí),內(nèi)置服務(wù)將收到的數(shù)據(jù)給了rabbitMQ服務(wù)列隊(duì)中。

生產(chǎn)者完成。

4、消費(fèi)者

同生產(chǎn)者一樣新創(chuàng)建一個(gè)thinkphp6及安裝workerman擴(kuò)展,注意端口別和生產(chǎn)者沖突!這里我設(shè)置的是2346端口

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

創(chuàng)建的Receive類代碼如下:

<?php

namespace app\workerman;
use Bunny\Channel;
use Bunny\Message;
use Workerman\RabbitMQ\Client;
use think\worker\Server;
class Receive extends Server
{
    protected $socket = 'tcp://127.0.0.1:2346';

    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
{

    }

    /**
     * 當(dāng)連接建立時(shí)觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onConnect($connection)
{

    }

    /**
     * 當(dāng)連接斷開時(shí)觸發(fā)的回調(diào)函數(shù)
     * @param $connection
     */
    public function onClose($connection)
{

    }
    /**
     * 當(dāng)客戶端的連接上發(fā)生錯(cuò)誤時(shí)觸發(fā)
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
{
        echo "error $code $msg\n";
    }

    /**
     * 每個(gè)進(jìn)程啟動(dòng)
     * @param $worker
     */
    public function onWorkerStart($worker)
{
        //rabbitMQ配置
        $options = [
            'host'=>'127.0.0.1',//rabbitMQ IP
            'port'=>5672,//rabbitMQ 通訊端口
            'user'=>'admin',//rabbitMQ 賬號(hào)
            'password'=>'123456'//rabbitMQ 密碼
        ];
        (new Client($options))->connect()->then(function (Client $client) {
            return $client->channel();
        })->then(function (Channel $channel) {
            /**
             * 創(chuàng)建隊(duì)列(Queue)
             * name: ceshi         // 隊(duì)列名稱
             * passive: false      // 如果設(shè)置true存在則返回OK,否則就報(bào)錯(cuò)。設(shè)置false存在返回OK,不存在則自動(dòng)創(chuàng)建
             * durable: true       // 是否持久化,設(shè)置false是存放到內(nèi)存中RabbitMQ重啟后會(huì)丟失,
             *                        設(shè)置true則代表是一個(gè)持久的隊(duì)列,服務(wù)重啟之后也會(huì)存在,因?yàn)榉?wù)會(huì)把持久化的Queue存放在硬盤上,當(dāng)服務(wù)重啟的時(shí)候,會(huì)重新加載之前被持久化的Queue
             * exclusive: false    // 是否排他,指定該選項(xiàng)為true則隊(duì)列只對(duì)當(dāng)前連接有效,連接斷開后自動(dòng)刪除
             *  auto_delete: false // 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開連接之后隊(duì)列是否自動(dòng)被刪除
             */
            return $channel->queueDeclare('ceshi', false, true, false, false)->then(function () use ($channel) {
                return $channel;
            });
        })->then(function (Channel $channel) {
            echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
            $channel->consume(
                function (Message $message, Channel $channel, Client $client) {
                    echo "接收消息內(nèi)容:", $message->content, "\n";
                },
                'ceshi',
                '',
                false,
                true
            );
        });

    }
}

都OK以后咱們可以項(xiàng)目路徑下通過(guò)命令啟動(dòng)這個(gè)消費(fèi)者:

php think worker:server

此時(shí)應(yīng)該會(huì)自動(dòng)消費(fèi)掉rabbitMQ中等待的消息!

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

到這里消費(fèi)者也就結(jié)束啦!

5、整體測(cè)試

接下來(lái)我用cmd來(lái)啟動(dòng)兩個(gè)服務(wù),然后用接口發(fā)送消息和消費(fèi)測(cè)試!

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

至于具體怎么靈活應(yīng)用自行開拓大腦哦~

比如php項(xiàng)目有些業(yè)務(wù)吃力,可以去做個(gè)java的消費(fèi)端,讓java來(lái)完成任務(wù)~

PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)

以上就是關(guān)于“PHP怎么實(shí)現(xiàn)RabbitMQ消息列隊(duì)”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI