1.首先启动JMS(ACTIVEMQ)服务器
2.编写代码如下:
package com.active;
import java.util.Arrays;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
public class ProducerTool {
private Destination destination;
private int messageCount = 10;
private long sleepTime;
private boolean verbose = true;
private int messageSize = 255;
private long timeToLive;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "topic1";
private boolean topic =true;
private boolean transacted;
private boolean persistent =true;//表示是否要持久化消息。
/*
ProducerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[messagecount] 发送消息数量,默认是10
[messagesize] 消息长度,默认是255
[clientID] durable为true的时候,需要配置clientID
[timeToLive] 消息存活时间
[sleepTime] 发送消息中间的休眠时间
[transacte] 是否采用事务
ConsumerTool [url] broker的地址,默认的是tcp://localhost:61616
[true|flase] 是否使用topic,默认是false
[subject] subject的名字,默认是TOOL.DEFAULT
[durabl] 是否持久化消息,默认是false
[maxiumMessages] 接受最大消息数量,0表示不限制
[clientID] durable为true的时候,需要配置clientID
[transacte] 是否采用事务
[sleepTime] 接受消息中间的休眠时间,默认是0,onMeesage方法不休眠
[receiveTimeOut] 接受超时
*/
public static void main(String[] args) {
ProducerTool producerTool = new ProducerTool();
String[] unknown = CommandLineSupport.setOptions(producerTool, args);
if (unknown.length > 0) {
System.out.println("Unknown options: " + Arrays.toString(unknown));
System.exit(-1);
}
producerTool.run();
}
public void run() {
Connection connection = null;
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
System.out.println("Sleeping between publish " + sleepTime + " ms");
if (timeToLive != 0) {
System.out.println("Messages time to live " + timeToLive + " ms");
}
// Create the connection.首先创建一个连接
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
connection.start();
// Create the session.创建一个SESSION
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
//如果是TOPIC的方式则创建TOPIC连接.否则创建QUEUE连接
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//我们如果设置Mode的方式为DeliveryMode.PERSISTENT.(我采用的是MYSQL的持久化方式)那么服务器所发送的消息将都保存到MYSQL数据库中的activemq_msgs表中
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//如果不是采用持久化保存的方式,那么当服务器重新启动的时候,由于客户端不在线时服务器所发的消息将不再保留.因为那些消息是保存在了服务器内存中的
}
if (timeToLive != 0) {
producer.setTimeToLive(timeToLive);
}
// Start sending messages
sendLoop(session, producer);
System.out.println("Done.");
// Use the ActiveMQConnection interface to dump the connection
// stats.
// ActiveMQConnection c = (ActiveMQConnection)connection;
// c.getConnectionStats().dump(new IndentPrinter());
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
注意发送消息完后要释放资源。
} catch (Throwable ignore) {
}
}
}
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
TextMessage message = session.createTextMessage();
message.setText("i love wuweiling");
producer.send(message);
}
public void setPersistent(boolean durable) {
this.persistent = durable;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
}
分享到:
相关推荐
3. **创建消息生产者和消费者**: 使用JMS API或者支持的其他协议创建应用程序,发送和接收消息。 4. **访问Web Console**: 浏览器中输入`http://localhost:8161/admin`访问管理界面,监控和管理队列、主题和连接。 ...
ActiveMQ在企业级应用中扮演着重要的角色,因为它允许应用程序通过异步通信来解耦生产者和消费者,从而提高系统的可扩展性和可靠性。 在"apache-activemq-5.15.6"这个版本中,我们可以探讨以下几个关键知识点: 1....
在实际应用中,`javax.jms-1.1.jar` 提供了JMS API的实现,开发人员可以使用这些API创建消息、连接到消息代理、创建生产者和消费者,以及进行消息的发送和接收。例如,使用`ConnectionFactory`创建连接工厂,然后...
2. **核心组件**:ActiveMQ的核心组件包括Broker(消息代理)、Producer(生产者)、Consumer(消费者)、Topic(主题)和Queue(队列)。Broker负责路由和存储消息,生产者发送消息,消费者接收消息。Topic适用于...
- **生产者与消费者**:使用Java或其他语言编写应用程序,通过JMS API创建消息生产和消费逻辑。 - **监控**:通过Web控制台,可以查看消息队列状态,管理消费者,以及监控系统性能。 - **消息策略**:设置消息...
4. **客户端连接**:开发者可以通过JMS API或其他支持的协议创建生产者和消费者,与ActiveMQ进行交互。 5. **监控管理**:ActiveMQ内置了一个Web控制台,可以在浏览器中访问`http://localhost:8161/admin`进行管理和...
**发送端** 的实现会创建一个JMS连接,然后创建一个生产者对象,用来向队列或主题发送消息。消息可以是文本、对象或者二进制数据。生产者将消息封装成`Message`对象,然后通过`send()`方法发送出去。 **接收端** 则...
5. **创建和配置目的地** - 在Web控制台中,可以创建Queue或Topic,并配置相关的消费者和生产者。 6. **停止服务** - 当完成测试或配置后,使用`bin/activemq stop`命令关闭服务。 **在应用开发中使用ActiveMQ:**...
1. **消息中间件**:ActiveMQ作为一个消息中间件,其主要任务是处理应用程序之间的通信,通过消息传递来解耦生产者和消费者,使得系统更加灵活和可扩展。 2. **JMS支持**:JMS是一个标准接口,定义了如何在分布式...
- **Java API**:使用JMS API直接与ActiveMQ交互,创建生产者和消费者实例。 - **其他语言支持**:ActiveMQ支持多种编程语言,如Python、Ruby、C#等,通过相应的客户端库进行集成。 - **Spring框架集成**:Spring...
JMS提供了一种解耦通信的方式,使得生产者和消费者可以在不同时刻运行,甚至不知道彼此的存在。ActiveMQ作为JMS的实现,支持多种消息模式,包括点对点(Queue)和发布/订阅(Topic)。 点对点模式下,每个消息只有...
这个压缩包文件"apache-activemq-4.1.2"包含了该版本的完整源码和二进制文件,允许用户在本地环境中安装和运行ActiveMQ服务器。 1. **Apache ActiveMQ基本概念**: - **JMS(Java Message Service)**:JMS是Java...
此外,理解JMS规范中的概念,如队列(Queue)、主题(Topic)、生产者(Producer)、消费者(Consumer)以及消息模型(点对点、发布/订阅),对于有效利用ActiveMQ至关重要。 总之,Apache ActiveMQ是企业级消息...
而Topic的实现则采用了发布/订阅模型,多个消费者可以同时订阅同一个Topic,消息会被广播给所有订阅者。 通过深入研究ActiveMQ的javax.jms源码,我们可以更深入地理解JMS的工作原理,了解如何利用ActiveMQ实现高效...
这个“apache-activemq-5.9.1-bin.tar.gz”压缩包包含了在Linux环境下安装和运行ActiveMQ所需的所有文件。 首先,让我们了解一下ActiveMQ的核心概念和功能。ActiveMQ是一个中间件,它作为应用程序之间的通信桥梁,...
6. **JMS接口**:ActiveMQ支持JMS接口,这意味着开发者可以使用Java、Python、C++、.NET等多种语言创建生产者和消费者来发送和接收消息。JMS接口提供了一种标准的方式来处理异步消息,确保数据传输的可靠性和持久性...
通过JMS API,开发者可以创建消息生产者、消费者,并发送和接收消息。 2. **持久化机制**:ActiveMQ支持多种持久化策略,包括本地文件系统存储、LevelDB、JDBC以及KahaDB等。这确保了即使在服务宕机后,消息也不会...
在发布/订阅模式中,生产者(Publisher)发送消息到一个主题(Topic),而多个消费者(Subscriber)可以订阅这个主题,从而接收到这些消息。这种模式适用于需要广播消息或通知所有感兴趣方的情况。 在博客链接中...
开发人员可以利用这些库创建生产者和消费者,实现消息的发布与订阅。 7. **消息模式**:ActiveMQ支持点对点(Queue)、发布/订阅(Topic)等多种消息模式,满足不同场景的需求。 8. **监控与调试**:通过Web控制台...
- **JMS接口**:JMS是Java平台上的标准API,用于应用程序之间的异步通信,提供了消息生产者、消费者和消息队列的概念。 - **Queues与Topics**:在ActiveMQ中,Queues支持一对一的消息传递,而Topics支持一对多的...