溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

PHP使用Beanstalkd的案例

發(fā)布時間:2020-10-19 16:33:36 來源:億速云 閱讀:172 作者:小新 欄目:編程語言

這篇文章將為大家詳細講解有關(guān)PHP使用Beanstalkd的案例,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

1.使用Composer安裝Pheanstalk

composer require pda/pheanstalk

2.實現(xiàn)代碼

php查看beanstalkd狀態(tài)腳本Status.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/21
 * Time: 10:32
 */
require "../vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('192.168.75.135',11300);
print_r($pheanstalk->stats());

生產(chǎn)者代碼Producter.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/20
 * Time: 16:30
 */
require "../vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('192.168.75.135',11300);
for ($i=0;$i<50;$i++){
    $data = array(
        'key' => 'testkey'.$i,
        'value' => 'testvalue',
        'time' => time(),
    );
    $ret = $pheanstalk->putInTube('test-tube', json_encode($data), Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR);
    var_dump($ret);
}

消費者代碼Consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/20
 * Time: 16:31
 */
set_time_limit(0);
ini_set('default_socket_timeout', 900);
require "../vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('192.168.75.135',11300);
while (true){
    $job = $pheanstalk
        ->watch('test-tube')
        ->ignore('default')
        ->reserve();
    if ($job){
        sleep(2);
        echo $job->getData();
        echo "\n";
        $pheanstalk->delete($job);
    }
}

打開命令行/終端窗口,執(zhí)行生產(chǎn)者,會向tube寫入50條任務

PS E:\repository\work\beanstalk> php .\Producter.php
int(101)
int(102)
int(103)
int(104)
int(105)
int(106)
int(107)
int(108)
int(109)
int(110)
int(111)
int(112)
int(113)
int(114)
......

由此可見,$pheanstalk->putInTube成功后返回的是job的id

查看狀態(tài)

PS E:\repository\work\beanstalk> php Status.php
Pheanstalk\Response\ArrayResponse Object
(
    [_name:Pheanstalk\Response\ArrayResponse:private] => OK
    [storage:ArrayObject:private] => Array
        (
            [current-jobs-urgent] => 0
            [current-jobs-ready] => 50
            [current-jobs-reserved] => 0
            [current-jobs-delayed] => 0
            [current-jobs-buried] => 0
            ......

結(jié)果中顯示處于ready待讀取狀態(tài)的job是50個

打開兩個或以上命令行/終端窗口,執(zhí)行消費者,模擬多消費者競爭

消費者1

PS E:\repository\work\beanstalk> php .\Consumer.php
{"key":"testkey0","value":"testvalue","time":1548039103}
{"key":"testkey1","value":"testvalue","time":1548039103}
{"key":"testkey2","value":"testvalue","time":1548039103}
{"key":"testkey4","value":"testvalue","time":1548039103}
{"key":"testkey6","value":"testvalue","time":1548039103}
{"key":"testkey8","value":"testvalue","time":1548039103}
{"key":"testkey10","value":"testvalue","time":1548039103}
{"key":"testkey12","value":"testvalue","time":1548039103}
{"key":"testkey14","value":"testvalue","time":1548039103}
{"key":"testkey16","value":"testvalue","time":1548039103}
{"key":"testkey18","value":"testvalue","time":1548039103}
{"key":"testkey20","value":"testvalue","time":1548039103}
{"key":"testkey22","value":"testvalue","time":1548039103}
{"key":"testkey24","value":"testvalue","time":1548039103}
{"key":"testkey26","value":"testvalue","time":1548039103}
{"key":"testkey28","value":"testvalue","time":1548039103}
{"key":"testkey30","value":"testvalue","time":1548039103}
{"key":"testkey32","value":"testvalue","time":1548039103}
{"key":"testkey34","value":"testvalue","time":1548039103}
{"key":"testkey36","value":"testvalue","time":1548039103}
{"key":"testkey38","value":"testvalue","time":1548039103}
{"key":"testkey40","value":"testvalue","time":1548039103}
{"key":"testkey42","value":"testvalue","time":1548039103}
{"key":"testkey44","value":"testvalue","time":1548039103}
{"key":"testkey46","value":"testvalue","time":1548039103}
{"key":"testkey48","value":"testvalue","time":1548039103}

消費者2

PS E:\repository\work\beanstalk> php .\Consumer.php
{"key":"testkey3","value":"testvalue","time":1548039103}
{"key":"testkey5","value":"testvalue","time":1548039103}
{"key":"testkey7","value":"testvalue","time":1548039103}
{"key":"testkey9","value":"testvalue","time":1548039103}
{"key":"testkey11","value":"testvalue","time":1548039103}
{"key":"testkey13","value":"testvalue","time":1548039103}
{"key":"testkey15","value":"testvalue","time":1548039103}
{"key":"testkey17","value":"testvalue","time":1548039103}
{"key":"testkey19","value":"testvalue","time":1548039103}
{"key":"testkey21","value":"testvalue","time":1548039103}
{"key":"testkey23","value":"testvalue","time":1548039103}
{"key":"testkey25","value":"testvalue","time":1548039103}
{"key":"testkey27","value":"testvalue","time":1548039103}
{"key":"testkey29","value":"testvalue","time":1548039103}
{"key":"testkey31","value":"testvalue","time":1548039103}
{"key":"testkey33","value":"testvalue","time":1548039103}
{"key":"testkey35","value":"testvalue","time":1548039103}
{"key":"testkey37","value":"testvalue","time":1548039103}
{"key":"testkey39","value":"testvalue","time":1548039103}
{"key":"testkey41","value":"testvalue","time":1548039103}
{"key":"testkey43","value":"testvalue","time":1548039103}
{"key":"testkey45","value":"testvalue","time":1548039103}
{"key":"testkey47","value":"testvalue","time":1548039103}
{"key":"testkey49","value":"testvalue","time":1548039103}

兩個消費者競爭著完成了全部任務,由于我的beanstalkd啟動時開啟了binlog持久,所以beanstalkd重啟后任務也不會丟失

3.需要注意的事項

1.創(chuàng)建job時,設置的超時時間Pheanstalk::DEFAULT_TTR一定要比消費者處理一個job的時間要長,否則job在超時之后會被tube更改為ready狀態(tài),被其他消費者獲取,而此時當前消費者還在處理該job,這就出現(xiàn)了一個job被多個消費者重復執(zhí)行的可怕現(xiàn)象

2.Pheanstalk的維護者發(fā)生了變化,在新版的Pheanstalk中是不支持長連接的,當客戶端socket連接服務器時間超過php.ini中設置的default_socket_timeout時,如果未能從服務端tube獲得job,連接將會被斷開,所以消費者進程需要維護,以便在退出后可以重新開啟進程,推薦使用supervisord維護消費者進程。

判斷socket超時的代碼

public function getLine($length = null)
    {
        $timeout = ini_get('default_socket_timeout');
        $timer   = microtime(true);
        do {
            $data = isset($length) ?
                $this->_wrapper()->fgets($this->_socket, $length) :
                $this->_wrapper()->fgets($this->_socket);
            if ($this->_wrapper()->feof($this->_socket)) {
                throw new Exception\SocketException('Socket closed by server!');
            }
            if (($data === false) && microtime(true) - $timer > $timeout) {
                $this->disconnect();
                throw new Exception\SocketException('Socket timed out!');
            }
        } while ($data === false);
        return rtrim($data);
    }

關(guān)于PHP使用Beanstalkd的案例就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI