`
- 浏览:
234368 次
- 性别:
- 来自:
深圳
-
1.TopicRabbitConfig
package com.soft.rabbit.server.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
// 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
// 这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
// 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
2.rabbitconfig
package com.soft.rabbit.server.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback: " + "确认情况:" + ack);
System.out.println("ConfirmCallback: " + "原因:" + cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,String routingKey) {
System.out.println("ReturnCallback: " + "消息:" + message);
System.out.println("ReturnCallback: " + "回应码:" + replyCode);
System.out.println("ReturnCallback: " + "回应信息:" + replyText);
System.out.println("ReturnCallback: " + "交换机:" + exchange);
System.out.println("ReturnCallback: " + "路由键:" + routingKey);
}
});
return rabbitTemplate;
}
}
3.TopicService
package com.soft.rabbit.server.service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicService {
@Autowired
private AmqpTemplate rabbitTemplate;
public String sendTopicMessage1() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: MAN";
String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
Map<String, Object> manMap = new HashMap<String, Object>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}
public String sendTopicMessage2() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: woman is all ";
String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
Map<String, Object> womanMap = new HashMap<String, Object>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}
}
4.消费者TopicReceiver
package com.soft.rabbit.client.service;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.man")
public class TopicReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString());
}
}
5.消费者TopicTotalReceiver
package com.soft.rabbit.client.service;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicTotalReceiver消费者收到消息 : " + testMessage.toString());
}
}
6.生产者bootstrap.yml
#http端口配置
server:
port: 6001
connection-timeout: 5000
tomcat:
max-http-post-size: -1
max-threads: 1000
max-connections: 1000
spring:
application:
name: soft-rabbit-server
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#消息确认配置项
#确认消息已发送到交换机(Exchange)
publisher-confirms: true
#publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
# eureka注册中心配置
eureka:
client:
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:7001/eureka/
instance:
instance-id: ${spring.cloud.client.ip-address}:${server.port}
prefer-ip-address: true
hostname: ${spring.cloud.client.ip-address}
分享到:
Global site tag (gtag.js) - Google Analytics
相关推荐
在发送消息之前,需要声明一个交换机,指定其类型(如direct、fanout、topic或headers),这决定了消息的路由方式。 4. **声明Queue**: 创建一个队列,可以设置队列是否持久化,以及其他属性。队列是无名的,...
在 Spring Cloud Stream RabbitMQ 中,我们使用广播模式(topic)作为交换机模式,每一个服务仅允许拥有一个交换机。交换机是消息队列的核心组件,负责将消息路由到相应的队列中。 通道(Channel) Spring Cloud ...
3. **Publish/Subscribe 模式**:消息通过交换机广播到多个队列,适合群发邮件或广播消息。 4. **Routing 模式**:根据路由键将消息发送到特定队列,适用于定向消息传递。 5. **Topic 模式**:基于通配符的路由,更...
2. **消息模型**:RabbitMQ支持多种消息模型,如Direct、Fanout、Topic和Header,每种模型都有其特定的路由策略,满足不同场景的需求。 3. **虚拟主机(Vhosts)**:RabbitMQ使用虚拟主机来隔离不同的应用程序或...
常见的交换机类型有Direct、Fanout、Topic和Header等。 3. **消息队列(Queue)**:消息队列是存储消息的实际位置。多个消费者可以订阅同一个队列,接收并处理消息。队列遵循FIFO(先进先出)原则,即消息按顺序被...
3. **Topic Exchange**:类似于Direct,但允许使用通配符进行路由键匹配。例如,`weather.*`可以匹配`weather.uk`和`weather.us`。 4. **Header Exchange**:根据消息头中的键值对进行匹配。如果队列绑定时指定了一...
RabbitMQ提供了多种类型的交换机,如Direct、Fanout、Topic和Header,以满足不同场景的需求。 6. 模型与异常处理:使用`model.BasicAck(deliveryTag, false)`方法可以手动确认消息已被正确处理,否则RabbitMQ会重新...
RabbitMQ还提供了多种交换机类型,包括direct exchange、fanout exchange、topic exchange、headers exchange等,可以满足不同的路由需求。 RabbitMQ的主要特点包括: * 可靠性:RabbitMQ提供了多种机制来保证消息...
3. **消息发布和消费**:客户端会实现发送消息到指定的交换机,并设置监听器来接收来自特定队列的消息。 4. **错误处理**:确保在连接丢失或消息处理失败时能够适当地恢复或记录错误。 5. **事件驱动**:Qt的信号和...
文件"rabbitMQ.pdf"可能包含了更详细的RabbitMQ原理和操作指南,包括消息模型(Direct、Fanout、Topic、Header交换机)、队列持久化、消息确认机制等。建议阅读以加深理解。 总的来说,通过结合SSM框架和RabbitMQ,...
我们可以根据需求声明不同的交换机类型,如直接(direct)、主题(topic)或扇出(fanout)等。 ```go queueName := "myQueue" _, err = ch.QueueDeclare( queueName, // name true, // durable false, // auto-...
### 使用JMeter测试RabbitMQ...不仅限于基本的发布/订阅模型,还可以探索更复杂的场景,例如使用Topic交换机进行模式匹配等高级特性。这将帮助开发团队更好地评估系统的扩展性和稳定性,确保生产环境下的表现符合预期。
2. **交换机** - 交换机根据预设的路由规则(如Direct、Fanout、Topic或Header类型)将消息路由到一个或多个队列。 3. **队列** - 消息被存储在队列中,等待消费者来消费。如果多个消费者监听同一个队列,消息会被...
例如,MES(Manufacturing Execution System)向NCC(Network Control Center)发送生产订单完工信息的交换机名称可能为: `EX.mes.finish.planorder` #### 1.2 Routing-Key命名规范 Routing-Key用于匹配消息到...
4. 配置各种类型的交换机(如Direct、Fanout、Topic、Header)。 5. 设置消息持久化,确保在服务器重启后仍能恢复消息。 6. 处理消费确认机制,确保消息被正确处理。 **RabbitMQ压力测试** 通常是为了评估RabbitMQ...
2. Topic模式:Topic模式扩展了Direct模式,允许使用通配符作为路由键。交换机会根据消息的路由键和队列的绑定键进行匹配,如果匹配成功,消息就会被投递。这在一对多或者多对多的场景中非常有用,例如日志收集或...
- **其他MQ**: 如Kafka、ActiveMQ等,每种MQ都有自己的特点和适用场景。RabbitMQ以其高可靠性和易用性著称,在企业级应用中非常受欢迎。 #### 四、RabbitMQ消息类型 - **六种消息类型**: - **Direct**: 消息直接...
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.2/ebin/rabbit.app ``` 找到并修改`{loopback_users, [<<"guest">>]};`为`{loopback_users, []};` **6. 基本操作命令** - **启动服务:** ```bash service ...
在这个例子中,`convertAndSend` 方法用于发送消息,第一个参数是交换机名称,第二个参数是 RouteKey,第三个参数是实际的消息内容。 为了接收消息,可以创建一个消费者类 `Receiver`,并使用 `@RabbitListener` ...