核心概念:
(1)使用JNDI查询管理对象ConnectionFactory和Destination
(2)使用管理对象ConnectionFactory建立连接Connection
(3)使用连接Connection建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessagerProducer
(5)使用消息生产者MessagerPriducer发送消息
2.接收消息的客户端使用JMS的过程
(1)使用JNDI查询管理对象ConnectionFactory和Destination
(2)使用管理对象ConnectionFactory建立连接Connection
(3)使用连接Connection建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessagerProducer
(5)使用消息生产者MessagerConsumer接收消息
3. JMS中支持两种事务方式:事务性会话和JTA事务
a)创建事务性会话的代码:qsession=qcon.createQueueSession(true; //在PTP方式下创建事务性会话
Session.AUTO_ACKNOWLEDGE
);
tsession=tcon.createTopicSession(
true; //在Pub/Sub方式下创建事务性会话
Session.AUTO_ACKNOWLEDGE
);
b)JTA事务:JTA支持跨数据源的事务,步骤如下:
(1)创建非事务性会话
QueuesSession session=connection.createQueueSession(
false,Session.AUTO_ACKNOWLEGE);
(2)使用JNDI查询JTA事务引用
Context ctx = new InitialContext();
UserTansaction ux=(UserTansaction)ctx.lookup
(“javax.transaction.UserTansaction”);
(3)开始事务
ux.begin();
(4)执行业务操作
(5)提交或回滚事务
提交事务:ux.commit();
回滚事务:ux.rollback();
服务区端代码
package test.jms;
import javax.jms.*;import javax.naming.*;public class Server { private static Server instance = new Server(); private TopicSession tsession = null; private Topic topic = null; private TopicPublisher tpub = null; private TopicConnectionFactory tcf = null; private TopicConnection tconn = null; private Context ctx = null; private Server() { init(); } public static Server getInstance(){ return instance; } public void sendMessage(MessageInfo msgInfo){ int tryTimes = 0; while(true){ try { if(msgInfo == null){ break; } Message msg = tsession.createMessage(); msg.setStringProperty("xxxx", msgInfo.getxxxx()); msg.setStringProperty("xxxx", msgInfo.getxxxx()); msg.setStringProperty("xxxx", msgInfo.getxxxx()); msg.setStringProperty("xxxx", msgInfo.getxxxx()); msg.setStringProperty("xxxx", msgInfo.getxxxx()); msg.setIntProperty("xxxx", msgInfo.getxxxx()); tpub.publish(msg); break; } catch (Exception e) { if (!init()) { if (tryTimes < 20) { //默认15分钟可以恢复数据库连接,这里冗余一部分 tryTimes++; try{ Thread.sleep(60000); //一分种后重试 } catch(Exception ex){ Logger.log(Logger.DEBUG_TYPE,ex); } continue; } else{ System.out.println("系统消息机制异常,系统将自动退出!请进行系统恢复!"); SysTool.exit(0); return; } } } } } private boolean init() { try { try{ tpub.close(); } catch(Exception ex){ } try{ tsession.close(); } catch(Exception ex){ } try{ tconn.close(); } catch(Exception ex){ } try{ ctx.close(); } catch(Exception ex){ } ctx = new InitialContext(); tcf = (TopicConnectionFactory)ctx.lookup( "ConnectionFactory"); tconn = tcf.createTopicConnection(); tsession = tconn.createTopicSession(false,TopicSession.AUTO_ACKNOWLEDGE); topic = (Topic)ctx.lookup("topic/xxxxTopic"); tpub = tsession.createPublisher(topic); tconn.start(); return true; } catch (Exception e) { Logger.log(Logger.DEBUG_TYPE, e); return false; } }}客户端代码
package test;
import java.util.Hashtable;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import javax.naming.Context;import javax.naming.InitialContext;public class JMSClinet { public static void main(String[] args) { while(true){ try { TopicSession tsession = null; TopicSubscriber tsub = null; Hashtable ht = new Hashtable(); ht.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); ht.put(Context.PROVIDER_URL, "ip地址:1099"); ht.put("java.naming.rmi.security.manager", "yes"); ht.put(Context.URL_PKG_PREFIXES, "org.jboss.naming"); Context ctx = new InitialContext(ht); TopicConnectionFactory factory = (TopicConnectionFactory) ctx .lookup("ConnectionFactory"); TopicConnection connection = factory.createTopicConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic) ctx.lookup("topic/logInAndOutTopic"); tsub = session.createSubscriber(topic); connection.start(); Message msg = tsub.receive(); String xxxx= msg.getStringProperty("xxxx"); String xxxx= msg.getStringProperty("xxxx"); String xxxx= msg.getStringProperty("xxxx"); String xxxx= msg.getStringProperty("xxxx"); String xxxx= msg.getStringProperty("xxxx"); int xxxx= msg.getIntProperty("xxxx"); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }}配置文件
<mbean code="org.jboss.mq.server.jmx.Topic"
name="jboss.mq.destination:service=Topic,name=xxxxTopic"> <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> <depends optional-attribute-name="SecurityManager">jboss.mq:service=SecurityManager</depends> <attribute name="SecurityConf"> <security> <role name="xxxx" read="true" write="true"/> <role name="xxxx" read="true" write="true" create="false"/> <role name="xxxx" read="true" write="true" create="true"/> </security> </attribute> </mbean>