您好,登錄后才能下訂單哦!
這篇文章主要介紹“activemq特性是什么”,在日常操作中,相信很多人在activemq特性是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”activemq特性是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
activemq特點:用通配符訂閱多個destination,用組合發(fā)布多重destionation
activemq支持destination的層次結(jié)構(gòu)【topic和queen】便于歸類和管理。
通配符有三個:
. 用來分隔路徑
* 用來匹配路徑中的一節(jié)
> 用來匹配任意節(jié)的路徑
opics: <sport><League>.<team>
。例如: football.division.leeds。 如果leeds 參加兩種運動--Scccer 和 Rugby,為了方便,我們希望通過一個消息消費者而看到Leeds兩種運動的最新戰(zhàn)績,這個時候,通配符就有用武之地了
. : used to separate elements in the destination name
* : used to match one element
> : match one or all trailing elements
所以,對于上面的例子, 你可以訂閱這樣的主題: *.*.Leeds
如果你想知道division1 這個賽區(qū)的所有分?jǐn)?shù), 你可以訂閱這個: soccer.division1.*
如果你想知道Rugby的分?jǐn)?shù): 你可以訂閱這個: rugby.>.
然而, 通配符中是為消費者服務(wù)的,如果你發(fā)送了這樣的一個主題: rugby.>., 這個消息僅會發(fā)送到命名了rugby.>.的主題,并不是所有的主題都是以rugby開頭的。
這里有一種 方法,使消息生產(chǎn)者能將一條
消息發(fā)送到多個目的地。通過使用 composite destination。
將同一條消息發(fā)送到不同的目的地是很有用的。 比如一個用來存儲信息的應(yīng)用,會發(fā)送一條消息給隊列
同時也要將這條消息廣播給監(jiān)控的所有系統(tǒng)。通常,你會通過用兩個producer 發(fā)送兩次消息來達(dá)到這個目的。composite destination就是用來解決這種情況的
例如,如果你創(chuàng)建了名子為: store.order.backoffice,store.order.warehouse 的 Queue,這樣 就會發(fā)送同時兩個Queue。
訂閱信息 解釋
PRICE.> Any price for any product on any exchange
PRICE.STOCK.> Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.* Any stock price on NASDAQ
PRICE.STOCK.*.IBM Any IBM stock price on any exchange
從5.5 版本以后,可以自定義路徑分隔符:
<plugins>
.....
<destinationPathSeparatorPlugin/>
</plugins>
此時FOO.BAR.* 可以表示為 FOO/BAR/*
也可以通過pathSeparator 屬性定義其他符號位路徑分隔符。
public void subscribeToLeeds() throws JMSException {
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic allLeeds = session.createTopic("*.*.Leeds");
MessageConsumer consumer = session.createConsumer(allLeeds);
Message result = consumer.receive();
}
11.1.2發(fā)送一個message到多重destinations
發(fā)送相同的message到不同的destination上:案列發(fā)送一個[queen,opic]組合模式,默認(rèn)的組合destination用,分隔
列如store.order.backoffice,store.order.warehouse
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue ordersDestination = session.createQueue("store.orders, topic://store.orders");
MessageProducer producer = session.createProducer(ordersDestination);
Message order = session.createObjectMessage();
producer.send(order);
11.2通知消息
單的說就是實現(xiàn)了ActiveMQ的broker上各種操作的記錄跟蹤和通知。
使用這個功能,你可以實時的知道broker上
創(chuàng)建或銷毀了連接,
添加或刪除了生存者或消費者,
添加或刪除了主題或隊列,
有消息發(fā)送和接收,
什么時候有慢消費者,
什么時候有快生產(chǎn)者
什么時候什么消息被丟棄
什么時候broker被添加到集群(主從或是網(wǎng)絡(luò)連接)
這個機(jī)制是ActiveMQ對JMS協(xié)議的重要補充,也是基于JMS實現(xiàn)的ActiveMQ的可管理性的一部分。多個ActiveMQ的相互協(xié)調(diào)和互操作的基礎(chǔ)設(shè)置。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
MessageConsumer consumer = session.createConsumer(connectionAdvisory);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
ConnectionInfo connectionInfo = (ConnectionInfo) data;
System.out.println("Connection started: " + connectionInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Connection stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
大多數(shù)advisor消息都是完整的對于destiation,但是呢advisorysupport類有一些方法來決定監(jiān)聽哪個advisorytopic,你也能使用通配符-
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
// Lets first create a Consumer to listen too
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Lets first create a Consumer to listen too
Queue queue = session.createQueue("test.Queue");
MessageConsumer testConsumer = session.createConsumer(queue);
// so lets listen for the Consumer starting/stoping
Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
MessageConsumer consumer = session.createConsumer(advisoryTopic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
ActiveMQMessage message = (ActiveMQMessage) m;
try {
System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
ConsumerInfo consumerInfo = (ConsumerInfo) data;
System.out.println("Consumer started: " + consumerInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Consumer stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
testConsumer.close();
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForSlowConsumers="true">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當(dāng)于一個持久化的queue的客戶端,它會收取所有消息。這種情況下存在兩個問題:
1. 同一應(yīng)用內(nèi)consumer端負(fù)載均衡的問題:同一個應(yīng)用上的一個持久訂閱不能使用多個consumer來共同承擔(dān)消息處理功能。因為每個都會獲取所有消息。queue模式可以解決這個問題,broker
端又不能將消息發(fā)送到多個應(yīng)用端。所以,既要發(fā)布訂閱,又要讓消費者分組,這個功能jms規(guī)范本身是沒有的。
2. 同一應(yīng)用內(nèi)consumer端failover的問題:由于只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應(yīng)用就無法處理消息了,系統(tǒng)的健壯性不高,為了解決這兩個問題,ActiveMQ中實現(xiàn)了虛擬
Topic的功能。使用起來非常簡單。對于消息發(fā)布者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。對于消息接收端來說,是個隊列,不同應(yīng)用里使用不同的前綴作為
隊列的名稱,即可表明自己的身份即可實現(xiàn)消費端應(yīng)用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。
可以在同一個應(yīng)用里使用多個consumer消費此queue,則可以實現(xiàn)上面兩個功能。又因為不同應(yīng)用使用的queue名稱不同(前綴不同),所以不同的應(yīng)用中都可以接收到全部的消息。每個客戶端相當(dāng)于一個持久訂
閱者,而且這個客戶端可以使用多個消費者共同來承擔(dān)消費任務(wù)。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
//setup the sender
Connection senderConnection = connectionFactory.createConnection();
senderConnection.start();
Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
MessageProducer producer = senerSession.createProducer(ordersDestination);
同樣queue名稱的消費者會平分所有消息。
從queue接收到的消息,message.getJMSDestination().toString()為topic://VirtualTopic.TEST,即原始的destination。消息的persistent屬性為true,即每個相當(dāng)于一個持久訂閱。
Virtual Topic這個功能特性在broker上有個總開關(guān),useVirtualTopics屬性,默認(rèn)為true,設(shè)置為false即可關(guān)閉此功能。
當(dāng)此功能開啟,并且使用了持久化的存儲時,broker啟動的時候會從持久化存儲里拿到所有的destinations的名稱,如果名稱模式與Virtual Topics匹配,則把它們添加到系統(tǒng)的Virtual Topics列表中去。
當(dāng)然,沒有顯式定義的Virtual Topics,也可以直接使用的,系統(tǒng)會自動創(chuàng)建對應(yīng)的實際topic。
當(dāng)有consumer訪問此VirtualTopics時,系統(tǒng)會自動創(chuàng)建持久化的queue,并在每次Topic收到消息時,分發(fā)到具體的queue。
可追溯”消費者,只對Topic有效,如果consumer是可追溯的,那么它可以獲取實例創(chuàng)建之前的消息。通常而言,訂閱者不可能獲取實例創(chuàng)建之前的消息,因為broker根本不知道它的存在。對于broker而言,如果
一個Topic通道創(chuàng)建,且有發(fā)布者發(fā)布消息(Publisher),那么broker將會在內(nèi)存中(非持久化)或者磁盤中(持久化)保存已經(jīng)發(fā)布的消息,直到所有的訂閱者都消費者,才會清除原始消息內(nèi)容。那么retroactive
類型的訂閱者,就可以獲取這些原本不屬于自己但broker上還保存的舊消息,就像我們訂閱一種Feed,可以立即獲取舊的內(nèi)容列表一樣。如果此訂閱者不是durable(耐久的),它可以獲取最近發(fā)布的一些消息;如果是durable,它可以獲取存儲器中尚未刪除的所有的舊消息。[下文會詳細(xì)介紹Topic的數(shù)據(jù)轉(zhuǎn)發(fā)模型]
//在destinationUrl中設(shè)置,默認(rèn)為false
feedTopic?consumer.retroactive=true
在broker端,可以配置當(dāng)前Topic默認(rèn)為“可追溯的”,不過Topic并不會在此種情況下額外的保存消息,只不過表示訂閱者默認(rèn)都是可追溯的而已。
<!-- 只對topic有效,默認(rèn)為false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);
Message result = consumer.receive();
redeliveryPolicy
consumer使用的重發(fā)策略,當(dāng)消息在client端處理失敗(比如onMessage方法拋出異常,事務(wù)回滾等),將會觸發(fā)消息重發(fā)。對于Broker端,需要重發(fā)的消息將會被立即發(fā)送(如果broker端使用異步發(fā)送,
且發(fā)送隊列中還有其他消息,那么重發(fā)的消息可能不會被立即到達(dá)Consumer)。我們通過此Policy配置最大重發(fā)次數(shù)、重發(fā)頻率等,如果你的Consumer客戶端處于不良網(wǎng)絡(luò)環(huán)境中,可以適當(dāng)調(diào)整相關(guān)參數(shù)。參數(shù)列表,
請參見(RedeliveryPolicy)
//在brokerUrl中設(shè)置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
. redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)
DLQ-死信隊列(Dead Letter Queue)用來保存處理失敗或者過期的消息。
出現(xiàn)以下情況時,消息會被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
當(dāng)一個消息被redelivered超過maximumRedeliveries(缺省為6次,具體設(shè)置請參考后面的鏈接)次數(shù)時,會給broker發(fā)送一個"Poison ack",這個消息被認(rèn)為是a poison pill,這時broker會將這
消息發(fā)送到DLQ,以便后續(xù)處理。缺省的死信隊列是ActiveMQ.DLQ,如果沒有特別指定,死信都會被發(fā)送到這個隊列。缺省持久消息過期,會被送到DLQ,非持久消息不會送到DLQ可以通過配置文件(activemq.xml)
來調(diào)整死信發(fā)送策略
<destinationPolicy>
<policyMap>
<policyEntries>
<!— 設(shè)置所有隊列,使用 '>' ,否則用隊列名稱 -->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
queuePrefix:設(shè)置死信隊列前綴
useQueueForQueueMessages: 設(shè)置使用隊列保存死信,還可以設(shè)置useQueueForTopicMessages,使用Topic來保存死信
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="false" processNonPersistent="false"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
...
</broker>
在一個電子系統(tǒng)中可能接受來自不同供應(yīng)商的各種訂單信息,不同類型的訂單走的流程不盡相同,為了快速處理各種不同的訂單完成不同的業(yè)務(wù)。特定義不同的路由 信息。根據(jù)路由信息的不同,將消息進(jìn)行不同的處理。如果采用ActiveMQ那么最好采用apache-camel整合,使不同的消息根據(jù)不同的流程自動 處理到不同的隊列中去。
<beans>
<broker brokerName="testBroker">
<transportConnectors>
<transportConnector uri="tcp://localhos:61616">
</transportConnectors>
<import resource="camel.xml">
</beans>
到此,關(guān)于“activemq特性是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。