溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java Spring boot整合RabbitMQ如何實(shí)現(xiàn)B2B2C小程序電子商務(wù)

發(fā)布時(shí)間:2021-12-24 09:33:23 來源:億速云 閱讀:103 作者:小新 欄目:軟件技術(shù)

小編給大家分享一下Java Spring boot整合RabbitMQ如何實(shí)現(xiàn)B2B2C小程序電子商務(wù),希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

之前我們發(fā)送和接收到隊(duì)列中的消息,現(xiàn)在是時(shí)候在 RabbitMQ 中引入完整的消息傳遞模式了。

讓我們快速回顧一下之前了解的內(nèi)容:

生產(chǎn)者(producer):發(fā)送消息的程序

隊(duì)列(queue):存儲消息的緩沖器

消費(fèi)者(consumer):接收消息的程序

RabbitMQ 消息模型的核心理念是:發(fā)布者(producer)不會直接發(fā)送任何消息給隊(duì)列。事實(shí)上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊(duì)列。

發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)交換器(exchange)。交換器非常簡單,它一邊從發(fā)布者方接收消息,一邊把消息推送到隊(duì)列。交換器必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是多個(gè)隊(duì)列,或者是直接忽略消息。這些規(guī)則是通過交換器類型(exchange type)來定義的。

有幾個(gè)可供選擇的交換器類型:direct, topic, headers 和 fanout。我們在這里主要說明最后一個(gè) ——fanout。fanout exchange 很簡單,你可能從名字上就能猜測出來,它把消息發(fā)送給它所知道的所有隊(duì)列。這正是我們所需要的。

先創(chuàng)建一個(gè) fanout 類型的交換器,命名為”tut.fanout”

@Profile({"tut3", "pub-sub"})
@Configuration
public class Tut3Config {
 /**
     * 定義一個(gè) Exchange
     *
     * @return FanoutExchange
     */
     @Bean
    public FanoutExchange queue() {
        return new FanoutExchange("tut.fanout");
    }
    /**
     * 消費(fèi)者一端的配置:queues、bindings
     */
    @Profile("receiver")
    private static class ReceiverConfig {
        @Bean 
        public Queue autoDeleteQueue1() {
            return new AnonymousQueue();
        }
        @Bean 
        public Queue autoDeleteQueue2() {
            return new AnonymousQueue();
        }
        @Bean 
        public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
        }
        @Bean 
        public Binding binding2(FanoutExchange fanout, Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
        }
        @Bean 
        public Tut3Receiver receiver() {
            return new Tut3Receiver();
        }
    }
    
@Bean 
@Profile("sender")
    public Tut3Sender sender() {
        return new Tut3Sender();
    }
}

和之前的兩個(gè)教程一樣,我們定義了兩個(gè) profiles(tut3、pub-sub)以保證我們能運(yùn)行指定的示例。然后創(chuàng)建了一個(gè)消費(fèi)者端的配置,并在其中定義了兩個(gè) AnonymousQueue 和兩個(gè) Binding 以便將相應(yīng)的隊(duì)列和交換器綁定起來。

交換器列表

rabbitmqctl 能夠列出服務(wù)器上所有的交換器。這個(gè)列表中有一些叫做 amq.* 的交換器。這些都是默認(rèn)創(chuàng)建的,不過這時(shí)候你還不需要使用他們。

$ sudo rabbitmqctl list_exchanges

Listing exchanges …

logs fanout

amq.direct direct

amq.topic topic

amq.fanout fanout

amq.headers headers

…done.

未命名交換器(Nameless exchange)

在之前,我們對交換一無所知,但依然能夠?qū)⑾l(fā)送到隊(duì)列。這是怎么回事?因?yàn)槲覀兪褂昧四J(rèn)的交換器,它是用空字符串(””)來標(biāo)識的。

我們之前是這樣發(fā)送消息的:

template.convertAndSend(queue.getName(), message);

而具體源碼如下,可以看出之前我們使用的是默認(rèn)的空字符定義的交換器

// RabbitTemplate
@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {
convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}
private volatile String exchange = DEFAULT_EXCHANGE;
/\*_ Alias for amq.direct default exchange. _/
private static final String DEFAULT_EXCHANGE = "";

臨時(shí)隊(duì)列(Temporary queues)

我們之前使用的是具有指定名稱的隊(duì)列(hello 和 work-queues)。能夠命名隊(duì)列對我們而言至關(guān)重要 —— 我們需要將工作進(jìn)程指向同一個(gè)隊(duì)列。當(dāng)我們想要在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列時(shí),給隊(duì)列一個(gè)名字很重要。

但是我們的日志記錄器并不是這樣。我們希望記錄到所有的日志消息,而不僅僅是它們的一部分。我們也只對當(dāng)前的消息感興趣,而對舊的消息不感興趣。為了解決這個(gè)問題,我們需要做兩件事情。

首先,每當(dāng)我們連接到 RabbitMQ,我們需要一個(gè)新的空的隊(duì)列。我們可以創(chuàng)建一個(gè)具有隨機(jī)名稱的隊(duì)列,或者最好讓服務(wù)器為我們選擇一個(gè)隨機(jī)的隊(duì)列名。

其次,當(dāng)與消費(fèi)者(consumer)斷開連接的時(shí)候,這個(gè)隊(duì)列應(yīng)當(dāng)被立即刪除。

在 Spring AMQP 中,我們可以使用 AnonymousQueue 來作為臨時(shí)隊(duì)列。它是一個(gè)非持久化的、獨(dú)占的、可自動刪除的隊(duì)列。

@Bean
public Queue autoDeleteQueue1() {
    return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2() {
    return new AnonymousQueue();
}

此時(shí)我們的隊(duì)列名字看起來會像這樣:amq.gen-JzTY20BRgKO-HjmUJj0wLg

綁定(Bindings)

我們已經(jīng)創(chuàng)建了一個(gè) fanout exchange 和兩個(gè)隊(duì)列,現(xiàn)在我們需要告訴交換所將消息發(fā)送到我們的隊(duì)列。交換器和隊(duì)列之間的關(guān)系稱為綁定。在上面的 Tut3Config 中,您可以看到我們有兩個(gè)綁定,每個(gè) AnonymousQueue 都有一個(gè)綁定。

@Bean
public Binding binding1(FanoutExchange fanout, Queue autoDeleteQueue1) {
    return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

綁定列表

你可以使用 rabbitmqctl list_bindings 列出所有現(xiàn)存的綁定。

rabbitmqctl list_bindings

代碼整合

生產(chǎn)者

發(fā)出消息的生產(chǎn)者程序與以前的沒有多大區(qū)別。最重要的變化是我們現(xiàn)在要發(fā)布消息給我們的 fanout exchange,而不是默認(rèn)的交換器。發(fā)送時(shí)我們需要提供一個(gè) routingKey,但是對于 fanout exchange,這個(gè)值將被忽略。

public class Tut3Sender {
@Autowird
private AmqpTemplate template;
@Override
private FanoutExchange fanout;
 private int dots = 0;
 private int count = 0;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello");
        if (dots++ == 3) {
            dots = 1;
        }
        for (int i = 0; i < dots; i++) {
            builder.append('.');
        }
        builder.append(Integer.toString(++count));
        String message = builder.toString();
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

說明:

禁止發(fā)布到不存在的交換器
如果沒有任何隊(duì)列綁定到交換器,消息將丟失
消費(fèi)者 了解springcloud架構(gòu)可以加求求:三五三六二四七二五九
我們定義兩個(gè)消費(fèi)者分別監(jiān)聽兩個(gè)隊(duì)列

public class Tut3Receiver {
    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receiver1(String in) throws InterruptedException {
        receive(in, 1);
    }
    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receiver2(String in) throws InterruptedException {
        receive(in, 2);
    }
    private void receive(String in, int instance) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + instance + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + instance + " [x] Done in "
                + watch.getTotalTimeSeconds() + "s");
    }
    private void doWork(String in) throws InterruptedException {
        for (char ch : in.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

運(yùn)行

maven 編譯

mvn clean package -Dmaven.test.skip=true

運(yùn)行(建議先運(yùn)行消費(fèi)者,再運(yùn)行生產(chǎn)者)

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut3,receiver  --tutorial.client.duration=60000

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut3,sender  --tutorial.client.duration=60000

輸出

// Sender

Ready … running for 60000ms

[x] Sent ‘Hello.1’

[x] Sent ‘Hello…2’

[x] Sent ‘Hello…3’

// Receiver

Ready … running for 60000ms

instance 2 [x] Received ‘Hello.1’

instance 1 [x] Received ‘Hello.1’

instance 2 [x] Done in 1.002s

instance 1 [x] Done in 1.002s

instance 2 [x] Received ‘Hello…2’

instance 1 [x] Received ‘Hello…2’

instance 2 [x] Done in 2.003s

instance 1 [x] Done in 2.003s

instance 1 [x] Received ‘Hello…3’

instance 2 [x] Received ‘Hello…3’

instance 1 [x] Done in 3.011s

instance 2 [x] Done in 3.011s

看完了這篇文章,相信你對“Java Spring boot整合RabbitMQ如何實(shí)現(xiàn)B2B2C小程序電子商務(wù)”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI