`
taiwei.peng
  • 浏览: 236308 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Rabbit MQ Direct 交换机

阅读更多
1.完整的pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.soft.rabbit.server</groupId>
<artifactId>soft-rabbit-server</artifactId>
<version>0.0.1-SNAPSHOT</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>

<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud.version>Finchley.M9</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<!--使用 ${spring.cloud.client.ip-address} 需引用下面的包 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</dependency>

<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<finalName>soft-rabbit-server</finalName>
<sourceDirectory>${basedir}/src/main/java</sourceDirectory>
<outputDirectory>${basedir}/bin/classes</outputDirectory>
<resources>
<resource>
<directory>${basedir}/src/main/java</directory>
<filtering>true</filtering>
<excludes>
<exclude>**/*.java</exclude>
</excludes>
</resource>
<resource>
<directory>${basedir}/src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>**/*.*</include>
</includes>
</resource>
<resource>
<directory>${basedir}/src/main/resources</directory>
<filtering>true</filtering>
<includes>
<include>*.yml</include>
<include>*.sh</include>
</includes>
<targetPath>${basedir}/target</targetPath>
</resource>
</resources>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.soft.rabbit.server.ServerApplication</mainClass>
</manifest>
</archive>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<id>copy-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/resources</outputDirectory>
<resources>
<resource>
<directory>${basedir}/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

2.配置类
package com.soft.rabbit.server.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {

//队列 起名:TestDirectQueue
    @Bean
    public Queue TestDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        // return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestDirectQueue",true);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange TestDirectExchange() {
      //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("TestDirectExchange",true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }

    @Bean
    DirectExchange lonelyDirectExchange() {
        return new DirectExchange("lonelyDirectExchange");
    }

}

3.生产者类
package com.soft.rabbit.server.service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DirectService {

@Autowired
    private AmqpTemplate rabbitTemplate;

    public String send() {
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<String,Object>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
        return "ok";
    }

}

4.启动类
package com.soft.rabbit.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ServerApplication {

private static Logger logger = LoggerFactory.getLogger(ServerApplication.class);

public static void main(String[] args) {
SpringApplication.run(ServerApplication.class, args);
logger.info("server start success!");
}

}

5.bootstrap.yml
#http端口配置
server:
  port: 6001
  connection-timeout: 5000
  tomcat:
    max-http-post-size: -1
    max-threads: 1000
    max-connections: 1000

spring:
  application:
    name: soft-rabbit-server
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
               
# eureka注册中心配置
eureka:
  client:
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:9001/eureka/
  instance:
    instance-id: ${spring.cloud.client.ip-address}:${server.port}
    prefer-ip-address: true  
    hostname: ${spring.cloud.client.ip-address}

6.消费者类
package com.soft.rabbit.client.service;

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "TestDirectQueue") // 监听的队列名称 TestDirectQueue
public class DirectReceiver {

@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
}

}

分享到:
评论

相关推荐

    rabbit-mq-ack-direct-consumer.zip

    本资料包"rabbit-mq-ack-direct-consumer.zip"包含的是关于RabbitMQ中消费者实现的代码示例,特别关注了消息确认(Message Acknowledgement)和Direct交换机模式。 首先,我们来理解RabbitMQ中的消息确认机制。在...

    rabbit_mq的demo

    在发送消息之前,需要声明一个交换机,指定其类型(如direct、fanout、topic或headers),这决定了消息的路由方式。 4. **声明Queue**: 创建一个队列,可以设置队列是否持久化,以及其他属性。队列是无名的,...

    rabbit mq入门例子

    在这个入门例子中,我们创建一个名为“hello”的队列,使用Direct交换机类型,它是最基础的交换机,将消息直接路由到指定的队列。 ```java channel.queueDeclare(QUEUE_NAME, false, false, false, null); ``` ...

    rabbit mq的demo更新

    在这个“rabbit mq的demo更新”中,我们将探讨如何通过编程实现与RabbitMQ的连接,发送和接收消息,以及如何配置消息的持久化和客户端订阅。 首先,连接RabbitMQ通常需要一个客户端库,如Java的`rabbitmq-amqp-...

    rabbit-mq-provider.zip

    在"rabbit-mq-ack-direct-provider"这个文件中,我们可以预见到包含的是一个实现直接交换模式的RabbitMQ生产者的示例。 直接交换模式是最基础的模式,它按照路由键(routing key)将消息路由到绑定到相同路由键的...

    rabbitmq消息队列

    2. **消息模型**:RabbitMQ支持多种消息模型,如Direct、Fanout、Topic和Header,每种模型都有其特定的路由策略,满足不同场景的需求。 3. **虚拟主机(Vhosts)**:RabbitMQ使用虚拟主机来隔离不同的应用程序或...

    RabbitMQ项目

    这个项目包含了RabbitMQ的详细代码,旨在帮助开发者深入理解其工作原理和使用方式,特别是各种交换机类型的配置。 首先,RabbitMQ的核心概念是消息和队列。消息是数据的载体,它们被生产者发送到队列,然后由消费者...

    RabbitMq与Spring整合实例

    &lt;rabbit:direct-exchange name="myExchange"&gt; &lt;rabbit:bindings&gt; &lt;rabbit:binding queue="myQueue" key="routingKey" /&gt; &lt;/rabbit:bindings&gt; &lt;/rabbit:exchange&gt; ``` 3. **编写生产者代码**:在Spring Bean中...

    rabbitmq 消息队列

    常见的交换机类型有Direct、Fanout、Topic和Header等。 3. **消息队列(Queue)**:消息队列是存储消息的实际位置。多个消费者可以订阅同一个队列,接收并处理消息。队列遵循FIFO(先进先出)原则,即消息按顺序被...

    RabbitMq整合使用

    public Binding binding(Queue myQueue, DirectExchange directExchange) { return BindingBuilder.bind(myQueue).to(directExchange).with("routingKey"); } } ``` **四、消息确认** 为了确保消息被正确处理,...

    Rabbit-MQ-Pika:与python pika库的简单Rabbit MQ连接

    channel.exchange_declare(exchange=exchange_name, exchange_type='direct') channel.queue_declare(queue=queue_name) ``` **4. 发布消息** 发布消息到RabbitMQ,需要指定交换机、路由键和消息体。路由键用于匹配...

    RabbitMQ中文文档.pdf

    RabbitMQ还提供了多种交换机类型,包括direct exchange、fanout exchange、topic exchange、headers exchange等,可以满足不同的路由需求。 RabbitMQ的主要特点包括: * 可靠性:RabbitMQ提供了多种机制来保证消息...

    rabbitmq-producer:Rabbit MQ-ConsumerProducer消息队列

    在上述示例中,我们使用了Direct交换机,这意味着消息会被直接路由到与路由键匹配的队列。如果多个队列有相同的路由键,消息会被复制到所有队列。 在C#中使用RabbitMQ时,还需要注意错误处理和资源管理。例如,确保...

    rabbitmq-dotnet-client-3.5.0

    RabbitMQ提供了多种类型的交换机,如Direct、Fanout、Topic和Header,以满足不同场景的需求。 6. 模型与异常处理:使用`model.BasicAck(deliveryTag, false)`方法可以手动确认消息已被正确处理,否则RabbitMQ会重新...

    ssm-rabbit-mq-发送消息-接收消息

    文件"rabbitMQ.pdf"可能包含了更详细的RabbitMQ原理和操作指南,包括消息模型(Direct、Fanout、Topic、Header交换机)、队列持久化、消息确认机制等。建议阅读以加深理解。 总的来说,通过结合SSM框架和RabbitMQ,...

    RebbitMQ Hello World(maven项目+RebbitMQ jar包+RabbitMQ安装文件)

    channel.exchangeDeclare("hello", "direct", false); channel.queueDeclare("hello", false, false, false, null); channel.queueBind("hello", "hello", ""); ``` 接着,生产者发送消息: ```java String message ...

    rabbit-mq-test

    我们可以根据需求声明不同的交换机类型,如直接(direct)、主题(topic)或扇出(fanout)等。 ```go queueName := "myQueue" _, err = ch.QueueDeclare( queueName, // name true, // durable false, // auto-...

    RabbitMQ-SpringBoot-Project:这是一个有关如何使用Rabbit MQ和Spring Boot将消息从Producer发送到Consumer应用程序的项目

    2. **交换机** - 交换机根据预设的路由规则(如Direct、Fanout、Topic或Header类型)将消息路由到一个或多个队列。 3. **队列** - 消息被存储在队列中,等待消费者来消费。如果多个消费者监听同一个队列,消息会被...

    用Jmeter测试RabbitMQ

    - **Direct**: 单播方式,要求队列绑定至特定的路由键,只有完全匹配的消息才能被转发。 - **Fanout**: 广播方式,所有绑定至该交换机的队列都会接收到消息,通常用于实现发布/订阅模式。 - **Topic**: 组播方式...

    基于springboot+maven的rabbitmq项目demo

    在项目中,`rabbitmqdirect`可能代表一个示例,展示了如何使用Direct交换机类型。Direct交换机是最基础的类型,它按照完全匹配的路由键将消息发送到相应的队列。例如,生产者可能设置一个路由键为"news.update",而...

Global site tag (gtag.js) - Google Analytics