溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何在Jboss中使用JMS

發(fā)布時間:2021-12-30 17:15:15 來源:億速云 閱讀:121 作者:iii 欄目:編程語言

這篇文章主要介紹“如何在Jboss中使用JMS”,在日常操作中,相信很多人在如何在Jboss中使用JMS問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何在Jboss中使用JMS”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

1、介紹
為了能建立和運(yùn)行此例子。我們使用兩種方式來進(jìn)行:一是使用Ant命令,二是使用JAR和JAVA基本命令。必須有下面的環(huán)境變量:
w JAVA_HOME 安裝JDK1.4的目錄。
w JBOSS_DIST 安裝JBoss的目錄。
你必須安裝JDK, 如果使用Ant必須安裝 Ant。可以參考我前面文檔的介紹。
2、概述
1) 什么是JMS
JMS是Java API, 允許應(yīng)用程序來建立、接收和讀取消息。程序依靠這些API, 在運(yùn)行時需要一個JMS實(shí)現(xiàn)接口,來提供管理和控制,這被稱為JMS provider, 現(xiàn)在有幾種不同的JMS Provider; 在JBoss中的叫做JbossMQ。
2) JMS 和J2EE
JMS是在EJB和J2EE框架開發(fā)之前進(jìn)行開發(fā)的,所以在JMS說明書中沒有涉及到EJB或J2EE。
EJB 和J2EE第一代版本中也沒有涉及到JMS,一直到EJB1.1,在生成一個可用Beand的容器provider中JMS也不是必須的API。在J2EE1.2中JMS接口是必需的情況,但并不是非得要包含一個JMS Provider;在EJB2.0和J2EE1.3中又進(jìn)行改變,應(yīng)用服務(wù)器包含了一個JMS Provider,自從J2EE1。3需要EJB2.0,增加了以下兩個JMS特性:
w 一種新Bean類型定義, 也就是消息驅(qū)動Beam (MDB), 這種bean做為JMS消息監(jiān)聽者,可以異步地處理JMS消息。
w JMS處理作為資源,來自一個Bean 的JMD 發(fā)布(發(fā)送)必須能和其他bean的全局事務(wù)環(huán)境共享。這個需要把JMS認(rèn)為是一個容器管理資源,象JDBC的連接。
3) JMS 和JBoss
JBoss從2.0版本以后都支持JMS。 在2.1中增加了MDB,從2.4版本開始JMS作為一個事務(wù)資源。
JBoss中JMS的體系結(jié)構(gòu)如下:
w JMS Provider, 叫做JbossMQ 。 是JBoss實(shí)現(xiàn)JMS 1.0.2規(guī)范的一部分,包括可選部分,象ASF(Application Service Facvility)。 JbossMQ處理和普遍JMS一樣:建立 queues (隊(duì)列)或topic(標(biāo)題),持久性等。
w MDB (Message Driven Beans),
w 資源適配器。

