最近在学习activeMQ,想模拟个类似于QQ单聊和群聊的功能..大致思路如下 消费者开发流程:
1.创建Connection:
2. 创建Session和queue :
3.根据queue 创建QueueReceiver 对象
queueConnection = connectionFactory.createQueueConnection();
queueConnection.start();
QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(String.valueOf(fid)+"-"+String.valueOf(userId));
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener监听发布者发布的消息
单聊用的是point-to-point 点对点的形式发送方式
建立一个单聊消息类
public class SingleMessage implements Serializable {
private static final long serialVersionUID = -3264456779519472584L;
double id = 0.0;
int fromId = 0;
String fromName = "";
int toId = 0;
String toName = "";
String body = "";
Date sendDate = null;
public SingleMessage(){
id = Math.random();
}
public int getFromId() {
return fromId;
}
public void setFromId(int fromId) {
this.fromId = fromId;
}
public String getFromName() {
return fromName;
}
public void setFromName(String fromName) {
this.fromName = fromName;
}
public int getToId() {
return toId;
}
public void setToId(int toId) {
this.toId = toId;
}
public String getToName() {
return toName;
}
public void setToName(String toName) {
this.toName = toName;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public Date getSendDate() {
return sendDate;
}
public void setSendDate(Date sendDate) {
this.sendDate = sendDate;
}
public double getId() {
return id;
}
}
ActiveMQConnectionFactory connectionFactory = null;
Map<Integer, SingleChat> messageMap = new HashMap<Integer, SingleChat>();
发送消息的大致代码代码如下:
SingleMessage message = new SingleMessage();
message.setFromId(user.getId());
message.setFromName(user.getShowName());
message.setToId(fid);
message.setBody(text);
message.setSendDate(new Date());
try {
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(String.valueOf(user.getId())+"-"+String.valueOf(fid));
QueueSender sender = session.createSender(queue);
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setTimeToLive(3*24*60*60*1000);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(message);
sender.send(objectMessage);
queueConnection.close();
接收消息代码如下
if(queueConnection == null){
try {
queueConnection = connectionFactory.createQueueConnection();
queueConnection.start();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
try {
QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(String.valueOf(fid)+"-"+String.valueOf(userId));
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(new MessageListener() {
public void onMessage(javax.jms.Message message) {
logger.info("接收到消息类型:"+message.getClass());
if(message instanceof ObjectMessage){
if(messageMap.containsKey(fid)){
try {
ObjectMessage objectMessage = (ObjectMessage)message;
SingleMessage singleMessage = (SingleMessage) objectMessage.getObject();
logger.info(singleMessage.getBody());
messageMap.get(fid).getMessages().add(singleMessage);
objectMessage.acknowledge();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
}
}
});
SingleChat singleChat = new SingleChat();
singleChat.setSession(queueSession);
messageMap.put(fid, singleChat);
/**
* 包装连接和消息列表的内部类
*/
private class SingleChat{
private QueueSession session = null;
private List<SingleMessage> messages = new ArrayList<SingleMessage>();
public QueueSession getSession() {
return session;
}
public void setSession(QueueSession session) {
this.session = session;
}
public List<SingleMessage> getMessages() {
return messages;
}
}
分享到:
相关推荐
1. **消息队列**:ActiveMQ支持多种消息模式,如点对点(Queue)和发布/订阅(Topic)。消息队列确保消息的可靠传输,即使在发送方和接收方之间发生故障时也能保持数据的完整性。 2. **JMS兼容性**:ActiveMQ完全...
- **队列(Queues)**:提供点对点的消息传递,消息仅被一个消费者消费。 - **主题(Topics)**:支持发布/订阅模式,消息可以被多个消费者接收。 - **持久化**:ActiveMQ支持将消息持久化到磁盘,即使在服务器...
ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 2. **消息模型**:ActiveMQ支持多种消息模型,包括持久化消息、非持久化消息、事务消息等。持久化消息即使在服务器重启后也能保持,而非...
总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过熟悉和掌握其API及特性,开发者可以构建高效、可靠的分布式系统,实现数据...
APR是许多Apache项目的基础,对ActiveMQ-CPP的稳定运行至关重要。 同时,压缩包内还包含`Win32`目录,这表明该库已针对Windows 32位系统进行了优化,可以确保在32位环境下正常运行。此外,`apr-1.7.0-win32-src`和`...
ActiveMQ的核心功能是作为消息代理,它允许应用程序通过发布/订阅和点对点模式进行异步通信。这种通信方式提高了系统的可扩展性和可靠性,因为消息可以在生产者和消费者之间独立传输,即使它们在不同的时间运行或...
2. **JMS支持**:遵循JMS 1.1规范,提供点对点和发布/订阅两种消息模型,支持多种消息类型如文本、对象、文件和流消息。 3. **多协议支持**:除了JMS,还支持STOMP、AMQP、MQTT、OpenWire等多种消息协议,以适应不同...
队列采用点对点模型,每个消息只被一个消费者接收;主题采用发布/订阅模型,可以有多个订阅者接收同一消息。 2. **ActiveMQ 5.15.11的特性**: - **高可用性**:此版本支持集群和故障转移,确保即使在单个服务器...
2. **主题(Topic)与队列(Queue)**:ActiveMQ支持两种消息模型——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。主题适用于广播式通信,多个订阅者可以接收到相同的消息;队列则遵循FIFO(先进先...
5. **主题与队列**:支持发布/订阅模型的主题(Topics)和点对点模型的队列(Queues),满足不同类型的通信需求。 6. **管理工具**:包含了一个Web控制台,可以方便地管理和监控ActiveMQ实例,包括查看和管理消息、...
- **主题与队列**:支持发布/订阅模式的主题和点对点模式的队列,满足不同应用场景的需求。 - **安全性**:通过JAAS(Java Authentication and Authorization Service)实现用户身份验证和权限控制。 - **消息优先级...
ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...
它支持点对点(P2P)和发布/订阅(Pub/Sub)两种消息模型。 2. **协议支持**:ActiveMQ不仅支持JMS,还支持AMQP、STOMP、MQTT等多种消息协议,使得它能与多种语言和平台无缝集成。 3. **高可用性**:通过集群和...
此外,ActiveMQ支持多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。点对点模式下,消息只被一个消费者接收;而在发布/订阅模式下,消息可以被多个订阅者消费。这些模型适用于不同的应用场景。 消息传输的...
- **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...
ActiveMQ作为JMS提供商,提供了对JMS接口的支持,使得开发者可以使用Java语言方便地实现消息传递。 4. **ActiveMQ特性**: - **持久化**:ActiveMQ支持消息的持久化存储,即使在服务器重启后,也能保证消息不会...
1. **JMS兼容性** - ActiveMQ符合JMS 1.1规范,支持点对点(Queue)和发布/订阅(Topic)两种消息模型,以及事务处理和持久化消息。 2. **多协议支持** - 支持多种协议,包括OpenWire、AMQP、STOMP、MQTT、WS-...
ActiveMQ实现了JMS 1.1规范,提供了多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。 3. **多种协议支持**:ActiveMQ不仅支持JMS,还支持STOMP、AMQP、XMPP、OpenWire等多种消息传输协议,使其能与其他非...
8. **主题和队列**:支持发布/订阅模型的主题和点对点模型的队列,满足不同的应用场景。 9. **过滤和路由**:提供灵活的消息筛选和路由规则,可以根据特定条件对消息进行处理或转发。 10. **事务支持**:支持本地JMS...
它的核心功能包括发布/订阅和点对点的消息模式,以及事务处理和持久化机制。 **2. 版本5.15.3特性** - **稳定性与性能优化**:此版本在前一版本的基础上进行了大量优化,提高了系统的稳定性和处理消息的性能。 - **...