前提:
先创建一个queue, ,名称为start(这个名称与压缩包中的队列名称相同×)
Sender:
package com.service;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import com.config.ConfigManager;
public class Sender {
private ConnectionFactory connectionFactory;
private Connection connection;
public Sender() {
connectionFactory = new ActiveMQConnectionFactory(
ConfigManager.getUserName(), ConfigManager.getUserPassword(),
ConfigManager.getConnectionFactory());
}
@Test
public void send() {
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(ConfigManager.getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("java start " + i);
producer.send(textMessage);
}
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
}
}
}
}
Receiver:
package com.service;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import com.config.ConfigManager;
public class Receiver {
private ConnectionFactory connectionFactory;
private Connection connection;
public Receiver() {
connectionFactory = new ActiveMQConnectionFactory(
ConfigManager.getUserName(), ConfigManager.getUserPassword(),
ConfigManager.getConnectionFactory());
}
@Test
public void read() {
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(ConfigManager
.getQueueName());
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (null != message) {
System.out.println(message.getText());
} else {
break;
}
}
session.commit();
connection.stop();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
}
}
}
}
}
分享到:
相关推荐
教程一:初识ActiveMQ 在这一部分,我们将了解ActiveMQ的基本概念,包括消息队列、生产者、消费者以及它们在系统中的角色。ActiveMQ支持多种协议,如OpenWire、STOMP、AMQP和XMPP,这使得它能与多种语言和平台无缝...
ZMQ专题学习之一:初识ZeroMQ ZeroMQ号称是“史上最快的消息队列”,基于c语言开发的。引用官方说明定义:“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程...
初识分布式架构与意义 如何把应用从单机扩展到分布式 大型分布式架构演进过程 分布式架构设计 主流架构模型-SOA架构和微服务架构 领域驱动设计及业务驱动规划 分布式架构的基本理论CAP、BASE以及其应用 什么...
- ** RocketMq之初识消息中间件**:这部分内容可能涵盖了消息中间件的基本概念、工作原理,以及RocketMQ在消息中间件领域中的独特优势。 - **31-Rocketmq特性详解**:深入理解RocketMQ的各项特性,包括高可用、高...