3、JMS Provider
JBoss有它自己的JMS Provider 叫做JbossMQ。 適合與JMS 1.0.2 的JMS Provider,并且能用在所有平常的JMS程序中。為了清楚在JBoss中JMS是如何工作的,首先要清楚在JMS中涉及到的概念和術(shù)語,最好的辦法是閱讀JMS規(guī)范,下面給出了簡單的JMS介紹。
1) JMS的簡單介紹
當(dāng)你發(fā)送一個消息,你不能直接發(fā)送到對此消息感興趣的接受者。而是你發(fā)送到一個目的地。對此消息感興趣的接受者必須連接到目的地,得到此消息或在目的地設(shè)置訂閱。
在JMS中有兩種域:topics 和queues 。
w 一個消息發(fā)送到一個topics ,可以有多個客戶端。用topic發(fā)布允許一對多,或多對多通訊通道。消息的產(chǎn)生者被叫做publisher, 消息接受者叫做subscriber。
w queue 是另外一種方式,僅僅允許一個消息傳送給一個客戶。一個發(fā)送者將消息放在消息隊(duì)列中,接受者從隊(duì)列中抽取并得到消息,消息就會在隊(duì)列中消失。第一個接受者抽取并得到消息后,其他人就不能在得到它。
為了能發(fā)送和接收消息,必須得到一個JMS連接。該連接是使用JMS Provider得到連接的,在得到連接之后,建立一個會話(Session)。然后再建立publisher/sender 來發(fā)送消息或subscriber/receiver來接收消息。
運(yùn)行時,如果使用topic 那么publisher 或subscriber 通過一個topic來關(guān)聯(lián),如果使用queue ,則sender 或receiver通過queue來關(guān)聯(lián)起來。
通常,在JMS框架中運(yùn)轉(zhuǎn)的方法如下:
(1) 得到一個JNDI初始化上下文(Context);
(2) 根據(jù)上下文來查找一個連接工廠TopicConnectFactory/ QueueConnectionFactory (有兩種連接工廠,根據(jù)是topic/queue來使用相應(yīng)的類型);
(3) 從連接工廠得到一個連接(Connect 有兩種[TopicConnection/ QueueConnection]);
(4) 通過連接來建立一個會話(Session);
(5) 查找目的地(Topic/ Queue);
(6) 根據(jù)會話以及目的地來建立消息制造者(TopicPublisher/QueueSender)和消費(fèi)者(TopicSubscriber/ QueueReceiver).
為了得到一個連接和得到一個目的地(用來關(guān)聯(lián)publisher/sender 或subscriber/receiver),必須用provider-specific參數(shù)。
通過JNDI來查找相應(yīng)的連接工廠或目的地,JNDI適合于任何JMS Provider。但是查找用的名字是provider使用的。因此,在你使用的JMS Provider(其中包括JBossMQ),必須學(xué)會如何進(jìn)行指定。JMS Provider中的任何設(shè)置,象連接特性,用到目的地等,在用到的Provider都有明確描述。
2) 配置
當(dāng)使用一個JMS Provider時,有三個Provider-specific因素:
w 得到一個JNDI初始化上下文
w 用到的連接工廠的名字。
w 對目的地的管理和命名協(xié)定。
JBoss同它的JNDI一起執(zhí)行。為了簡單的JMS client, 配置和查找初始化上下文,同實(shí)現(xiàn)其他J2EE客戶端一樣。
JMS-specific 來約束JBoss 的JMS provider (JBossMQ)。JbossMQ是通過xml 文件jbossmq-service.xml進(jìn)行配置的,該文件放在在serverdefaultdeploy下。
在xml文件中最基本的三件事情:
w 增加一個新的目的地
w 配置用戶
w 獲得可用連接工廠的名字。
(1) 增加新的目的地
w 在目的地的xml文件在jboss 3.x中是jbossmq-destinations-service.xml(server/../deploy)。在文件中已經(jīng)存在幾個缺省的目的地,所以你比較容易明白怎樣增加到文件中。在例子中你需要一個topic目的地 spool,所以增加下面的語句到j(luò)bossmq-destinations-service.xml中。這種方式是長久存在的,不隨著JBoss服務(wù)器關(guān)閉而消失。
<mbean code="org.jboss.mq.server.jmx.Topic"
name="jboss.mq.destination:service=Topic,name=spool">

jboss.mq:service=DestinationManager



w 另外一種方法是可以通過JMX HTML管理界面。通過http://localhost:8080/jmx-console 來訪問。在jboss.mq 下查找service=DestinationManager 的連接。然后在createTopic()或createQueue()來建立,這種方法建立的目的地是臨時性的,隨著服務(wù)器開始存在,當(dāng)當(dāng)JBoss 服務(wù)器重新啟動時,動態(tài)建立的目的地將會不存在。在JbossMQ中所有目的地都有一個目的地類型的前綴。對于topic前綴是topic ,對于queues前綴是queue。例如查找一個testTopic目的地,需要查找名字為“topic/testTopic”。
在此種方法中有createTopic()或createQueue()分別有兩種方法:一是有兩個參數(shù),二是有一個參數(shù)的。兩個參數(shù)分別是:建立的目的地名稱和JNDI名稱。一個參數(shù)的只是目的地名稱,對于JNDI名稱默認(rèn)是:[目的地類型(topic/queue) ]/目的地名稱。
在這里我們使用的是第一種方法。直接修改jbossmq-destinations-service.xml文件。
(2) 管理用戶
在JMS中可能關(guān)聯(lián)一個連接和一個用戶,不幸的是沒有明確的方法來限制訪問JbossMQ或訪問特殊的目的地到一個給定的用戶。為了給大部分角色,在JbossMQ中不需要建立用戶,除非想有一個持久topic訂閱者。在這個例子中,用戶是必須的。
用戶可以直接在文件jbossmq-state.xml(server/../conf)中添加。同樣也可以使用JMX HTML管理界面來增加(jboss.mq->service=StateManager->addUser())。


