`
gtgt1988
  • 浏览: 114265 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论

JAVA----activeMQ -点对点

 
阅读更多
 最近在学习activeMQ,想模拟个类似于QQ单聊和群聊的功能..大致思路如下 消费者开发流程:
1.创建Connection:
2. 创建Session和queue :
3.根据queue 创建QueueReceiver 对象
queueConnection = connectionFactory.createQueueConnection();
				queueConnection.start();

	QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

			Queue queue = queueSession.createQueue(String.valueOf(fid)+"-"+String.valueOf(userId));

			QueueReceiver queueReceiver = queueSession.createReceiver(queue);


queueReceiver.setMessageListener监听发布者发布的消息
 
单聊用的是point-to-point  点对点的形式发送方式

建立一个单聊消息类

public class SingleMessage implements Serializable {

	private static final long serialVersionUID = -3264456779519472584L;
	double id = 0.0;
	int fromId = 0;
	String fromName = "";
	int toId = 0;
	String toName = "";
	String body = "";
	Date sendDate = null;
	
	public SingleMessage(){
		id = Math.random();
	}

	public int getFromId() {
		return fromId;
	}

	public void setFromId(int fromId) {
		this.fromId = fromId;
	}

	public String getFromName() {
		return fromName;
	}

	public void setFromName(String fromName) {
		this.fromName = fromName;
	}

	public int getToId() {
		return toId;
	}

	public void setToId(int toId) {
		this.toId = toId;
	}

	public String getToName() {
		return toName;
	}

	public void setToName(String toName) {
		this.toName = toName;
	}

	public String getBody() {
		return body;
	}

	public void setBody(String body) {
		this.body = body;
	}

	public Date getSendDate() {
		return sendDate;
	}

	public void setSendDate(Date sendDate) {
		this.sendDate = sendDate;
	}

	public double getId() {
		return id;
	}
}
	ActiveMQConnectionFactory connectionFactory = null;
	
	Map<Integer, SingleChat> messageMap = new HashMap<Integer, SingleChat>();
发送消息的大致代码代码如下:
SingleMessage message = new SingleMessage();
		message.setFromId(user.getId());
		message.setFromName(user.getShowName());
		message.setToId(fid);
		message.setBody(text);
		message.setSendDate(new Date());
		try {
			QueueConnection queueConnection = connectionFactory.createQueueConnection();
			QueueSession session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
			Queue queue = session.createQueue(String.valueOf(user.getId())+"-"+String.valueOf(fid));
			QueueSender sender = session.createSender(queue);
			sender.setDeliveryMode(DeliveryMode.PERSISTENT);
			sender.setTimeToLive(3*24*60*60*1000);
			ObjectMessage objectMessage = session.createObjectMessage();
			objectMessage.setObject(message);
			sender.send(objectMessage);
			queueConnection.close();





接收消息代码如下
if(queueConnection == null){
			try {
				queueConnection = connectionFactory.createQueueConnection();
				queueConnection.start();
				
				
			} catch (JMSException e) {
				logger.error(e.getMessage(), e);
			}
		}
		try {
			QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
			Queue queue = queueSession.createQueue(String.valueOf(fid)+"-"+String.valueOf(userId));
			QueueReceiver queueReceiver = queueSession.createReceiver(queue);
			queueReceiver.setMessageListener(new MessageListener() {
				public void onMessage(javax.jms.Message message) {
					logger.info("接收到消息类型:"+message.getClass());
					if(message instanceof ObjectMessage){
						if(messageMap.containsKey(fid)){
							try {
								ObjectMessage objectMessage = (ObjectMessage)message;
								SingleMessage singleMessage = (SingleMessage) objectMessage.getObject();
								logger.info(singleMessage.getBody());
								messageMap.get(fid).getMessages().add(singleMessage);
								objectMessage.acknowledge();
							} catch (JMSException e) {
								logger.error(e.getMessage(), e);
							}
						}
					}
				}
			});
			SingleChat singleChat = new SingleChat();
			singleChat.setSession(queueSession);
			messageMap.put(fid, singleChat);




	/**
	 * 包装连接和消息列表的内部类
	 
	 */
	private class SingleChat{
		private QueueSession session = null;
		private List<SingleMessage> messages = new ArrayList<SingleMessage>();
		public QueueSession getSession() {
			return session;
		}
		public void setSession(QueueSession session) {
			this.session = session;
		}
		public List<SingleMessage> getMessages() {
			return messages;
		}
	}
 

 

分享到:
评论

相关推荐

    apache-activemq-5.9.0-bin

    1. **消息队列**:ActiveMQ支持多种消息模式,如点对点(Queue)和发布/订阅(Topic)。消息队列确保消息的可靠传输,即使在发送方和接收方之间发生故障时也能保持数据的完整性。 2. **JMS兼容性**:ActiveMQ完全...

    apache-activemq-5.8.0-bin.zip

    - **队列(Queues)**:提供点对点的消息传递,消息仅被一个消费者消费。 - **主题(Topics)**:支持发布/订阅模式,消息可以被多个消费者接收。 - **持久化**:ActiveMQ支持将消息持久化到磁盘,即使在服务器...

    apache-activemq-5.15.6

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 2. **消息模型**:ActiveMQ支持多种消息模型,包括持久化消息、非持久化消息、事务消息等。持久化消息即使在服务器重启后也能保持,而非...

    activemq-cpp-library-3.9.5-src.zip

    总之,ActiveMQ-CPP库3.9.5版本为C++开发者提供了强大的消息传递功能,无论是简单的点对点通信还是复杂的发布/订阅模式,都能灵活应对。通过熟悉和掌握其API及特性,开发者可以构建高效、可靠的分布式系统,实现数据...

    activemq-cpp-library-3.9.5 编译的windows库文件,支持vs2015、vs2017

    APR是许多Apache项目的基础,对ActiveMQ-CPP的稳定运行至关重要。 同时,压缩包内还包含`Win32`目录,这表明该库已针对Windows 32位系统进行了优化,可以确保在32位环境下正常运行。此外,`apr-1.7.0-win32-src`和`...

    apache-activemq-5.13.2-bin.tar.gz

    ActiveMQ的核心功能是作为消息代理,它允许应用程序通过发布/订阅和点对点模式进行异步通信。这种通信方式提高了系统的可扩展性和可靠性,因为消息可以在生产者和消费者之间独立传输,即使它们在不同的时间运行或...

    apache-activemq-5.16.0-bin.tar.gz 下载(5积分)

    2. **JMS支持**:遵循JMS 1.1规范,提供点对点和发布/订阅两种消息模型,支持多种消息类型如文本、对象、文件和流消息。 3. **多协议支持**:除了JMS,还支持STOMP、AMQP、MQTT、OpenWire等多种消息协议,以适应不同...

    apache-activemq-5.15.11-bin.tar.gz

    队列采用点对点模型,每个消息只被一个消费者接收;主题采用发布/订阅模型,可以有多个订阅者接收同一消息。 2. **ActiveMQ 5.15.11的特性**: - **高可用性**:此版本支持集群和故障转移,确保即使在单个服务器...

    apache-activemq-5.15.7-bin

    2. **主题(Topic)与队列(Queue)**:ActiveMQ支持两种消息模型——发布/订阅(Publish/Subscribe)和点对点(Point-to-Point)。主题适用于广播式通信,多个订阅者可以接收到相同的消息;队列则遵循FIFO(先进先...

    apache-activemq-5.15.15二进制包,安装包

    5. **主题与队列**:支持发布/订阅模型的主题(Topics)和点对点模型的队列(Queues),满足不同类型的通信需求。 6. **管理工具**:包含了一个Web控制台,可以方便地管理和监控ActiveMQ实例,包括查看和管理消息、...

    apache-activemq-5.15.7-bin.tar.gz

    - **主题与队列**:支持发布/订阅模式的主题和点对点模式的队列,满足不同应用场景的需求。 - **安全性**:通过JAAS(Java Authentication and Authorization Service)实现用户身份验证和权限控制。 - **消息优先级...

    apache-activemq-5.3.1-bin.tar.gz

    ActiveMQ作为JMS实现,支持点对点(Queue)和发布/订阅(Topic)两种模式。 **3. 安装过程** 下载“apache-activemq-5.3.1-bin.tar.gz”后,你需要解压到一个合适的目录。这可以通过命令行工具如`tar`完成: ```...

    apache-activemq-5.11.1

    它支持点对点(P2P)和发布/订阅(Pub/Sub)两种消息模型。 2. **协议支持**:ActiveMQ不仅支持JMS,还支持AMQP、STOMP、MQTT等多种消息协议,使得它能与多种语言和平台无缝集成。 3. **高可用性**:通过集群和...

    apache-activemq-5.9.1-bin.tar.gz

    此外,ActiveMQ支持多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。点对点模式下,消息只被一个消费者接收;而在发布/订阅模式下,消息可以被多个订阅者消费。这些模型适用于不同的应用场景。 消息传输的...

    apache-activemq-5.3.0-bin.zip

    - **队列与主题**:ActiveMQ支持两种消息模式——点对点(Queue)和发布/订阅(Topic)。队列保证消息的顺序传递,而主题则允许广播式的消息分发。 2. **ActiveMQ功能**: - **高可用性**:通过集群和复制策略,...

    apache-activemq-5.15.11-bin.rar

    ActiveMQ作为JMS提供商,提供了对JMS接口的支持,使得开发者可以使用Java语言方便地实现消息传递。 4. **ActiveMQ特性**: - **持久化**:ActiveMQ支持消息的持久化存储,即使在服务器重启后,也能保证消息不会...

    apache-activemq-5.15.10-bin.tar.gz

    1. **JMS兼容性** - ActiveMQ符合JMS 1.1规范,支持点对点(Queue)和发布/订阅(Topic)两种消息模型,以及事务处理和持久化消息。 2. **多协议支持** - 支持多种协议,包括OpenWire、AMQP、STOMP、MQTT、WS-...

    apache-activemq-5.15.1-bin.tar.gz

    ActiveMQ实现了JMS 1.1规范,提供了多种消息模型,包括点对点(Queue)和发布/订阅(Topic)。 3. **多种协议支持**:ActiveMQ不仅支持JMS,还支持STOMP、AMQP、XMPP、OpenWire等多种消息传输协议,使其能与其他非...

    apache-activemq-5.13.4-bin

    8. **主题和队列**:支持发布/订阅模型的主题和点对点模型的队列,满足不同的应用场景。 9. **过滤和路由**:提供灵活的消息筛选和路由规则,可以根据特定条件对消息进行处理或转发。 10. **事务支持**:支持本地JMS...

    apache-activemq-5.15.3-bin

    它的核心功能包括发布/订阅和点对点的消息模式,以及事务处理和持久化机制。 **2. 版本5.15.3特性** - **稳定性与性能优化**:此版本在前一版本的基础上进行了大量优化,提高了系统的稳定性和处理消息的性能。 - **...

Global site tag (gtag.js) - Google Analytics