企业中各项目中相互协作的时候可能用得到消息通知机制。比如有东西更新了,可以通知做索引。
在 Java 里有 JMS 的多个实现。其中 apache 下的 ActiveMQ 就是不错的选择。ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。这里示例下使用 ActiveMQ
用 ActiveMQ 最好还是了解下 JMS
JMS 公共 点对点域 发布/订阅域
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver TopicSubscriber
JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。
ConnectionFactory 是连接工厂,负责创建Connection。
Connection 负责创建 Session。
Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。
Destination 是消息的目的地。
详细的可以网上找些 JMS 规范(有中文版)。
下载 apache-activemq-5.3.0。http://activemq.apache.org/download.html ,解压,然后双击 bin/activemq.bat。运行后,可以在 http://localhost:8161/admin 观察。也有 demo, http://localhost:8161/demo 。把 activemq-all-5.3.0.jar 加入 classpath。
Jms 发送 代码:
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for(int i=0; i<3; i++) {
MapMessage message = session.createMapMessage();
message.setLong("count", new Date().getTime());
Thread.sleep(1000);
//通过消息生产者发出消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
Jms 接收代码:
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i=0;
while(i<3) {
i++;
MapMessage message = (MapMessage) consumer.receive();
session.commit();
//TODO something....
System.out.println("收到消息:" + new Date(message.getLong("count")));
}
session.close();
connection.close();
}
JMS五种消息的发送/接收的例子
转自:http://chenjumin.javaeye.com/blog/687124
1、消息发送
//连接工厂
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//连接到JMS提供者
Connection conn = connFactory.createConnection();
conn.start();
//事务性会话,自动确认消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的目的地
Destination destination = session.createQueue("queue.hello");
//消息生产者
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //不持久化
//文本消息
TextMessage textMessage = session.createTextMessage("文本消息");
producer.send(textMessage);
//键值对消息
MapMessage mapMessage = session.createMapMessage();
mapMessage.setLong("age", new Long(32));
mapMessage.setDouble("sarray", new Double(5867.15));
mapMessage.setString("username", "键值对消息");
producer.send(mapMessage);
//流消息
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("streamMessage流消息");
streamMessage.writeLong(55);
producer.send(streamMessage);
//字节消息
String s = "BytesMessage字节消息";
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(s.getBytes());
producer.send(bytesMessage);
//对象消息
User user = new User("cjm", "对象消息"); //User对象必须实现Serializable接口
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(user);
producer.send(objectMessage);
session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地
producer.close();
session.close();
conn.close();
2、消息接收:通过消息监听器的方式接收消息
public class Receiver implements MessageListener{
private boolean stop = false;
public void execute() throws Exception {
//连接工厂
ConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
//连接到JMS提供者
Connection conn = connFactory.createConnection();
conn.start();
//事务性会话,自动确认消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的来源地
Destination destination = session.createQueue("queue.hello");
//消息消费者
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
//等待接收消息
while(!stop){
Thread.sleep(5000);
}
session.commit();
consumer.close();
session.close();
conn.close();
}
public void onMessage(Message m) {
try{
if(m instanceof TextMessage){ //接收文本消息
TextMessage message = (TextMessage)m;
System.out.println(message.getText());
}else if(m instanceof MapMessage){ //接收键值对消息
MapMessage message = (MapMessage)m;
System.out.println(message.getLong("age"));
System.out.println(message.getDouble("sarray"));
System.out.println(message.getString("username"));
}else if(m instanceof StreamMessage){ //接收流消息
StreamMessage message = (StreamMessage)m;
System.out.println(message.readString());
System.out.println(message.readLong());
}else if(m instanceof BytesMessage){ //接收字节消息
byte[] b = new byte[1024];
int len = -1;
BytesMessage message = (BytesMessage)m;
while((len=message.readBytes(b))!=-1){
System.out.println(new String(b, 0, len));
}
}else if(m instanceof ObjectMessage){ //接收对象消息
ObjectMessage message = (ObjectMessage)m;
User user = (User)message.getObject();
System.out.println(user.getUsername() + " _ " + user.getPassword());
}else{
System.out.println(m);
}
stop = true;
}catch(JMSException e){
stop = true;
e.printStackTrace();
}
}
}
分享到:
相关推荐
SpringActiveMQ入门示例是关于如何在Java环境中利用Spring框架与Apache ActiveMQ集成的一个实践教程。这个示例主要适用于开发者想要了解如何在Spring应用中使用消息队列进行异步通信和解耦。在这个项目中,开发环境...
**ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统...
根据提供的文件信息:“activeMQ入门到精通”,我们可以深入探讨ActiveMQ的相关知识点,包括其基本概念、安装配置步骤、核心功能特性以及应用场景等。 ### ActiveMQ简介 ActiveMQ是一款开源的消息中间件,它支持...
本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式。 首先,我们需要了解ActiveMQ的基本概念。在消息队列中,生产者是发送消息的实体,而消费者则是接收和处理这些消息的实体。...
**JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...
在压缩包"ActiveMQ-5.1"中,可能包含了示例代码和配置文件,你可以根据这些资料动手实践,通过运行例子来加深对ActiveMQ的理解。这些例子涵盖了基本的发送和接收消息,以及一些高级特性,如消息选择器、事务管理等。...
在这个“activeMQ消息中间件入门示例”中,我们将探讨如何设置基本的生产者和消费者来实现消息传递。 首先,让我们了解什么是消息中间件。消息中间件是一种软件系统,它允许不同的应用之间通过消息进行通信,而不是...
在“HETF-ActiveMQ入门手册.doc”中,读者可能会找到如何安装和配置ActiveMQ,创建和管理队列与主题,设置安全策略,以及如何在实际项目中集成和使用ActiveMQ的详细步骤和示例。此外,文档可能还会涵盖性能调优技巧...
《ActiveMQ入门实例详解》 在信息技术领域,消息队列(Message Queue)作为一种重要的中间件技术,被广泛应用于系统解耦、异步处理以及负载均衡等场景。Apache ActiveMQ是Apache软件基金会开发的一款开源消息代理,...
在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单的Master环境。 首先,我们要了解消息队列(Message Queue)的基本概念。消息队列是一种异步通信机制,它允许应用程序之间...
描述中的"memcached 和 activeMQ 的入门级示例代码,JAVA eclipse工程"告诉我们这个项目是为初学者设计的,它包含了在Eclipse开发环境中运行的Java代码。Eclipse是一款广泛使用的Java集成开发环境(IDE),使得...
以下是一个简单的示例: ```java package com.abc.demo.activemq.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org....
本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...
001-ActiveMQ基础;002-安全机制+签收模式+发送模式+MessageProducer;003-顺序消费+消息过滤SELECTOR+MessageConsumer+MySql持久化;004-p2p模式+pulish-subscribe发布订阅模式+与spring集成;...示例;
**ActiveMQ入门详解** ActiveMQ是Apache组织开发的一款开源的消息中间件,它是Java Message Service (JMS) 的实现,主要用于处理应用间的异步通信。在分布式系统中,ActiveMQ作为一个消息代理,允许应用程序通过...