jacky
jacky
DurableSubscriberExample

>

(3) 連接工廠
JBossMQ 包括為topic和queue幾個不同的連接工廠,每個連接工廠有自己特性。當(dāng)通過JNDI來查找一個連接工廠時,需要知道此連接工廠的名稱。所有可用連接工廠和它們的屬性,名稱都會在文件jbossmq-service.xml中。
有三種類型連接工廠,依賴的通訊協(xié)議如下:
OIL
快速雙向scoket通訊協(xié)議。它是缺省的。
UIL
超過一個socket協(xié)議,可以使用在通過防火墻訪問,或者當(dāng)客戶端不能正確的查找到服務(wù)器的IP地址。
RMI
最早的協(xié)議,是穩(wěn)定的,但是比較慢。
JVM
在JBoss 2.4之后增加的一個新的連接工廠類型。不需要用socket,當(dāng)客戶端和JbossMQ使用同樣虛擬機(jī)時,可以使用。
在JBoss2.4.1以后版本中,對于topic- 和 queue-目的地,連接工廠使用同樣的名字。下表列出了在JBoss中JMS連接工廠:
目的地類型 JNDI名字 連接工廠類型
Topic/Queue java:/ConnectionFactory JVM
Topic/Queue java:/XAConnectionFactory JVM支持XA事務(wù)
Topic/Queue RMIConnectionFactory RMI
Topic/Queue RMIXAConnectionFactory RMI支持XA事務(wù)
Topic/Queue ConnectionFactory OIL
Topic/Queue XAConnectionFactory OIL支持XA事務(wù)
Topic/Queue UILConnectionFactory UIL
Topic/Queue UILXAConnectionFactory UIL支持XA事務(wù)

3) JBoss中高級JMS配置
在上邊段落主要描述了和JbossMQ一起實(shí)行的基本配置工作。在本段來描述JMS其他配置。
(1) JMS持久性管理
JMS持久性管理(PM)負(fù)責(zé)存儲消息,并且將消息標(biāo)記為持久,如果服務(wù)器發(fā)生故障時,能保證消息不會丟失,并允許恢復(fù)持久性消息。持久性JMS消息可以使用不同的方法來完成。每個方法有自己優(yōu)點(diǎn)和缺陷:
PM 名字 優(yōu)點(diǎn) 缺點(diǎn)
File 比較穩(wěn)定 速度慢
Rollinglogged 速度比較快 此方法比較新,有些地方需要完善
JDBC 對于穩(wěn)定性和可量測性比較好 必須有JDBC
Logged 速度快 Log files grow without bound, memory management problems during recovery
在JBoss中缺省的持久性消息管理是File持久性管理??梢愿淖兯?,但必須對于一個JMS
Server有且僅有一個持久性管理配置。所以你在JBoss管理界面的jboss.mq ? >
service=PersistenceManager 只是看到一個。
持久性管理的配置文件是jbossmq-service.xml。在server..deploy下。
為了讓大家了解持久性管理的各種方法,我下面來逐個介紹如何配置。
w File持久性管理
File持久性管理的概念就是為每一個持久性消息建立一個文件。消息持久性方法不是全部都能使用,但它是比較穩(wěn)定的。
File持久性管理在JBoss發(fā)布時,作為一個缺省的持久性管理。如果你打開jbossmq-service.xml文件,你會看到下面的XML:
<mbean code="org.jboss.mq.pm.file.PersistenceManager"
name="jboss.mq:service=PersistenceManager">
db/jbossmq/file

jboss.mq:service=MessageCache



