`
QING____
  • 浏览: 2253414 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JMS订阅模式消息

    博客分类:
  • JMS
 
阅读更多

    发布/订阅模型,消息会发送到一个名为主题(Topic)的虚拟通道中,消息生产者成为发布者(Publisher),消息的消费者成为订阅者(Subscriber);与点对点模型的最大不同,就是发布到主题的消息,能够被多个订阅者接收,类似于广播.发布/订阅模型的消息传输机制是一个基于推送(push)的方式,消息将有JMS Provider主动的向消息消费者广播,消费者客户端无需请求或者轮询,只需要保持Connection的活跃性即可.

    不过在发布/订阅消息传送模型的内部,有多种不同类型的订阅者(比如,受托管订阅者,耐久订阅者,临时订阅者,动态订阅者);临时订阅者(TemporarySubscriber)为只有它主动侦听主题时才能收到消息,JMS Provider不会为临时订阅者持久存储"离线时"的任何消息的副本;持久订阅者将接收到发布的每条消息的一个副本,即使发布消息时,订阅者处于"离线"状态.

 

###contextFactory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
###brokerUrl,any protocol
java.naming.provider.url = tcp://localhost:61616
##username
##java.naming.security.principal=
##password
##java.naming.security.credentials=
##connectionFactory,for building sessions
connectionFactoryNames = QueueCF,TopicCF
##topic.<topicName> = <physicalName-of-topic>
##your application should use <topicName>,such as:
## context.lookup("topic1");
##It can be more than once
topic.topic1 = jms.topic1
##queue.<topicName> = <physicalName-of-queue>
queue.queue1 = jms.queue1

 

//Topic发布者
package com.test.jms.simple.topic;

import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

public class SimplePublisher {

	private TopicPublisher producer;
	private TopicSession session;
	private TopicConnection connection;
	private boolean isOpen = true;
	
	public SimplePublisher() throws Exception{
		Context context = new InitialContext();
		TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF");
		connection = connectionFactory.createTopicConnection();
		connection.setClientID("OK111");
		session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = (Topic)context.lookup("topic1");
		producer = session.createPublisher(topic);//non durable
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		connection.start();
		
	}
	
	
	public boolean send(String text) {
		if(!isOpen){
			throw new RuntimeException("session has been closed!");
		}
		try{
			Message message = session.createTextMessage(text);
			producer.send(message);
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		try{
			if(isOpen){
				isOpen = false;
			}
			session.close();
			connection.close();
		}catch (Exception e) {
			//
		}
	}
	
}

 

//Topic订阅者
package com.test.jms.simple.topic;

import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;

import com.test.jms.object.TopicMessageListener;

public class SimpleSubscriber {

	private TopicConnection connection;
	private TopicSession session;
	private TopicSubscriber consumer;
	
	private boolean isStarted;
	
	public SimpleSubscriber(String clientId) throws Exception{
		Context context = new InitialContext();
		TopicConnectionFactory connectionFactory = (TopicConnectionFactory)context.lookup("TopicCF");
		connection = connectionFactory.createTopicConnection();
		connection.setClientID(clientId);
		session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = (Topic)context.lookup("topic1");
		consumer = session.createDurableSubscriber(topic, "Test-subscriber");
		consumer.setMessageListener(new TopicMessageListener());
	}
	
	
	public synchronized boolean start(){
		if(isStarted){
			return true;
		}
		try{
			connection.start();
			isStarted = true;
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		isStarted = false;
		try{
			session.close();
			connection.close();
		}catch(Exception e){
			//
		}
	}
}

 

//测试类
package com.test.jms.simple.topic;




public class SimpleTestMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		SimpleSubscriber consumer = new SimpleSubscriber("TestClientId");
		consumer.start();
		
		SimplePublisher productor = new SimplePublisher();
		for(int i=0; i<10; i++){
			productor.send("message content:" + i);
		}
		productor.close();
		//consumer.close();
	}
	

}

 

    在session.createSubscriber(Topic topic)方法中,将会创建一个"非耐久性"主题,即只有subscriber侦听时才会收到消息,当subscriber离线时,它将错过消息.session.createDurableSubscriber(Topic topic,String name)用来创建一个耐久性订阅者,这种订阅者不会错过离线时的消息,JMS Provider将会为它保留所有的消息副本(必须符合相应的消息选择器).其中参数"name"用来表示此订阅者的名字,name的值可以任意,也不允许重复.

    其中耐久性订阅者,必须对connection设定ClientId且此ID全局不能重复,否则将会抛出:javax.jms.JMSException: You cannot create a durable subscriber without specifying a unique clientID on a Connection.

    一个session中只能创建一个耐久性订阅者,否则将抛出异常:javax.jms.JMSException: Durable consumer is in use for client;不过一个connection下可以有多个耐久性订阅者.

    如果一个connection下有多个耐久订阅者时,此时订阅者的name不能重复,否则抛出: javax.jms.JMSException: Durable consumer is in use for client: TestClientId and subscriptionName: ..

    session.unsubscribe(String name)方法为取消订阅,取消当前connection下指定name的订阅者.此后JMS Provider将不会为其保存消息副本.如果你确定一个耐久订阅者不会再次激活时,你需要"取消订阅",否则JMS Provider将会一直为它保存消息副本,而且极有可能带来存储上的风险,如果磁盘或者内存消耗完毕,将会导致JMS Provider故障.

 

    Topic中消息副本的存储模式(数据库描述):

 

++++++++++++Consumers table+++++++
++id      |       Name                      |       destinationId         |     created
1                  clientID1::name1                  testTopoc               122222222
2                  clientID1::name2                  testTopoc               122323232
//其中Name + destinationId为唯一索引.


++++++++++++Messages_handles+++++++
++id      |    messagId    |     destinationId    |    consumerId  |   delivered

1                 10010                 testTopic                      1                0
2                 10010                 testTopic                      2                0
//消息的实体将会保存在其他表中,通过messageId与其关联.
//此表中messageId + destinationId + consumerId为唯一索引.

  

    通过这个存储模式,我们能够理解出JMS Provider是如何创建消息副本的:每创建一个耐久订阅者都将会在Consumer表中新增一条记录,当"取消订阅"之后,相应的consumer记录也会被删除;当一个Topic中新的消息生成时,将会检索consumer表中此destinationId下的所有consumer,然后为每个conusmer生成消息副本--在Message_handles表中插入一条数据;如果某个consumer消费了一条消息,将会在message_handles表中删除消息副本记录.(某些JMS 实现,可能是记录每个订阅者已经消费的最后一个消息的ID,而不是消息的副本)

 

    其中createDurableSubscriber(Topic topic,String name,String selector,boolean noLocal)方法中还有一个重要的参数--noLocal,此参数主要用来控制此订阅者是否接受本地消息,所谓本地消息就是当前Connection下其他publisher发送的消息(对于JMS Provider而言,就是ClientID标识),如果noLocal = true,那么意味着将只能收到其他Client发布的消息.

分享到:
评论

相关推荐

    JMS IBM MQ 订阅模式

    总结,JMS与IBM MQ结合的订阅模式提供了一种可靠且灵活的方式,允许应用程序在分布式环境中进行大规模的消息交换。通过理解并熟练运用这些概念和技术,开发者能够构建出高效、可扩展的消息驱动系统。

    JMS实现的信息的广播订阅

    总结起来,JMS的广播订阅模式通过Topic实现了高效的消息广播,使得多个订阅者能同时接收到相同的消息。这一特性在构建大型分布式系统、事件驱动架构以及实时通知系统中非常有价值。理解和熟练运用JMS广播订阅,能够...

    JMS--消息持久订阅者测试

    ### JMS 消息持久订阅者测试知识点解析 #### 一、引言 Java消息服务(JMS)是Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS定义了应用程序如何...

    JMS 教程 - 消息队列、消息服务

    相比之下,采用JMS的消息传递模式能够显著降低系统的复杂度。在JMS中,消息由客户端发送至消息服务器,再由消息服务器转发给目标客户端。这种方式不仅降低了客户端之间的耦合度,还极大地提高了系统的可扩展性和健壮...

    weblogic中使用JMS发送和接受消息

    通过理解并熟练掌握上述内容,你将在WebLogic环境中成功地利用JMS进行消息传递,无论是简单的点对点通信还是复杂的发布/订阅模式,都能游刃有余。请务必根据具体需求进行配置,并确保测试环节充分,以确保JMS服务的...

    JMS消息队列机制及案例

    5. **主题(Topic)**:主题支持发布/订阅模式,一个消息可以被多个订阅者同时接收,实现广播式通信。 6. **案例分析**: - **订单处理**:当大量订单涌入时,使用JMS队列可以避免系统因处理能力不足而崩溃。订单...

    ActiveMQ订阅模式持久化实现

    在发布/订阅模式下,消息生产者(Publisher)发送消息到主题(Topic),而多个消息消费者(Subscriber)可以订阅该主题,从而接收消息。这种模式特别适用于广播式通信,让所有订阅者都能接收到相同的消息。 **持久...

    jms远程IBM MQ 收发消息

    本教程将深入探讨如何使用JMS客户端模式来实现与IBM MQ的远程通信,包括同步和异步的消息收发处理。 首先,理解JMS的基本概念至关重要。JMS提供两种主要的消息模型:点对点(Point-to-Point,P2P)和发布/订阅...

    JMS消息模型 JMS学习.doc

    3. **JMS Domains**:JMS有两种主要的消息模型,即点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)模式。 - **Point-to-Point (PTP)模式**:在这种模型中,消息从一个生产者发送到一个...

    Spring发送接收JMS消息

    **Spring与JMS消息传递** 在Java世界中,Java Message Service (JMS) 是一个标准接口,用于在分布式环境中...在实际项目中,结合具体的业务需求,可以灵活选择合适的模式和策略来利用Spring和JMS实现高效的消息通信。

    java实现的基于jms协议的消息队列中间件,源码!

    6. **点对点与发布/订阅模型**:JMS支持两种消息模式,点对点(Queue)模型是单个消费者从队列中获取消息,而发布/订阅(Topic)模型允许多个订阅者同时接收消息。 源码分析方面,可以深入研究以下几个关键部分: 1...

    activemq和spring整合发布消息和订阅消息demo

    3. **发布/订阅模式**:在JMS中,有两种消息模型:点对点(Queue)和发布/订阅(Topic)。在这个示例中,我们关注的是发布/订阅模式。在这种模式下,一个消息发布者将消息发送到主题,而多个订阅者可以订阅该主题以...

    MQ JMS 发布订阅配置、代码

    【MQ JMS 发布订阅配置】是分布式消息传递系统中的一种模式,允许消息生产者(发布者)发送消息,而消息消费者(订阅者)可以选择性地接收这些消息。IBM 的 WebSphere MQ(简称 MQ)提供了Java消息服务(JMS)接口来...

    ActiveMQ-Topic订阅发布模式Demo

    在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这个主题,从而接收到这些消息。这种模式适用于需要广播消息或通知所有感兴趣方的情况。 在博客链接中...

    jms对获取消息

    消息的分发是广播式的,每个订阅者都可以独立处理消息,适合一对多的通信模式。 JMS的关键组件包括: - **消息生产者(Message Producer)**:创建并发送消息到消息代理。 - **消息消费者(Message Consumer)**:...

    消息中间件和JMS消息服务.pdf

    JMS支持两种基本的消息模型:点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。这两种模型分别对应于不同的应用场景和需求。 1. **点对点模型**:在此模型中,消息被发送到特定的队列,每个...

    在spring boot中使用jms集成IBM-MQ和TLQ,包含普通队列和主题订阅两种模式,并实现按需加载

    ##################################1、工程说明##################...3) 实现了普通队列消息发送与监听,实现了基于TOPIC的消息发布与订阅 4) IBM-MQ无需提前创建主题,TongLink需要提前创建主题以及对应的虚拟队列;

    JMS 简介以及Weblogic配置JMS图解

    综上所述,JMS为Java开发者提供了一套标准的接口,用于构建可靠的消息传递系统,无论是简单的点对点通信还是复杂的发布/订阅模式。通过使用JMS,开发者可以专注于业务逻辑,而不必关心底层的消息传输细节,这极大地...

    订阅发布模式

    订阅发布模式(Publish/Subscribe,简称Pub/Sub)是一种在分布式系统中进行消息传递的设计模式,它允许消息生产者(发布者)将消息发送到中间件(通常是一个消息代理),然后由多个消息消费者(订阅者)根据自己的...

    jms消息通讯

    而消费者则有两种形式,MessageConsumer用于接收点对点模式下的消息,而TopicSubscriber用于接收发布/订阅模式下的消息。 JMS还引入了两种消息持久化策略:非持久化消息和持久化消息。非持久化消息在服务器宕机后会...

Global site tag (gtag.js) - Google Analytics