锁定老帖子 主题:关于ActiveMQ队列之间协作的讨论
精华帖 (0) :: 良好帖 (1) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-08-02
kings36503 写道 dennis_zane 写道 mikewang 写道 dennis_zane 写道 JMS不是有reply功能吗?可以创建temp queue做reply,无需这样吧。
那个也是异步的, 他的场景是要求同步回复。 JMS有提供QueueRequestor或者TopicRequestor这样的工具类将异步转同步,请求/响应模型JMS有定义的。楼主完全没必要自己做,JMS本身就提供了这个功能。 请问有没有什么具体的例子,这个请求/响应模型不会只是告诉发送者接收者已经收到消息了吧。 http://chenyuanboygirl.iteye.com/blog/396932 这个帖子应该就是答案了。 |
|
返回顶楼 | |
发表时间:2010-08-02
正好在准备一个JMS培训,写了一个request/response实例,client是使用messagelistener接受reply消息
public class Client implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(Client.class); public static void main(String[] args) { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("training.queue"); MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); //创建临时响应队列 Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); responseConsumer.setMessageListener(new Client()); logger.info("Sending Messags"); message.setText("Hello Melin"); message.setJMSReplyTo(tempDest); producer.send(message); logger.info("Messags send successfully"); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { String messageText = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); logger.info("replay message = " + messageText); } } catch (JMSException e) { e.printStackTrace(); } } } public class Server implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(Server.class); private static MessageProducer replyProducer; private static Session session; public static void main(String[] args) { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL); try { Connection connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("training.queue"); MessageConsumer consumer = session.createConsumer(destination); replyProducer = session.createProducer(null); logger.info("Asyn Consumering Messages"); consumer.setMessageListener(new Server()); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { logger.info("asyn receive: " + textMessage.getText()); TextMessage response = this.session.createTextMessage(); response.setText("asyn receive: " + textMessage.getText()); response.setJMSCorrelationID(message.getJMSCorrelationID()); replyProducer.send(message.getJMSReplyTo(), response); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } |
|
返回顶楼 | |
发表时间:2010-08-02
lz是存心要研究同步和异步调用之间的转换吧。
如果要解决实际问题,就该用webservice了。 |
|
返回顶楼 | |
发表时间:2010-08-03
做个消息关联 然后过滤下不行么?
|
|
返回顶楼 | |
发表时间:2010-08-04
melin 写道 正好在准备一个JMS培训,写了一个request/response实例,client是使用messagelistener接受reply消息
public class Client implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(Client.class); public static void main(String[] args) { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL); try { Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("training.queue"); MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); //创建临时响应队列 Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); responseConsumer.setMessageListener(new Client()); logger.info("Sending Messags"); message.setText("Hello Melin"); message.setJMSReplyTo(tempDest); producer.send(message); logger.info("Messags send successfully"); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { String messageText = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); logger.info("replay message = " + messageText); } } catch (JMSException e) { e.printStackTrace(); } } } public class Server implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(Server.class); private static MessageProducer replyProducer; private static Session session; public static void main(String[] args) { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, ActiveMQConnectionFactory.DEFAULT_BROKER_URL); try { Connection connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("training.queue"); MessageConsumer consumer = session.createConsumer(destination); replyProducer = session.createProducer(null); logger.info("Asyn Consumering Messages"); consumer.setMessageListener(new Server()); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { logger.info("asyn receive: " + textMessage.getText()); TextMessage response = this.session.createTextMessage(); response.setText("asyn receive: " + textMessage.getText()); response.setJMSCorrelationID(message.getJMSCorrelationID()); replyProducer.send(message.getJMSReplyTo(), response); Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } 感谢。。 |
|
返回顶楼 | |
发表时间:2010-08-06
最后修改:2010-08-06
http://beehive.apache.org/docs/1.0/system-controls/jms/jmsDoc.html
using jms and xmlbean 实现 control bus, 一个经典的例子就是esper EE. |
|
返回顶楼 | |
发表时间:2010-08-08
cxf SOAP可以使用 JMS 作为 传输的方式,并且是同步, 可以研究一下源代码, 就是建立一个temp的queue, 并且使用reply的功能.
|
|
返回顶楼 | |