您好,登錄后才能下訂單哦!
這篇文章主要介紹了docker啟動(dòng)rabbitmq及使用的方法的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇docker啟動(dòng)rabbitmq及使用的方法文章都會(huì)有所收獲,下面我們一起來看看吧。
docker search rabbitmq:management
docker pull rabbitmq:management
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management
docker logs rabbitmq
http://localhost:15672
賬戶密碼默認(rèn):guest
package com.xun.rabbitmqdemo.example; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * 生成一個(gè)queue隊(duì)列 * 1、隊(duì)列名稱 QUEUE_NAME * 2、隊(duì)列里面的消息是否持久化(默認(rèn)消息存儲(chǔ)在內(nèi)存中) * 3、該隊(duì)列是否只供一個(gè)Consumer消費(fèi) 是否共享 設(shè)置為true可以多個(gè)消費(fèi)者消費(fèi) * 4、是否自動(dòng)刪除 最后一個(gè)消費(fèi)者斷開連接后 該隊(duì)列是否自動(dòng)刪除 * 5、其他參數(shù) */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message = "Hello world!"; /** * 發(fā)送一個(gè)消息 * 1、發(fā)送到哪個(gè)exchange交換機(jī) * 2、路由的key * 3、其他的參數(shù)信息 * 4、消息體 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println(" [x] Sent '"+message+"'"); channel.close(); connection.close(); } }
運(yùn)行該方法,可以看到控制臺(tái)的打印
name=hello的隊(duì)列收到Message
package com.xun.rabbitmqdemo.example; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setConnectionTimeout(600000);//milliseconds factory.setRequestedHeartbeat(60);//seconds factory.setHandshakeTimeout(6000);//milliseconds factory.setRequestedChannelMax(5); factory.setNetworkRecoveryInterval(500); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); System.out.println("Waiting for messages. "); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
package com.xun.rabbitmqdemo.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMqUtils { public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; public class Work01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println("接收消息:"+receivedMessage); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi)...."); /** * 消費(fèi)者消費(fèi)消息 * 1、消費(fèi)哪個(gè)隊(duì)列 * 2、消費(fèi)成功后是否自動(dòng)應(yīng)答 * 3、消費(fèi)的接口回調(diào) * 4、消費(fèi)未成功的接口回調(diào) */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; public class Work02 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println("接收消息:"+receivedMessage); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; System.out.println("C2 消費(fèi)者啟動(dòng)等待消費(fèi)...."); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
啟動(dòng)工作線程
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.Channel; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import java.util.Scanner; public class Task01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ try(Channel channel= RabbitMqUtils.getChannel();){ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //從控制臺(tái)接收消息 Scanner scanner = new Scanner(System.in); while(scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("發(fā)送消息完成:"+message); } } } }
啟動(dòng)發(fā)送線程,此時(shí)發(fā)送線程等待鍵盤輸入
發(fā)送4個(gè)消息
可以看到2個(gè)工作線程按照順序分別接收message。
rabbitmq將message發(fā)送給消費(fèi)者后,就會(huì)將該消息標(biāo)記為刪除。
但消費(fèi)者在處理message過程中宕機(jī),會(huì)導(dǎo)致消息的丟失。
因此需要設(shè)置手動(dòng)應(yīng)答。
import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import java.util.Scanner; public class Task02 { private static final String TASK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ try(Channel channel = RabbitMqUtils.getChannel()){ channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null); Scanner scanner = new Scanner(System.in); System.out.println("請(qǐng)輸入信息"); while(scanner.hasNext()){ String message = scanner.nextLine(); channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes()); System.out.println("生產(chǎn)者task02發(fā)出消息"+ message); } } } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import com.xun.rabbitmqdemo.utils.SleepUtils; public class Work03 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); System.out.println("Work03 等待接收消息處理時(shí)間較短"); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println("接收到消息:"+message); /** * 1、消息的標(biāo)記tag * 2、是否批量應(yīng)答 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; //采用手動(dòng)應(yīng)答 boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
package com.xun.rabbitmqdemo.workQueue; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.xun.rabbitmqdemo.utils.RabbitMqUtils; import com.xun.rabbitmqdemo.utils.SleepUtils; public class Work04 { private static final String ACK_QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); System.out.println("Work04 等待接收消息處理時(shí)間較長(zhǎng)"); DeliverCallback deliverCallback = (consumerTag,delivery)->{ String message = new String(delivery.getBody()); SleepUtils.sleep(30); System.out.println("接收到消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯"); }; //采用手動(dòng)應(yīng)答 boolean autoAck = false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
工具類SleepUtils
package com.xun.rabbitmqdemo.utils; public class SleepUtils { public static void sleep(int second){ try{ Thread.sleep(1000*second); }catch (InterruptedException _ignored){ Thread.currentThread().interrupt(); } } }
work04等待30s后發(fā)出ack
在work04處理message時(shí)手動(dòng)停止線程,可以看到message:dd被rabbitmq交給了work03
上面的輪詢分發(fā),生產(chǎn)者依次向消費(fèi)者按順序發(fā)送消息,但當(dāng)消費(fèi)者A處理速度很快,而消費(fèi)者B處理速度很慢時(shí),這種分發(fā)策略顯然是不合理的。
不公平分發(fā):
int prefetchCount = 1; channel.basicQos(prefetchCount);
通過此配置,當(dāng)消費(fèi)者未處理完當(dāng)前消息,rabbitmq會(huì)優(yōu)先將該message分發(fā)給空閑消費(fèi)者。
關(guān)于“docker啟動(dòng)rabbitmq及使用的方法”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“docker啟動(dòng)rabbitmq及使用的方法”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。