當(dāng)設(shè)置Mbean配置時,F(xiàn)ile持久性管理允許你指定下面的屬性:
DataDircetory 是存放持久性消息的路徑,會把生成的數(shù)據(jù)文件放在此目錄下。

w 設(shè)置Rollinglogged持久性管理
Rollinglogged持久性管理是比較新的一種持久性消息管理方法,因?yàn)樗褂萌罩疚募沓掷m(xù)多個消息,所以當(dāng)建立一個文件時,不需要許多的I/O。
定義Rollinglogged持久性管理:
<mbean code="org.jboss.mq.pm.rollinglogged.PersistenceManager"
name="jboss.mq:service=PersistenceManager">
db/jbossmq/file

jboss.mq:service=MessageCache



Rollinglogged持久性管理中DataDirctory 存放持久性消息的路徑,會把生成的數(shù)據(jù)文件放在此目錄下。

w 設(shè)置JDBC持久性管理
JDBC持久性管理使用數(shù)據(jù)庫表來存儲消息。需要一個JBoss配置的數(shù)據(jù)源來訪問數(shù)據(jù)庫。具體內(nèi)容參考jbossmq-service.xml文件中的內(nèi)容。
w 設(shè)置Logged持久性管理
Logged持久性管理是比較早的一個,在JBoss2.4.1以后版本中不在建議使用。現(xiàn)在有其他更好的辦法。
4、舉例說明
當(dāng)我們清楚了以后內(nèi)容后,現(xiàn)在我們來用JBoss實(shí)現(xiàn)一個例子來加深對JBoss和JMS的了解。
在上面敘述中,我們知道明確使用JMS provider有三個基本的事情要做:配置JNDI初始化上下文,連接工廠的名字和使用目的地的名字。
當(dāng)編寫產(chǎn)品的最好的事情是不受provider-specific 影響,使代碼能在不同的JMS provider之間容易移植。在此這個例子沒有聚焦在開發(fā)產(chǎn)品上,而是解釋如何使用JbossMQ來工作。
1) 初始化上下文
w 配置JNDI的一個方法是通過屬性文件jndi.properties。在這個文件中使用正確的值,并且把它所在的路徑包含到classpath中,它比較容獲得正確初始化上下文。
jndi.properties文件的內(nèi)容如下:
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=localhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
把該文件放置的路徑成為你的classpath的一部分。如果你使用這種方法,在初始化上下文時,代碼比較簡單: Context context = new IntialContext();1
w 在某些情景下,可能需要手工配置JNDI;例如當(dāng)運(yùn)行的類文件中環(huán)境已經(jīng)配置了一個初始化上下文,但不是你想用的上下文時,需要手工來配置一個上下文。設(shè)置在哈希表中的幾個屬性值,并且使用此哈希表來實(shí)例化一個上下文。定義語法:
Hashtable props = new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put("java.naming.rmi.security.manager", "yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");

2) 查找連接工廠
自有了上下文后,需要查找一個連接工廠。為了查找它,使用一個可用的名字。查找連接工廠的代碼如下:
對于一個topic目的地
TopicConnectionFactory topicFactory = (TopicConnectionFactory) context.lookup (“ConnectionFactory”)
Queue 目的地:
QueueConnectionFactory queueFactory = (QueueConnectionFactory ) context.lookup (“ConnectionFactory”)
3) 建立連接和會話
在我們有了連接工廠后,建立一個連接,在此連接中建立一個會話。
對于topic代碼如下:
//建立一個連接
topicConnection = topicFactory.createTopicConnection();
//建立一個會話
topicSession = topicConnection.createTopicSession(false, //不需要事務(wù)
Session.AUTO_ACKNOLEDGE //自動接收消息的收條。
);
對于queue代碼如下:
//建立一個連接
queueConnection = queueFactory.createQueueConnection();
//建立一個會話
queueSession = queueConnection .createQueueSession(false, //不需要事務(wù)
Session.AUTO_ACKNOLEDGE //自動接收消息的收條。
);
一個會話建立時,配置是否調(diào)用事務(wù)
在事務(wù)Session中,當(dāng)事務(wù)被提交后,自動接收,如果事務(wù)回滾,所有的被消費(fèi)的消息將會被重新發(fā)送。
在非事務(wù)Session中,如果沒有調(diào)用事務(wù)處理,消息傳遞的方式有三種:
Session.AUTO_ACKNOWLEDGE :當(dāng)客戶機(jī)調(diào)用的receive方法成功返回,或當(dāng)MessageListenser 成功處理了消息,session將會自動接收消息的收條。
Session.CLIENT_ACKNOWLEDGE :客戶機(jī)通過調(diào)用消息的acknowledge方法來接收消息。接收發(fā)生在session層。接收到一個被消費(fèi)的消息時,將自動接收該session已經(jīng)消費(fèi)的所有消息。例如:如果消息的消費(fèi)者消費(fèi)了10條消息,然后接收15個被傳遞的消息,則前面的10個消息的收據(jù)都會在這15個消息中被接收。
Session.DUPS_ACKNOWLEDGE :指示session緩慢接收消息。

