Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表():
Topic Queue
概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point 点对点
有无状态 topic数据默认不落地,是无状态的。 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。
消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。
以下是符合Jms1.1规范的使用Queue传输消息的代码,使用Amq作为Jms的实现,想要更clean的代码,可以考虑将Amq的实现DI:
view plaincopy to clipboardprint?
public void testMySend() throws JMSException {
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = null;
Queue queue = null;
MessageProducer producer = null;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue=session.createQueue("myTest");
producer= session.createProducer(queue);
Message msg=session.createTextMessage("hello");
producer.send(msg);
producer.close();
session.close();
connection.close();
}
public void testMyReceive() throws JMSException{
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myTest");
MessageConsumer consumer= session.createConsumer(queue);
consumer.setMessageListener(new MyListener());
connection.start();
consumer.close();
session.close();
connection.close();
}
消费端需要设定一个MessageListener:
private class MyListener implements MessageListener{
public void onMessage(Message message) {
System.out.println("msg start!----------------");
try {
System.out.println(""+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("msg end!----------------");
}
}
public void testMySend() throws JMSException {
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = null;
Queue queue = null;
MessageProducer producer = null;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue=session.createQueue("myTest");
producer= session.createProducer(queue);
Message msg=session.createTextMessage("hello");
producer.send(msg);
producer.close();
session.close();
connection.close();
}
public void testMyReceive() throws JMSException{
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myTest");
MessageConsumer consumer= session.createConsumer(queue);
consumer.setMessageListener(new MyListener());
connection.start();
consumer.close();
session.close();
connection.close();
}
消费端需要设定一个MessageListener:
private class MyListener implements MessageListener{
public void onMessage(Message message) {
System.out.println("msg start!----------------");
try {
System.out.println(""+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("msg end!----------------");
}
}
view plaincopy to clipboardprint?
以下是符合Jms1.1规范的使用Topic 发送消息的代码:
public void testMyTopicPublisher() throws JMSException {
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = null;
Topic topic = null;
MessageProducer producer = null;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic=session.createTopic("myTopicTest");
producer= session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message msg=session.createTextMessage("hello and whatever u say");
producer.send(msg);
producer.close();
session.close();
connection.close();
}
以下是符合Jms1.1规范的使用Topic 接收消息的代码:
public static void testMyTopicConsumer() throws JMSException, InterruptedException{
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopicTest");
MessageConsumer consumer= session.createConsumer(topic);
consumer.setMessageListener(new MyListener());
connection.start();
}
以下是符合Jms1.1规范的使用Topic 发送消息的代码:
public void testMyTopicPublisher() throws JMSException {
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = null;
Topic topic = null;
MessageProducer producer = null;
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic=session.createTopic("myTopicTest");
producer= session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message msg=session.createTextMessage("hello and whatever u say");
producer.send(msg);
producer.close();
session.close();
connection.close();
}
以下是符合Jms1.1规范的使用Topic 接收消息的代码:
public static void testMyTopicConsumer() throws JMSException, InterruptedException{
Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopicTest");
MessageConsumer consumer= session.createConsumer(topic);
consumer.setMessageListener(new MyListener());
connection.start();
}
jms在创建Session时可以有两个参数,第一个参数是是否使用事务,第二个参数是消费者向发送者确认消息已经接收的方式:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
确认消息的方式有如下三种:
AUTO_ACKNOWLEDGE(自动通知)
CLIENT_ACKNOWLEDGE(客户端自行决定通知时机)
DUPS_OK_ACKNOWLEDGE(延时//批量通知)
如果使用的是 客户端自行决定通知时机 方式,那么需要在MessageListener里显式调用message.acknowledge()来通知服务器。服务器接收到通知后采取相应的操作。
分享到:
相关推荐
JMS主要提供了两种消息模式:主题(Topic)和队列(Queue),这两种模式在实现方式和功能上有所不同。 1. 主题(Topic): - **发布/订阅模型**:主题基于发布/订阅模型,其中多个生产者可以发布消息到一个特定的...
Queue 和 Topic 是 JMS(Java Message Service)中两种基本的消息模式,分别对应 Point-to-Point 和 Publish/Subscribe 模式。 Queue 模式 在 Queue 模式中,一条消息仅能被一个消费者(Consumer)接收。如果在...
综上所述,Queue与Topic的主要区别在于消息的传递方式和处理方式。选择哪种消息传递模型取决于具体的业务需求和应用场景。例如,如果需要实现消息的一对多广播功能,则更适合采用基于Topic的发布/订阅模型;如果需要...
4. **创建目的地**:包括Queue和Topic。为每个目的地指定名称、类型和其他属性。 5. **配置目的地工厂**:如QueueDestinationFactory和TopicDestinationFactory,它们用于创建实际的队列和主题对象。 ### 三、编写...
JMS支持两种类型的消息模型:点对点(Point-to-Point, Queue)和发布/订阅(Publish/Subscribe, Topic)。 在本案例中,我们将深入探讨如何在JBoss ESB中利用JMS Topic实现消息通信。 #### 二、JMS Topic应用场景 ...
#### 四、Queue 与 Topic 的比较 1. **Queue 执行 Load Balancer 语义**: - 每条消息仅能被一个消费者接收。若消息发送时没有可用的消费者,则消息会被保存直到有可用的消费者。如果一个消费者接收到消息但不响应...
javax.jms.Topic.class javax.jms.MapMessage.class javax.jms.ObjectMessage.class javax.jms.StreamMessage.class javax.jms.TextMessage.class javax.jms.MessageListener.class javax.jms.MessageProducer.class...
标题"spring boot jsm ibmmq topic queue"涉及到的是如何在Spring Boot项目中使用JMS与IBM MQ进行交互,包括发布/订阅模型(Topic)和点对点模型(Queue)两种方式。下面将详细解释这两个概念以及如何在Spring Boot...
一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...
2. **JmsSetting.java**: 此文件可能包含JMS相关的配置信息,如连接工厂、目的地(topic或queue)的定义,以及身份验证和安全设置。它通常用于初始化JMS连接,并为不同的组件提供配置参数。 3. **IAlarmListener....
在ActiveMQ中,有两种主要的消息传递模式:Queue和Topic。这两种模式都是基于JMS(Java Message Service)标准,用于在分布式环境中实现异步通信。理解它们的区别和应用场景至关重要。 **Queue(队列)模式** ...
相比于JMS Queue,Topic更适合广播类型的信息传播,因为所有订阅者都会收到每条消息的副本。 下面,我们详细解析EJB到JMS Topic的实现步骤: 1. **创建消息生产者**: 首先,我们需要导入相关的JMS和Java标准库。...
通过理解Queue和Topic的特性,结合Spring的JMS支持,我们可以构建出高效、可靠的分布式系统。在实际项目中,可以根据需求选择合适的通信模式,并利用持久订阅来保证消息的可靠性。在学习过程中,查阅官方文档和相关...
以上就是Spring集成ActiveMQ的基本流程,涉及Queue和Topic的创建、持久化到MySQL以及消息的发送与接收。这个测试Demo涵盖了消息中间件的核心功能,可以帮助你理解并实践消息传递系统在实际项目中的应用。
Java消息服务(Java Message Service,简称JMS)是一个标准,定义了应用程序如何创建、发送、接收和读取消息。JMS允许不同的应用之间进行异步通信,增强了系统的可扩展性和可靠性。 **2. JMS核心概念** - **消息...
`javax.jms.jar` 文件中包含了如`javax.jms.Queue`, `javax.jms.Topic`, `javax.jms.MessageProducer`, `javax.jms.MessageConsumer`, `javax.jms.ConnectionFactory`等关键接口,以及其他辅助类和异常类,开发者...
首先,我们要理解JMS中的两种主要概念:Queue(队列)和Topic(主题)。Queue遵循点对点模型,每个消息只能被一个消费者接收,而Topic遵循发布/订阅模型,允许多个订阅者同时接收到同一消息。在广播订阅场景中,...
- 主要分为Queue和Topic两种类型,这里关注Topic配置。 - 在`Services -> Messaging -> JMS Modules`下,选择`SystemModule`,新建一个Topic。 - 命名为`JMS File Topic`,设置JNDI名称为`jms/fileTopic`。 - ...
创建QUEUE和TOPIC的基本步骤如下: 1. 引入依赖:首先,确保在项目中引入了ActiveMQ的客户端库。 2. 创建ConnectionFactory:这是连接到ActiveMQ服务器的工厂类,通过它来创建Connection。 3. 建立Connection:使用...
在消息传递模型上,ActiveMQ对于Queue和Topic的实现也十分精巧。Queue的实现基于FIFO(先进先出)原则,保证了消息的顺序传递;而Topic的实现则采用了发布/订阅模型,多个消费者可以同时订阅同一个Topic,消息会被...