溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

laravel源碼分析隊(duì)列Queue方法怎么用

發(fā)布時(shí)間:2022-04-06 10:21:41 來源:億速云 閱讀:113 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“l(fā)aravel源碼分析隊(duì)列Queue方法怎么用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

隊(duì)列任務(wù)的創(chuàng)建

先通過命令創(chuàng)建一個(gè) Job 類,成功之后會(huì)創(chuàng)建如下文件 laravel-src/laravel/app/Jobs/DemoJob.php。

> php artisan make:job DemoJob
 
> Job created successfully.

下面我們來分析一下 Job 類的具體生成過程。

執(zhí)行 php artisan make:job DemoJob 后,會(huì)觸發(fā)調(diào)用如下方法。

laravel-src/laravel/vendor/laravel/framework/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php

/**
 * Register the command.
 * [A] make:job 時(shí)觸發(fā)的方法
 * @return void
 */
protected function registerJobMakeCommand()
{
    $this->app->singleton('command.job.make', function ($app) {
        return new JobMakeCommand($app['files']);
    });
}

接著我們來看下 JobMakeCommand 這個(gè)類,這個(gè)類里面沒有過多的處理邏輯,處理方法在其父類中。

 class JobMakeCommand extends GeneratorCommand

我們直接看父類中的處理方法,GeneratorCommand->handle(),以下是該方法中的主要方法。

public function handle()
{
    // 獲取類名
    $name = $this->qualifyClass($this->getNameInput());
    // 獲取文件路徑
    $path = $this->getPath($name);
    // 創(chuàng)建目錄和文件
    $this->makeDirectory($path);
    // buildClass() 通過模板獲取新類文件的內(nèi)容
    $this->files->put($path, $this->buildClass($name));
    // $this->type 在子類中定義好了,例如 JobMakeCommand 中 type = 'Job'
    $this->info($this->type.' created successfully.');
}

方法就是通過目錄和文件,創(chuàng)建對(duì)應(yīng)的類文件,至于新文件的內(nèi)容,都是基于已經(jīng)設(shè)置好的模板來創(chuàng)建的,具體的內(nèi)容在 buildClass($name) 方法中。

 protected function buildClass($name)
{
    // 得到類文件模板,getStub() 在子類中有實(shí)現(xiàn),具體看 JobMakeCommand 
    $stub = $this->files->get($this->getStub());
    // 用實(shí)際的name來替換模板中的內(nèi)容,都是關(guān)鍵詞替換
    return $this->replaceNamespace($stub, $name)->replaceClass($stub, $name);
}

獲取模板文件

protected function getStub()
{
    return $this->option('sync')
                    ? __DIR__.'/stubs/job.stub'
                    : __DIR__.'/stubs/job-queued.stub';
}

job.stub

 <?php
/**
* job 類的生成模板
*/
namespace DummyNamespace;
 
use Illuminate\Bus\Queueable;
use Illuminate\Foundation\Bus\Dispatchable;
 
class DummyClass
{
    use Dispatchable, Queueable;
 
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }
 
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

job-queued.stub

<?php
/**
* job 類的生成模板
*/
namespace DummyNamespace;
 
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
 
class DummyClass implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }
 
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

下面看一下前面我們創(chuàng)建的一個(gè)Job類,DemoJob.php,就是來源于模板 job-queued.stub。

<?php
/**
* job 類的生成模板
*/
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class DemoJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
 
    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }
 
    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        //
    }
}

至此,我們已經(jīng)大致明白了隊(duì)列任務(wù)類是如何創(chuàng)建的了。下面我們來分析下其是如何生效運(yùn)行的。

隊(duì)列任務(wù)的分發(fā)

任務(wù)類創(chuàng)建后,我們就可以在需要的地方進(jìn)行任務(wù)的分發(fā),常見的方法如下:

DemoJob::dispatch(); // 任務(wù)分發(fā)
DemoJob::dispatchNow(); // 同步調(diào)度,隊(duì)列任務(wù)不會(huì)排隊(duì),并立即在當(dāng)前進(jìn)程中進(jìn)行

下面先以 dispatch() 為例分析下分發(fā)過程。