4) 查找目的地
現(xiàn)在我們來介紹建立publishes/sends 或subscribles/receives消息。
下面的代碼列出來查找一個目的地:
對于topic 查找一個testTopic目的地
Topic topic = (Topic) context.lookup(“topic/testTopic”);

對于queue 查找一個testQueue目的地
Queue queue= (Queue) context.lookup(“queue/testQueue”);
注意:JbossM的前綴topic/ (queue/)通常被放在topic (queue)名字前面。
在JMS中,當(dāng)客戶機(jī)扮演每種角色,象對于topic來將的publisher ,subscriber 或?qū)τ趒ueue來將的sender, receiver, 都有自己不同類繼承和不同函數(shù)。
5) 建立一個消息制造者M(jìn)essage Producer (topic publisher/ queue sender)
消息制造者是一個由session創(chuàng)建的對象,主要工作是發(fā)送消息到目的地。
對于一個topic,需要通過TopicSession來創(chuàng)建一個TopicPublisher。代碼如下:
TopicPublisher topicPublisher = TopicSession.createPublisher(topic);

對于一個queue,需要通過QueueSession來創(chuàng)建一個QueueSender。代碼如下:
QueuePublisher queuePublisher = queueSession.createSender(queue);
6) 消息發(fā)送
建立一個TestMessage并且publish 它, 代碼:
TextMessage message = topicSession.createTestMessage();
message.setText(msg);
topicPublishe.publish(topic, message);
建立一個TestMessage并且send它, 代碼:
TextMessage message = queueSession.createTestMessage();
message.setText(msg);
queueSender.send(queue, message);

