目标:使用开源JMS应用框架Apache ActiveMQ,实现P2P方式下Request-Response Socket通信
模式:P2P
特征:发送端可以实时发送请求并取得响应,接收端可以实时获取请求并返回响应
库:activemq-all-5.11.1.jar
前提:启动activemq消息服务程序
--------------------------------------------------------------------------------------------
发送端:
package merrick.activemq;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender_p2p {
private static String user = ActiveMQConnection.DEFAULT_USER;
private static String password =ActiveMQConnection.DEFAULT_PASSWORD;
private static String url = "tcp://localhost:61616";
public static final String LONGMSG = "123456789012345678901234567890123456789012345678901234567890";
public static void main(String[] args) {
try {
new Sender_p2p().send(UUID.randomUUID().toString()+LONGMSG);
} catch (Throwable e) {
e.printStackTrace();
}
}
/***
* P2P模式
* Request-Response
* 类似:发送请求同步获取响应
* 此处可应用为TCP SOCKET Client端
*
* */
public void send(String msg) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("Request&Response_X");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(msg);
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
message.setJMSReplyTo(tempDest);
String correlationId = UUID.randomUUID().toString();
message.setJMSCorrelationID(correlationId); //
responseConsumer.setMessageListener(new MessageListener() {//监听回应
@Override
public void onMessage(Message arg0) {
try {
TextMessage textMessage=(TextMessage)arg0;
System.out.println(textMessage.getText()); //显示接收者的Response
} catch (JMSException e) {
e.printStackTrace();
} finally {//得到回应后关闭会话和连接
try {
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}});
producer.send(message); //发送消息
System.out.println("<"+Thread.currentThread().getId() + "> send ok: "+msg);
session.commit();
}
}
--------------------------------------------------------------------------------------------
接收端:
package merrick.activemq;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver_p2p {
private static String user = ActiveMQConnection.DEFAULT_USER;
private static String password =ActiveMQConnection.DEFAULT_PASSWORD;
private static String url = "tcp://localhost:61616";
public static void main(String[] args) {
try {
Receiver_p2p obj = new Receiver_p2p();
obj.receive();
} catch (Exception e) {
e.printStackTrace();
}
}
/***
* P2P模式
* Request-Response
* 类似:获取即时内容,同步返回响应
* (但Session不能关闭)
* 此处可应用为TCP SOCKET Server端
* */
public void receive() throws Exception{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("Request&Response_X");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) { //监听消息
try {
TextMessage textMessage=(TextMessage)message;
System.out.println("[Received]"+textMessage.getText()); //取得消息
Message response = session.createTextMessage(UUID.randomUUID().toString() + " [from receiver] "+ textMessage.getText().length()+", " + textMessage.getText());//及时给予Response
response.setJMSCorrelationID(message.getJMSCorrelationID());
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(message.getJMSReplyTo(),response); //发送回应
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
}
}
相关推荐
这种模式非常适合实现请求-响应类型的通信。例如,一个客户端发送请求到一个服务器,服务器处理请求后通过消息响应客户端。在这个实例中,我们将创建一个生产者,它将消息放入一个特定的队列,然后创建一个或多个...
对于一些耗时较长的任务处理(如视频转码、数据处理等),可以通过ActiveMQ异步接收这些任务请求,并由后台工作线程池进行处理,从而提高系统的响应速度和整体性能。 ### 总结 通过以上介绍,我们可以看到ActiveMQ...
在现代网络通讯中,HTTP 请求通常采用同步方式,基于请求-响应模式。这意味着客户端调用服务端接口后,必须等待服务端返回结果才能继续执行,这种方式称为同步调用。然而,同步调用的缺点在于,如果服务器端出现网络...
- ActiveMQ:Apache的一个非常成熟的开源消息中间件项目。 2. **学习方式**:采用“总-分-总”的方法进行学习: - 总体上理解消息队列的概念和应用场景。 - 分别学习不同的消息队列产品及其特性。 - 最后总结...
- P2P与发布/订阅模式的区别。 - ActiveMQ的监控与管理。 - **面试考察要点:** - ActiveMQ在项目中的具体应用场景。 - 如何处理数据提交失败的情况。 **3. MQ的三种Exchange** - **知识点概述:** - Direct ...
要建立JMS,首先你需要选择一个JMS提供商,如ActiveMQ、RabbitMQ或Apache Qpid等。每个提供商都有自己的实现,但都必须遵循JMS API规范。接下来,你需要配置JMS连接工厂,这通常涉及到设置URL、用户名、密码等连接...
在"消息队列实践项目"中,你将会学习如何使用Java API实现JMS客户端,创建消息生产者和消费者,以及如何配置和使用常见的消息中间件,如Apache ActiveMQ、RabbitMQ或Kafka。此外,还会涉及到如何设计和实施消息序列...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
在Java中,我们可以使用多种库和框架来实现这样的系统,例如Java Message Service (JMS) 和 Apache ActiveMQ。 Java Message Service (JMS) 是Java平台上的一个标准API,它定义了与消息传递系统交互的接口,允许...
这种模式可以减少服务端的响应时间,提高系统吞吐量。 4. **消息模型**: 在这个项目中,可能会使用两种JMS消息模型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。点对点模型适用于...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
2. **Java编程**:作为主要的编程语言,Java提供了许多库和框架来集成消息队列,如Apache ActiveMQ的Artemis客户端、RabbitMQ的Java客户端等。MessageDBdemo可能展示了如何在Java程序中与消息队列交互。 3. **消息...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...
Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...