`
taiwei.peng
  • 浏览: 234368 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Rabbit Mq Topic 交换机

阅读更多
1.TopicRabbitConfig
package com.soft.rabbit.server.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

public final static String man = "topic.man";
public final static String woman = "topic.woman";

@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}

@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}

// 将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
// 这样只要是消息携带的路由键是topic.man,才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}

// 将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#
// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}

}

2.rabbitconfig
package com.soft.rabbit.server.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback:     " + "确认情况:" + ack);
System.out.println("ConfirmCallback:     " + "原因:" + cause);
}
});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,String routingKey) {
System.out.println("ReturnCallback:     " + "消息:" + message);
System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
System.out.println("ReturnCallback:     " + "交换机:" + exchange);
System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
}
});

return rabbitTemplate;
}

}


3.TopicService
package com.soft.rabbit.server.service;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicService {

@Autowired
    private AmqpTemplate rabbitTemplate;

    public String sendTopicMessage1() {
    String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: MAN";
    String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    Map<String, Object> manMap = new HashMap<String, Object>();
        manMap.put("messageId", messageId);
        manMap.put("messageData", messageData);
        manMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
        return "ok";
    }
   
    public String sendTopicMessage2() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "message: woman is all ";
        String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        Map<String, Object> womanMap = new HashMap<String, Object>();
        womanMap.put("messageId", messageId);
        womanMap.put("messageData", messageData);
        womanMap.put("createTime", createTime);
        rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
        return "ok";
    }

}

4.消费者TopicReceiver
package com.soft.rabbit.client.service;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.man")
public class TopicReceiver {

@RabbitHandler
public void process(Map testMessage) {
        System.out.println("TopicManReceiver消费者收到消息  : " + testMessage.toString());
    }

}

5.消费者TopicTotalReceiver
package com.soft.rabbit.client.service;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {

@RabbitHandler
public void process(Map testMessage) {
        System.out.println("TopicTotalReceiver消费者收到消息  : " + testMessage.toString());
    }

}


6.生产者bootstrap.yml
#http端口配置
server:
  port: 6001
  connection-timeout: 5000
  tomcat:
    max-http-post-size: -1
    max-threads: 1000
    max-connections: 1000

spring:
  application:
    name: soft-rabbit-server
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #消息确认配置项
    #确认消息已发送到交换机(Exchange)
    publisher-confirms: true
    #publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true
               
# eureka注册中心配置
eureka:
  client:
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:7001/eureka/
  instance:
    instance-id: ${spring.cloud.client.ip-address}:${server.port}
    prefer-ip-address: true  
    hostname: ${spring.cloud.client.ip-address}

分享到:
评论

相关推荐

    rabbit_mq的demo

    在发送消息之前,需要声明一个交换机,指定其类型(如direct、fanout、topic或headers),这决定了消息的路由方式。 4. **声明Queue**: 创建一个队列,可以设置队列是否持久化,以及其他属性。队列是无名的,...

    spring-cloud-starter-stream-rabbit MQ使用规范

    在 Spring Cloud Stream RabbitMQ 中,我们使用广播模式(topic)作为交换机模式,每一个服务仅允许拥有一个交换机。交换机是消息队列的核心组件,负责将消息路由到相应的队列中。 通道(Channel) Spring Cloud ...

    Rabbit MQ.ppt

    3. **Publish/Subscribe 模式**:消息通过交换机广播到多个队列,适合群发邮件或广播消息。 4. **Routing 模式**:根据路由键将消息发送到特定队列,适用于定向消息传递。 5. **Topic 模式**:基于通配符的路由,更...

    rabbitmq消息队列

    2. **消息模型**:RabbitMQ支持多种消息模型,如Direct、Fanout、Topic和Header,每种模型都有其特定的路由策略,满足不同场景的需求。 3. **虚拟主机(Vhosts)**:RabbitMQ使用虚拟主机来隔离不同的应用程序或...

    rabbitmq 消息队列

    常见的交换机类型有Direct、Fanout、Topic和Header等。 3. **消息队列(Queue)**:消息队列是存储消息的实际位置。多个消费者可以订阅同一个队列,接收并处理消息。队列遵循FIFO(先进先出)原则,即消息按顺序被...

    RabbitMQ项目

    3. **Topic Exchange**:类似于Direct,但允许使用通配符进行路由键匹配。例如,`weather.*`可以匹配`weather.uk`和`weather.us`。 4. **Header Exchange**:根据消息头中的键值对进行匹配。如果队列绑定时指定了一...

    rabbitmq-dotnet-client-3.5.0

    RabbitMQ提供了多种类型的交换机,如Direct、Fanout、Topic和Header,以满足不同场景的需求。 6. 模型与异常处理:使用`model.BasicAck(deliveryTag, false)`方法可以手动确认消息已被正确处理,否则RabbitMQ会重新...

    RabbitMQ中文文档.pdf

    RabbitMQ还提供了多种交换机类型,包括direct exchange、fanout exchange、topic exchange、headers exchange等,可以满足不同的路由需求。 RabbitMQ的主要特点包括: * 可靠性:RabbitMQ提供了多种机制来保证消息...

    RabbitMQ客户端Qt项目工程

    3. **消息发布和消费**:客户端会实现发送消息到指定的交换机,并设置监听器来接收来自特定队列的消息。 4. **错误处理**:确保在连接丢失或消息处理失败时能够适当地恢复或记录错误。 5. **事件驱动**:Qt的信号和...

    ssm-rabbit-mq-发送消息-接收消息

    文件"rabbitMQ.pdf"可能包含了更详细的RabbitMQ原理和操作指南,包括消息模型(Direct、Fanout、Topic、Header交换机)、队列持久化、消息确认机制等。建议阅读以加深理解。 总的来说,通过结合SSM框架和RabbitMQ,...

    rabbit-mq-test

    我们可以根据需求声明不同的交换机类型,如直接(direct)、主题(topic)或扇出(fanout)等。 ```go queueName := "myQueue" _, err = ch.QueueDeclare( queueName, // name true, // durable false, // auto-...

    用Jmeter测试RabbitMQ

    ### 使用JMeter测试RabbitMQ...不仅限于基本的发布/订阅模型,还可以探索更复杂的场景,例如使用Topic交换机进行模式匹配等高级特性。这将帮助开发团队更好地评估系统的扩展性和稳定性,确保生产环境下的表现符合预期。

    RabbitMQ-SpringBoot-Project:这是一个有关如何使用Rabbit MQ和Spring Boot将消息从Producer发送到Consumer应用程序的项目

    2. **交换机** - 交换机根据预设的路由规则(如Direct、Fanout、Topic或Header类型)将消息路由到一个或多个队列。 3. **队列** - 消息被存储在队列中,等待消费者来消费。如果多个消费者监听同一个队列,消息会被...

    rabbitmq开发规范

    例如,MES(Manufacturing Execution System)向NCC(Network Control Center)发送生产订单完工信息的交换机名称可能为: `EX.mes.finish.planorder` #### 1.2 Routing-Key命名规范 Routing-Key用于匹配消息到...

    RabbitMQ.zip

    4. 配置各种类型的交换机(如Direct、Fanout、Topic、Header)。 5. 设置消息持久化,确保在服务器重启后仍能恢复消息。 6. 处理消费确认机制,确保消息被正确处理。 **RabbitMQ压力测试** 通常是为了评估RabbitMQ...

    RabbitMq+springboot

    2. Topic模式:Topic模式扩展了Direct模式,允许使用通配符作为路由键。交换机会根据消息的路由键和队列的绑定键进行匹配,如果匹配成功,消息就会被投递。这在一对多或者多对多的场景中非常有用,例如日志收集或...

    rabbitMQ实战

    - **其他MQ**: 如Kafka、ActiveMQ等,每种MQ都有自己的特点和适用场景。RabbitMQ以其高可靠性和易用性著称,在企业级应用中非常受欢迎。 #### 四、RabbitMQ消息类型 - **六种消息类型**: - **Direct**: 消息直接...

    RabbitMQ调研文档

    vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.2/ebin/rabbit.app ``` 找到并修改`{loopback_users, [&lt;&lt;"guest"&gt;&gt;]};`为`{loopback_users, []};` **6. 基本操作命令** - **启动服务:** ```bash service ...

    springboot整合rabbitmq合集(xml方式和注解方式)

    在这个例子中,`convertAndSend` 方法用于发送消息,第一个参数是交换机名称,第二个参数是 RouteKey,第三个参数是实际的消息内容。 为了接收消息,可以创建一个消费者类 `Receiver`,并使用 `@RabbitListener` ...

Global site tag (gtag.js) - Google Analytics