与Queue不同的是,Topic实现的是发布/订阅模型,在下面的例子中,启动2个消费者共同监听一个Topic,然后循环给这个Topic中发送多个消息。
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class TopicTest {
/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
// TODO Auto-generated method stub
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER
, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
//JMS 客户端到JMS Provider 的连接
Connection connection = factory.createConnection();
connection.start();
// Session: 一个发送或接收消息的线程
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// Destination :消息的目的地;消息发送给谁.
// 获取session注意参数值my-queue是Query的名字
Topic topic = new ActiveMQTopic("topicTest");
MessageConsumer comsumer1 = session.createConsumer(topic);
comsumer1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("Consumer 1 get " + ((TextMessage)message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
});
MessageConsumer comsumer2 = session.createConsumer(topic);
comsumer2.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("Consumer 2 get " + ((TextMessage)message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
});
MessageProducer producer = session.createProducer(topic);
for (int i=0; i<10; i++) {
producer.send(session.createTextMessage("Message:" + i));
}
}
}
运行后得到下面的输出结果:
Consumer 1 get Message:0
Consumer 2 get Message:0
Consumer 1 get Message:1
Consumer 2 get Message:1
Consumer 1 get Message:2
Consumer 2 get Message:2
Consumer 1 get Message:3
Consumer 2 get Message:3
Consumer 1 get Message:4
Consumer 2 get Message:4
Consumer 1 get Message:5
Consumer 2 get Message:5
Consumer 1 get Message:6
Consumer 2 get Message:6
Consumer 1 get Message:7
Consumer 2 get Message:7
Consumer 1 get Message:8
Consumer 2 get Message:8
Consumer 1 get Message:9
Consumer 2 get Message:9
说明每一个消息都会被所有的消费者消费。
分享到:
相关推荐
在本项目中,我们将深入探讨如何使用Spring Boot与Kafka进行集成,实现一个实战项目,包括Kafka的生产者、消费者以及如何创建Topic,并且特别关注指定消费分区这一高级特性。Kafka是一款高吞吐量的分布式消息系统,...
测评开始时,测评程序会启动10~20个Producer,每个Producer在一条线程中,然后每个Producer随机生产某个Topic或者附属于Queue的消息并发送到消息引擎; Topic: 消息主题。 Queue: 队列。抽象的概念,消息可以发送到...
首先,Kafka的基本知识点包括它的核心组件:生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责将数据发布到特定的主题,消费者则订阅这些主题并处理数据。主题是逻辑上的分类,可以分为多个分区...
RocketMQ 介绍与实战 RocketMQ 是阿里巴巴中间件团队自研的一款高性能、高吞吐量、低延迟、高可用、高可靠(具备金融级稳定性)的分布式消息中间件。RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁...
会话创建成功后,根据消息类型(队列Queue或主题Topic),创建相应的目的地(Destination)。然后创建消息生产者(MessageProducer),并设置其为非持久性模式(DeliveryMode.NON_PERSISTENT),意味着消息将不会被...
1. **Kafka基本概念**:理解Kafka的核心概念,包括生产者、消费者、主题(Topic)、分区(Partition)和副本(Replica)。了解这些元素如何协同工作,确保数据的可靠传输和存储。 2. **Kafka架构**:Kafka集群由多...
11. 自定义Topic同步调用RRPC实战:展示了如何在自定义的Topic上使用远程过程调用(RRPC)进行消息通信。 12. 设备上报二进制数据云端解析:最后,文档详细解读了设备如何将二进制数据上报到云端,并在云端进行解析...
### 图解Kafka之实战指南知识点详述 #### 一、Kafka简介 **Kafka** 起初由LinkedIn采用Scala语言开发,后捐赠给Apache基金会,现已成为一款广泛应用于分布式流处理平台的成熟软件。它凭借高吞吐量、可持久化存储、...
常见的交换机类型有Direct、Fanout、Topic和Header。 3. **队列**: 队列是消息的临时存储区域,消息被消费者消费后会从队列中删除。 4. **消费者**: 消费者是接收并处理来自RabbitMQ消息的应用。它们通过订阅队列...
7. **主题建模(Topic Modeling)**:如Latent Dirichlet Allocation (LDA),用于发现文本隐藏的主题。 8. **依存句法分析(Dependency Parsing)**:理解词语间的句法关系,如主谓、动宾等。 9. **机器翻译...
而在发布/订阅消息传递域中,目的地被称为主题(topic),消息可被多个消费者消费。 消息生产者是会话创建的对象,用于将消息发送到指定的目的地。消息消费者是会话创建的对象,用于接收发送到目的地的消息。消息...
常见的交换器类型有Direct、Fanout、Topic和Header。 - **队列(Queue)**:存储消息的地方,消息只能从交换器路由到队列,不能直接由生产者发送到队列。 - **绑定(Binding)**:连接交换器和队列的关系,定义了...
### ActiveMQ 实战 #### JMS 基本构件概览 **ActiveMQ** 是一个高性能、功能丰富的开源消息中间件,它实现了 **Java Message Service (JMS)** 规范。JMS 规范定义了一组接口,这些接口提供了一个标准的方式来进行...
### Kafka核心原理与实战 #### 一、Kafka概述与特点 Kafka是一款开源的分布式消息系统,由LinkedIn开发并在2011年开源,现在是Apache顶级项目。其主要设计目的是为了提供一种高吞吐量、低延迟的发布订阅模型,适用...
8. **调试和测试**:书中可能会讲解如何使用ros2 topic、ros2 node和ros2 log等工具进行调试,以及如何编写单元测试确保代码质量。 9. **实战应用**:代码实例可能涵盖从简单的传感器数据处理到复杂的导航和路径...
- 创建消息队列 (`Queue`) 或主题 (`Topic`),用于指定消息目的地。 - 发送消息: - 使用 `MessageProducer` 发送消息。 - 设置消息属性,如消息类型、消息体等。 - 接收消息: - 使用 `MessageConsumer` 接收...
### Kafka核心原理与实战 #### 一、Kafka概述与特点 Kafka是一款高性能的分布式消息队列系统,主要用于处理实时数据流。它通过将消息持久化至硬盘,并利用顺序写入的方式,实现了高吞吐量和低延迟的特性。在大数据...
### Kafka核心原理与实战 #### 一、Kafka概述及特点 Kafka是一款开源的分布式消息系统,由LinkedIn开发并在2011年开源,现已成为Apache顶级项目。其主要特点是高吞吐量、低延迟、持久化、容错性和可扩展性。Kafka...
本知识点将深入探讨RocketMQ的基础概念、工作原理、实战应用以及源码解析。 一、RocketMQ基础 1. 概念:RocketMQ是一个基于发布/订阅模式的消息队列,支持事务消息、定时/延时消息、顺序消息等多种特性,广泛应用...
2. **JMS兼容性**:ActiveMQ完全支持JMS 1.1规范,提供了队列(Queue)和主题(Topic)两种消息模型。队列遵循“一对一”通信,而主题支持“一对多”广播。 3. **多种协议支持**:除了JMS,ActiveMQ还支持STOMP、...