trait Dispatchable
{
    public static function dispatch()
    {
        return new PendingDispatch(new static(...func_get_args()));
    }
}
 class PendingDispatch
{
    protected $job;
 
    public function __construct($job)
    {   echo '[Max] ' . 'PendingDispatch ' . '__construct' . PHP_EOL;
        $this->job = $job;
    }
    public function __destruct()
    {   echo '[Max] ' . 'PendingDispatch ' . '__destruct' . PHP_EOL;
        app(Dispatcher::class)->dispatch($this->job);
    }
}

重點(diǎn)是 app(Dispatcher::class)->dispatch($this->job) 這部分。

我們先來分析下前部分 app(Dispatcher::class),它是在 laravel 框架中自帶的 BusServiceProvider 中向 $app 中注入的。

class BusServiceProvider extends ServiceProvider implements DeferrableProvider
{
    public function register()
    {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });
    }
}

看一下 Dispatcher 的構(gòu)造方法,至此,我們已經(jīng)知道前半部分 app(Dispatcher::class) 是如何來的了。

class Dispatcher implements QueueingDispatcher
{
    protected $container;
    protected $pipeline;
    protected $queueResolver;
 
    public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        /**
         * Illuminate/Bus/BusServiceProvider.php->register()中
         * $queueResolver 傳入的是一個(gè)閉包
         * function ($connection = null) use ($app) {
         *   return $app[QueueFactoryContract::class]->connection($connection);
         * }
         */
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }
    public function dispatch($command)
    {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        		// 將 $command 存入隊(duì)列
            return $this->dispatchToQueue($command);
        }
        return $this->dispatchNow($command);
    }
}

BusServiceProvider 中注冊(cè)了 Dispatcher::class ,然后 app(Dispatcher::class)->dispatch($this->job) 調(diào)用的即是 Dispatcher->dispatch()。

 public function dispatchToQueue($command)
{
    // 獲取任務(wù)所屬的 connection
    $connection = $command->connection ?? null;
    /*
     * 獲取隊(duì)列實(shí)例,根據(jù)config/queue.php中的配置
     * 此處我們配置 QUEUE_CONNECTION=redis 為例,則獲取的是RedisQueue
     * 至于如何通過 QUEUE_CONNECTION 的配置獲取 queue ,此處先跳過,本文后面會(huì)具體分析。
     */
    $queue = call_user_func($this->queueResolver, $connection);
    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }
    // 我們創(chuàng)建的DemoJob無queue方法,則不會(huì)調(diào)用
    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    }
    // 將 job 放入隊(duì)列
    return $this->pushCommandToQueue($queue, $command);
}
protected function pushCommandToQueue($queue, $command)
{
    // 在指定了 queue 或者 delay 時(shí)會(huì)調(diào)用不同的方法,基本大同小異
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }
    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }
    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }
    // 此處我們先看最簡單的無參數(shù)時(shí)的情況,調(diào)用push()
    return $queue->push($command);
}

筆者的配置是 QUEUE_CONNECTION=redis ,估以此來分析,其他類型的原理基本類似。

配置的是 redis 時(shí), $queue 是 RedisQueue 實(shí)例,下面我們看下 RedisQueue->push() 的內(nèi)容。

Illuminate/Queue/RedisQueue.php

public function push($job, $data = '', $queue = null)
{
    /**
     * 獲取隊(duì)列名稱
     * var_dump($this->getQueue($queue));
     * 創(chuàng)建統(tǒng)一的 payload,轉(zhuǎn)成 json
     * var_dump($this->createPayload($job, $this->getQueue($queue), $data));
     */
    // 將任務(wù)和數(shù)據(jù)存入隊(duì)列
    return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue);
}
public function pushRaw($payload, $queue = null, array $options = [])
{
    // 寫入redis中
    $this->getConnection()->eval(
        LuaScripts::push(), 2, $this->getQueue($queue),
        $this->getQueue($queue).':notify', $payload
    );
    // 返回id
    return json_decode($payload, true)['id'] ?? null;
}

“l(fā)aravel源碼分析隊(duì)列Queue方法怎么用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI