- 浏览: 270989 次
- 性别:
- 来自: 天津
文章分类
- 全部博客 (183)
- oracle (4)
- informix (1)
- web开发 (6)
- java (49)
- hibernate (1)
- hadoop (1)
- spring (23)
- 非技术 (8)
- ibatis2 (5)
- Linux (6)
- tomcat (14)
- nginx (7)
- dubbo (3)
- myibatis (7)
- webservice 开发 (2)
- mysql (2)
- svn (2)
- redis (7)
- 分布式技术 (17)
- zookeeper (2)
- kafka (2)
- velocity (1)
- maven (7)
- js (1)
- freemarker (1)
- Thymeleaf (3)
- 代码审计 (1)
- ibatis3 (1)
- rabbitmq (1)
最新评论
1.需要在rabbitmq 管理界面上,定义用户和 Virtual host
登录地址:http://localhost:15672/
用户名为:gjpztb;密码:gjpztb; Virtual host
实现代码在附件中
2.建立maven 项目:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd" >
<!-- 公共部分 -->
<!-- 创建连接类 连接安装好的 rabbitmq -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="localhost" />
<!-- username,访问RabbitMQ服务器的账户,默认是guest -->
<property name="username" value="gjpztb" />
<!-- username,访问RabbitMQ服务器的密码,默认是guest -->
<property name="password" value="gjpztb" />
<!-- host,RabbitMQ服务器地址,默认值"localhost" -->
<property name="host" value="localhost" />
<!-- port,RabbitMQ服务端口,默认值为5672 -->
<property name="port" value="5672" />
<!-- channel-cache-size,channel的缓存数量,默认值为25 -->
<property name="channelCacheSize" value="50" />
<!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
<property name="cacheMode" value="CHANNEL" />
<property name="virtualHost" value="gjpztb" />
<!--如果使用发布 confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener" publisherConfirms参数必须为true -->
<property name="publisherConfirms" value="true" />
</bean>
<!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,
需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,
而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
<!--定义消息队列-->
<rabbit:queue name="ztb.topic.queue.1" id="ztb_queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="ztb.topic.queue.2" id="ztb_queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="ztb.topic.queue.3" id="ztb_queue_3" durable="true" auto-delete="false" exclusive="false" />
<!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,
即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,
就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,
如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:
如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,
这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,
则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
<!--绑定队列-->
<!--符号#:匹配一个或者多个词cn.# 可以匹配cn.新闻或者cn.新闻.国内新闻-->
<!--符号*:只能匹配一个词 cn.* 可以匹配cn.新闻或者us.新闻-->
<!--主要如果key值与多个队列匹配时,多个队列都会受到信息-->
<rabbit:topic-exchange id="ztb.topic" name="ztb.topic" durable="true" auto-delete="false" >
<rabbit:bindings>
<rabbit:binding queue="ztb.topic.queue.1" pattern="ztb.topic.*" ></rabbit:binding>
<rabbit:binding queue="ztb.topic.queue.2" pattern="ztb.topic*" ></rabbit:binding>
<rabbit:binding queue="ztb.topic.queue.3" pattern="ztb.phone.msg" ></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 生产者部分 -->
<!-- 发送消息的producer类,也就是生产者 -->
<bean id="producterMq" class="com.ztb.pro.topic.ProducterMq">
<property name="rabbitTemplate" ref="rabbitTemplate" />
</bean>
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<!--<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>-->
<!-- 或者配置jackson -->
<!-- -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<bean id="confirmCallBackListener" class="com.ztb.pro.topic.ConfirmCallBackListener" />
<bean id="returnCallBackListener" class="com.ztb.pro.topic.ReturnCallBackListener" />
<!--创建消息队列模板-->
<!-- mandatory必须设置true,return callback才生效 -->
<rabbit:template id="rabbitTemplate" exchange="ztb.topic"
connection-factory="connectionFactory"
message-converter="jsonMessageConverter"
mandatory="true"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
/>
<!-- 消费者部分 -->
<!-- 自定义接口类 -->
<!-- 消息确认机制:acknowledge :manual收到确认-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="200" >
<rabbit:listener ref="topicListener" method="onMessage"
queues="ztb.topic.queue.1" />
<rabbit:listener ref="topicListener" method="onMessage"
queues="ztb.topic.queue.2" />
<rabbit:listener ref="topicListener" method="onMessage"
queues="ztb.topic.queue.3" />
</rabbit:listener-container>
<!--收到确认-->
<bean id="topicListener" class="com.ztb.pro.topic.ChannelAwareMessageListenerImp"/>
</beans>
3.建立类:
package com.ztb.pro.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/**
* Author by gjp, Date on 2019/8/26.
*/
public class ChannelAwareMessageListenerImp implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String body = new String(message.getBody(), "UTF-8");
System.out.println("consumer--:"+message.getMessageProperties()+": body:"+body);
//MessageProperties [headers={spring_listener_return_correlation=bef4e64a-76fa-4d7a-a3ba-95d9c2367959, __TypeId__=com.ztb.pro.message.CommonMessage}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ztb.fanout, receivedRoutingKey=ztb.faout.queue.11, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-ltHMYrVBPU4Ss89yNaUy2w, consumerQueue=ztb.faout.queue.1]: body:{"msg":"msg","source":"alert.queue.3","body":"body01566872705708"}
try {
//deliveryTag是消息传送的次数
if (message.getMessageProperties().getDeliveryTag() >= 2) {
}
}catch (Exception ex){
if (message.getMessageProperties().getRedelivered())
{
System.out.println("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
}
else
{
System.out.println("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
}
}
}catch (Exception ex){
ex.printStackTrace();
}
// Thread.sleep(1);
System.out.println("已经确认信息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
package com.ztb.pro.topic;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* Author by gjp, Date on 2019/8/26.
*发布信息确认
* 只确认生产者消息发送成功
*/
public class ConfirmCallBackListener implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
}
package com.ztb.pro.topic;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* Author by gjp, Date on 2019/8/26.
* 生产者
*/
public class ProducterMq {
private RabbitTemplate rabbitTemplate;
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String exchange, String routingKey, Object message) {
// rabbitTemplate.convertAndSend(exchange, routingKey, message);
rabbitTemplate.convertAndSend(routingKey,message);
}
}
package com.ztb.pro.topic;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* Author by gjp, Date on 2019/8/26.
* 生产者消息发送失败返回监听器
*/
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
}
}
package com.ztb.pro.topic;
import com.ztb.pro.message.CommonMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.Calendar;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Author by gjp, Date on 2019/8/23.
* 测试类
*/
public class TestMainTopic {
public static void main(String[] args) {
ApplicationContext app = new ClassPathXmlApplicationContext("classpath:conf/spring-rabbitmq-topic.xml");
final ProducterMq producterMq = (ProducterMq) app.getBean("producterMq");
int threadNum =1;
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i <threadNum ; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1; i++) {
CommonMessage message = new CommonMessage();
message.setSource("topic.queue.3");
String sendMsg = "body" + i + Calendar.getInstance().getTimeInMillis();
System.out.println("发送信息::" + sendMsg);
message.setBody(sendMsg);
message.setMsg("msg");
producterMq.send("ztb.topic", "ztb.topic.test", message);
}
}
});
}
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果输出:
发送信息::body01566892764139
consumer--:MessageProperties [headers={spring_listener_return_correlation=43347283-a2f8-4a46-90f4-072cb7500100, __TypeId__=com.ztb.pro.message.CommonMessage}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ztb.topic, receivedRoutingKey=ztb.topic.test, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-zLSo4gdFpzrY-EhzGI5kmg, consumerQueue=ztb.topic.queue.1]: body:{"msg":"msg","source":"topic.queue.3","body":"body01566892764139"}
已经确认信息
confirm--:correlationData:null,ack:true,cause:null
登录地址:http://localhost:15672/
用户名为:gjpztb;密码:gjpztb; Virtual host
实现代码在附件中
2.建立maven 项目:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd" >
<!-- 公共部分 -->
<!-- 创建连接类 连接安装好的 rabbitmq -->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="localhost" />
<!-- username,访问RabbitMQ服务器的账户,默认是guest -->
<property name="username" value="gjpztb" />
<!-- username,访问RabbitMQ服务器的密码,默认是guest -->
<property name="password" value="gjpztb" />
<!-- host,RabbitMQ服务器地址,默认值"localhost" -->
<property name="host" value="localhost" />
<!-- port,RabbitMQ服务端口,默认值为5672 -->
<property name="port" value="5672" />
<!-- channel-cache-size,channel的缓存数量,默认值为25 -->
<property name="channelCacheSize" value="50" />
<!-- cache-mode,缓存连接模式,默认值为CHANNEL(单个connection连接,连接之后关闭,自动销毁) -->
<property name="cacheMode" value="CHANNEL" />
<property name="virtualHost" value="gjpztb" />
<!--如果使用发布 confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener" publisherConfirms参数必须为true -->
<property name="publisherConfirms" value="true" />
</bean>
<!--或者这样配置,connection-factory元素实际就是注册一个org.springframework.amqp.rabbit.connection.CachingConnectionFactory实例
<rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}"
username="${rmq.manager.user}" password="${rmq.manager.password}" />-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定义消息队列,durable:是否持久化,如果想在RabbitMQ退出或崩溃的时候,不会失去所有的queue和消息,
需要同时标志队列(queue)和交换机(exchange)是持久化的,即rabbit:queue标签和rabbit:direct-exchange中的durable=true,
而消息(message)默认是持久化的可以看类org.springframework.amqp.core.MessageProperties中的属性
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
exclusive: 仅创建者可以使用的私有队列,断开后自动删除;
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
<!--定义消息队列-->
<rabbit:queue name="ztb.topic.queue.1" id="ztb_queue_1" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="ztb.topic.queue.2" id="ztb_queue_2" durable="true" auto-delete="false" exclusive="false" />
<rabbit:queue name="ztb.topic.queue.3" id="ztb_queue_3" durable="true" auto-delete="false" exclusive="false" />
<!--绑定队列,rabbitmq的exchangeType常用的三种模式:direct,fanout,topic三种,我们用direct模式,
即rabbit:direct-exchange标签,Direct交换器很简单,如果是Direct类型,
就会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,
如果相等,则发送到该Binding对应的Queue中。有一个需要注意的地方:
如果找不到指定的exchange,就会报错。但routing key找不到的话,不会报错,
这条消息会直接丢失,所以此处要小心,auto-delete:自动删除,如果为Yes,
则该交换机所有队列queue删除后,自动删除交换机,默认为false -->
<!--绑定队列-->
<!--符号#:匹配一个或者多个词cn.# 可以匹配cn.新闻或者cn.新闻.国内新闻-->
<!--符号*:只能匹配一个词 cn.* 可以匹配cn.新闻或者us.新闻-->
<!--主要如果key值与多个队列匹配时,多个队列都会受到信息-->
<rabbit:topic-exchange id="ztb.topic" name="ztb.topic" durable="true" auto-delete="false" >
<rabbit:bindings>
<rabbit:binding queue="ztb.topic.queue.1" pattern="ztb.topic.*" ></rabbit:binding>
<rabbit:binding queue="ztb.topic.queue.2" pattern="ztb.topic*" ></rabbit:binding>
<rabbit:binding queue="ztb.topic.queue.3" pattern="ztb.phone.msg" ></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 生产者部分 -->
<!-- 发送消息的producer类,也就是生产者 -->
<bean id="producterMq" class="com.ztb.pro.topic.ProducterMq">
<property name="rabbitTemplate" ref="rabbitTemplate" />
</bean>
<!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<!--<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>-->
<!-- 或者配置jackson -->
<!-- -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<bean id="confirmCallBackListener" class="com.ztb.pro.topic.ConfirmCallBackListener" />
<bean id="returnCallBackListener" class="com.ztb.pro.topic.ReturnCallBackListener" />
<!--创建消息队列模板-->
<!-- mandatory必须设置true,return callback才生效 -->
<rabbit:template id="rabbitTemplate" exchange="ztb.topic"
connection-factory="connectionFactory"
message-converter="jsonMessageConverter"
mandatory="true"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
/>
<!-- 消费者部分 -->
<!-- 自定义接口类 -->
<!-- 消息确认机制:acknowledge :manual收到确认-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="200" >
<rabbit:listener ref="topicListener" method="onMessage"
queues="ztb.topic.queue.1" />
<rabbit:listener ref="topicListener" method="onMessage"
queues="ztb.topic.queue.2" />
<rabbit:listener ref="topicListener" method="onMessage"
queues="ztb.topic.queue.3" />
</rabbit:listener-container>
<!--收到确认-->
<bean id="topicListener" class="com.ztb.pro.topic.ChannelAwareMessageListenerImp"/>
</beans>
3.建立类:
package com.ztb.pro.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/**
* Author by gjp, Date on 2019/8/26.
*/
public class ChannelAwareMessageListenerImp implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String body = new String(message.getBody(), "UTF-8");
System.out.println("consumer--:"+message.getMessageProperties()+": body:"+body);
//MessageProperties [headers={spring_listener_return_correlation=bef4e64a-76fa-4d7a-a3ba-95d9c2367959, __TypeId__=com.ztb.pro.message.CommonMessage}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ztb.fanout, receivedRoutingKey=ztb.faout.queue.11, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-ltHMYrVBPU4Ss89yNaUy2w, consumerQueue=ztb.faout.queue.1]: body:{"msg":"msg","source":"alert.queue.3","body":"body01566872705708"}
try {
//deliveryTag是消息传送的次数
if (message.getMessageProperties().getDeliveryTag() >= 2) {
}
}catch (Exception ex){
if (message.getMessageProperties().getRedelivered())
{
System.out.println("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
}
else
{
System.out.println("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
}
}
}catch (Exception ex){
ex.printStackTrace();
}
// Thread.sleep(1);
System.out.println("已经确认信息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
package com.ztb.pro.topic;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* Author by gjp, Date on 2019/8/26.
*发布信息确认
* 只确认生产者消息发送成功
*/
public class ConfirmCallBackListener implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
}
}
package com.ztb.pro.topic;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* Author by gjp, Date on 2019/8/26.
* 生产者
*/
public class ProducterMq {
private RabbitTemplate rabbitTemplate;
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String exchange, String routingKey, Object message) {
// rabbitTemplate.convertAndSend(exchange, routingKey, message);
rabbitTemplate.convertAndSend(routingKey,message);
}
}
package com.ztb.pro.topic;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* Author by gjp, Date on 2019/8/26.
* 生产者消息发送失败返回监听器
*/
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
}
}
package com.ztb.pro.topic;
import com.ztb.pro.message.CommonMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.Calendar;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Author by gjp, Date on 2019/8/23.
* 测试类
*/
public class TestMainTopic {
public static void main(String[] args) {
ApplicationContext app = new ClassPathXmlApplicationContext("classpath:conf/spring-rabbitmq-topic.xml");
final ProducterMq producterMq = (ProducterMq) app.getBean("producterMq");
int threadNum =1;
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i <threadNum ; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1; i++) {
CommonMessage message = new CommonMessage();
message.setSource("topic.queue.3");
String sendMsg = "body" + i + Calendar.getInstance().getTimeInMillis();
System.out.println("发送信息::" + sendMsg);
message.setBody(sendMsg);
message.setMsg("msg");
producterMq.send("ztb.topic", "ztb.topic.test", message);
}
}
});
}
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果输出:
发送信息::body01566892764139
consumer--:MessageProperties [headers={spring_listener_return_correlation=43347283-a2f8-4a46-90f4-072cb7500100, __TypeId__=com.ztb.pro.message.CommonMessage}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=ztb.topic, receivedRoutingKey=ztb.topic.test, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-zLSo4gdFpzrY-EhzGI5kmg, consumerQueue=ztb.topic.queue.1]: body:{"msg":"msg","source":"topic.queue.3","body":"body01566892764139"}
已经确认信息
confirm--:correlationData:null,ack:true,cause:null
- ztbmq.zip (76.7 KB)
- 下载次数: 1
相关推荐
本篇文章将深入探讨如何在Spring Boot 2.1.4版本中集成RabbitMQ,并实现三种主要的交换机类型:Fanout、Direct和Topic模式。 首先,我们需要在Spring Boot项目中添加RabbitMQ的相关依赖。在`pom.xml`文件中,引入`...
在本文中,我们将深入探讨如何将Spring框架与RabbitMQ集成,主要关注三种交换器类型:direct、topic和fanout。这些模式是RabbitMQ消息路由的基础,它们为不同的消息分发需求提供了灵活性。 首先,让我们理解...
在本文中,我们将深入探讨如何在SpringBoot应用中使用RabbitMQ实现Direct、Topic和Fanout这三种消息队列模式。RabbitMQ是一款强大的开源消息代理和队列服务器,广泛应用于分布式系统中的异步处理和解耦。SpringBoot...
在RabbitMQ中,有三种主要的交换机(exchange)类型,即fanout、direct和topic模式,每种模式对应不同的路由策略,适用于不同场景。接下来,我们将深入探讨这三种模式及其配置。 1. **Fanout模式**:这是最简单的...
在这个基于Spring集成RabbitMQ的源码示例中,我们主要会关注三种不同的交换机类型:direct、topic和fanout。 1. **Direct Exchange(直连交换机)**: 直连交换机是最简单的模型,它将消息路由到绑定键与发布时...
本文将深入探讨RabbitMQ的三种基本交换机模式:Direct、Topic和Fanout,这些都是在实际开发中常见的应用场景。 首先,我们从"Hello, World!"开始。在RabbitMQ中,发送和接收消息的基本流程包括创建连接、通道、...
Go语言版本rabbitmq消息队列库:simple、worker、Fanout 模型、Direct 模型、Topic模型。 RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的...
在Java中使用RabbitMQ时,通常需要理解并掌握三种主要的Exchange类型:Direct、Fanout和Topic。这三种类型的Exchange各自有不同的路由策略,适应不同的应用场景。 1. **Direct Exchange(直接交换机)** Direct ...
标签“Rabbit”表明了这个主题与RabbitMQ相关,可能涉及到其他交换器类型,如Fanout、Topic或Header,以及更复杂的消息路由策略。不过在这个特定的demo中,我们主要关注Direct交换器的使用。 在提供的压缩包文件`...
Topic交换模式是RabbitMQ中一种灵活的消息路由策略,它结合了Direct Exchange的精确匹配和Fanout Exchange的广播特性。在Direct Exchange中,消息只能被精确匹配的绑定规则接收,而在Fanout Exchange中,消息会被...
本文将详细介绍如何在本地环境中安装RabbitMQ,并演示如何使用直接交换(direct exchange)这一基本的路由模式。 ### 1. 安装RabbitMQ 首先,确保你的系统是基于Unix/Linux或Windows。在Unix/Linux环境下,你可以...
- **Direct模式**:最简单的模式,根据路由键直接将消息发送到绑定的队列,适合一对一通信。 - **Fanout模式**:广播模式,消息会被发送到所有绑定到交换机的队列,适合一对多通信。 - **Topic模式**:主题模式,...
在标题和描述中提到的C# Demo项目,主要展示了RabbitMQ的基础用法,特别是如何封装RabbitMQ.Client库以简化操作,并演示了三种不同的Exchange(交换机)的使用。交换机是RabbitMQ中的核心组件,它决定了消息如何路由...
接着,我们来看RabbitMQ的四种交换机模式:Direct、Fanout、Topic和Headers。每种模式都有其特定的路由策略。 1. **Direct交换机**:最简单的模式,基于完全匹配的路由键将消息路由到队列。如果多个队列具有相同的...
系统集成六大模块,对于RabbitMQ有四种交换机,Direct,topic,headers,Fanout,分别对Direct/topic/Fanout三种交换机进行模拟操作,分别有sender和receiver模块
本资源包含RabbitMQ的五种核心模式的源码,全部为手工编写,有助于深入理解和实践这些模式。 1. **简单模式(Simple)**: 在这个模式中,生产者发送消息到RabbitMQ,消费者从队列中接收并处理这些消息。这是一种...
RabbitMQClientUtil是MQ的测试工具类,他封装了fanout、direct、topic三种exchange模式,并包括发送数据和接收数据。 Test1、Test2是测试类 使用maven管理,在pom.xml文件中引入如下代码: <!-- Rabbitmq工具包...
该项目包括两个子工程,生产者(rabbitmqProvider),消费者(rabbitmqConsumer),采用了direct,fanout,topic三种方式发布及消费。以及持久化分批消费等。工程中rabbitMQ.xml,rabbitMQ1.xml,rabbitMQChanel.xml...
RabbitMQ 是 Spring Cloud ...本代码简单示例了RabbitMQ 的5中工作模式中的几种 (1)simple 简单模式 (2)Work(竞争者模式) (3)fanout 模式(发布订阅模式) (4)direct(路由键模式) (5)topic(通配符模式)
1. **Direct模式**:这是最基础的模式,消息根据路由键(routing key)精确地投递给指定的队列。在生产者发送消息时,会指定一个路由键,消费者则根据相同的路由键来接收消息。 2. **Fanout模式**:广播模式,不论...