论坛首页 Java企业应用论坛

关于ActiveMQ队列之间协作的讨论

浏览 10131 次
精华帖 (0) :: 良好帖 (1) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2010-08-02  
kings36503 写道
dennis_zane 写道
mikewang 写道
dennis_zane 写道
JMS不是有reply功能吗?可以创建temp queue做reply,无需这样吧。

那个也是异步的, 他的场景是要求同步回复。

JMS有提供QueueRequestor或者TopicRequestor这样的工具类将异步转同步,请求/响应模型JMS有定义的。楼主完全没必要自己做,JMS本身就提供了这个功能。

请问有没有什么具体的例子,这个请求/响应模型不会只是告诉发送者接收者已经收到消息了吧。


http://chenyuanboygirl.iteye.com/blog/396932
这个帖子应该就是答案了。
0 请登录后投票
   发表时间:2010-08-02  
正好在准备一个JMS培训,写了一个request/response实例,client是使用messagelistener接受reply消息

public class Client implements MessageListener {
	private static final Logger logger = LoggerFactory.getLogger(Client.class);
	public static void main(String[] args) {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

		try {
			Connection connection = connectionFactory.createConnection();
			connection.start();
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue("training.queue");
			MessageProducer producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage();

			//创建临时响应队列
			Destination tempDest = session.createTemporaryQueue();
            MessageConsumer responseConsumer = session.createConsumer(tempDest);
            responseConsumer.setMessageListener(new Client());

			logger.info("Sending Messags");
			message.setText("Hello Melin");
			message.setJMSReplyTo(tempDest);
			producer.send(message);
			logger.info("Messags send successfully");

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                logger.info("replay message = " + messageText);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
	}
}

public class Server implements MessageListener {
	private static final Logger logger = LoggerFactory.getLogger(Server.class);

	private static MessageProducer replyProducer;
	private static Session session;
	public static void main(String[] args) {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
		try {
			Connection connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue("training.queue");
			MessageConsumer consumer = session.createConsumer(destination);

			replyProducer = session.createProducer(null);

			logger.info("Asyn Consumering Messages");
			consumer.setMessageListener(new Server());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		TextMessage textMessage = (TextMessage)message;
		try {
			logger.info("asyn receive: " + textMessage.getText());

			TextMessage response = this.session.createTextMessage();
			response.setText("asyn receive: " + textMessage.getText());
			response.setJMSCorrelationID(message.getJMSCorrelationID());
			replyProducer.send(message.getJMSReplyTo(), response);

			Thread.sleep(100);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
0 请登录后投票
   发表时间:2010-08-02  
lz是存心要研究同步和异步调用之间的转换吧。
如果要解决实际问题,就该用webservice了。
0 请登录后投票
   发表时间:2010-08-03  
做个消息关联 然后过滤下不行么?
0 请登录后投票
   发表时间:2010-08-04  
melin 写道
正好在准备一个JMS培训,写了一个request/response实例,client是使用messagelistener接受reply消息

public class Client implements MessageListener {
	private static final Logger logger = LoggerFactory.getLogger(Client.class);
	public static void main(String[] args) {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				ActiveMQConnectionFactory.DEFAULT_BROKER_URL);

		try {
			Connection connection = connectionFactory.createConnection();
			connection.start();
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue("training.queue");
			MessageProducer producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage();

			//创建临时响应队列
			Destination tempDest = session.createTemporaryQueue();
            MessageConsumer responseConsumer = session.createConsumer(tempDest);
            responseConsumer.setMessageListener(new Client());

			logger.info("Sending Messags");
			message.setText("Hello Melin");
			message.setJMSReplyTo(tempDest);
			producer.send(message);
			logger.info("Messags send successfully");

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                logger.info("replay message = " + messageText);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
	}
}

public class Server implements MessageListener {
	private static final Logger logger = LoggerFactory.getLogger(Server.class);

	private static MessageProducer replyProducer;
	private static Session session;
	public static void main(String[] args) {
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD,
				ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
		try {
			Connection connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue("training.queue");
			MessageConsumer consumer = session.createConsumer(destination);

			replyProducer = session.createProducer(null);

			logger.info("Asyn Consumering Messages");
			consumer.setMessageListener(new Server());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void onMessage(Message message) {
		TextMessage textMessage = (TextMessage)message;
		try {
			logger.info("asyn receive: " + textMessage.getText());

			TextMessage response = this.session.createTextMessage();
			response.setText("asyn receive: " + textMessage.getText());
			response.setJMSCorrelationID(message.getJMSCorrelationID());
			replyProducer.send(message.getJMSReplyTo(), response);

			Thread.sleep(100);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

感谢。。
0 请登录后投票
   发表时间:2010-08-06   最后修改:2010-08-06
http://beehive.apache.org/docs/1.0/system-controls/jms/jmsDoc.html

using jms and xmlbean 实现 control bus, 一个经典的例子就是esper EE.
0 请登录后投票
   发表时间:2010-08-08  
cxf SOAP可以使用 JMS 作为 传输的方式,并且是同步, 可以研究一下源代码, 就是建立一个temp的queue, 并且使用reply的功能.
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics