一、特性及优势
1、实现 JMS1.1 规范,支持 J2EE1.4以上
2、可运行于任何 jvm和大部分 web 容器(ActiveMQ works great in any JVM)
3、支持多种语言客户端(java, C, C++, AJAX, ACTIONSCRIPT 等等)
4、支持多种协议(stomp,openwire,REST)
5、良好的 spring 支持(ActiveMQ has great Spring Support)
6、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than
JBossMQ.)
7、与 OpenJMS、JbossMQ等开源jms provider 相比,ActiveMQ有 Apache 的支
持,持续发展的优势明显。
二、下载部署
1、下载
http://activemq.apache.org/activemq-510-release.html ,下载 5.1.0 Windows
Distribution版本
2、安装
直接解压至任意目录(如:d:\ apache-activemq-5.1.0)
3、启动 ActiveMQ服务器
方法 1:
直接运行 bin\activemq.bat
方法 2(在 JVM 中嵌套启动):
cd example
ant embedBroker
4、ActiveMQ消息管理后台系统:
http://localhost:8161/admin
三、运行附带的示例程序
1、Queue 消息示例:(点对点)
* 启动 Queue 消息消费者
cd example ant consumer
* 启动 Queue 消息生产者
cd example
ant producer
简要说明:生产者(producer)发消息,消费者(consumer)接消息,发送/接
收 2000 个消息后自动关闭
2、Topic 消息示例:(群组订阅)
* 启动 Topic 消息消费者
cd example
ant topic-listener
* 启动 Topic 消息生产者
cd example
ant topic-publisher
简要说明:重复 10 轮,publisher每轮发送2000 个消息,并等待获取 listener
的处理结果报告,然后进入下一轮发送,最后统计全局发送时间。
四、Queue与 Topic 的比较
1、JMS Queue 执行 load balancer语义:
一条消息仅能被一个 consumer(消费者) 收到。如果在 message 发送的时候没有可用的
consumer,那么它将被保存一直到能处理该 message 的 consumer 可用。如果一
个 consumer 收到一条 message 后却不响应它,那么这条消息将被转到另一个
consumer 那儿。一个 Queue 可以有很多 consumer,并且在多个可用的 consumer
中负载均衡。
注:
点对点消息传递域的特点如下:
• 每个消息只能有一个消费者。
• 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发
送消息的时候是否处于运行状态,它都可以提取消息。
2、Topic 实现 publish和 subscribe 语义:
一条消息被 publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber
将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的
subscriber能够获得消息的一个拷贝。
注:
发布/订阅消息传递域的特点如下:
• 每个消息可以有多个消费者。
• 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费
自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程
度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激
活状态时发送的消息。
3、分别对应两种消息模式:
Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者) 其中在 Publicher/Subscriber 模式下又有Nondurable subscription(非持久订阅)
和 durable subscription (持久化订阅)2种消息处理方式(支持离线消息)。
注:
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递
域中,目的地被成为主题(topic)。
五、Point-to-Point (点对点)消息模式开发流程
1、生产者(producer)开发流程(ProducerTool.java):
1.1 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection。
Java代码
1.ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
2. connection = connectionFactory.createConnection();
3. connection.start();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
1.2 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。
Java代码
1.Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
1.3 创建 Destination对象:
需指定其对应的主题(subject)名称,producer 和 consumer 将根据 subject
来发送/接收对应的消息。
Java代码
1.if (topic) {
2. destination = session.createTopic(subject);
3.} else {
4. destination = session.createQueue(subject);
5.}
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
1.4 创建 MessageProducer:
根据 Destination创建MessageProducer 对象,同时设置其持久模式。
Java代码
1.MessageProducer producer = session.createProducer(destination);
MessageProducer producer = session.createProducer(destination);
1.5 发送消息到队列(Queue):
封装 TextMessage 消息, 使用 MessageProducer 的 send 方法将消息发送出去。
Java代码
1.TextMessage message = session.createTextMessage(createMessageText(i));
2.producer.send(message);
TextMessage message = session.createTextMessage(createMessageText(i));
producer.send(message);
2、消费者(consumer)开发流程(ConsumerTool.java):
2.1 实现 MessageListener 接口:
消费者类必须实现MessageListener 接口,然后在onMessage()方法中监听消息的
到达并处理。
Java代码
1.public class ConsumerTool extends Thread implements MessageListener, ExceptionListener
public class ConsumerTool extends Thread implements MessageListener, ExceptionListener 实现 onMessage(Message message)方法,实现监听消息的到达
2.2 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection,如果是durable 模式,
还需要给 connection设置一个 clientId。
Java代码
1.ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
2.Connection connection = connectionFactory.createConnection();
3.//是否是 durable 模式.(离线消息持久化支持)
4.if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
5. connection.setClientID(clientId);
6.}
7.connection.setExceptionListener(this);
8.connection.start();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
//是否是 durable 模式.(离线消息持久化支持)
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
connection.setClientID(clientId);
}
connection.setExceptionListener(this);
connection.start();
2.3 创建 Session 和 Destination:
与 ProducerTool.java 中的流程类似,不再赘述。
Java代码
1.session = connection.createSession(transacted, ackMode);
session = connection.createSession(transacted, ackMode);
2.4 创建 replyProducer【可选】:
可以用来将消息处理结果发送给 producer。
2.5 创建 MessageConsumer:
根据 Destination创建MessageConsumer 对象。
Java代码
1.MessageConsumer consumer = null;
2.if (durable && topic) {
3. consumer = session.createDurableSubscriber((Topic) destination, consumerName);
4.} else {
5. consumer = session.createConsumer(destination);
6.}
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}
2.6 消费 message:
在 onMessage()方法中接收producer 发送过来的消息进行处理,并可以通过
replyProducer 反馈信息给 producer
Java代码
1.if (message.getJMSReplyTo() != null) {
2. replyProducer.send(message.getJMSReplyTo()
3. , session.createTextMessage("Reply: "
4. + message.getJMSMessageID()));
5.}
if (message.getJMSReplyTo() != null) {
replyProducer.send(message.getJMSReplyTo()
, session.createTextMessage("Reply: "
+ message.getJMSMessageID()));
}
六、Publisher/Subscriber(发布/订阅者)消息模式开发流程
1、订阅者(Subscriber)开发流程(TopicListener.java):
1.1 实现 MessageListener 接口:
在 onMessage()方法中监听发布者发出的消息队列,并做相应处理。
Java代码
1.public void onMessage(Message message) {
2. if (checkText(message, "SHUTDOWN")) {
3.
4. try {
5. connection.close();
6. } catch (Exception e) {
7. e.printStackTrace(System.out);
8. }
9.
10. } else if (checkText(message, "REPORT")) {
11. // send a report:
12. try {
13. long time = System.currentTimeMillis() - start;
14. String msg = "Received " + count + " in " + time + "ms";
15. producer.send(session.createTextMessage(msg));
16. } catch (Exception e) {
17. e.printStackTrace(System.out);
18. }
19. count = 0;
20.
21. } else {
22.
23. if (count == 0) {
24. start = System.currentTimeMillis();
25. }
26.
27. if (++count % 1000 == 0) {
28. System.out.println("Received " + count + " messages.");
29. }
30. }
31.}
public void onMessage(Message message) {
if (checkText(message, "SHUTDOWN")) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace(System.out);
}
} else if (checkText(message, "REPORT")) {
// send a report:
try {
long time = System.currentTimeMillis() - start;
String msg = "Received " + count + " in " + time + "ms";
producer.send(session.createTextMessage(msg));
} catch (Exception e) {
e.printStackTrace(System.out);
}
count = 0;
} else {
if (count == 0) {
start = System.currentTimeMillis();
}
if (++count % 1000 == 0) {
System.out.println("Received " + count + " messages.");
}
}
}
1.2 创建 Connection:
根据 url,user 和 password 创建一个 jms Connection。
Java代码
1.ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
2.connection = factory.createConnection();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
1.3 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。
Java代码
1.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
1.4 创建 Topic:
创建 2 个Topic, topictest.messages用于接收发布者发出的消息,
topictest.control用于向发布者发送消息,实现双方的交互。
Java代码
1.topic = session.createTopic("topictest.messages");
2.control = session.createTopic("topictest.control");
topic = session.createTopic("topictest.messages");
control = session.createTopic("topictest.control");
1.5 创建 consumer 和 producer 对象:
根据topictest.messages创建consumer,根据topictest.control创建producer。
Java代码
1.MessageConsumer consumer = session.createConsumer(topic);//创建消费者
2.consumer.setMessageListener(this);
3.
4.connection.start();
5.
6.producer = session.createProducer(control);//创建生产者
MessageConsumer consumer = session.createConsumer(topic);//创建消费者
consumer.setMessageListener(this);
connection.start();
producer = session.createProducer(control);//创建生产者
1.6 接收处理消息:
在 onMessage()方法中,对收到的消息进行处理,可直接简单在本地显示消
息,或者根据消息内容不同处理对应的业务逻辑(比如:数据库更新、文件操作
等等),并且可以使用 producer对象将处理结果返回给发布者。
Java代码
1.//可以先检查消息类型
2.ate static boolean checkText(Message m, String s) {
3. try {
4. return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);
5. } catch (JMSException e) {
6. e.printStackTrace(System.out);
7. return false;
8. }
9.}
//可以先检查消息类型
private static boolean checkText(Message m, String s) {
try {
return m instanceof TextMessage && ((TextMessage)m).getText().equals(s);
} catch (JMSException e) {
e.printStackTrace(System.out);
return false;
}
}
Java代码
1.//然后
2. if (checkText(message, "SHUTDOWN")) {
3.
4. //关机
5.
6. } else if (checkText(message, "REPORT")) {
7. // 打印
8.
9.
10. } else {
11. //别的操作
12.
13. }
//然后
if (checkText(message, "SHUTDOWN")) {
//关机
} else if (checkText(message, "REPORT")) {
// 打印
} else {
//别的操作
}
2、发布者(Publisher)开发流程(TopicPublisher.java):
2.1 实现 MessageListener 接口:
在 onMessage()方法中接收订阅者的反馈消息。
Java代码
1.public void onMessage(Message message) {
2. synchronized (mutex) {
3. System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
4. if (remaining == 0) {
5. mutex.notify();
6. }
7. }
8.}
public void onMessage(Message message) {
synchronized (mutex) {
System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
if (remaining == 0) {
mutex.notify();
}
}
}
2.2 创建 Connection:
根据 url 创建一个 jms Connection。
Java代码
1.ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
2.connection = factory.createConnection();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
2.3 创建 Session:
在 connection的基础上创建一个 session,同时设置是否支持事务和
ACKNOWLEDGE 标识。
Java代码
1.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
2.4 创建 Topic:
创建 2 个Topic,topictest.messages用于向订阅者发布消息,topictest.control用
于接收订阅者反馈的消息。这2个topic与订阅者开发流程中的topic是一一对应
的。
Java代码
1.topic = session.createTopic("topictest.messages");
2.control = session.createTopic("topictest.control");
topic = session.createTopic("topictest.messages");
control = session.createTopic("topictest.control");
2.5 创建 consumer 和 producer 对象:
根据topictest.messages创建publisher;
根据topictest.control创建consumer,同时监听订阅者反馈的消息。
Java代码
1.publisher = session.createProducer(topic);
2.publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化模式
3.
4.session.createConsumer(control).setMessageListener(this);//加入监听
5.connection.start();
publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//非持久化模式
session.createConsumer(control).setMessageListener(this);//加入监听
connection.start();
2.6 给所有订阅者发送消息,并接收反馈消息:
示例代码中,一共重复 10 轮操作。
Java代码
1.for (int i = 0; i < batch; i++) {
2. if (i > 0) {
3. Thread.sleep(delay * 1000);
4. }
5. times[i] = batch(messages);
6. System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");
7.}
for (int i = 0; i < batch; i++) {
if (i > 0) {
Thread.sleep(delay * 1000);
}
times[i] = batch(messages);
System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");
}
每轮先向所有订阅者发送 2000 个消息;
Java代码
1.private long batch(int msgCount) throws Exception {
2. long start = System.currentTimeMillis();
3. remaining = subscribers;
4. publish();
5. waitForCompletion();
6. return System.currentTimeMillis() - start;
7.}
private long batch(int msgCount) throws Exception {
long start = System.currentTimeMillis();
remaining = subscribers;
publish();
waitForCompletion();
return System.currentTimeMillis() - start;
}
Java代码
1.private void publish() throws Exception {
2.
3. // send events
4. BytesMessage msg = session.createBytesMessage();
5. msg.writeBytes(payload);
6. for (int i = 0; i < messages; i++) {
7. publisher.send(msg);
8. if ((i + 1) % 1000 == 0) {
9. System.out.println("Sent " + (i + 1) + " messages");
10. }
11. }
12.
13. // request report
14. publisher.send(session.createTextMessage("REPORT"));
15.}
private void publish() throws Exception {
// send events
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(payload);
for (int i = 0; i < messages; i++) {
publisher.send(msg);
if ((i + 1) % 1000 == 0) {
System.out.println("Sent " + (i + 1) + " messages");
}
}
// request report
publisher.send(session.createTextMessage("REPORT"));
}
然后堵塞线程,开始等待;
Java代码
1.private void waitForCompletion() throws Exception {
2. System.out.println("Waiting for completion...");
3. synchronized (mutex) {
4. while (remaining > 0) {
5. mutex.wait();//赌赛线程
6. }
7. }
8.}
private void waitForCompletion() throws Exception {
System.out.println("Waiting for completion...");
synchronized (mutex) {
while (remaining > 0) {
mutex.wait();//赌赛线程
}
}
}
最后通过 onMessage()方法,接收到订阅者反馈的“REPORT”类信息后,才
print 反馈信息并解除线程堵塞,进入下一轮。
Java代码
1.public void onMessage(Message message) {
2. synchronized (mutex) {
3. System.out.println("Received report " + getReport(message) + " " + --remaining + " remaining");
4. if (remaining == 0) {
5. mutex.notify();//唤醒线程
6. }
7. }
8.}
分享到:
相关推荐
### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...
### Apache ActiveMQ与JMS整合Tomcat:深入解析与实践 #### 一、Apache ActiveMQ:强大而灵活的开源消息中间件 Apache ActiveMQ作为一款成熟的开源消息中间件,不仅遵循了JMS 1.1规范,还兼容J2EE 1.4以上的标准,...
- `activemq-consumer.xml`通常包含消息消费者的配置,例如定义MessageListenerContainer,它监听特定的Queue或Topic,并通过实现MessageListener接口处理接收到的消息。 - `activemq-produce.xml`则包含了消息...
在本文中,我们将深入探讨如何将SpringBoot与Apache ActiveMQ集成,特别关注Queue和Topic两种消息传递模式。首先,我们来了解一下ActiveMQ的基本概念和功能。 Apache ActiveMQ是一款开源的消息中间件,由Apache软件...
需要安装NuGet包`Apache.NMS`和`Apache.NMS.ActiveMQ`,然后按照API文档编写代码连接到ActiveMQ服务器。 7. **消息模型**:了解JMS消息模型,包括点对点(Queue)和发布/订阅(Topic)模式,以及持久化消息、事务和...
Apache ActiveMQ 是一款高度可扩展且稳定的企业级消息中间件,它遵循JMS(Java Message Service)1.1规范,提供了跨多种语言和系统的集成能力。ActiveMQ 的核心设计目标是促进不同应用间的异步消息通信,从而提高...
**ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统...
同时Apache ActiveMq是速度快,支持多种跨语言客户端和协议,同时配有易于使用的企业集成模式和优秀的特性,并且支持JMS1.1和J2EE1.4。具体特性见官网:http://activemq.apache.org/ 2. 什么是JMS JMS的全称是Java ...
3. **ActiveMQ核心组件**:介绍Broker、Topic、Queue、Destination等核心组件,以及它们在消息传递中的角色和使用场景。 4. **API使用**:通过Java API演示如何创建生产者和消费者,发送和接收消息,以及设置消息...
深入了解topic队列与Queue队列比较点到点(X英尺) PublisherSubscriber(发布订阅者)消息模式开发流程 详细讲解企业项目中ActiveMQ使用经验 ActiveMQ与Tomcat整合 分布式ActiveMQ集群开发详解 ActiveMQ集群配置方法 在...
在你提到的"apache-activemq-5.15.2"版本中,包含了"webapps-demo"这个目录,这是一个非常有价值的资源,它为开发者提供了丰富的示例和教程,帮助他们更好地理解和使用ActiveMQ。 1. **ActiveMQ基本概念**: - **...
JMS提供了两种消息模型:点对点(Queue)和发布/订阅(Topic)。 点对点模型中,消息由一个生产者发送到一个队列,然后由一个消费者接收。这种模式确保每个消息仅被一个消费者消费一次。而在发布/订阅模型中,多个...
**Apache ActiveMQ 详解** Apache ActiveMQ 是一款开源的消息中间件,它遵循Java Message Service (JMS) 规范,提供了高效、可靠的异步消息传递功能。ActiveMQ 的设计目标是提供灵活、高性能且易用的消息传递解决...
**Spring与ActiveMQ整合详解** 在Java开发中,Spring框架是极为重要的应用基础,而ActiveMQ作为Apache出品的一款开源消息中间件,常被用于实现应用间的异步通信和解耦。本实例代码工程"Spring+ActiveMQ整合实例代码...
本文档详细介绍了在Apache ActiveMQ 5.15.3版本中如何进行消息过期时间的设置,以及如何配置自动清除机制,特别是针对死信队列的处理方式。 #### 1. 消息过期设置 ##### 参数详解 - **Message 过期则客户端不能...
5. **主题与队列**:ActiveMQ 支持发布/订阅模式的主题(Topic)和点对点模式的队列(Queue),满足不同的消息传递需求。 6. **事务支持**:ActiveMQ 提供事务处理能力,确保消息的原子性,保证消息的正确发送和...
**ActiveMQ实例Demo详解** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。ActiveMQ以其高性能、高可靠性以及丰富的特性,成为许多企业级应用首选的...
**ActiveMQ实例详解** ActiveMQ,作为Apache软件基金会的一个开源项目,是业界广泛使用的消息中间件,它遵循Java Message Service(JMS)标准,提供高效、可靠的异步通信能力。在分布式系统中,ActiveMQ扮演着至关...
ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能强大的消息中间件,用于在分布式系统中实现异步通信和解耦。ActiveMQ支持多种协议,如OpenWire、STOMP、AMQP、MQTT等,使其能广泛应用于Java、.NET、Python...
《ActiveMQ 5.7源码API详解》 Apache ActiveMQ是开源的、高性能的、功能丰富的消息中间件,它遵循JMS(Java Message Service)规范,为分布式系统提供了可靠的消息传递服务。ActiveMQ 5.7版本是其重要的一个里程碑...