7) 下面是一個完成的topic publisher 代碼,文件名HelloPublisher.java :
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicPublisher;
import javax.jms.Topic;
import javax.jms.TextMessage;
import javax.jms.Session;
import javax.jms.JMSException;
import java.util.Hashtable;
public class HelloPublisher {

TopicConnection topicConnection;
TopicSession topicSession;
TopicPublisher topicPublisher;
Topic topic;

public HelloPublisher(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException {
Hashtable props=new Hashtable();
props.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
props.put(Context.PROVIDER_URL, "localhost:1099");
props.put("java.naming.rmi.security.manager", "yes");
props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming");
Context context = new InitialContext(props);
TopicConnectionFactory topicFactory =
(TopicConnectionFactory)context.lookup(factoryJNDI);
topicConnection = topicFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

topic = (Topic)context.lookup(topicJNDI);

topicPublisher = topicSession.createPublisher(topic);

}

public void publish(String msg) throws JMSException {

TextMessage message = topicSession.createTextMessage();
message.setText(msg);
topicPublisher.publish(topic, message);
}

public void close() throws JMSException {
topicSession.close();
topicConnection.close();
}

public static void main(String[] args) {
try {
HelloPublisher publisher = new HelloPublisher(
"ConnectionFactory", "topic/testTopic");
for (int i = 1; i < 11; i++) {
String msg = "Hello World no. " + i;
System.out.println("Publishing message: " + msg);
publisher.publish(msg);
}
publisher.close();
} catch(Exception ex) {
System.err.println(
"An exception occurred while testing HelloPublisher25: " + ex);
ex.printStackTrace();
}
}
}
我們知道,使用JMS不僅能發(fā)送(send)/發(fā)布(publish)消息,也能獲得(send)/發(fā)布(publish)的消息。在時間方式有良種方法來做:
w 同步(Synchronously):需要手工的去得到消息,為了得到一個消息客戶機(jī)調(diào)用方法得到消息,直到消息到達(dá)或在規(guī)定的時間內(nèi)沒有到達(dá)而超時。我們在例子中沒有說明這部分,大家可以實(shí)驗(yàn)一下。
w 異步(Asynchronously):你需要定義一個消息監(jiān)聽器(MessageListener),實(shí)現(xiàn)該接口。當(dāng)消息達(dá)到時,JMS provider通過調(diào)用該對象的 onMessage方法來傳遞消息。
從原則來將,topic和queue都是異步的,但是在這兩種目的地中有不同的類和方法。首先,必須定義一個MessageListener接口。
8) 建立一個MessageListener
在建立了你需要的subscriber/receiver,并且登記了監(jiān)聽器后。就可以調(diào)用連接的start方法得到JMS provider 發(fā)送到的消息了。如果在登記監(jiān)聽器之前調(diào)用start方法,很可能會丟失消息。
public void onMessage(Message m) {
try {
String msg = ((TextMessage)m).getText();
System.out.println("HelloSubscriber got message: " + msg);
} catch(JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
9) 建立消息消費(fèi)者
對于topic來將:
//建立一個訂閱者
topicSubscriber = topicSession.createSubscriber(topic);
//設(shè)置消息監(jiān)聽器,
topicSubscriber.setMessageListener(this)
//連接開始
topicConnection.start();
對于queue來將:
//建立一個訂閱者
queueReceiver = queueSession.createReceiver(queue);
//設(shè)置消息監(jiān)聽器,
queueReceiver .setMessageListener(this)
//連接開始
queueConnection.start();
10) 完整的代碼,放在文件HelloSubscriber.java中,如下:
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import javax.jms.TopicConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.Topic;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.JMSException;

public class HelloSubscriber implements MessageListener {
TopicConnection topicConnection;
TopicSession topicSession;
TopicSubscriber topicSubscriber;
Topic topic;
public HelloSubscriber(String factoryJNDI, String topicJNDI)
throws JMSException, NamingException
{
Context context = new InitialContext();
TopicConnectionFactory topicFactory =
(TopicConnectionFactory)context.lookup(factoryJNDI);
topicConnection = topicFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(
false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic)context.lookup(topicJNDI);
topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);
System.out.println(
"HelloSubscriber subscribed to topic: " + topicJNDI);
topicConnection.start();
}
public void onMessage(Message m) {
try {
String msg = ((TextMessage)m).getText();
System.out.println("HelloSubscriber got message: " + msg);
} catch(JMSException ex) {
System.err.println("Could not get text message: " + ex);
ex.printStackTrace();
}
}
public void close() throws JMSException {
topicSession.close();
topicConnection.close();
}
public static void main(String[] args) {
try {
HelloSubscriber subscriber = new HelloSubscriber(
"TopicConnectionFactory",
"topic/testTopic");
} catch(Exception ex) {
System.err.println(
"An exception occurred while testing HelloSubscriber: " + ex);
ex.printStackTrace();
}
}
}

11) 編輯、運(yùn)行程序
直接使用命令(java)
w 開啟命令操作符。設(shè)置classpath :
set classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.jar;C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientlog4j.jar;.
w 首先運(yùn)行訂閱消息端:java HelloSubscriber
w 再開啟另外一個命令窗口設(shè)置classpath :
set classpath=C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientjboss-j2ee.jar;C:jboss-3.0.6_tomcat-4.1.18clientjnp-client.jar;C:jboss-3.0.6_tomcat-4.1.18clientlog4j.jar;.
w 運(yùn)行發(fā)布消息端:java HelloPublisher
5、補(bǔ)充
在最后我們解釋JBoss-specific特性:如何用代碼來管理目的地。JBoss各個版本可能不同,但是差別不大。我使用的是jboss3.0.6。
實(shí)現(xiàn)這個目的有兩種不同的方法,依賴于是否代碼是在和JBoss同樣的虛擬機(jī)還是獨(dú)立獨(dú)立的。它們都包括調(diào)用一個通過service=DestinationManager 登記的JMX Bean。這個Mbean 有四個方法來管理目的地:createTopic(),createQueue(),destroyTopic(),destroyQueue()。
在代碼中實(shí)現(xiàn)管理目的地在影射怎樣調(diào)用MBean有不同的地方。如果程序虛擬機(jī)和Mbean服務(wù)器一樣,可以直接調(diào)用。
建立一個topic 目的地的代碼如下:
MBeanServer server = (MBeanServer)
MBeanServerFactory.findMBeanServer(null).iterator().next();
server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"),
method,
new Object[] { “myTopic” },
new String[] { "java.lang.String" });

