您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關(guān)ActiveMQ中如何實現(xiàn)消息存儲,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
一、隊列和topic
概述
ActiveMQ不僅支持persistent和non-persistent兩種方式,還支持消息的恢復(fù)(recovery)方式
PTP
Queue的存儲方式很簡單,就是一個FIFO(先進先出)的Queue
PUB/SUB
對于持久化訂閱主題,每一個消費者將獲得一個消息的復(fù)制
有效的消息存儲
ActiveMQ提供了一個插件式的消息存儲,類似于消息的多點傳播,主要實現(xiàn)了如下幾種:
1:AMQ消息存儲一基于文件的存儲方式,是以前的默認消息存儲
2:KahaDB消息存儲一提供了容量的提升和恢復(fù)能力,是現(xiàn)在的默認存儲方式
3:JDBC消息存儲一消息基于JDBC存儲的
4:Memory消息存儲一基于內(nèi)存的消息存儲
二、KahaDB
KahaDB Message Store概述
KahaDB是目前默認的存儲方式,可用于任何場景,提高了性能和恢復(fù)能力。消息存儲使用一個事務(wù)日志和僅僅用一個索引文件來存儲它所有的地址。
KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優(yōu)化。在Kaha中,數(shù)據(jù)被追加到datalogs中。當(dāng)不再需要log文件中的數(shù)的時候,log文件會被丟棄。
KahaDB基本配置例子
<persistenceAdapter> <kahaDB directory="${actlvemq.data}/kahadb"/> </persistenceAdapter>
可用的屬性有:
1:director:KahaDB存放的路徑,默認值activemq-data
2:indexWriteBatchSize:批量寫入磁盤的索引page數(shù)量,默認值1000
3:indexCacheSize:內(nèi)存中緩存索引page的數(shù)量,默認值10000
4:enableIndexWriteAsync:是否異步寫出索引,默認false
5:journalMaxFi1eLength:設(shè)置每個消息data log的大小,默認是32M
6:enab1eJournalDiskSyncs:設(shè)置是否保證每個沒有事務(wù)的內(nèi)容,被同步寫入磁盤,JMS持久化的時候需要,默認為true
7:cleanupInterval:在檢查到不再使用的消后,在具體刪除消息前的時間,默認30000
8:checkpointInterval:checkpoint的間隔時間,默認5000
9:ignoreMissingJournalfiles:是否忽略丟失的消息日志文件,默認false
10:checkForCourruptJournalFiles:在啟動的時候,將會驗證消息文件是否損壞,默認為false
11:checksumJournalFiles:是否為每個消息日志文件提供checksum,默認false
12:archiveDataLogs:是否移動文件到特定的路徑,而不是刪除它們,默認false
13:directoryArchive:定義消息已經(jīng)被消費后,移動data log到的路徑,默認為null
14:databaseLockedWaitDelay:獲得數(shù)據(jù)庫鎖的等待時間,默認10000
15:maxAsyncJobs:設(shè)置最大的可以存儲的異步消息隊列,默認10000,可以和concurrent MessageProducers設(shè)置成一樣的值
16:concurrentStoreAndDispatchTransactions:是否分發(fā)消息到客戶端,同時事務(wù)存儲消息,默認true
17:concurrentStoreAndDispatchTopics:是否分發(fā)Topic消息到客戶端,同時進行存儲,默認true
18:concurrentStoreAndDispatchQueues:是否分發(fā)queue消息到客戶端,同時進行存儲,默認true
在Java中內(nèi)嵌使用Broker,使用KahaDB的例子
public class EmbeddedBrokerUsingKahaDBStoreExample { public BrokerService createEmbeddedBroker()throws Exception{ BrokerService broker = new BrokerService(); File dataFileDir = new File("target/amq-in-action/kahadb"); KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(dataFileDir); kaha.setJournalMaxFileLength(1024*1000); kaha.setIndexWriteBatchSize(100); kaha.setEnableIndexWriteAsync(true); broker.setPersistenceAdapter(kaha); broker.addConnector("tcp://localhost:61616"); broker.start(); return broker; } }
三、AMQ
AMQ Message Store概述
AMQ Message Store 是ActiveMQ5.0缺省的持久化存儲,它是一個基于文件、事務(wù)存儲設(shè)計為快速消息存儲的一個結(jié)構(gòu),該結(jié)構(gòu)是以流的形式來進行消息交互的。
這種方式中,Messages被保存到data logs中,同時被reference store進行索引以提高存取速度。Data logs由一些單獨的data log文件組成,缺省的文件大小是32M,如果某個消息的大小超過了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某個data log文件中所有的消息都被成功消費了,那么這個data log文件將會被標(biāo)記,以便在下一輪的清理中被刪除或者歸檔。
AMQ Message Store配置示例
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <amqPersistenceAdapter driectory="${activemq.base}/data" maxFileLength="32mb"/> </persistenceAdapter> </broker>
四、 JDBC
使用JDBC來持久化消息(此步驟不需要手工跑腳本)
ActiveMQ支持使用JDBC來持久化消息,預(yù)定義的表如下:
1:消息表,缺省表明為ACTIVEMQ_MSGS,quue和topic都存在里面,結(jié)構(gòu)如下:
2:ACTIVEMQ_ACKS表存儲持久訂閱的信息和最后一個持久訂閱接收的消息ID,結(jié)構(gòu)如下:
3:鎖定表,缺省表明為ACTIVEMQ_LOCK,用來確保在某一時刻,只能有一個Act broker實例來訪問數(shù)據(jù)庫,結(jié)構(gòu)如下:
使用JDBC來持久化消息的配置示例
注意:
(1)數(shù)據(jù)庫需要字符集設(shè)置為latin1。
(2)需要把mysql-connector-java.jar包放入lib中。
(3)啟動成功之后會出現(xiàn)三張表。
示例
一、(queue模式):
運行發(fā)送者(數(shù)據(jù)庫中有三條未接收的消息):
運行接收者(消息成功接收的同時,數(shù)據(jù)庫中的消息也會被刪除)
二、persistence模式
消息接收者接收完成之后,數(shù)據(jù)庫中的消息不會被刪除。
如圖所示:
JDBC Message Store with ActiveMQ Journal(日志)
這種方式克服了JDBC Store的不足,使用快速的緩存寫入技術(shù),大大提高了性能。配置示例如下:
<beans> <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core"> <persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#derby-ds" dataDirectory="activemq-data"/> </persistenceFactory> </broker> </beans>
JDBC Store和JDBC Message Store with ActiveMQ Journal的區(qū)別
1:jdbc with journal的性能優(yōu)于jdbc
2:jdbc用于master/slave模式的數(shù)據(jù)庫分享
3:jdbc with journal不能用于master/slave模式
4:一般情況下(非集群狀態(tài)下),推薦使用jdbc with journal
五、 MMS
Memory Message Store
內(nèi)存消息存儲主要是存儲所有的持久化的消息在內(nèi)存中。這里沒有動態(tài)的緩存存在,所以必須要注意設(shè)置broker所在的JVM和內(nèi)存限制。
Memory Message Store 配置示例
<beans> <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors uri="tcp://localhost:61616"/> </transportConnectors> </broker> </beans>
在Java中內(nèi)嵌使用Broker,使用Memory的例子
public void createEmbeddedBroker()throws Exception{ BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.addConnector("tcp://localhost:61616"); broker.start(); }
關(guān)于“ActiveMQ中如何實現(xiàn)消息存儲”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。