启动:\apache-activemq-5.6.0\bin\win32\activemq.bat
工程中引入 \apache-activemq-5.6.0\lib下的jar
Listener 消息消费者:
public class TopicListener implements MessageListener {
..............
main()方法{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("topictest.messages"); //创建一个topic
MessageConsumer consumer = session.createConsumer(topic); //创建topic的消费者
consumer.setMessageListener(this);
connection.start();
producer = session.createProducer(control);
System.out.println("Waiting for messages...");
}
//消息处理方法
@Override
public void onMessage(Message message) {
if (checkText(message, "SHUTDOWN")) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace(System.out);
}
} else if (checkText(message, "REPORT")) {
// send a report:
try {
long time = System.currentTimeMillis() - start;
String msg = "Received " + count + " in " + time + "ms";
producer.send(session.createTextMessage(msg));
} catch (Exception e) {
e.printStackTrace(System.out);
}
count = 0;
} else {
if (count == 0) {
start = System.currentTimeMillis();
}
if (++count % 1000 == 0) {
System.out.println("Received " + count + " messages.");
}
}
}
...................
}
消息生存者:
public class TopicPublisher{
main(){
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic("topictest.messages");
publisher = session.createProducer(topic);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
payload = new byte[size];
for (int i = 0; i < size; i++) {
payload[i] = (byte)DATA[i % DATA.length];
}
session.createConsumer(control).setMessageListener(this);
connection.start();
long[] times = new long[batch];
for (int i = 0; i < batch; i++) {
if (i > 0) {
Thread.sleep(delay * 1000);
}
times[i] = batch(messages);
System.out.println("Batch " + (i + 1) + " of " + batch + " completed in " + times[i] + " ms.");
}
long min = min(times);
long max = max(times);
System.out.println("min: " + min + ", max: " + max + " avg: " + avg(times, min, max));
// request shutdown
publisher.send(session.createTextMessage("SHUTDOWN"));
connection.stop();
connection.close();
}
private long batch(int msgCount) throws Exception {
long start = System.currentTimeMillis();
remaining = subscribers;
publish();
waitForCompletion();
return System.currentTimeMillis() - start;
}
private void publish() throws Exception {
// send events
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(payload);
for (int i = 0; i < messages; i++) {
publisher.send(msg);
if ((i + 1) % 1000 == 0) {
System.out.println("Sent " + (i + 1) + " messages");
}
}
// request report
publisher.send(session.createTextMessage("REPORT"));
}
}
管理页面地址: http://127.0.0.1:8161/admin/topics.jsp
测试结果监控:
===================以下为spring中的用法:
配置:
beans中引入activemq的schema:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">
创建queue/topic:(服务器启动时)
<amq:queue name="ORDER_TOPIC" physicalName="ActiveMQ.ORDER.${jms_node}" />
配置服务器连接工厂,参数是activemq服务器地址:
jms_server=tcp://192.168.0.238:61616?wireFormat.maxInactivityDuration=0
<!-- ActiveMQ connectionFactory -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="${jms_server}" />
创建 发送消息的bean:
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
</property>
</bean>
<bean id="orderMessageProducer" class="com.xxx.comm.jms.TopicMessageProducer">
<property name="template" ref="myJmsTemplate" />
<property name="destination" value="ActiveMQ.ORDER" />
</bean>
再看看TopicMessageProducer的实现(pojo而已):
public class TopicMessageProducer {
private static final Log log = LogFactory.getLog(TopicMessageProducer.class);
private JmsTemplate template;
private String destination;
public void sendMsg(Message msg){
log.info(msg);
String[] queues = MessageProtector.getInstance().getQueues(destination);
for (String string : queues) {
template.convertAndSend(string, msg);
}
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
public void setDestination(String destination) {
this.destination = destination;
}
}
其中Message 是自定义的一个pojo,但须实现 Serializable:
public class Message implements Serializable{
private int a;
......
}
发送消息:
getBean("orderMessageProducer").sendMsg(new Message());
相关推荐
### ActiveMQ实践入门指南 #### 重要性与背景 **ActiveMQ**,作为Apache基金会旗下的明星项目之一,凭借其强大的功能和对JMS1.1及J2EE1.4规范的全面支持,在IT业界占据了举足轻重的地位。自2003年起,由Apache ...
**ActiveMQ实践入门指南** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。ActiveMQ以其高性能、高可靠性和易于管理的特点,在企业级应用中备受青睐。...
本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式。 首先,我们需要了解ActiveMQ的基本概念。在消息队列中,生产者是发送消息的实体,而消费者则是接收和处理这些消息的实体。...
【ActiveMQ使用入门】 ActiveMQ是一款基于Java的消息中间件,它是Apache基金会的开源项目,也是最早的JMS(Java消息服务)实现之一。JMS是一种标准,定义了在Java环境中访问消息中间件的接口,但并未具体实现。...
Apache ActiveMQ是业界广泛...通过这个入门教程,你应该能够了解如何安装、配置和使用ActiveMQ,以及其与JMS的关联。继续深入学习,你将掌握更多关于消息中间件的高级特性和最佳实践,为你的项目带来更高效的解决方案。
根据提供的文件信息:“activeMQ入门到精通”,我们可以深入探讨ActiveMQ的相关知识点,包括其基本概念、安装配置步骤、核心功能特性以及应用场景等。 ### ActiveMQ简介 ActiveMQ是一款开源的消息中间件,它支持...
### ActiveMQ入门详解 #### 一、ActiveMQ简介与重要性 **ActiveMQ**作为Apache组织下的一个开源项目,是一款非常成熟且功能强大的消息中间件。消息中间件是指在分布式系统中用来完成消息发送和接收的基础软件。它...
ActiveMQ是中国最流行的开源消息中间件之一,它基于Java Message Service (JMS) 规范,为分布式系统提供高效、可靠的消息传递服务。本教程将引导你从基础到深入理解如何使用ActiveMQ,并通过实际的例子进行操作。 ...
**JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...
**ActiveMQ入门详解** ActiveMQ是Apache组织开发的一款开源的消息中间件,它是Java Message Service (JMS) 的实现,主要用于处理应用间的异步通信。在分布式系统中,ActiveMQ作为一个消息代理,允许应用程序通过...
本视频包含JMS相关知识,activeMQ入门,集群等,觉得视频可以的小伙伴给个赞,谢谢了
**ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统...
在本文中,我们将深入探讨ActiveMQ的配置及其入门知识。 一、ActiveMQ简介 ActiveMQ是Apache软件基金会的顶级项目,它的核心功能是作为消息代理,负责接收、存储和转发消息。它支持多种协议,如OpenWire、AMQP、...
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。?特点:?1、支持多种语言...
在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单的Master环境。 首先,我们要了解消息队列(Message Queue)的基本概念。消息队列是一种异步通信机制,它允许应用程序之间...
SpringActiveMQ入门示例是关于如何在Java环境中利用Spring框架与Apache ActiveMQ集成的一个实践教程。这个示例主要适用于开发者想要了解如何在Spring应用中使用消息队列进行异步通信和解耦。在这个项目中,开发环境...
在“activemq入门实例”中,你将学习到以下几个关键知识点: 1. **ActiveMQ的基本概念**:ActiveMQ作为JMS提供商,它提供一个服务器端(broker)来存储和转发消息,客户端则通过连接到这个服务器来发送和接收消息。...
Apache ActiveMQ是一个开源的消息代理,遵循Java消息服务(JMS)规范,用于在不同的应用和平台之间进行可靠的消息传递。ActiveMQ能够支持多种传输协议,并能够处理大量消息的存储和转发。 从文件【部分内容】中,...