`
benx
  • 浏览: 276370 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ JMS 例子

    博客分类:
  • java
阅读更多

How should I implement request response with JMS?

The simplest solution is to use Camel as a Spring Remoting provider which allows you to hide all the JMS API from your business logic and letting Camel provide the request/response handling code for you.

However if you wish to write the JMS client code yourself, please read on how it works...

Using the JMS API to implement request-response

You might think at first that to implement request-response type operations in JMS that you should create a new consumer with a selector per request; or maybe create a new temporary queue per request.

Creating temporary destinations, consumers, producers and connections are all synchronous request-response operations with the broker and so should be avoided for processing each request as it results in lots of chat with the JMS broker.

The best way to implement request-response over JMS is to create a temporary queue and consumer per client on startup, set JMSReplyTo property on each message to the temporary queue and then use a correlationID on each message to correlate request messages to response messages. This avoids the overhead of creating and closing a consumer for each request (which is expensive). It also means you can share the same producer & consumer across many threads if you want (or pool them maybe).

The Lingo library is an implementation of Spring remoting using JMS. (Spring remoting is a kind of POJO based remoting where the remoting code is invisible to your business logic code).

It uses exactly this pattern; of using correlation IDs to correlate requests to responses. The server side just has to remember to put the inbound message's correlation ID on the response.

The actual class which does this is the MultiplexingRequestor . It may be just using Spring remoting with Lingo is the simplest way of implementing request response - or maybe you could just use Lingo's Requestor interface to keep the JMS semantics.

More details here

Client side

So the client side creates a consumer on a temporary queue as follows...

// client side
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
...

// send a request..
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID(myCorrelationID);

producer.send(message);

Server side

public void onMessage(Message request) {

  Message response = session.createMessage();
  response.setJMSCorrelationID(request.getJMSCorrelationID())

  producer.send(request.getJMSReplyTo(), response)
}

Full Examples

Server Side

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Server implements MessageListener {
    private static int ackMode;
    private static String messageQueueName;
    private static String messageBrokerUrl;

    private Session session;
    private boolean transacted = false;
    private MessageProducer replyProducer;
    private MessageProtocol messageProtocol;

    static {
        messageBrokerUrl = "tcp://localhost:61616";
        messageQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }

    public Server() {
        try {
            //This message broker is embedded
            BrokerService broker = new BrokerService();
            broker.setPersistent(false);
            broker.setUseJmx(false);
            broker.addConnector(messageBrokerUrl);
            broker.start();
        } catch (Exception e) {
            //Handle the exception appropriately
        }

        //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
        //is ready to handle messages
        this.messageProtocol = new MessageProtocol();
        this.setupMessageQueueConsumer();
    }

    private void setupMessageQueueConsumer() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            this.session = connection.createSession(this.transacted, ackMode);
            Destination adminQueue = this.session.createQueue(messageQueueName);

            //Setup a message producer to respond to messages from clients, we will get the destination
            //to send to from the JMSReplyTo header field from a Message
            this.replyProducer = this.session.createProducer(null);
            this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Set up a consumer to consume messages off of the admin queue
            MessageConsumer consumer = this.session.createConsumer(adminQueue);
            consumer.setMessageListener(this);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public void onMessage(Message message) {
        try {
            TextMessage response = this.session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                response.setText(this.messageProtocol.handleProtocolMessage(messageText));
            }

            //Set the correlation ID from the received message to be the correlation id of the response message
            //this lets the client identify which message this is a response to if it has more than
            //one outstanding message to the server
            response.setJMSCorrelationID(message.getJMSCorrelationID());

            //Send the response to the Destination specified by the JMSReplyTo field of the received message,
            //this is presumably a temporary queue created by the client
            this.replyProducer.send(message.getJMSReplyTo(), response);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String[] args) {
        new Server();
    }
}

Client Side

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Random;

public class Client implements MessageListener {
    private static int ackMode;
    private static String clientQueueName;

    private boolean transacted = false;
    private MessageProducer producer;

    static {
        clientQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }

    public Client() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(transacted, ackMode);
            Destination adminQueue = session.createQueue(clientQueueName);

            //Setup a message producer to send message to the queue the server is consuming from
            this.producer = session.createProducer(adminQueue);
            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Create a temporary queue that this client will listen for responses on then create a consumer
            //that consumes message from this temporary queue...for a real application a client should reuse
            //the same temp queue for each message to the server...one temp queue per client
            Destination tempDest = session.createTemporaryQueue();
            MessageConsumer responseConsumer = session.createConsumer(tempDest);

            //This class will handle the messages to the temp queue as well
            responseConsumer.setMessageListener(this);

            //Now create the actual message you want to send
            TextMessage txtMessage = session.createTextMessage();
            txtMessage.setText("MyProtocolMessage");

            //Set the reply to field to the temp queue you created above, this is the queue the server
            //will respond to
            txtMessage.setJMSReplyTo(tempDest);

            //Set a correlation ID so when you get a response you know which sent message the response is for
            //If there is never more than one outstanding message to the server then the
            //same correlation ID can be used for all the messages...if there is more than one outstanding
            //message to the server you would presumably want to associate the correlation ID with this
            //message somehow...a Map works good
            String correlationId = this.createRandomString();
            txtMessage.setJMSCorrelationID(correlationId);
            this.producer.send(txtMessage);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    private String createRandomString() {
        Random random = new Random(System.currentTimeMillis());
        long randomLong = random.nextLong();
        return Long.toHexString(randomLong);
    }

    public void onMessage(Message message) {
        String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                System.out.println("messageText = " + messageText);
            }
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String[] args) {
        new Client();
    }
}

Protocol Class

This class is needed to run the client/server example above. Delegating the handling of messages to a seperate class is solely a personal preference.

public class MessageProtocol {
    public String handleProtocolMessage(String messageText) {
        String responseText;
        if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
            responseText = "I recognize your protocol message";
        } else {
            responseText = "Unknown protocol message: " + messageText;
        }
        
        return responseText;
    }
}
分享到:
评论

相关推荐

    activeMQ JMS WEB 例子

    在这个"ActiveMQ JMS WEB 例子"中,我们将探讨如何在Web环境中使用ActiveMQ进行消息通信。 首先,了解ActiveMQ的基本概念是必要的。ActiveMQ支持多种协议,如OpenWire、AMQP、STOMP、XMPP和MQTT,使其能够广泛应用...

    jms+sping+activeMq的例子serd和recevice

    标题"jms+spring+activeMq的例子serd和recevice"提到了三个关键术语:JMS(Java Message Service)、Spring框架和ActiveMQ。这表明我们将探讨一个结合了这三个技术的示例,其中"serd"可能是"server"的误拼,而...

    apache__activemq_jms 的例子(带jar包)

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息...ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    JMS.rar_activemq_jms_jms activemq

    在这个例子中,我们连接到运行在本地的ActiveMQ服务器(默认端口61616),创建一个非事务性的会话,然后创建一个消息队列`myQueue`作为目的地。接着,我们创建一个`MessageProducer`来发送`TextMessage`,最后关闭...

    一个activeMQ的简单例子

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它基于Java消息服务(JMS)标准,用于在分布式系统中提供可靠的消息传递。本示例将介绍如何使用ActiveMQ实现一个简单的消息队列应用。 首先,我们需要理解...

    springMVC+activeMQ例子

    整合Spring MVC和ActiveMQ的例子会涵盖以上所述的各个方面,通过这些文件,我们可以进一步分析和理解这个示例的具体实现细节。这个例子展示了如何在一个Web应用中有效地利用消息队列来处理异步任务,提升系统的性能...

    ActiveMQ 简单例子源码 包含操作说明

    ActiveMQ 是一款开源的消息中间件,它是 Java Message Service (JMS) 的实现,由 Apache 软件基金会开发。它允许应用程序之间通过消息传递进行异步通信,使得系统间的耦合度降低,提高了系统的可扩展性和可靠性。...

    ActiveMQ例子

    Apache ActiveMQ 是一款开源的消息中间件,它遵循Java Message Service (JMS) 规范,提供了高效、可靠的异步消息传递功能。ActiveMQ 的设计目标是提供灵活、高性能且易用的消息传递解决方案,适用于各种分布式环境。...

    jms-test.zip_jms activemq_jms test

    描述中提到,“jms测试程序,将tomcat和activeMq整合在一起做的一个发送接受的发布订阅的例子”,这表明项目是基于Tomcat服务器,并且通过ActiveMQ实现了一个发布/订阅模式的消息传递。Tomcat是一个流行的Java应用...

    java ActiveMQ的例子

    在这个例子中,我们有`apache-activemq-5.9.0`的压缩包,这包含了ActiveMQ的一个早期版本。这个包通常会包含服务器的可执行文件、配置文件、示例代码以及相关的文档,非常适合初学者来学习和理解ActiveMQ的工作原理...

    ActiveMQ实现例子

    它提供了一个灵活、高性能的平台,用于在分布式系统中传递消息,实现了JMS(Java消息服务)标准,支持多种协议,包括AMQP、STOMP、MQTT等。在本示例中,我们将探讨如何利用ActiveMQ来实现消息传输。 首先,理解...

    Apache ActiveMQ 入门最简单例子

    例如,在Java环境中,我们可以使用JMS(Java Message Service)API与ActiveMQ交互: 1. **消息生产者**:生产者创建一个连接到ActiveMQ服务器的ConnectionFactory,然后创建一个Session,用于发送消息。Session可以...

    spring boot activemq整合例子

    而ActiveMQ是Apache出品的一款开源消息中间件,它是Java Message Service (JMS) 的实现,用于处理应用程序之间的异步通信。将Spring Boot与ActiveMQ整合,可以充分利用Spring的自动化配置和ActiveMQ的消息传递能力,...

    Spring boot 和内置ActiveMQ集成例子.zip

    而Apache ActiveMQ是流行的开源消息代理,符合Java Message Service(JMS)标准,用于处理异步通信和消息传递。将Spring Boot与ActiveMQ集成可以提供强大的消息处理能力,使应用能够解耦组件,提高可扩展性和容错性...

    ActiveMQ学习 完整例子

    Apache ActiveMQ是开源的、基于Java消息服务(JMS)的流行消息代理,它提供了高效、可靠的异步通信解决方案。ActiveMQ在企业级应用中广泛使用,因为它支持多种协议,如OpenWire、STOMP、AMQP、MQTT和WSO2,能够连接...

    JMS之ActiveMQ工具类+使用例子.zip

    **Apache ActiveMQ与JMS简介** Apache ActiveMQ是基于Java消息服务(Java Message Service,简称JMS)的开源消息中间件,由Apache软件基金会开发。它允许应用程序通过发送和接收消息来实现异步通信,从而解耦了系统...

    jms+activeMq+spring学习简单例子

    标题“jms+activeMq+spring学习简单例子”表明这个压缩包包含了一些示例代码,用于演示如何在Spring框架中集成JMS和ActiveMQ,以便于理解和学习。通过这个例子,开发者可以了解如何在实际应用中实现基于消息的通信。...

    activeMQ完整例子

    ActiveMQ是中国最流行的开源消息中间件之一,它基于Java Message Service (JMS) 规范,为分布式系统提供高效、可靠的消息传递服务。这个“activeMQ完整例子”压缩包文件很可能是包含了一系列示例代码和配置,帮助...

    activeMQ 例子 真实环境下测试过

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循开放消息传递协议(Open Message Broker Protocol,即AMQP)和Java消息服务(Java Message Service,JMS)规范,用于在分布式系统中提供可靠的消息传递...

Global site tag (gtag.js) - Google Analytics