如果程序和Mbean服務(wù)器的虛擬機(jī)不同,需要通過一個JMX adapter。一個JMX adapter是一個HTML GUI。用程序通過URL來調(diào)用Mbean。代碼如下:
import java.io.InputStream;
import java.net.URL;
import java.net.HttpURLConnection;
import javax.management.MBeanServerFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.jms.Topic;
import javax.jms.Queue;
public class DestinationHelper {
static final String HOST = "localhost";
static final int PORT = 8080;
static final String BASE_URL_ARG = "/jmx-console/HtmlAdaptor?";
public static void createDestination(Class type, String name)
throws Exception
{
String method = null;
if (type == Topic.class) { method = "createTopic"; }
else if (type == Queue.class) { method = "createQueue";}
invoke(method, name);
}

public static void destroyDestination(Class type, String name)
throws Exception
{
String method = null;
if (type == Topic.class) { method = "destroyTopic"; }
else if (type == Queue.class) { method = "destroyQueue";}
invoke(method, name);
}

protected static void invoke(String method, String destName)
throws Exception
{
try {
MBeanServer server = (MBeanServer) MBeanServerFactory.findMBeanServer(null).iterator().next();
invokeViaMBean(method, destName);
}catch(Exception ex) { invokeViaUrl(method, destName);}
}
protected static void invokeViaUrl(String method, String destName)
throws Exception
{
String action = "action=invokeOp&methodIndex=6&name=jboss.mq%3Aservice%3DDestinationManager&arg0=" + destName;
String arg = BASE_URL_ARG + action;
URL url = new URL("http", HOST, PORT, arg);
HttpURLConnection con = (HttpURLConnection)url.openConnection();
con.connect();

InputStream is = con.getInputStream();
java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream();
byte[] buff = new byte[1024];
for(;;) {
int size = is.read( buff );
if (size == -1 ) { break; }
os.write(buff, 0, size);
}
os.flush();

if (con.getResponseCode() != HttpURLConnection.HTTP_OK ) {
throw new Exception ("Could not invoke url: " + con.getResponseMessage() );
} else {
System.out.println("Invoked URL: " + method + " for destination " + destName + "got resonse: " + os.toString());
}
}
protected static void invokeViaMBean(String method, String destName)
throws Exception
{
MBeanServer server = (MBeanServer)MBeanServerFactory.findMBeanServer(null).iterator().next();
server.invoke(new ObjectName("JBossMQ", "service", "DestinationManager"),
method,
new Object[] { destName },
new String[] { "java.lang.String" });
}
public static void main(String[] args) {
try {
if (args.length >0){
destroyDestination(Topic.class,"myCreated");
}else {
createDestination(Topic.class,"myCreated");
}
}catch(Exception ex) {
System.out.println("Error in administering destination: " + ex);
ex.printStackTrace();
}
}

}
編輯命令:
javac -classpath C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx.jar;. DestinationHelper.java
運(yùn)行命令
java -classpath C:jboss-3.0.6_tomcat-4.1.18clientjbossall-client.jar;C:jboss-3.0.6_tomcat-4.1.18libjboss-jmx.jar;. DestinationHelper
當(dāng)運(yùn)行完后查看http://localhost:8080/jmx-console下面的jboss.mq.destination中有name=myCreated,service=Topic
表明你建立成功。當(dāng)JBoss關(guān)閉重新啟動時。該目的地不會在存在。

到此,關(guān)于“如何在Jboss中使用JMS”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI