您好,登錄后才能下訂單哦!
這篇文章主要講解了“Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者”吧!
拉取鏡像
docker pull rabbitmq:3.8.8-management
查看鏡像
docker images rabbitmq
啟動(dòng)鏡像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management
Linux虛擬機(jī)記得開(kāi)放5672端口或者關(guān)閉防火墻,在window通過(guò) 主機(jī)ip:15672 訪問(wèn)rabbitmq控制臺(tái)
用戶名密碼默認(rèn)為guest
<!--指定 jdk 編譯版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依賴客戶端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!--操作文件流的一個(gè)依賴--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
工作原理
Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker
Connection:publisher/consumer 和 broker 之間的 TCP 連接
Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection 的開(kāi)銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè) thread 創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開(kāi)銷
Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最終被送到這里等待 consumer 取走
我們需要先獲取連接(Connection),然后通過(guò)連接獲取信道(Channel),這里我們演示簡(jiǎn)單例子,可以直接跳過(guò)交換機(jī)(Exchange)發(fā)送隊(duì)列(Queue)
public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //創(chuàng)建一個(gè)連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置主機(jī)ip factory.setHost("182.92.234.71"); // 設(shè)置用戶名 factory.setUsername("guest"); // 設(shè)置密碼 factory.setPassword("guest"); //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉 Connection connection = factory.newConnection(); // 獲取信道 Channel channel = connection.createChannel(); /* * 生成一個(gè)隊(duì)列 * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * 1.隊(duì)列名稱 * 2.隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中 * 3.該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi) * 4.是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開(kāi)連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除 * 5.其他參數(shù) **/ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello rabbitmq"; /* * 發(fā)送一個(gè)消息 * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 1.發(fā)送到哪個(gè)交換機(jī) * 2.路由的key是哪個(gè) * 3.其他的參數(shù)信息 * 4.發(fā)送消息的消息體 * **/ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("發(fā)送成功"); } }
public class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //創(chuàng)建一個(gè)連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置主機(jī)ip factory.setHost("182.92.234.71"); // 設(shè)置用戶名 factory.setUsername("guest"); // 設(shè)置密碼 factory.setPassword("guest"); //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉 Connection connection = factory.newConnection(); // 獲取信道 Channel channel = connection.createChannel(); // 推送的消息如何進(jìn)行消費(fèi)的回調(diào)接口 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; // 取消消費(fèi)的一個(gè)回調(diào)接口,如在消費(fèi)的時(shí)候隊(duì)列被刪除了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消費(fèi)被中斷"); }; /* * 消費(fèi)者消費(fèi)消息 * basicConsume(String queue, boolean autoAck, * DeliverCallback deliverCallback, CancelCallback cancelCallback) * 1.消費(fèi)哪個(gè)隊(duì)列 * 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答 * 3.消費(fèi)者未成功消費(fèi)的回調(diào) **/ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
可以發(fā)現(xiàn),上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個(gè)工具類來(lái)調(diào)用,并使用單例模式-餓漢式完成信道的初始化
public class RabbitMqUtils { private static Channel channel; static { ConnectionFactory factory = new ConnectionFactory(); // 設(shè)置ip地址 factory.setHost("192.168.23.100"); // 設(shè)置用戶名 factory.setUsername("guest"); // 設(shè)置密碼 factory.setPassword("guest"); try { // 創(chuàng)建連接 Connection connection = factory.newConnection(); // 獲取信道 channel = connection.createChannel(); } catch (Exception e) { System.out.println("創(chuàng)建信道失敗,錯(cuò)誤信息:" + e.getMessage()); } } public static Channel getChannel() { return channel; } }
相當(dāng)于前面的消費(fèi)者,我們只需要寫(xiě)一個(gè)類,通過(guò)ideal實(shí)現(xiàn)多線程啟動(dòng)即可模擬兩個(gè)線程
public class Worker01 { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { System.out.println("接受到消息:" + new String(message.getBody())); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; // 啟動(dòng)兩次,第一次為C1, 第二次為C2 System.out.println("C2消費(fèi)者等待消費(fèi)消息"); channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback); } }
public class Test01 { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 通過(guò)控制臺(tái)輸入充當(dāng)消息,使輪訓(xùn)演示更明顯 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME,null, message.getBytes() ); System.out.println("消息發(fā)送完成:" + message); } } }
結(jié)果
消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成 了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消 息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù) 發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。 為了保證消息在發(fā)送過(guò)程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi)者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。
自動(dòng)應(yīng)答:消費(fèi)者發(fā)送后立即被認(rèn)為已經(jīng)傳送成功。這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)衡,因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。
當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息, 沒(méi)有對(duì)傳遞的消息數(shù)量進(jìn)行限制 , 當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終 使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并 以某種速率能夠處理這些消息的情況下使用 。
手動(dòng)應(yīng)答:消費(fèi)者接受到消息并順利完成業(yè)務(wù)后再調(diào)用方法進(jìn)行確認(rèn),rabbitmq 才可以把該消息刪除
Channel.basicAck(用于肯定確認(rèn))
RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
Channel.basicNack(用于否定確認(rèn))
Channel.basicReject(用于否定確認(rèn))
與 Channel.basicNack 相比少一個(gè)參數(shù)Multiple
multiple 的 true 和 false 代表不同意思
true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)
5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答
false 同上面相比
只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答
不處理該消息了直接拒絕,可以將其丟棄了
如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確 保不會(huì)丟失任何消息。
public class Test01 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws IOException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME,null, message.getBytes() ); System.out.println("消息發(fā)送完成:" + message); } } }
public class SleepUtils { public static void sleep(int second) { try { Thread.sleep(1000 * second); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
public class Worker01 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws Exception { System.out.println("C1,業(yè)務(wù)時(shí)間短"); Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { SleepUtils.sleep(1); // 模擬業(yè)務(wù)執(zhí)行1秒 System.out.println("接受到消息:" + new String(message.getBody())); /* * 1、消息標(biāo)識(shí) * 2、是否啟動(dòng)批量確認(rèn),false:否。 * 啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息 * 時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback); } } ============================================================================== public class Worker02 { private final static String QUEUE_NAME = "ack"; public static void main(String[] args) throws Exception { System.out.println("C2,業(yè)務(wù)時(shí)間長(zhǎng)"); Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = ( consumerTag, message) -> { SleepUtils.sleep(15); // 模擬業(yè)務(wù)執(zhí)行15秒 System.out.println("接受到消息:" + new String(message.getBody())); /* * 1、消息標(biāo)識(shí) * 2、是否啟動(dòng)批量確認(rèn),false:否。 * 啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息 * 時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (cunsumerTag) -> { System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback); } }
worker01業(yè)務(wù)時(shí)間短,worker02業(yè)務(wù)時(shí)間長(zhǎng),我們提前終止worker02模擬出異常,可以看到消息dd會(huì)被放回隊(duì)列由worker01接收處理。
注意:這里需要先啟動(dòng)生產(chǎn)者聲明隊(duì)列ack,不然啟動(dòng)消費(fèi)者會(huì)報(bào)錯(cuò)
最后一個(gè)案例我們可以看到消息輪訓(xùn)+消息自動(dòng)重新入隊(duì)+手動(dòng)應(yīng)答。
感謝各位的閱讀,以上就是“Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Docker怎么啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。