`
fujohnwang
  • 浏览: 156269 次
社区版块
存档分类
最新评论

JMS Durable Subscription Tutorial

阅读更多

Table of Contents

过了近乎2个星期的倒班生活,现在都有点儿“恍如隔世”的感觉了,再加上老李临时给了个东西做,更是严重的“颠倒黑白”, 否则,这篇文字早就应该出来了,趁着今天可以偷懒,就在工作时间了结它吧!(别告我状啊,呵呵,偷偷di进村,打枪di不要...)

1. Durable Subscription[1]释义(What's Durable Subscription?)

对于什么是Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub),我就不用废话了,大家应该比我都清楚, 我就直接说Durable Subscription了。

对于通常的消息订阅来说, JMS Provider会对这类消息订阅者“一视同仁”,你来了,我就给你消息,你走了,我就不管你了。 当消息到达指定Topic之后,JMS Provider只会为已经连接并且订阅了该指定Topic的消息订阅者发送消息, 如果消息到达之后你恰好不在,那不好意思,你将接收不到这一消息。这就好像现在的商场促销活动,礼品(消息)有限,虽然你(相当于消息订阅者)也想获得相应的礼品, 但当发送礼品的时候你不在礼品派发现场的话,你将失去这一获得礼品(消息)的机会,因为商场可不会管你是何方神圣,对于JMS Provider来说, 也是同样道理,只要我(JMS Provider)派发消息的时候你不在,你收不到消息是你自己找的,跟我没有关系。 也就是说,JMS Provider不会“耗费脑筋”去记下谁还没有来接收消息,就跟商场不会纪录到底谁的礼品还没有来领取一样, 因为对于这种情况来说,耗费资源去这些不确定的client, 完全就是non-sense的,不是嘛? JMS Provider或者说商场,根本就不会知道谁会来领取消息或者礼品。

当我们转到Durable Subscription的时候,情况就完全不同了。如果消息订阅者通过Durable Subscription的方式来订阅消息, 那么JMS Provider将会费点儿脑筋来记下这个Durable Subscription的消息订阅者是谁,即使当消息到达之后,该Durable Subscription消息订阅者不在, JMS Provider也会保证, 该Durable Subscription消息订阅者重新回来之后,之前到达而该Durable Subscription消息订阅者还没有处理的消息,将被一个不少的发送给它。

Let's take something for example. 假设你经营一家旅游胜地的旅馆,对于来住宿的顾客来说,你可以将他们划分为两类:

散客,不事先预定你的旅店

对于这类顾客来说,你也不会知道他来自哪里,姓甚名谁,只会在他们入住后临时为期分配房间并提供相应服务,一旦他们退房离开,旅馆方将不再保留与之相关的任何信息。

这是不是与通常的消息订阅者很像?

常客,或许都持有你旅馆的VIP卡

对于这类顾客,一旦他们拥有了VIP卡,则意味着他们之前已经登记过,当他们再次光临的时候,根据VIP卡这一标志,你就可以确定他们上次入住的房间等信息, 这样就可以为他们提供相同的房间,相同的服务。直到他们主动注销VIP卡或者VIP卡期限到期, 你的旅馆将一直保留这类顾客的相关信息。

而这恰好与Durable Subscription场景下,JMS Provider与Durable Subscription消息订阅者之间的关系很相似。

简单来说,区分Durable Subscription和Nondurable Subscription最明显的一个标志就是,JMS Provider是否会为消息订阅者保存相应的状态。 对于Durable Subscription来说,JMS Provider会根据消息订阅者提供的某种标志来为其保留相应状态, 就类似于那张VIP卡或者身份证,在使用JMS API进行Durable Subscription的编程的时候,消息订阅者必须通过某种方式来提供这种标志性信息,这是最需要我们关注的一点。

在了解了Durable Subscription的语义之后,我们马上来看一下如何使用JMS API进行实际的Durable Subscription编程, 并详细了解在JMS API中,我们可以通过什么途径为JMS Provider提供Durable Subscription消息订阅者的标志信息...

2. 如何进行Durable Subscription(Durable Subscription How to)

我们以一个简化功能的类似Spring的SimpleMessageListenerContainer为例,来说明进行Durable Subscription的过程中应该注意的几个问题,下面是该类的代码:

public class GenericSimpleMessageListenerContainer extends ServiceWithLifecycle {
    
    private static final transient Log logger = LogFactory.getLog(GenericSimpleMessageListenerContainer .class);
    
    private JndiTemplate      jndiTemplate;
    private String            connectionFactoryJndiName;
    private String            destinationJndiName;
    private ConnectionFactory connectionFactory;
    private Destination       destination;
    
    private MessageListener   messageListener;
    
    private Connection        sharedConnection;
    private Session           session;
    private MessageConsumer   messageConsumer;
    
    /*
     * set a non-empty durableSubscriptionName to perform durable subscription
     */
    private String            durableSubscriptionName;
    /*
     * to identify durable subscriber plus durableSubscriptionName
     */
    private String            clientId;
    
    public GenericSimpleMessageListenerContainer()
    {
	
    }
    public GenericSimpleMessageListenerContainer(JndiTemplate jt)
    {
		this.jndiTemplate = jt;
    }
    
    protected void doStart()
    {
	Validate.notNull(getMessageListener());
	
	setupConnectionFactoryIfNecessary(jndiTemplate);
	setupDestinationIfNecessary(jndiTemplate);
	
	try {
	    setupSharedConnectionIfNecessary();
	    session = getSharedConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
	    if(StringUtils.isNotEmpty(getDurableSubscriptionName()))
	    {
		if(logger.isInfoEnabled())
		{
		    logger.info("create durable subsriber with name:"+getDurableSubscriptionName());
		}
		messageConsumer = session.createDurableSubscriber((Topic)getDestination(), getDurableSubscriptionName());
	    }
	    else
	    {
		if(logger.isInfoEnabled())
		{
		    logger.info("create generic Message Consumer.");
		}
		messageConsumer = session.createConsumer(getDestination());
	    }
	    messageConsumer.setMessageListener(getMessageListener());
	    getSharedConnection().start();
	    
	    if(logger.isInfoEnabled())
	    {
		logger.info("The Connection to deliver messages is Started now!");
	    }
	    
	} catch (JMSException e) {
	    logger.error("failed to start Message listener container!!!\n");
                    JmsUtils.closeMessageConsumer(getMessageConsumer());
	    JmsUtils.closeSession(getSession());
	    JmsUtils.closeConnection(getSharedConnection());
	    throw JmsUtils.convertJmsAccessException(e);
	}
	
    }
    
    protected void doStop()
    {
	try {
	    getSharedConnection().stop();
	} catch (JMSException e) {
	    logger.warn("failed to stop connection of delivering jms message.\n");
	    logger.warn(ExceptionUtils.getFullStackTrace(e));
	}
	JmsUtils.closeMessageConsumer(getMessageConsumer());
	JmsUtils.closeSession(getSession());
	JmsUtils.closeConnection(getSharedConnection());
    }
    /**
     * After creating connection from ConnectionFactory, 
     * we will check whether we can set clientId for the created connection, 
     * If a pre-configured client Id exists, we will not try to set our clientId;
     * otherwise we will set our custom client Id if it's not empty.<br>
     * 
     * The try-catch(IllegalStateException) is also for checking whether the jms provider has pre-configured a client Id for the connections it creates.<br>
     * If a pre-configured client id does exist, we will let it be, so after catching such exception, we just log it in WARN level to notify us.<br>
     *  
     * @throws JMSException
     */
    private void setupSharedConnectionIfNecessary() throws JMSException {
	if (getSharedConnection() == null) 
	{
	    setSharedConnection(getConnectionFactory().createConnection());
	    
	    String preConfiguredClientId = getSharedConnection().getClientID();
	    if(StringUtils.isEmpty(preConfiguredClientId) && StringUtils.isNotEmpty(clientId))
	    {
		try
		{
		    getSharedConnection().setClientID(clientId);
		    if(logger.isInfoEnabled())
		    {
			logger.info("set up JMS Connection with Client Id:"+clientId);
		    }
		}
		catch(IllegalStateException e)
		{
		    logger.warn("A pre-configured client id exists, durable subscriber will use this client id and ignore external setted client id.");
		    logger.warn("pre-configured client id:"+preConfiguredClientId);
		    logger.warn("external setted client id:"+clientId);
		}
	    }
	}
    }
    private void setupDestinationIfNecessary(JndiTemplate jndiTemplate) {
	if(getDestination() == null)
	{
	    Validate.notEmpty(getDestinationJndiName());
	    
	    try {
		setDestination((Destination)jndiTemplate.lookup(getDestinationJndiName()));
	    } catch (NamingException e) {
		throw new RuntimeException("failed to lookup destination via JNDI with jndiName:"+getDestinationJndiName());
	    }
	}
    }
    private void setupConnectionFactoryIfNecessary(JndiTemplate jndiTemplate) {
	if(getConnectionFactory() == null)
	{
	    Validate.notEmpty(getConnectionFactoryJndiName());
	    
	    try {
		setConnectionFactory((ConnectionFactory)jndiTemplate.lookup(getConnectionFactoryJndiName()));
	    } catch (NamingException e) {
		throw new RuntimeException("failed to lookup ConnectionFactory via JNDI with jndiName:"+getConnectionFactoryJndiName());
	    }
	}
    }

	// getters and setters...
}
			

如果你在使用Spring 2.x之前的版本而又不能升级,那么这个类可以“凑合”用一下(因为它的功能并不完备,比如没有添加MessageSelector以及多线程等功能支持), 如果可能,还是建议你使用Spring 2.x之后引入的SimpleMessageListenerContainer或者DefaultMessageListenerContainer,当然了,这些属于题外话,

使用JMS API进行Durable Subscription编程与通常的方式没有太多差异,只要搞清楚一下两点,剩下的基本就不会有太大问题了。

2.1. Client Id

JMS规定了两种Administered Object,即ConnnectionFactory和Destination,所以,“万物伊始”,我们得先将这两个东西从JNDI上拿下来, GenericSimpleMessageListenerContainer提供了两种方式,要么你在外面获取到这两个东西, 然后直接注入给他;要么你就传一个JndiTemplate, 然后注入这两个东西对应的Jndi名称。

有了ConnectionFactory,我们可以通过它创建到相应JMS Provider的连接;有了Destination,我们才知道该去哪里接收消息,我想这个很容易理解, 这里需要着重说明的是ConnectionFactory。

我们已经说过, 要进行Durable Subscription,客户端必须提供某种类似VIP卡或者身份证之类的标志,在JMS中,Client Id的存在即是因为如此。 将Client Id称作Connection Id或许更好理解,它与JMS的Connection相“挂钩”,当一个JMS Connection被创建之后, 它有两种方式获得它的Client Id:

  • 通过ConnectionFactory自动获得.  既然ConnectionFactory属于Administered Object, 那么在各个JMS Provider中部署相应ConnectionFactory的时候, 我们就可以设定通过ConnectionFactory创建Connection的时候,是否要为创建的Connection设定Client Id, 以及该设定什么样的Client Id, 而具体设定方式可能需要参考各个JMS Provider各自的文档。

  • 客户端程序自定义设定.  在Connection被创建之后,并且没有进行任何其他操作之前,客户端程序可以为其设定自定义的Client Id,不过,如果该Connection已经被ConnectionFactory预先设定了Client Id的话, connection..setClientID(clientId)将会抛出JMS的IllegalStateException

所以,在setupSharedConnectionIfNecessary()方法中,你会发现,我们会事先检查ConnectinFactory是否已经预先设定过Client Id,如果没有并且客户端程序持有注入的非空的Client Id, 那么我们才会为Connection设定自定义的Client Id。

Caution
[Caution]

连接到JMS Provider进行Durable Subscription的多个Connection不可以拥有相同的Client Id,否则也会被IllegalStateException伺候!

2.2. Subscriber Name

单凭Client Id还不足以唯一标志某一个Durable Subscription,就跟我凭一个身份证,可以预定多个房间一样。 同一个连接里,你可以创建多个MessageConsumer去订阅不同Topic的消息,如果下回回来,你只想继续接受某一个Topic消息的话,JMS Provider如何知道是哪一个? 所以,为了区分同一个Connection中不同的Durable Subscription,我们还需要进一步的标志物,这就是Subscriber Name!

 

messageConsumer = session.createDurableSubscriber((Topic)getDestination(), getDurableSubscriptionName());

通过Session创建DurableSubscriber的时候,我们要为其提供一个Durable Subscriber Name,这是与普通订阅最基本的区别:

messageConsumer = session.createConsumer(getDestination());

有了SubscriberName之后,下回,当我们重新连接然后使用相同的SubscriberName创建消息订阅的时候,JMS Provider就会知道将哪一个Durable Subscription使用的Topic中的消息进行传送了。

Note
[Note]

创建MessageConsumer的时候可以同时设定相应的Message Selector, 另外进行异步消息接收的时候,需要为MessageConsumer设定相应的MessageListener, 最后,调用connection.start()方法告知JMS Provider开始进行消息传送,这里只是简单提及一下,我向大家比我更清楚。

2.3. 小结

Connection级别的Client Id和创建MessageConsumer时候的Subscriber Name唯一标志一个Durable Subscription,这是在JMS中成功进行Durable Subscription的前提(当然,要是JMS Provider过于“山寨”,或许也不成)。

基本上,个人觉得在Durable Subscription中要提的就这些了。 有关JMS更多信息,可以参考JMS规范以及各个JMS Provider提供的文档。

 

 

倾听了

 

<quote>袜子</quote>和Marvel在这一问题上的论点之后,才有了写下这段文字的想法, Thanks to you both!

 

BTW. 以上纯属个人观点,如果有误,还望各位看官指出。



[1] 中文通常翻译为“持久化订阅”或“持久订阅

2
0
分享到:
评论
2 楼 fujohnwang 2009-03-20  
for non-durable subscription, the unique topic name is enough, but for durable ones, that will be not.

think about such a sceneraio, you can buy newspaper or you can subscribe to press for half a year:

1- buy news paper, it just like you do non-durable subscription to news press, as long as you know which kind of news paper you want, you can "subscribe" one at any time.  One day, you forget to buy one, then you miss today's copy; even you come to buy today's copy, no one will on purpose keep one copy for you. because they don't know who u r, where u come from.

2- subscribe to press, that means, the press has gotten a record for you. they will keep everyday's copy for their customers, even they can't get the copy some day, they will keep the copy for them. But they need your names and Identity, so that when you come to get the copy, they can check their records whether you r the registered ones.

try to think from the JMS provider's side, if you think the unique topic name is enough, I think, you just saw one side of the mirror, :-)

BTW. I don't know whether my metaphor is good enough, but I hope the whole article and my sample will help u to understand the durable subscription well.
1 楼 asianboycn 2009-03-20  
Question on durable subscription.

for durable one, we need
client ID
+
subscription name

But the unique topic name is enough to identify subscription, right?

why we need a subscription name?

Can you help with my query?

Thanks

Steven
Stevenjiang.au@hotmail.com

相关推荐

    Queue与Topic的比较

    Durable Subscription 是一种持久化的订阅方式,JMS Provider 将会费点儿脑筋来记下这个 Durable Subscription 的消息订阅者是谁,即使当消息到达之后,该 Durable Subscription 消息订阅者不在,JMS Provider 也会...

    JMS 开发简明教程

    9. **持久订阅(Durable Subscription)** - 在主题上,消费者可以创建持久订阅,即使消费者断开连接,也不会丢失消息。当重新连接时,可以从上次离开的地方继续接收。 10. **消息选择器(Message Selectors)** ...

    JMS的一个非常好的demo

    4. **持久订阅者(Durable Subscription)**:在发布/订阅模型中,如果一个订阅者在发布消息时离线,那么它可能会错过这些消息。持久订阅者解决了这个问题,通过在订阅者离线时保留其状态,当订阅者重新上线时,它...

    JMS学习笔记

    - 消费者可以选择是否接收已持久化的消息,这通过设置消息的Durable Subscription实现。 5. **事务处理** - Session可以设置为事务性,这样一组消息的发送或接收可以作为一个原子操作,确保所有消息要么全部成功...

    jms * jar包

    8. **持久化订阅(Durable Subscription)**: 在发布/订阅模式下,如果订阅者希望在离线期间也能接收到消息,可以创建持久化订阅。当订阅者重新上线时,它将收到自离线以来发布的所有匹配的消息。 9. **事务...

    Quartus II8.1的Subscription Edition和Web Edition的差别

    在探讨Quartus II 8.1的Subscription Edition与Web Edition之间的差异时,我们首先要明确这两版软件在功能、操作系统支持、设备支持、IP、设计输入、设计环境、实现与优化、验证与调试以及系统设计软件等方面的区别...

    JMS详细讲解

    - 无确认:在Session的DURABLE_SUBSCRIPTION模式下,消息直到重新订阅才会被确认。 JMS通过提供这些组件和模式,使得分布式系统中的异步通信和解耦变得简单而强大,从而提高了应用的可伸缩性和可靠性。了解并掌握...

    CDGS_subscription.exe

    CDGS_subscription.exe

    JMS简明教程(Word版)

    1. **持久化订阅(Durable Subscription)**: 允许消息消费者即使在离线期间也能接收消息。当消费者重新上线时,它可以接收到自其离线以来的所有消息。 2. **选择器(Selectors)**: 允许消费者仅接收满足特定条件...

    DXperience Universal Subscription 12.1.6 (DEV宇宙版) 5/6

    DXperience Universal Subscription 12.1.6 (DEV宇宙版) 5/6 共有6个压缩文件,请全部下载后安装 是一个全球使用最多用户界面控件套包,他主要的特点是:高效率和高实用性, 拥有大量的示例和帮助文档,开发者能够...

    DXperience Universal Subscription 12.1.6 (DEV宇宙版) 4/6

    DXperience Universal Subscription 12.1.6 (DEV宇宙版) 4/6 共有6个压缩文件,请全部下载后安装 是一个全球使用最多用户界面控件套包,他主要的特点是:高效率和高实用性, 拥有大量的示例和帮助文档,开发者能够...

    websphere jms开发...

    8. **持久性订阅**(Durable Subscription): 在主题订阅中,如果订阅者想要在离线时也能接收到消息,可以创建持久性订阅。这样,即使订阅者未在线,消息也会被保留直到它重新连接。 在实际开发中,我们还需要了解...

    DXperience Universal Subscription 12.1.6 (DEV宇宙版) 1/6

    DXperience Universal Subscription 12.1.6 (DEV宇宙版) 共有6个压缩文件,请全部下载后安装 是一个全球使用最多用户界面控件套包,他主要的特点是:高效率和高实用性, 拥有大量的示例和帮助文档,开发者能够快速...

    Visual Studio Subscription.docx

    Visual Studio Subscription Visual Studio Subscription是微软公司提供的一种订阅服务,旨在为Windows平台或其他Microsoft产品的开发者提供相关资源。该订阅服务提供了广泛的开发相关资源,包括下载、程序、社区、...

    DevExpress Universal Subscription 17.1.7 part2

    DevExpress Universal Subscription(又名DevExpress宇宙版或DXperience Universal Suite)是全球使用最多的.NET用户界面控件套包,DevExpress广泛应用于ECM企业内容管理、 成本管控、进程监督、生产调度,在企业/...

    DXperience Universal Subscription 13.2.7.14057 source code

    The DXperience Subscription includes the complete range of DevExpress .NET controls and libraries for all major Microsoft platforms, including WinForms, ASP.NET, WPF, Silverlight and Windows 8....

    DXperience Universal Subscription 13.2.7.14057 full source code

    The DXperience Subscription includes the complete range of DevExpress .NET controls and libraries for all major Microsoft platforms, including WinForms, ASP.NET, WPF, Silverlight and Windows 8....

    DevExpress Universal Subscription 17.1.7 part1

    DevExpress Universal Subscription(又名DevExpress宇宙版或DXperience Universal Suite)是全球使用最多的.NET用户界面控件套包,DevExpress广泛应用于ECM企业内容管理、 成本管控、进程监督、生产调度,在企业/...

    Laravel开发-laravel-subscription

    在 Laravel 框架中,`laravel-subscription` 是一个用于实现订阅计费功能的组件,它适用于那些提供按月或按年付费服务的 web 应用。Laravel 5.2 版本引入了这个强大的工具,帮助开发者轻松构建具有订阅模式的业务...

    TMS VCL Business Subscription 2014.06.20.rar

    TMS VCL Business Subscription 2014.06.20.rar

Global site tag (gtag.js) - Google Analytics