溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java Spring boot 整合RabbitMQ(五):主題(Topics)-B2B2C小程序電子商務(wù)

發(fā)布時(shí)間:2020-08-08 09:13:15 來源:ITPUB博客 閱讀:162 作者:gung123 欄目:軟件技術(shù)

在我們的日志系統(tǒng)中,我們不只希望訂閱基于嚴(yán)重程度的日志,同時(shí)還希望訂閱基于發(fā)送來源的日志。Unix 工具 syslog 就是同時(shí)基于嚴(yán)重程度 -severity (info/warn/crit…) 和 設(shè)備 -facility (auth/cron/kern…) 來路由日志的。

如果這樣的話,將會(huì)給予我們非常大的靈活性,我們既可以監(jiān)聽來源于”cron” 的嚴(yán)重程度為”critical errors” 的日志,也可以監(jiān)聽來源于”kern” 的所有日志。

為了實(shí)現(xiàn)這個(gè)目的,接下來我們學(xué)習(xí)如何使用另一種更復(fù)雜的交換器 ——topic exchange。

Topic exchange

topic exchange 與 direct exchange 類似,也是將消息路由到 binding key 與 routing key 相匹配的 Queue 中,但這里的匹配規(guī)則有些不同,它約定:

routing key 為一個(gè)句點(diǎn)號(hào). 分隔的字符串(我們將被句點(diǎn)號(hào). 分隔開的每一段獨(dú)立的字符串稱為一個(gè)單詞),如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit”

binding key 與 routing key 一樣也是句點(diǎn)號(hào). 分隔的字符串

binding key 中可以存在兩種特殊字符 * 與#,用于做模糊匹配,其中 * 用于匹配一個(gè)單詞,#用于匹配多個(gè)單詞(可以是零個(gè))

這個(gè)例子里,我們發(fā)送的所有消息都是用來描述小動(dòng)物的。發(fā)送的消息所攜帶的路由鍵是由三個(gè)單詞所組成的,這三個(gè)單詞被兩個(gè)。分割開。路由鍵里的第一個(gè)單詞描述的是動(dòng)物的速度,第二個(gè)單詞是動(dòng)物的顏色,第三個(gè)是動(dòng)物的種類。所以它看起來是這樣的:” 速度。顏色。種類”。

我們創(chuàng)建了三個(gè)綁定:Q1 的 binding key 為”.orange.“,Q2 的 binding key 為”…rabbit” 和”lazy.#”。

這三個(gè) binding key 被可以總結(jié)為:

Q1 對(duì)所有的桔黃色動(dòng)物都感興趣。

Q2 則是對(duì)所有的兔子和所有懶惰的動(dòng)物感興趣。

以上圖中的配置為例:

routingKey=”quick.orange.rabbit” 的消息會(huì)同時(shí)路由到 Q1 與 Q2

routingKey=”lazy.orange.fox” 的消息會(huì)路由到 Q1 與 Q2

routingKey=”lazy.brown.fox” 的消息會(huì)路由到 Q2

routingKey=”lazy.pink.rabbit” 的消息會(huì)路由到 Q2(只會(huì)投遞給 Q2 一次,雖然這個(gè) routingKey 與 Q2 的兩個(gè) bindingKey 都匹配)

routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit” 的消息將會(huì)被丟棄,因?yàn)樗鼈儧]有匹配任何 bindingKey

如果我們違反約定,發(fā)送了一個(gè) routing key 為一個(gè)單詞或者四個(gè)單詞(”orange” or “quick.orange.male.rabbit”)的消息時(shí),該消息不會(huì)投遞給任何一個(gè)隊(duì)列,而且會(huì)丟失掉。

了解springcloud架構(gòu)可以加求求:三五三六二四七二五九

但是,即使”lazy.orange.male.rabbit” 有四個(gè)單詞,他還是會(huì)匹配最后一個(gè)綁定,并且被投遞到第二個(gè)隊(duì)列中。

topic exchange

topic exchange 是強(qiáng)大的,它可以表現(xiàn)出跟其他 exchange 類似的行為。

當(dāng)一個(gè)隊(duì)列的 binding key 為 “#”(井號(hào)) 的時(shí)候,它會(huì)接收所有消息,而不考慮 routing key,就像 fanout exchange。

當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí) topic exchange 會(huì)表現(xiàn)得像 direct exchange 一樣。

代碼整合

生產(chǎn)者

public class Tut5Sender {
   @Autowird
    private AmqpTemplate template;
    
   @Autowird
    private TopicExchange topic;
    private int index;
    private int count;
    private final String[] keys = {"quick.orange.rabbit",
            "lazy.orange.elephant", "quick.orange.fox",
            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (++this.index == keys.length) {
            this.index = 0;
        }
        String key = keys[this.index];
        builder.append(key).append(' ');
        builder.append(Integer.toString(++this.count));
        String message = builder.toString();
        template.convertAndSend(topic.getName(), key, message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

消費(fèi)者

public class Tut5Receiver {
    @RabbitListener(queues = "#{autoDeleteQueue1}")
    public void receiver1(String in) throws InterruptedException {
        receive(in, 1);
    }
    @RabbitListener(queues = "#{autoDeleteQueue2}")
    public void receiver2(String in) throws InterruptedException {
        receive(in, 2);
    }
    private void receive(String in, int instance) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        System.out.println("instance " + instance + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        System.out.println("instance " + instance + " [x] Done in "
                + watch.getTotalTimeSeconds() + "s");
    }
    private void doWork(String in) throws InterruptedException {
        for (char c : in.toCharArray()) {
            if (c == '.') {
                Thread.sleep(1000);
            }
        }
    }
}

配置類

@Profile({"tut5", "topics"})
@Configuration
public class Tut5Config {
    @Bean
    public TopicExchange topic() {
        return new TopicExchange("tut.topic");
    }
    @Profile("receiver")
    private static class ReceiverConfig {
        @Bean
        public Queue autoDeleteQueue1() {
            return new AnonymousQueue();
        }
        @Bean
        public Queue autoDeleteQueue2() {
            return new AnonymousQueue();
        }
        @Bean
        public Binding binding1a(TopicExchange topic, Queue autoDeleteQueue1) {
            return BindingBuilder.bind(autoDeleteQueue1)
                    .to(topic)
                    .with("*.orange.*");
        }
        @Bean
        public Binding binding2a(TopicExchange topic, Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2)
                    .to(topic)
                    .with("*.*.rabbit");
        }
        @Bean
        public Binding binding2b(TopicExchange topic, Queue autoDeleteQueue2) {
            return BindingBuilder.bind(autoDeleteQueue2)
                    .to(topic)
                    .with("lazy.#");
        }
        @Bean
        public Tut5Receiver receiver() {
            return new Tut5Receiver();
        }
    }
    @Profile("sender")
    @Bean
    public Tut5Sender sender() {
        return new Tut5Sender();
    }
}

運(yùn)行

maven 編譯

mvn clean package -Dmaven.test.skip=true

運(yùn)行

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,receiver --tutorial.client.duration=60000

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,sender --tutorial.client.duration=60000

輸出

// Sender

Ready … running for 60000ms

[x] Sent ‘Hello to lazy.orange.elephant 1’

[x] Sent ‘Hello to quick.orange.fox 2’

[x] Sent ‘Hello to lazy.brown.fox 3’

[x] Sent ‘Hello to lazy.pink.rabbit 4’

[x] Sent ‘Hello to quick.brown.fox 5’

[x] Sent ‘Hello to quick.orange.rabbit 6’

[x] Sent ‘Hello to lazy.orange.elephant 7’

[x] Sent ‘Hello to quick.orange.fox 8’

[x] Sent ‘Hello to lazy.brown.fox 9’

[x] Sent ‘Hello to lazy.pink.rabbit 10’

// Receiver

Ready … running for 60000ms

instance 1 [x] Received ‘Hello to lazy.orange.elephant 1’

instance 2 [x] Received ‘Hello to lazy.orange.elephant 1’

instance 2 [x] Done in 2.004s

instance 1 [x] Done in 2.004s

instance 2 [x] Received ‘Hello to lazy.brown.fox 3’

instance 1 [x] Received ‘Hello to quick.orange.fox 2’

instance 1 [x] Done in 2.006s

instance 2 [x] Done in 2.006s

instance 2 [x] Received ‘Hello to lazy.pink.rabbit 4’

instance 1 [x] Received ‘Hello to quick.orange.rabbit 6’

instance 2 [x] Done in 2.006s

instance 2 [x] Received ‘Hello to quick.orange.rabbit 6’

instance 1 [x] Done in 2.007s

instance 1 [x] Received ‘Hello to lazy.orange.elephant 7’

instance 2 [x] Done in 2.006s

instance 2 [x] Received ‘Hello to lazy.orange.elephant 7’

instance 1 [x] Done in 2.003s

instance 1 [x] Received ‘Hello to quick.orange.fox 8’

instance 2 [x] Done in 2.005s

instance 2 [x] Received ‘Hello to lazy.brown.fox 9’

instance 1 [x] Done in 2.005s

instance 2 [x] Done in 2.004s

instance 2 [x] Received ‘Hello to lazy.pink.rabbit 10’

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI