溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

rabbitmq的基本概念以及amqp-client的使用

發(fā)布時間:2021-07-02 17:04:12 來源:億速云 閱讀:383 作者:chen 欄目:大數據

本篇內容介紹了“rabbitmq的基本概念以及amqp-client的使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

package com.rabbitmq.demo;

import com.rabbitmq.client.*;

public class Produce {

    public static void main(String args[]) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setPassword("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("192.168.1.141");
        connectionFactory.setPort(5672);
        Connection connection=connectionFactory.newConnection();
        Channel channel= connection.createChannel();
        String exchangeName="test";
        String queueName="test";
        String routingkey="routingkey";
        String bindingkey=routingkey;
        //設置交換機,直連,持久化,非自動刪除,參數
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,null);
        //channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,null);
        /**
         * BuiltinExchangeType.FANOUT  不需要綁定 bindingkey與routingkey exchange直接將消息投遞到queue
         * BuiltinExchangeType.DIRECT  需要綁定  且bindingkey與routingkey一致時 exchange投遞消息到queue
         * BuiltinExchangeType.TOPIC   需要綁定  模糊匹配,規(guī)則如下
             * RoutingKey 為一個點號"."分隔的字符串(被點號"。"分隔開的每段獨立的字符,如com.rabbit.demo,com.rabbit.test)
             * BindingKey RoutingKey 樣也是點號"."分隔的字符串;
             * BindingKey 中可以存在兩種特殊字符串"*"和"#",用于做模糊匹配,其中"#"用于匹配多個(零個到多個),* 匹配任意一個單詞。
         *    如果binding_key 是 “#” - 它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。
         *    如果 “*” and “#” 沒有被使用,那么topic exchange就變成了direct exchange。
         * BuiltinExchangeType.HEADERS 不需要綁定
         *   消費者arguments指定“x-match”,這個鍵的Value可以是any或者all,這代表消息攜帶的Hash是需要全部匹配(all),還是僅匹配一個鍵(any)就可以了
         */
        /**
         * 持久化:數據會保存到磁盤,重啟rabbitmq數據依舊存在
         *
         * 排他:該隊列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。
         * 需要注意三點:排他隊列是基于連接( Connection) 可見的,同一個連接的不同信道 (Channel)
         * 是可以同時訪問同一連接創(chuàng)建的排他隊列; "首次"是指如果一個連接己經聲明了
         * 排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:即使該隊
         * 列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列
         * 適用于一個客戶端同時發(fā)送和讀取消息的應用場景。
         *
         *
         * 自動刪除:設置是否自動刪除。為 true 則設置隊列為自動刪除。自動刪除的前提是:
         * 至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會
         * 自動刪除。不能把這個參數錯誤地理解為: "當連接到此隊列的所有客戶端斷開時,這
         * 個隊列自動刪除",因為生產者客戶端創(chuàng)建這個隊列,或者沒有消費者客戶端與這個隊
         * 列連接時,都不會自動刪除這個隊列。
         *
         * 根據業(yè)務數據,最好提前建好exchange,queue,及綁定關系,生產端,消費端可以避免很多錯誤,如exchang創(chuàng)建失敗,
         * 綁定關系不確定導致消息投遞失敗
         *
         * rabbitmq  clinet 屬性集
         * props 消息的基本屬性集,其包含 14 個屬性成員,分別有 contentType
         * content ncoding headers Map<String Object>) deliveryMode priority
         * correlationld replyTo expiration messageld timestamp type userld
         * appld cluster 。
         *
         *   mandatory 參數設為 true 時,交換器無法根據自身的類型和路由鍵的隊列,
         *   那么 RabbitM 會調用 Basic.Return 命令將消息返回給生產者 數設置為 false 時,出現上述情形,
         *   則消息直接被丟棄 那么生產者如何獲取到沒有被正確路由到合適隊列的消息呢?這時channel addReturnListener 來添加 ReturnListener 監(jiān)昕器實現。          *   通過rabbitmq備份隊列,當消息發(fā)送不到主隊列,將自動發(fā)送到備份隊列,設置了備份隊列,mandatory參數將無效 */


        //設置隊列,持久化,非排他,非自動刪除
        channel.queueDeclare(queueName,true,false,false,null);
        //channel.queueBind(queueName,exchangeName, bindingkey);
        for (int i = 0; i <10 ; i++) {
            channel.basicPublish(exchangeName,routingkey,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,"hi,Rabbitmq".getBytes() );
        }
        channel.close();
        connection.close();
    }
}
package com.rabbitmq.demo;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {

    public static void main(java.lang.String args[]) throws Exception{
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setPassword("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("192.168.1.141");
        connectionFactory.setPort(5672);
        Connection connection=connectionFactory.newConnection();
        Channel channel= connection.createChannel();
        /*while(true){
            //單個獲取消息,當沒有消息返回null,pull模式
            GetResponse response = channel.basicGet("test001", true);
            if(response!=null){
                System.out.println(new String(response.getBody()));
            }
        }*/
        //從queue中手動確認獲取消息,push模式
        /**
         * 隊列名,自動簽收,默認消費者
         * queue 隊列的名稱:
         * autoAck 設置是否自動確認。建議設成 fa se ,即不自動確認:
         *      utoAck 參數置為 false ,對于 RabbitMQ 服務端而 ,隊列中的消息分成了兩個部分
                 * 部分是等待投遞給消費者的消息:一部分是己經投遞給消費者,但是還沒有收到消費者確認
                 * 信號的消息。 如果 RabbitMQ 直沒有收到消費者的確認信號,并且消費此消息的消費者己經
                 * 斷開連接,則 RabbitMQ 會安排該消息重新進入隊列,等待投遞給下 個消費者,當然也有可
                 * 能還是原來的那個消費者。
                 * RabbitMQ 會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的
                 * 依據是消費該消息的消費者連接是否己經斷開,這么設計的原因是 RabbitMQ 允許消費者
                 * 消費 條消息的時間可以很久很久。
         * consumerTag: 消費者標簽,用來區(qū)分多個消費者:
         * noLocal 設置為 true 則表示不能將同一個 Connectio口中生產者發(fā)送的消息傳送給
         * 這個 Connection 中的消費者:
         * exclusive 設置是否排他
         * arguments 設置消費者的其他參數:
         * callback 設置消費者的回調函數。用來處理 Rabb itM 推送過來的消息,比如
         * DefaultConsumer 使用時需要客戶端重寫 (overr e) 其中的方法。
         */
        channel.basicConsume("test",false,new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者1:"+new java.lang.String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        //創(chuàng)建多個消費者,默認為平均分攤round-robin
        //不支持隊列層面的廣播消費
        /**
         * 上面代碼中顯式地設置 autoAck false 然后在接收到消息之后 行顯式 ack
         * (channel basicAck ), 對于消費者來說這 設置是非常 可以防止
         * 丟失。
         *
         *
         */
        channel.basicConsume("queue",false,new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消費者2:"+new java.lang.String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        //單個拒絕接受消息
        //channel.basicReject(111, false);
        /**
         * 多個拒絕消息
         */
       // channel.basicNack();
        /**
         * channel.basicReject 或者 channel.basicNack 中的 requeue 設直為 false ,可
         * 以啟用"死信隊列"的功能。死信隊列可以通過檢測被拒絕或者未送達的消息來追蹤問題
         */
        /***
         * Basic Consume 將信道 (Channel) 直為接收模式,直到取消隊列的訂閱為止。在接收
         * 模式期間, RabbitMQ 會不斷地推送消息給消費者,當然推送消息的個數還是會受到 Basic.Qos
         * 的限制.如果只想從隊列獲得單條消息而不是持續(xù)訂閱,建議還是使用 Basic.Get 進行消費.但
         * 是不能將 Basic.Get 放在一個循環(huán)里來代替 Basic.Consume ,這樣做會嚴重影響 RabbitMQ
         * 的性能.如果要實現高吞吐量,消費者理應使用 Basic.Consume 方法。
         *
         *
         */
    }
}

“rabbitmq的基本概念以及amqp-client的使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI