发布/订阅模型,消息会发送到一个名为主题(Topic)的虚拟通道中,消息生产者成为发布者(Publisher),消息的消费者成为订阅者(Subscriber);与点对点模型的最大不同,就是发布到主题的消息,能够被多个订阅者接收,类似于广播.发布/订阅模型的消息传输机制是一个基于推送(push)的方式,消息将有JMS Provider主动的向消息消费者广播,消费者客户端无需请求或者轮询,只需要保持Connection的活跃性即可.
不过在发布/订阅消息传送模型的内部,有多种不同类型的订阅者(比如,受托管订阅者,耐久订阅者,临时订阅者,动态订阅者);临时订阅者(TemporarySubscriber)为只有它主动侦听主题时才能收到消息,JMS Provider不会为临时订阅者持久存储"离线时"的任何消息的副本;持久订阅者将接收到发布的每条消息的一个副本,即使发布消息时,订阅者处于"离线"状态.
###contextFactory java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory ###brokerUrl,any protocol java.naming.provider.url = tcp://localhost:61616 ##username ##java.naming.security.principal= ##password ##java.naming.security.credentials= ##connectionFactory,for building sessions connectionFactoryNames = QueueCF,TopicCF ##topic.<topicName> = <physicalName-of-topic> ##your application should use <topicName>,such as: ## context.lookup("topic1"); ##It can be more than once topic.topic1 = jms.topic1 ##queue.<topicName> = <physicalName-of-queue> queue.queue1 = jms.queue1
//Topic发布者 package com.test.jms.simple.topic; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; public class SimplePublisher { private TopicPublisher producer; private TopicSession session; private TopicConnection connection; private boolean isOpen = true; public SimplePublisher() throws Exception{ Context context = new InitialContext(); TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF"); connection = connectionFactory.createTopicConnection(); connection.setClientID("OK111"); session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic)context.lookup("topic1"); producer = session.createPublisher(topic);//non durable producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); } public boolean send(String text) { if(!isOpen){ throw new RuntimeException("session has been closed!"); } try{ Message message = session.createTextMessage(text); producer.send(message); return true; }catch(Exception e){ return false; } } public synchronized void close(){ try{ if(isOpen){ isOpen = false; } session.close(); connection.close(); }catch (Exception e) { // } } }
//Topic订阅者 package com.test.jms.simple.topic; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import com.test.jms.object.TopicMessageListener; public class SimpleSubscriber { private TopicConnection connection; private TopicSession session; private TopicSubscriber consumer; private boolean isStarted; public SimpleSubscriber(String clientId) throws Exception{ Context context = new InitialContext(); TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF"); connection = connectionFactory.createTopicConnection(); connection.setClientID(clientId); session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = (Topic)context.lookup("topic1"); consumer = session.createDurableSubscriber(topic, "Test-subscriber"); consumer.setMessageListener(new TopicMessageListener()); } public synchronized boolean start(){ if(isStarted){ return true; } try{ connection.start(); isStarted = true; return true; }catch(Exception e){ return false; } } public synchronized void close(){ isStarted = false; try{ session.close(); connection.close(); }catch(Exception e){ // } } }
//测试类 package com.test.jms.simple.topic; public class SimpleTestMain { /** * @param args */ public static void main(String[] args) throws Exception{ SimpleSubscriber consumer = new SimpleSubscriber("TestClientId"); consumer.start(); SimplePublisher productor = new SimplePublisher(); for(int i=0; i<10; i++){ productor.send("message content:" + i); } productor.close(); //consumer.close(); } }
在session.createSubscriber(Topic topic)方法中,将会创建一个"非耐久性"主题,即只有subscriber侦听时才会收到消息,当subscriber离线时,它将错过消息.session.createDurableSubscriber(Topic topic,String name)用来创建一个耐久性订阅者,这种订阅者不会错过离线时的消息,JMS Provider将会为它保留所有的消息副本(必须符合相应的消息选择器).其中参数"name"用来表示此订阅者的名字,name的值可以任意,也不允许重复.
其中耐久性订阅者,必须对connection设定ClientId且此ID全局不能重复,否则将会抛出:javax.jms.JMSException: You cannot create a durable subscriber without specifying a unique clientID on a Connection.
一个session中只能创建一个耐久性订阅者,否则将抛出异常:javax.jms.JMSException: Durable consumer is in use for client;不过一个connection下可以有多个耐久性订阅者.
如果一个connection下有多个耐久订阅者时,此时订阅者的name不能重复,否则抛出: javax.jms.JMSException: Durable consumer is in use for client: TestClientId and subscriptionName: ..
session.unsubscribe(String name)方法为取消订阅,取消当前connection下指定name的订阅者.此后JMS Provider将不会为其保存消息副本.如果你确定一个耐久订阅者不会再次激活时,你需要"取消订阅",否则JMS Provider将会一直为它保存消息副本,而且极有可能带来存储上的风险,如果磁盘或者内存消耗完毕,将会导致JMS Provider故障.
Topic中消息副本的存储模式(数据库描述):
++++++++++++Consumers table+++++++ ++id | Name | destinationId | created 1 clientID1::name1 testTopoc 122222222 2 clientID1::name2 testTopoc 122323232 //其中Name + destinationId为唯一索引. ++++++++++++Messages_handles+++++++ ++id | messagId | destinationId | consumerId | delivered 1 10010 testTopic 1 0 2 10010 testTopic 2 0 //消息的实体将会保存在其他表中,通过messageId与其关联. //此表中messageId + destinationId + consumerId为唯一索引.
通过这个存储模式,我们能够理解出JMS Provider是如何创建消息副本的:每创建一个耐久订阅者都将会在Consumer表中新增一条记录,当"取消订阅"之后,相应的consumer记录也会被删除;当一个Topic中新的消息生成时,将会检索consumer表中此destinationId下的所有consumer,然后为每个conusmer生成消息副本--在Message_handles表中插入一条数据;如果某个consumer消费了一条消息,将会在message_handles表中删除消息副本记录.(某些JMS 实现,可能是记录每个订阅者已经消费的最后一个消息的ID,而不是消息的副本)
其中createDurableSubscriber(Topic topic,String name,String selector,boolean noLocal)方法中还有一个重要的参数--noLocal,此参数主要用来控制此订阅者是否接受本地消息,所谓本地消息就是当前Connection下其他publisher发送的消息(对于JMS Provider而言,就是ClientID标识),如果noLocal = true,那么意味着将只能收到其他Client发布的消息.
相关推荐
总结,JMS与IBM MQ结合的订阅模式提供了一种可靠且灵活的方式,允许应用程序在分布式环境中进行大规模的消息交换。通过理解并熟练运用这些概念和技术,开发者能够构建出高效、可扩展的消息驱动系统。
总结起来,JMS的广播订阅模式通过Topic实现了高效的消息广播,使得多个订阅者能同时接收到相同的消息。这一特性在构建大型分布式系统、事件驱动架构以及实时通知系统中非常有价值。理解和熟练运用JMS广播订阅,能够...
### JMS 消息持久订阅者测试知识点解析 #### 一、引言 Java消息服务(JMS)是Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS定义了应用程序如何...
相比之下,采用JMS的消息传递模式能够显著降低系统的复杂度。在JMS中,消息由客户端发送至消息服务器,再由消息服务器转发给目标客户端。这种方式不仅降低了客户端之间的耦合度,还极大地提高了系统的可扩展性和健壮...
通过理解并熟练掌握上述内容,你将在WebLogic环境中成功地利用JMS进行消息传递,无论是简单的点对点通信还是复杂的发布/订阅模式,都能游刃有余。请务必根据具体需求进行配置,并确保测试环节充分,以确保JMS服务的...
5. **主题(Topic)**:主题支持发布/订阅模式,一个消息可以被多个订阅者同时接收,实现广播式通信。 6. **案例分析**: - **订单处理**:当大量订单涌入时,使用JMS队列可以避免系统因处理能力不足而崩溃。订单...
在发布/订阅模式下,消息生产者(Publisher)发送消息到主题(Topic),而多个消息消费者(Subscriber)可以订阅该主题,从而接收消息。这种模式特别适用于广播式通信,让所有订阅者都能接收到相同的消息。 **持久...
本教程将深入探讨如何使用JMS客户端模式来实现与IBM MQ的远程通信,包括同步和异步的消息收发处理。 首先,理解JMS的基本概念至关重要。JMS提供两种主要的消息模型:点对点(Point-to-Point,P2P)和发布/订阅...
3. **JMS Domains**:JMS有两种主要的消息模型,即点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)模式。 - **Point-to-Point (PTP)模式**:在这种模型中,消息从一个生产者发送到一个...
**Spring与JMS消息传递** 在Java世界中,Java Message Service (JMS) 是一个标准接口,用于在分布式环境中...在实际项目中,结合具体的业务需求,可以灵活选择合适的模式和策略来利用Spring和JMS实现高效的消息通信。
6. **点对点与发布/订阅模型**:JMS支持两种消息模式,点对点(Queue)模型是单个消费者从队列中获取消息,而发布/订阅(Topic)模型允许多个订阅者同时接收消息。 源码分析方面,可以深入研究以下几个关键部分: 1...
3. **发布/订阅模式**:在JMS中,有两种消息模型:点对点(Queue)和发布/订阅(Topic)。在这个示例中,我们关注的是发布/订阅模式。在这种模式下,一个消息发布者将消息发送到主题,而多个订阅者可以订阅该主题以...
【MQ JMS 发布订阅配置】是分布式消息传递系统中的一种模式,允许消息生产者(发布者)发送消息,而消息消费者(订阅者)可以选择性地接收这些消息。IBM 的 WebSphere MQ(简称 MQ)提供了Java消息服务(JMS)接口来...
在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这个主题,从而接收到这些消息。这种模式适用于需要广播消息或通知所有感兴趣方的情况。 在博客链接中...
消息的分发是广播式的,每个订阅者都可以独立处理消息,适合一对多的通信模式。 JMS的关键组件包括: - **消息生产者(Message Producer)**:创建并发送消息到消息代理。 - **消息消费者(Message Consumer)**:...
JMS支持两种基本的消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。这两种模型分别对应于不同的应用场景和需求。 1. **点对点模型**:在此模型中,消息被发送到特定的队列,每个...
##################################1、工程说明##################...3) 实现了普通队列消息发送与监听,实现了基于TOPIC的消息发布与订阅 4) IBM-MQ无需提前创建主题,TongLink需要提前创建主题以及对应的虚拟队列;
综上所述,JMS为Java开发者提供了一套标准的接口,用于构建可靠的消息传递系统,无论是简单的点对点通信还是复杂的发布/订阅模式。通过使用JMS,开发者可以专注于业务逻辑,而不必关心底层的消息传输细节,这极大地...
订阅发布模式(Publish/Subscribe,简称Pub/Sub)是一种在分布式系统中进行消息传递的设计模式,它允许消息生产者(发布者)将消息发送到中间件(通常是一个消息代理),然后由多个消息消费者(订阅者)根据自己的...
而消费者则有两种形式,MessageConsumer用于接收点对点模式下的消息,而TopicSubscriber用于接收发布/订阅模式下的消息。 JMS还引入了两种消息持久化策略:非持久化消息和持久化消息。非持久化消息在服务器宕机后会...