JMS不仅需要高效的存储消息,还需要确保消息能够无误的被传输.这就需要JMS提供一种"保证传送"机制和事务.如果抛开JMS的规范,那么它的技术实现本身就是网络IO + 文件存储;其中网络IO的困境就在"数据传输安全保证""网络失效"等方面,"文件存储"即要求数据需要被高效的存取.其中"文件存储"层面已经有较多的第三方存储工具和解决方案,比如文件类型数据库/内存数据库等.如果消息系统需要分布式且整体架构良好,那么上述问题,就更加棘手.
一.保证传送
保证传送,用来描述JMS Client和JMS Provider之间的消息交互是"可靠的",这种"可靠"并非绝对意义上的百分百数据不丢失或者数据不重复,所谓"保证传送",就是在良好的API设计和相对良好的运行环境中,消息最终会被接收和消费;但是如果API设计不良或者运行环境极其糟糕,比如事务使用不正确,或者确认机制操作不当,或者网络环境糟糕以至于无法连续的通讯等,都将会导致消息的发送和接收,出现"意料之外"的事情:队列消息被多次发送,消息被重复接收等.
一条消息的"来龙去脉"可以简单的通过上图表示
1: 消息生产者中send会阻塞操作(网络IO阻塞),如果网络异常,将导致消息发送失败,此时生产者需要在编码设计上做好异常检测或者重发机制;其中网络异常主要体现在"网络中断"或者JMS Provider失效.
2: JMS Provider接收到消息之后,将会保存在内存中(或者DB中),此后并立即向此Producer发送"ACK"信号,如果JMS Provider在存储上遇到无法解决的异常(比如磁盘故障,嵌入式DB异常),那么"ACK"响应将会携带异常信息(message);如果"ACK"发送中网络IO异常导致无法发送,将会导致"此消息"被移除,此时send方法也将从阻塞中返回并抛出异常.因为网络IO异常,通常TCP链接的双端都能及时的感知到.
ACK发送正常,指的是TCP连接上"ACK"数据已经通过网络发送给了Client,这个过程中没有发生异常,此后也意味着当前JMS消息被持久存储,producer的send方法也正确返回.此后如果Client端在处理ACK信息过程中出现问题(几乎不可能),那么JMS Provider也将认为此消息已经被正确发送.需要提醒的是,在Client端:消息的send和ACK在IO流上不是一个连续的过程,在不同的确认时机中,ACK信息是可以是同步的(send方法阻塞,知道收到ACK,比如持久化类型的消息),有些时异步获取的(比如事务中的消息,或者非持久化消息),不过JMS中的ACK MODE都producer并没有任何意义..
3: 消息消费者receive消息,这个操作在Queue中是consumer主动"拉取"(监听),在Topic中是Provider推送;当消费者接收到消息之后,可以立即确认(AUTO),然后再去执行与消息有关的业务;也可以"延迟"确认等.
4: 当消息被消费者接收到之后,JMS Provider将消息添加到此消费者的"待确认消息"列表,如果JMS Provider在此消费者的IO通道中阻塞返回,但却没有收到ACK,那么将导致此消息重发.如果ACK正常接收到,那么JMS Provider将会把此消息从持久存储设备删除.
如下为JMS中ACK MODE:
1. AUTO_ACKNOWLEDGE:自动确认,当Consumer客户端(通常是指Consumer所属的Session)在收到消息之后,即准备确认消息,通常在receive()方法返回之前,或者在messageListener.onMessage()正常执行之后,向Provider发送ACK指令..所谓AUTO_ACK,就是确认的时机有Session"择机"实施;开发者无法干扰.
2. DUPS_OK_ACKNOWLEDGE: 允许延迟批量确认,重点在"批量",AUTO_ACK通常是确认一条消息(事实上在不同的JMS实现中,它仍然可以像DUPS一样确认多条消息)..当消费者接收到消息之后,并不是立即确认,而是"滞留"一段时间之后才会确认(批量确认);这种方式直观上是提升了client端消费数据的速度(优先接触消息),但这种模式下,需要消费者能够接受"消息重发"的情况,比如当Consumer客户端失效重启,那些尚未确认但已经被"touch"(消费)过的消息有可能会重复接受到;对于JMS Provider而言,如果它"等待"一定时间后,仍未收到"确认",将会重发消息.,这种模式,对性能的提升是不明确的,因为较晚的确认消息,意味着JMS Provider需要更长时间的保留"待确认"消息列表..究竟多少条消息作为"批量"的边界,有具体的JMS实现者决定.
3. CLIENT_ACKNOWLEDGE: 客户端确认,需要要求消息消费者明确的调用message.acknowledge()方法来确认此消息的接收成功,无论何种原因(未捕获异常),如果此方法未被调用,那么此消息将会被重发.这种模式下,允许消费者自己选择时机确认消息,通常使用在消息编组(group)的情况下:将一系列消息作为一个group接收,当group中最后一个消息确认成功后,那么JMS Provider就认为此组消息全部接收成功(只需确认组的最后一条消息即可,那么JMS Provider会认为此前的其他消息也接收正常).
二.事务
事务用来描述"一系列消息要么全部确认成功,要么全不确认"的特征,它和数据库事务最终需要达成的效果是一样的.JMS Provider会缓存每个生产者当前事务下的所有消息,直到commit或者rollback.commit操作将会导致事务中所有的消息被持久存储;rollback意味着JMS Provider将会清除此事务下所有的消息记录...在事务未提交之前,消息是不会被持久存储的,也不会被消费者消费.
每次事务提交之后,在client端会生成一个事务ID(一个session中不会出现重复的ID,clientID:sessionID:txID);事务的提交或者回滚都会携带ID.对于producer而言,在事务类型的session中,发送消息(一个或者多个)之后,需要执行session.commit(),否则消息将不会被存储.对于consumer而言,消息接收到之后,需要手动的使用commit,否则JMS Provider会认为消息没有被接收,导致重发,因此你可以认为commit就是一个消息确认操作.
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); try{ Message message = session.createTextMessage(text); String cid = "ID:" + System.currentTimeMillis(); message.setJMSCorrelationID(cid); producer.send(message); session.commit(); }catch(Exception e){ // }
public class QueueMessageListener implements MessageListener{ private Session session; public QueueMessageListener(Session session){ this.session = session; } public void onMessage(Message message) { if(message == null){ return; } try{ //message handler // session.commit(); }catch(Exception e){ e.printStackTrace(); } } }
三.消息编组
一序列消息,如果不编组的话,可能会被分发给不同的消费者;但是很多时候,我们期望这"一序列"的消息能够有序的交付给一个消费者,无论是消息的发送还是消费,都希望它们是不可分割的"组合".那么此时我们需要"消息编组",我们需要使用到"JMSXGroupID"/"JMSXGroupSeq"两个属性.注意JMSProvider并不会根据JMSXGroupSeq进行排序,顺序还是需要自己来维护.
1.消息生产者
package com.test.jms.simple; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; public class XGroupProducer { private MessageProducer producer; private Session session; private Connection connection; private boolean isOpen = true; public XGroupProducer() throws Exception{ Context context = new InitialContext(); ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF"); connection = connectionFactory.createConnection(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination queue = (Queue)context.lookup("queue1"); producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); } public boolean send(String[] texts) { if(!isOpen){ throw new RuntimeException("session has been closed!"); } try{ synchronized (this) { String groupId = connection.getClientID() + ":Group"; Message bm = session.createTextMessage(); bm.setStringProperty("GroupMarker", "begin"); producer.send(bm); for(int i= 0 ; i < texts.length; i++) { Message message = session.createTextMessage(texts[i]); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", i +1); producer.send(message); } //取消group粘性 Message em = session.createTextMessage(); em.setStringProperty("JMSXGroupID", groupId); em.setIntProperty("JMSXGroupSeq", 0); em.setStringProperty("GroupMarker", "end"); producer.send(em); session.commit(); } return true; }catch(Exception e){ return false; } } public synchronized void close(){ try{ if(isOpen){ isOpen = false; } session.close(); connection.close(); }catch (Exception e) { // } } }
2.消息消费者
package com.test.jms.simple; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueConnectionFactory; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import com.test.jms.object.XGroupQueueMessageListener; public class XGroupConsumer { private Connection connection; private Session session; private MessageConsumer consumer; private boolean isStarted; public XGroupConsumer() throws Exception{ Context context = new InitialContext(); ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = (Queue)context.lookup("queue1"); consumer = session.createConsumer(queue); consumer.setMessageListener(new XGroupQueueMessageListener()); } public synchronized boolean start(){ if(isStarted){ return true; } try{ connection.start(); isStarted = true; return true; }catch(Exception e){ return false; } } public synchronized void close(){ isStarted = false; try{ session.close(); connection.close(); }catch(Exception e){ // } } }
3.测试类
public class XGroupTestMain { /** * @param args */ public static void main(String[] args) throws Exception{ XGroupConsumer consumer = new XGroupConsumer(); consumer.start(); XGroupProducer producer = new XGroupProducer(); producer.send(new String[]{"1","2","3","4"}); } }
相关推荐
本书深入浅出地讲解了jms1.1规范的底层技术、java类和接口、编程模型及其不同实现等java消息服务(jms)和消息传送机制关键技术。通过对支持点对点和发布/订阅“消息传送”的标准api的完全解读及具体实例,介绍了...
1. **消息类型选择**:客户端必须使用`javax.jms.TextMessage`或`javax.jms.BytesMessage`来向服务器传送SOAP请求消息。如果请求消息包含附件,则必须使用`BytesMessage`;若无附件,客户端可选择使用`TextMessage`...
JMS 提供了可靠的消息传递机制,是企业级应用集成的重要组件。在 WebLogic 中部署 JMS,主要包括以下几个步骤: 1. **登录 WebLogic 控制台**: 首先,你需要通过浏览器访问 WebLogic Server 的管理控制台,通常...
所有的消息传送操作都是通过由JMSProvider提供的JMS API对象执行的。在编写JMS程序时,ConnectionFactory对象用于创建连接,而连接用于创建Session。Session对象创建MessageProducer和MessageConsumer,用于发送和...
- 定义:企业消息传送系统(Enterprise Messaging System, EMS)是一种分布式计算环境中进行消息传递的基础设施。 - **需求**:支持异步通信、消息的持久性和可靠性。 - **集中式消息管理系统 (MOM)**:一种基于...
7. JMS主题:支持发送消息给多个订阅者的机制。 8. ConnectionFactory:用户用来创建到JMS提供者的连接的被管对象。 9. Connection:代表应用程序和消息服务器之间的通信链路。 10. Destination:消息发布和接收的...
在用户状态实时监控系统中,XML处理器负责解析从JMS服务器接收到的XML消息,并提取出用户状态信息,然后将这些信息传送到监控端,实现对用户状态的实时监控。 实现用户状态实时监控系统还涉及对监控端的处理,监控...
JMS作为一种成熟的消息传递机制,在现代软件架构中扮演着重要的角色。无论是异构系统集成、高并发环境下的消息传递还是复杂的事件通知系统,JMS都能够提供高效、可靠的解决方案。通过深入理解JMS的基本概念、工作...
Java事件传送技术是Java应用程序中实现组件间通信的重要机制,特别是在J2EE(Java 2 Platform, Enterprise Edition)环境中。这种技术允许不同的组件之间通过触发和响应事件来交互,而无需知道对方的具体实现细节,...
- JMS支持两种主要的消息传送模型: - **PTP Domain**: 使用Queue作为消息目标。 - **Pub/Sub Domain**: 使用Topic作为消息目标。 - 不同的领域适用于不同的应用场景,用户可以根据实际需求选择合适的模型。 ##...
可靠消息传送** - **确认机制**:确保消息成功处理后才移除。 - **事务支持**:保证消息处理的一致性。 **2. 持久性存储** - **持久化**:将消息保存在磁盘上,防止因系统故障导致消息丢失。 **3. 性能考量** - *...
JMS是一种面向消息中间件的Java API,为分布式系统中不同应用之间提供了一种标准的消息通信机制。通过JMS消息模式实现消息的持久化,可以保证数据在传输过程中的安全性和可靠性,同时支持异步消息传递模式,提高系统...
Spring Integration是Spring框架的一部分,提供了一种轻量级的消息集成机制,它将Spring的依赖注入、面向切面编程等核心特性应用在消息传递的场景中,以简化企业应用中复杂的消息处理。 从内容简要来看,《Spring ...
HornetQ提供了多种机制来确保消息的可靠传送,包括确认机制和重试策略等。 ##### 4.4 交易(TRANSACTIONS) 交易支持允许在一个或多个消息操作完成后才提交,这有助于保持数据一致性。 ##### 4.5 持久性...
- 登录WebLogic控制台,进入“消息 -> 消息传送 -> JMS”模块。 - 创建JMS模型(如WFSystemModel),并在模型内创建连接工厂(如WFConnectionFactory),设置JNDI名称。 - 创建JMS服务器,并选择文件存储方式。 ...