一、什么是JMS,为什么需要它
采用异步通信模式:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理;
客户和服务对象生命周期的松耦合关系:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。
整体结构如下:
(1)、ConnectionFactory
package javax.jms; public interface ConnectionFactory { Connection createConnection() throws JMSException;//创建一个连接 Connection createConnection(String userName, String password)//创建一个有密码的连接 throws JMSException; }
(2)、Connection
package javax.jms; public interface Connection { Session createSession(boolean transacted, int acknowledgeMode) throws JMSException; String getClientID() throws JMSException; //唯一客户端ID void setClientID(String clientID) throws JMSException; ConnectionMetaData getMetaData() throws JMSException; ExceptionListener getExceptionListener() throws JMSException; void setExceptionListener(ExceptionListener listener) throws JMSException; void start() throws JMSException; //开启连接 void stop() throws JMSException; //停止连接 void close() throws JMSException; //关闭连接 ConnectionConsumer createConnectionConsumer( Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException; ConnectionConsumer createDurableConnectionConsumer( Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException; }
JMS 客户端到JMS Provider 的连接,可以理解为数据库里的Connection
(3)、Session:一个发送或接收消息的线程
package javax.jms; import java.io.Serializable; public interface Session extends Runnable { static final int AUTO_ACKNOWLEDGE = 1; //四种模式 static final int CLIENT_ACKNOWLEDGE = 2; static final int DUPS_OK_ACKNOWLEDGE = 3; static final int SESSION_TRANSACTED = 0; BytesMessage createBytesMessage() throws JMSException; //创建消息 MapMessage createMapMessage() throws JMSException; Message createMessage() throws JMSException; ObjectMessage createObjectMessage() throws JMSException; ObjectMessage createObjectMessage(Serializable object) throws JMSException; StreamMessage createStreamMessage() throws JMSException; TextMessage createTextMessage() throws JMSException; TextMessage createTextMessage(String text) throws JMSException; boolean getTransacted() throws JMSException; int getAcknowledgeMode() throws JMSException; void commit() throws JMSException; void rollback() throws JMSException; void close() throws JMSException; void recover() throws JMSException; MessageListener getMessageListener() throws JMSException; void setMessageListener(MessageListener listener) throws JMSException; public void run(); MessageProducer createProducer(Destination destination) //创建消息生产者 throws JMSException; MessageConsumer createConsumer(Destination destination) throws JMSException; MessageConsumer createConsumer( Destination destination, java.lang.String messageSelector) throws JMSException; MessageConsumer createConsumer( Destination destination, java.lang.String messageSelector, boolean NoLocal) throws JMSException; Queue createQueue(String queueName) throws JMSException; //创建队列 Topic createTopic(String topicName) throws JMSException; //创建主题 TopicSubscriber createDurableSubscriber(Topic topic, String name)//创建主题订阅者 throws JMSException; TopicSubscriber createDurableSubscriber( Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException; QueueBrowser createBrowser(Queue queue) throws JMSException; QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException; TemporaryQueue createTemporaryQueue() throws JMSException; TemporaryTopic createTemporaryTopic() throws JMSException; void unsubscribe(String name) throws JMSException; }
JMS 消息由以下几部分组成:消息头,属性,消息体。
消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。
属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。
消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。
JMS Parent |
PTP Domain |
Pub/Sub Domain |
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
广义上说,一个JMS应用是几个JMS 客户端交换消息,开发JMS客户端应用由以下几步构成
(1)、用JNDI 得到ConnectionFactory对象;
(2)、用JNDI 得到目标队列或主题对象,即Destination对象;
(3)、用ConnectionFactory创建Connection 对象;
(4)、用Connection对象创建一个或多个JMS Session;
(5)、用Session 和Destination 创建MessageProducer和MessageConsumer;
(6)、通知Connection 开始传递消息。
import java.io.*; import javax.jms.*; import javax.naming.*; public class Sender { public static void main(String[] args) { new Sender().send(); } public void send() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { //Prompt for JNDI names System.out.println("Enter ConnectionFactory name:"); String factoryName = reader.readLine(); System.out.println("Enter Destination name:"); String destinationName = reader.readLine(); //Look up administered objects InitialContext initContext = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) initContext.lookup(factoryName); Destination destination = (Destination) initContext.lookup(destinationName); initContext.close(); //Create JMS objects Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer sender = session.createProducer(queue); //Send messages String messageText = null; while (true) { System.out.println("Enter message to send or 'quit':"); messageText = reader.readLine(); if ("quit".equals(messageText)) break; TextMessage message = session.createTextMessage(messageText); sender.send(message); } //Exit System.out.println("Exiting..."); reader.close(); connection.close(); System.out.println("Goodbye!"); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } }消息消费者
import java.io.*; import javax.jms.*; import javax.naming.*; public class Receiver implements MessageListener { private boolean stop = false; public static void main(String[] args) { new Receiver().receive(); } public void receive() { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); try { //Prompt for JNDI names System.out.println("Enter ConnectionFactory name:"); String factoryName = reader.readLine(); System.out.println("Enter Destination name:"); String destinationName = reader.readLine(); reader.close(); //Look up administered objects InitialContext initContext = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) initContext.lookup(factoryName); Destination destination = (Destination) initContext.lookup(destinationName); initContext.close(); //Create JMS objects Connection connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer receiver = session.createConsumer(queue); receiver.setMessageListener(this); connection.start(); //Wait for stop while (!stop) { Thread.sleep(1000); } //Exit System.out.println("Exiting..."); connection.close(); System.out.println("Goodbye!"); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } public void onMessage(Message message) { try { String msgText = ((TextMessage) message).getText(); System.out.println(msgText); if ("stop".equals(msgText)) stop = true; } catch (JMSException e) { e.printStackTrace(); stop = true; } } }
Active MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:
(1)、提供点到点消息模式和发布/订阅消息模式;
(2)、支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;
(3)、新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;
(4)、拥有消息持久化、事务、集群支持等JMS基础设施服务。
下篇学习Active MQ
相关推荐
- **3-1 JMS规范**:Java消息服务(Java Message Service, JMS)是Java平台中关于面向消息中间件(MOM)的标准客户端接口,它的主要目的是让Java应用程序能够与实现JMS的应用程序服务器通信。JMS定义了两种消息模型...
本文主要探讨了消息中间件的原理,以及Java消息服务(JMS)这一规范。 中间件是一种复用型软件,它位于操作系统、网络和数据库之间,为上层应用软件提供运行和开发的环境。中间件的主要任务是简化分布式系统的构建...
首先,ActiveMQ基于Java Message Service (JMS) 规范,提供了多种协议支持,如OpenWire、STOMP、AMQP和MQTT等,这使得它能与各种语言和平台无缝集成。在服务器环境中,ActiveMQ能够帮助处理高并发场景下的消息传递,...
综上所述,消息中间件及其JMS规范为分布式系统提供了强大的通信机制。通过异步、解耦和一对多的特性,消息中间件极大地增强了系统的灵活性、扩展性和可靠性。对于开发人员来说,理解和掌握JMS接口和消息模型是构建...
JMS规范详情 AMQP协议详情 RocketMQ RabbitMQ Kafka ActiveMQ ......对比
通过这个视频系列,你不仅能够掌握ActiveMQ的基本操作,还能深入理解JMS规范,从而在实际项目中更有效地利用消息中间件进行解耦和异步通信。此外,了解ActiveMQ的性能调优和故障排查技巧也是提升系统稳定性的关键,...
关键词的选取充分反映了论文的主题内容:消息中间件、JMS规范、JMX分布式管理框架。本文不仅展示了JMS与JMX结合的创新应用,而且为消息中间件的开发提供了理论支持和实践指导。对于理解和实现高效的消息服务系统具有...
【消息中间件和JMS原理】是分布式系统中重要的组件,它们主要解决了传统RPC(Remote Procedure Call)中间件...通过选择合适的消息中间件产品并遵循JMS规范,开发者可以构建出高可用、高性能和高度可扩展的分布式系统。
JMS规范的核心概念包括消息生产者(Message Producers)、消息消费者(Message Consumers)和消息代理(Message Brokers)。消息生产者负责创建并发送消息,而消息消费者则负责接收和处理这些消息。消息代理是中间的...
Apache ActiveMQ是业界广泛使用的一款开源消息中间件,它基于Java Message Service (JMS) 规范,提供了高效、可靠的异步通信能力。在“activeMQ-jms”这个压缩包中,我们通常会找到Apache ActiveMQ 5.13.4版本的相关...
Java Message Service(JMS)是Sun Microsystems提出的一种规范,用于统一不同的消息传递中间件(Message-Oriented Middleware, MOM)系统的接口。JMS为开发者提供了一个与特定平台无关的API,使得开发人员能够更...
**ActiveMQ-JMS简单实例** ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它实现了...通过阅读文档,学习JMS规范,以及实践代码示例,你可以深入理解消息中间件的工作原理,并掌握其在企业级应用中的应用。
常见的实现JMS规范的消息中间件产品包括ActiveMQ、RocketMQ、RabbitMQ、HornetQ等。每种消息中间件各有其特点和适用场景,开发者可以根据项目的具体需求和特性来选择合适的中间件产品。 消息中间件在分布式系统中的...
**ActiveMQ**是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,为应用程序提供了一个中间件,允许应用程序之间进行异步的消息通信。ActiveMQ支持多种协议,如OpenWire、AMQP、STOMP、MQTT等,适用于多种...
Java消息服务(JMS)是由Sun Microsystems提出的一种消息中间件接口规范,旨在统一不同消息中间件系统的接口,提高应用程序的可移植性和互操作性。 #### 四、JMS介绍 **4.1 JMS简介** JMS是Java平台上的消息中间件...
ActiveMQ是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,允许应用程序之间进行异步通信。JMS是一种标准接口,用于Java平台上的消息传递,提供了可靠的消息传递机制,确保消息的顺序和持久性,同时也...
标题中的"sun-jms.rar_jms_message queue"表明我们关注的是Sun Microsystems实现的JMS规范,以及Message Queue这一特定的消息中间件。Sun Microsystems的Message Queue是早期Oracle公司提供的一个JMS兼容的消息代理...
消息中间件和JMS消息服务在IT行业中扮演着至关重要的角色,尤其是在构建大型分布式系统时。传统的RPC中间件如CORBA、DCOM和RMI虽然广泛应用,但它们在处理复杂性和同步通信方面存在局限。为了解决这些问题,面向消息...
### TongLINK/Q 消息中间件相关知识点 #### 一、产品概述 TongLINK/Q 是由北京东方通科技发展有限责任公司开发的一款消息中间件,主要用于构建高效、可靠的分布式应用程序。该中间件提供了多种消息传递模式,包括点...