溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播

發(fā)布時(shí)間:2021-11-23 21:35:13 來源:億速云 閱讀:469 作者:柒染 欄目:云計(jì)算

如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

Pulsar 作為 Apache 社區(qū)的相對(duì)新的成員,在業(yè)界受到非常大量的關(guān)注。新產(chǎn)品的文檔相對(duì)不齊全也是非常能夠理解的。今天客戶問過來廣播怎么實(shí)現(xiàn)的,我解釋了半天,又找了很多介紹產(chǎn)品的 PPT,最終也沒有找到“官方”的文檔說明這個(gè)事情。于是我就寫了這篇文章,方便大家 copy/paste 。


Pulsar訂閱模型分類


Pulsar 支持的幾種模式如下,依次是 獨(dú)占模式 / 高可用模式 / 分享模式 / 基于鍵值 的分享模式。


如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播  


 

Pulsar 廣播模式


Pulsar 的訂閱模式和很多 MQ 不太一樣。比如 RabbitMQ/Kafka 等,一般消費(fèi)端(Consumer)是直接去對(duì)接 Topic 的,然后 Consumer 自己又有個(gè)組的概念在配置中心去設(shè)置 offset,以此來決定是一起分享 Topic 的數(shù)據(jù),還是每個(gè)人都接收同樣的數(shù)據(jù)。在 Pulsar 的消費(fèi)訂閱模型里,添加了一個(gè) Subscription 的邏輯,Subscription 的 Type 決定了消費(fèi)是獨(dú)享還是分享。


于是廣播模式可以用不同 Subscription 獨(dú)享的模式來實(shí)現(xiàn),具體架構(gòu)可以參照下圖:


如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播  


 

代碼實(shí)現(xiàn)


1. Full-mesh 的形創(chuàng)建 Java 項(xiàng)目(比如:Springboot - 這個(gè)應(yīng)該是相對(duì)簡(jiǎn)單的 IDE 集成開發(fā)組件)


畫重點(diǎn)


  • pulsar-client-api 和 tdmq-client 需要2.6.0
  • tdmq-client 需要在騰訊的repo里才能拿到,需要使用介紹鏈接介紹的方式進(jìn)行maven的配置(gradle方法類似)
  • 介紹鏈接:https://cloud.tencent.com/document/product/1179/44914


<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.4.3</version>    <relativePath /> <!-- lookup parent from repository -->  </parent>  <groupId>com.examble.demo</groupId>  <artifactId>tdmq-demo</artifactId>  <version>0.0.1-SNAPSHOT</version>  <name>tdmq-demo</name>  <description>demo project to test tdmq</description>  <properties>    <java.version>1.8</java.version>  </properties>  <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>      <groupId>com.tencent.tdmq</groupId>      <artifactId>tdmq-client</artifactId>      <version>2.6.0</version>    </dependency>    <!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client-api -->    <dependency>      <groupId>org.apache.pulsar</groupId>      <artifactId>pulsar-client-api</artifactId>      <version>2.6.0</version>    </dependency>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-test</artifactId>      <scope>test</scope>    </dependency>  </dependencies>
 <build>    <plugins>      <plugin>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-maven-plugin</artifactId>      </plugin>    </plugins>  </build>
</project>
 


2. 創(chuàng)建一個(gè) Component 用來全局使用 Producer 和 Consumers


這里創(chuàng)建了1個(gè) Producer 和3個(gè)擁有 exclusive subscription 的 consumers(廣播模式 - 我們期待他們3個(gè)每次都收到一樣的信息)


package com.example.demo.tdmq.instance;
import javax.annotation.PostConstruct;
import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;import org.apache.pulsar.client.api.Producer;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.apache.pulsar.client.api.SubscriptionType;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Scope;import org.springframework.stereotype.Component;
@Component@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)public class Global {  PulsarClient client;  public Producer<byte[]> producer;  public Consumer<byte[]> consumer01;  public Consumer<byte[]> consumer02;  public Consumer<byte[]> consumer03;
 public Global() {
 }
 @PostConstruct  public void init() {    try {      client = PulsarClient.builder().serviceUrl("pulsar://<Your TDMQ Pulsar Service URL>:6000/")          .listenerName("custom:<TDMQ Pulsar Instance ID>/<TDMQ VPC ID>/<TDMQ Subnet ID>")          .authentication(AuthenticationFactory.token(              "<Your Credential Token from TDMQ>"))          .build();      producer = client.newProducer().topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>").create();      consumer01 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")          .messageListener(new MessageListener<byte[]>() {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {              System.out.println("Consumer01" + " - " + System.currentTimeMillis() + " - "                  + new String(msg.getData()));              try {                consumer.acknowledge(msg);              } catch (PulsarClientException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName("my-subscription01").subscribe();      consumer02 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")          .messageListener(new MessageListener<byte[]>() {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {              System.out.println("Consumer02" + " - " + System.currentTimeMillis() + " - "                  + new String(msg.getData()));              try {                consumer.acknowledge(msg);              } catch (PulsarClientException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName("my-subscription02").subscribe();      consumer03 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)          .topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")          .messageListener(new MessageListener<byte[]>() {
           /**             *             */            private static final long serialVersionUID = 1L;
           @Override            public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {              System.out.println("Consumer03" + " - " + System.currentTimeMillis() + " - "                  + new String(msg.getData()));              try {                consumer.acknowledge(msg);              } catch (PulsarClientException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }
           }          }).subscriptionName("my-subscription03").subscribe();
   } catch (PulsarClientException e) {      // TODO Auto-generated catch block      e.printStackTrace();    }  }
}
 


3. 最外層的測(cè)試代碼和簡(jiǎn)單的 Message 模型


public class MessageModel {
 private String messageText = null;
 public String getMessageText() {    return messageText;  }
 public void setMessageText(String messageText) {    this.messageText = messageText;  }}
 


跑起來測(cè)試一下,果然3個(gè)一起接收一樣的消息


如何實(shí)現(xiàn)TDMQ中的Pulsar 廣播    

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI