关键字:
producertool/customertool ActiveMQ的一个简单示例
关键字:
activemq
最近由于公司项目需要,开始学习JMS,用的是ActiveMQ。由于这方面网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,写了一个简单的小例子,现在贴出来与大家分享。
ProducerTool.java用于发送消息:
package homework;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 发送消息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
ConsumerTool.java用于接受消息,我用的是基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。
package homework;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool implements MessageListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL.DEFAULT";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();
System.out.println("Consumer:->Begin listening...");
// 开始监听
consumer.setMessageListener(this);
// Message message = consumer.receive();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如果想主动的去接受消息,而不用消息监听的话,把consumer.setMessageListener(this)改为Message message = consumer.receive(),手动去调用MessageConsumer的receive方法即可。
下面是测试类Test.java:
package homework;
import javax.jms.JMSException;
public class Test {
/**
* @param args
*/
public static void main(String[] args) throws JMSException, Exception {
// TODO Auto-generated method stub
ConsumerTool consumer = new ConsumerTool();
ProducerTool producer = new ProducerTool();
// 开始监听
consumer.consumeMessage();
// 延时500毫秒之后发送消息
Thread.sleep(500);
producer.produceMessage("Hello, world!");
producer.close();
// 延时500毫秒之后停止接受消息
Thread.sleep(500);
consumer.close();
}
}
以上就是我学习ActiveMQ之后写的一个简单的例子,希望对你有帮助,如果有什么错误还请指正。
分享到:
相关推荐
创建一个连接: ```cpp std::auto_ptr<ActiveMQ::CMSConnectionFactory> factory( new ActiveMQ::CMSConnectionFactory("tcp://localhost:61616")); ``` 创建会话和消费者: ```cpp std::auto_ptr...
activemq-web-console的默认使用方式是通过在activemq.xml中导入jetty.xml配置一个jetty server来实现的。其实activemq-web-console完全可以和activemq-broker分开来部署。 activemq-web-console包含3个apps, 1.一...
activemqBroker插件:activemqBroker-2.14-SNAPSHOT.war
在提供的压缩包中,除了核心的`activemq-cpp-library-3.9.5`之外,还包含了`apr_lib`,这是一个Apache Portable Runtime (APR) 库的子集,它为跨平台的系统编程提供了一套底层的服务,如文件I/O、内存管理、线程管理...
赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...
activemq-parent-5.10.0-source-release.zip activemq-parent-5.10.2-source-release.zip activemq-parent-5.12.3-source-release.zip apache-activemq-5.10.2-bin.tar.gz apache-activemq-5.11.4-bin.zip apache-...
Apache ActiveMQ是世界上最流行的开源消息代理和队列...总之,Apache ActiveMQ是一个强大的消息中间件,适用于构建分布式系统和微服务架构,提供可靠的异步通信机制,确保数据在复杂的网络环境中安全、高效地传递。
Apache ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它基于Java Message Service(JMS)规范,用于在分布式系统中高效地传输数据。这个压缩包"apache-activemq-5.8.0-bin.zip"包含了ActiveMQ 5.8.0版本的...
这个压缩包“apache-activemq-5.15.3-bin.tar.gz”包含了Apache ActiveMQ 5.15.3版本的源代码和可执行文件,适合在Linux环境下部署和使用。 **1. Apache ActiveMQ简介** Apache ActiveMQ是Apache软件基金会的一个...
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
在安装过程中,解压"apache-activemq-5.15.8-bin.zip"后,你会得到一个包含bin目录的结构,其中包含了启动和停止ActiveMQ服务所需的脚本。在Windows上,你可以使用"bin\win32\activemq.bat",而在Linux或Mac OS上,...
在提供的压缩包"activemq-web-4.0-M3.jar.zip"中,有两个主要文件:"activemq-web-4.0-M3.jar"和"license.txt"。"activemq-web-4.0-M3.jar"是核心的Java档案文件,包含了运行ActiveMQ Web UI所需的所有类和资源。这...
activemq-all-5.12.0-sources.jar
activemq-all-5.2.0-sources.jar
Apache ActiveMQ是Apache软件基金会的一个开源项目,是一个基于消息的通信中间件。ActiveMQ是JMS的一个具体实现,支持JMS的两种消息模型。ActiveMQ使用AMQP协议集成多平台应用,使用STOMP协议通过websockets在Web...
activemq-jms-pool-5.14.4.jar
标签:activemq-kahadb-store-5.9.1.jar,activemq,kahadb,store,5.9.1,jar包下载,依赖包
tar -zxvf apache-activemq-5.15.12-bin.tar.gz 2.进入bin目录 cd /apache-activemq-5.15.12/bin 3.运行,没有配置环境变量只能在bin目录下使用命令 ./activemq 4.配置环境变量,配置完环境变量之后...