溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

發(fā)布時(shí)間:2020-06-22 13:13:35 來(lái)源:網(wǎng)絡(luò) 閱讀:334 作者:eflypro小普 欄目:開(kāi)發(fā)技術(shù)

Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

理論
定義
消息隊(duì)列:在消息的傳輸過(guò)程中保存消息的的容器。

這是一個(gè)較為經(jīng)典的消費(fèi)-生產(chǎn)者模型,說(shuō)起來(lái)比較抽象,打個(gè)比方:A線程需要給B線程發(fā)送消息(A、B線程不一定是在同一臺(tái)機(jī)器上的),A線程先把消息發(fā)送到消息隊(duì)列服務(wù)器上,然后B線程去讀取或是訂閱消息服務(wù)器上消息隊(duì)列中的消息,線程A和B之間并沒(méi)有進(jìn)行直接通信。MQ服務(wù)器在中間起到中繼的作用。

適用的應(yīng)用場(chǎng)景
比較適合異步傳輸,這里解釋一下什么是異步和同步。

異步:發(fā)送方不關(guān)心消息有沒(méi)有發(fā)送成功,只發(fā)送消息,不去獲取消息是否發(fā)送成功。

同步:發(fā)送方關(guān)心消息是否發(fā)送成功,發(fā)送消息后,會(huì)等待接收方返回狀態(tài)碼,根據(jù)狀態(tài)碼來(lái)判斷是否發(fā)送成功,然后執(zhí)行相對(duì)于的動(dòng)作。

下邊以Http中的同步和異步為例:

如:普通的B/S架構(gòu)客戶端和服務(wù)器端之間的通信就是同步的,即提交請(qǐng)求 ---> 等待服務(wù)器處理完畢返回消息 ---> 拿到服務(wù)器返回的消息,處理完畢。

如:Ajax技術(shù)就是異步的,請(qǐng)求通過(guò)事件觸發(fā) ---> 服務(wù)器處理(瀏覽器不用等待,仍可以做其他的事情) ---> 處理完畢。

有人可能會(huì)好奇說(shuō)應(yīng)用場(chǎng)景怎么說(shuō)到了同步和異步,那說(shuō)明你還不是很理解技術(shù)和應(yīng)用場(chǎng)景之間的緊密聯(lián)系。


工作過(guò)程

生產(chǎn)者客戶端:
客戶端連接到RabbitMQ服務(wù)器上,打開(kāi)一個(gè)消息通道(channel);
客戶端聲明一個(gè)消息交換機(jī)(exchange),并設(shè)置相關(guān)屬性。
客戶端聲明一個(gè)消息隊(duì)列(queue),并設(shè)置相關(guān)屬性。
客戶端使用routing key在消息交換機(jī)(exchange)和消息隊(duì)列(queue)中建立好綁定關(guān)系。
客戶端投遞消息都消息交換機(jī)(exchange)上
客戶端關(guān)閉消息通道(channel)以及和服務(wù)器的連接。
服務(wù)器端:
exchange接收到消息后,根據(jù)消息的key(這個(gè)key的產(chǎn)生規(guī)則暫時(shí)沒(méi)研究,有知道的小伙伴可以留言告訴我)和以及設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)消息隊(duì)列中。

安裝
由于rabbitMq需要erlang語(yǔ)言的支持,在安裝rabbitMq之前需要安裝erlang,執(zhí)行命令:

sudo apt-get install erlang
Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

這樣就安裝完了。
接下來(lái),安裝rabbitMq:
sudo apt-get install rabbitmq-server

安裝完之后啟動(dòng)rabbitMQ

Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

創(chuàng)建用戶
sudo rabbitmqctl add_user lsl 123456
Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

將用戶設(shè)置為管理員(只有管理員才能遠(yuǎn)程登錄)

sudo rabbitmqctl set_user_tags 用戶名 administrator

同時(shí)為用戶設(shè)置讀寫(xiě)等權(quán)限
sudo rabbitmqctl set_permissions -p / lsl ".*" ".*" ".*"

跟著訪問(wèn) http://192.168.136.130:15672/
就能看到配置頁(yè)面了,這也說(shuō)明已經(jīng)成功安裝rabbitmq
Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

實(shí)戰(zhàn)消費(fèi)者和生產(chǎn)者(PHP版本
生產(chǎn)者:生產(chǎn)消息,發(fā)送消息。類似工廠。
消費(fèi)者:接受消息,使用消息。類似顧客。
隊(duì)列:存儲(chǔ)消息。類似倉(cāng)庫(kù)、中轉(zhuǎn)站。隊(duì)列可以存儲(chǔ)很多的消息,因?yàn)樗旧鲜且粋€(gè)無(wú)限制的緩沖區(qū),前提是你的機(jī)器有足夠的存儲(chǔ)空間。多個(gè)生產(chǎn)者可以將消息發(fā)送到同一個(gè)隊(duì)列中,多個(gè)消費(fèi)者也可以只從同一個(gè)隊(duì)列接收數(shù)據(jù)。這就是隊(duì)列的特性。

下面寫(xiě)一個(gè)demo來(lái)實(shí)現(xiàn)rabbitmq的消費(fèi)者和生產(chǎn)者
send.php

<?php
//配置信息 
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'lsl',
    'password' => '123456',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機(jī)名 
//$q_name = 'q_linvo'; //無(wú)需隊(duì)列名 
$k_route = 'key_1'; //路由key 

//創(chuàng)建連接和channel 
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//創(chuàng)建交換機(jī)對(duì)象    
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
date_default_timezone_set("Asia/Shanghai");
//發(fā)送消息 
//$channel->startTransaction(); //開(kāi)始事務(wù)  
for($i=0; $i<5; ++$i){
    sleep(2);//每個(gè)兩秒發(fā)送一條消息
    //消息內(nèi)容 
    $message = "HelloWorld!".date("h:i:sa");
    echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事務(wù) 

$conn->disconnect();
?>

rec.php

<?php
//配置信息 
$conn_args = array(
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'lsl',
    'password' => '123456',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交換機(jī)名 
$q_name = 'q_linvo'; //隊(duì)列名 
$k_route = 'key_1'; //路由key 

//創(chuàng)建連接和channel 
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//創(chuàng)建交換機(jī)    
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型  
$ex->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$ex->declare()."\n";

//創(chuàng)建隊(duì)列    
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化  
echo "Message Total:".$q->declare()."\n";

//綁定交換機(jī)與隊(duì)列,并指定路由鍵 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";

//阻塞模式接收消息 
echo "Message:\n";
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自動(dòng)ACK應(yīng)答  
}
$conn->disconnect();

先讓消費(fèi)者接收消息
Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

再調(diào)用生產(chǎn)者發(fā)送消息
Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

再查看消費(fèi)者收到的消息
Rabbitmq學(xué)習(xí)(一) Rabbitmq初探

假設(shè)消費(fèi)者掛掉了,看看消息是怎么樣的
Rabbitmq學(xué)習(xí)(一) Rabbitmq初探
看一下之前部署的,你會(huì)發(fā)現(xiàn)linvo這個(gè)隊(duì)列里有5條消息,這意味著沒(méi)有消費(fèi)者去讀取它,把消息堆積在隊(duì)列里了

當(dāng)你打開(kāi)消費(fèi)者,頁(yè)面的Total值就馬上變?yōu)?了,這意味著消息已經(jīng)被接收。
這樣就模擬了隊(duì)列對(duì)消息的處理。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI