`
greemranqq
  • 浏览: 977010 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

activemq 应答模式

    博客分类:
  • JMS
阅读更多

一、序言

       JMS 用于系统解耦有一定帮助,像我们 iteye 的一些系统消息,可能非重要要消息,就没那么严格的限制,统一异步发送就行了,反正上线你就能看到。有些情况下我们需要消息两端进行确认,比如一些比较重要的金额之类的信息。

 

二、实例场景

       我们ERP系统中的财务模块是分开的,当成一个单独的财务系统,那么从ERP那么那送的财务信息,或者财务系统接收了,需要给ERP 那边一个确认信息,不然消息没处理成功或者其他异常,导致金额数据出问题,这个麻烦比较大的。

 

三、JMS 场景对应

       场景一:

       1.Producer  -----> 发送消息到broker

       2.Customer------> 从broker 收到消息

       3.Customer------> 向broker 确认消息收到

      

       对于场景一,可能没有完全满足我么你的实例场景,但是可以通过broker 获得,也算是一种异步通知,下面来看看对应的伪代码:

       场景1.1 :开启事务的情况,是根据session 的commit 和 rollback 进行处理确认消息

       

// 这是我们消息生产者伪代码,初始化过程这里暂时不贴了,参考前面的
// true : 表示开启事务,开启事务,必须的commit
Session session =  InitJms.connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        // 创建一个文本消息
       TextMessage message =  session.createTextMessage("测试消息");
        // 创建发送消息目的地
       Destination send_destination = session.createQueue("order_queue");
        // 生产者
       MessageProducer producer = session.createProducer(send_destination);
       // 发送
       producer.send(message);
       // 这里必须提交,因为开启了事务,不然broker 里面是看不到消息的
       session.commit();

   

// 这是消费者代码,同样用true
        Session session =  InitJms.connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);
        // 指定接收消息的地方
        Destination destination = session.createQueue("order_queue");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        try {
            // 接收消息
            TextMessage message = (TextMessage)consumer.receive(1000);
            System.out.println(message.getText());  
            // 收到消息之后进行确认    
            session.commit();
        }finally {
            session.close();
            InitJms.connection.close();
        }

 

 

     注意,上面由于没用messageListener 而且关闭了连接,因此控制台看不到消费者存在了:

     

      场景1.2 我们自动响应服务器 和 非事务,客户端响应服务器的情况

       

 // 消费端 在这里默认 采用AUTO 就会自动响应了
Session session =  InitJms.connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

// 同理,如果设置 
InitJms.connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
// 就需要手动响应
message.acknowledge();

 

    注意:如果没有按要求响应broker,消费端还是能拿到消息的,而且能重复拿到,5.11的版本可以拿到7次,可以通过下面的检查一些信息:

    

// 是否收到过消息,第一次fasle,第二次开始就是true
message.getJMSRedelivered();
// 消息发送时间:毫秒
message.getJMSTimestamp();
// 从broker 重复获取消息的次数,5.11最多6次。
// 参考 RedeliveryPolicy  DEFAULT_MAXIMUM_REDELIVERIES 可以更改
message.getStringProperty("JMSXDeliveryCount")
// 消息存活时间:0 一直存在
message.getJMSExpiration();

当然还有很多,可以参考官方文档:
http://activemq.apache.org/activemq-message-properties.html
以及
http://activemq.apache.org/features.html

 

四、双向应答的场景:

     双向应答可以这样描述

     1.producer-->发送消息到broker, 然后等待确认消息

     2.customer-->从broker 获得消息,然后发送确认消息--->broker

     3.producer --> 从broker 获得确认消息

 

     举个栗子:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址,看代码

      

 // 承接刚才的代码,发送消息的时候,需要填写一个回执的地址
       Destination recall_destination = session.createQueue("recall_queue");
       // 将回执地址写在消息里面,方便李四知道
       message.setJMSReplyTo(recall_destination);
       producer.send(message); 
        
       // 发送之后,某个地方这里变成消息消费者,等待那边给我发送确认消息
       MessageConsumer replyConsumer =session.createConsumer(recall_destination);
        // 这里我们用个消息监听
        replyConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
 

 

    消费者:

    

// 获得回执地址
        Destination recall_destination = message.getJMSReplyTo();
        // 创建回执消息
        TextMessage textMessage = session.createTextMessage("张三,我已经收到消息了");
        // 以上收到消息之后,从新创建生产者,然后在回执过去
        MessageProducer producer = session.createProducer(recall_destination);
        producer.send(textMessage);
 

 

    OK,这样就能相互通信了,你可以理解为通过两个通道进行的。

    注意:这种方式毕竟会慢一些了,除非有及时性的,需要两端处理另外的事情,才这么多,因为涉及2步,都会有类似于“拜占庭将军问题”,因此解决还需要 持久化、多点部署,反而麻烦,按前面的场景还好。

 

小结:

        1.这里大概介绍了activemq 的一些应答的及时,关于有些属性可以参考文档,具体问题具体分析~。~脱离场景谈性能都是耍流氓,这里也不会介绍性能问题,有测试的朋友可以告知一声。

        2.如果有问题的,请大家指出,非常感谢。

 

      

 

 

  • 大小: 6.9 KB
分享到:
评论
2 楼 greemranqq 2015-08-26  
cheng.xinwei 写道
事务在做 rollback 之后,什么时候会下一次回调呢?

由你业务控制,其实这个事务  只是简单的应答模式,你业务认为消费成功了,然后应答broker 消息才会被确认消费,如果中途挂掉,你broker 里面消息还是存在的
1 楼 cheng.xinwei 2015-08-25  
事务在做 rollback 之后,什么时候会下一次回调呢?

相关推荐

    ActiveMQ的队列queue模式(事务、应答、转发模式、阻塞消息)

    自动应答模式下,消费者一旦接收到消息,就会自动发送确认,而显式应答则需要消费者在处理完消息后手动发送确认。未确认的消息可能会在故障恢复时重新投递。 ### 4. 转发模式(Forwarding) ActiveMQ支持消息的跨...

    工具使用篇——python操作ActiveMq

    在本文中,我们将深入探讨如何使用Python操作ActiveMQ,一个流行的开源消息代理和消息中间件。ActiveMQ允许应用程序之间高效地交换数据,而Python通过STOMP(Simple Text Oriented Message Protocol)协议与ActiveMQ...

    ActiveMQ_request-response

    **ActiveMQ请求应答模式详解** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循开放消息中间件协议(Open Message Middleware, OMQ)和Java消息服务(Java Message Service, JMS)标准,用于在分布式...

    activemq_basic.rar

    4. 路由与过滤:ActiveMQ允许消息路由到特定的消费者,支持主题(Topic)和队列(Queue)两种模式,以及基于消息属性的过滤。 5. 安全性:支持用户认证和授权,可以保护消息的安全传输。 在"activemq_basic.rar"中...

    ActiveMQ 实战

    确认可以自动发生,如在事务性会话中,或根据会话的应答模式手动进行。JMS支持消息持久性提交模式,包括PERSISTENT和NON_PERSISTENT,以保障消息不会因服务失败而丢失。消息优先级允许指示JMS提供者优先提交紧急消息...

    activeMQ介绍

    确认可以在事务性会话中自动完成,或在非事务性会话中通过设置应答模式(如 AUTO_ACKNOWLEDGE、CLIENT_ACKNOWLEDGE 和 DUPS_ACKNOWLEDGE)来控制。 2. **持久性(Persistence)**:消息可以通过设置为PERSISTENT来...

    activemq学习(2) spring+activemq

    此外,还可以选择不同的消费者应答模式,如自动应答、手动应答或不设置应答,以控制消息的确认时机。 7. **监控与管理**: ActiveMQ提供了Web控制台,可以实时监控队列和主题的状态,查看消息传递情况,甚至可以...

    activeMq 实战

    - **非事务性会话**:消息何时被确认取决于创建会话时的应答模式 (`Acknowledgement Mode`)。该参数有以下三种可选值: - `Session.AUTO_ACKNOWLEDGE`:当客户端成功从 `receive()` 方法返回时,会话自动确认客户端...

    ActiveMQ_in_Action_中文

    非事务性会话中,消息何时被确认取决于创建会话时的应答模式,有Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGE和Session.DUPS_ACKNOWLEDGE三种可选值。 JMS支持两种消息提交模式:PERSISTENT和NON_...

    activemq的几种基本通信方式总结

    另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求应答的模式。下面,我们针对这几种方式一一讨论一下。在讨论具体方式的时候,我们先看看使用activemq需要启动服务的主要过程。...

    activemq 集群配置文档

    而在非事务性会话中,确认的发生取决于创建会话时的应答模式。 - **1.2.2 持久性 (Persistence)** - **定义**: 指示JMS提供者是否需要持久化存储消息。 - **功能**: `PERSISTENT`表示消息应被持久化存储,以防止...

    ActiveMQ(中文)参考手册

    确认消息指的是消息在被成功消费后进行确认,包括三种应答模式:自动确认(AUTO_ACKNOWLEDGE)、客户端确认(CLIENT_ACKNOWLEDGE)和有重复的确认(DUPS_ACKNOWLEDGE)。持久性指的是消息提交模式,分为PERSISTENT和...

    ActiveMQ-in-Action-中文.docx

    在非事务性会话中,消息何时被确认取决于创建会话时的应答模式。 持久性是 JMS 的一个关键特性,JMS 支持以下两种消息提交模式:PERSISTENT 和 NON_PERSISTENT。PERSISTENT 模式指示 JMS provider 持久保存消息,以...

    ActiveMQ(中文)参考手册.pdf

    应答模式有三种:Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGE和Session.DUPS_ACKNOWLEDGE。 消息持久性分为持久提交(PERSISTENT)和非持久提交(NON_PERSISTENT)。持久提交可以确保消息在JMS提供者...

    MQTest jsm activeMQ

    2. **创建会话**:通过ConnectionFactory创建一个Session对象,它是消息发送和接收的核心,可以设置事务和应答模式。 3. **定义目的地**:创建Destination对象,代表消息的目的地,可以是Queue(点对点模型)或Topic...

    ActiveMQ使用教程

    确认发生在会话提交时(事务性会话)或根据应答模式(非事务性会话)自动或手动进行。确认模式包括: - AUTO_ACKNOWLEDGE:消息处理完成后自动确认。 - CLIENT_ACKNOWLEDGE:客户端通过消息的acknowledge方法手动...

    ActiveMQ详细教程

    在非事务性会话中,消息的确认取决于会话的应答模式,其中包括Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGE和Session.DUPS_ACKNOWLEDGE三种模式。 ActiveMQinAction是ActiveMQ的学习资源,其中包含对...

    activeMQ_JMS学习资料

    在非事务性会话中,确认策略依赖于创建会话时设定的应答模式。 - `AUTO_ACKNOWLEDGE`:消息接收即确认。 - `CLIENT_ACKNOWLEDGE`:用户显式调用`acknowledge()`方法确认消息。 - `DUPS_ACKNOWLEDGE`:会话延迟确认...

Global site tag (gtag.js) - Google Analytics