- 浏览: 105902 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
sheungxin:
@仑山中鸟 @zl_xzl 不好意思,代码已丢。。。自己动手试 ...
自定义类加载器与spring的集成 -
昆仑山中鸟:
同求代码,看的云里雾里。自定义监听器WebContextLis ...
自定义类加载器与spring的集成 -
xzl_xzl:
代码都是片段,能否提供下具体的代码,正好我也有类似的需求。但我 ...
自定义类加载器与spring的集成 -
sheungxin:
@add2ws 我试下,多谢!
基于oracle的增量数据采集实现总结 -
add2ws:
用得着这么麻烦么,直接用kettle插入更新,配合oracle ...
基于oracle的增量数据采集实现总结
- 消息确认机制
1、从实验来看,消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
2、而为了确认消息是否被发送给queue,应该在发送消息中启用参数mandatory=true,使用ReturnListener接收未被发送成功的消息。
3、接下来就需要确认消息是否被有效消费。publisher端目前并没有提供监听事件,但提供了应答机制来保证消息被成功消费,应答方式:
basicAck:成功消费,消息从队列中删除
basicNack:requeue=true,消息重新进入队列,false被删除
basicReject:等同于basicNack
basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer
- 应答模式之transaction机制
package com.demo.mq.rabbitmq.example11; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 应答模式之transaction机制 * @author sheungxin * */ public class TxDemo { private static String exchange_name=""; private static String queue_name="tx_queue"; /** * transaction机制发送消息,事务机制:手动提交和回滚 * 执行txCommit,消息才会转发给队列进入ready状态 * 执行txRollback,消息被取消 * @param mes * @throws Exception */ public static void txSend(Serializable mes) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 channel.txSelect(); channel.queueDeclare(queue_name,false,false,true,null); for(int i=0;i<10;i++){ try{ channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i)); //do something // int n=5/0;//试验消息回滚 channel.txCommit();//提交消息 System.out.println("发布消息"+mes.toString()+i); }catch(Exception e){ channel.txRollback();//异常,取消消息 System.out.println("回滚消息"+mes.toString()+i); } } } /** * transaction机制接收消息,事务机制:手动提交和回滚 * 消费者需要执行basicAck,并txCommit(自动应答模式自动处理,本例中采用手动应答模式) * @throws Exception */ public static void txRecv() throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 channel.txSelect(); channel.queueDeclare(queue_name,false,false,true,null); //关闭自动应答模式(自动应答模式不需要ack、txCommit),需要手动basicAck,并执行txCommit channel.basicConsume(queue_name, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); System.out.println("tx Received :'"+mes+"' done"); channel.basicAck(envelope.getDeliveryTag(), false); channel.txCommit(); } }); } public static void main(String[] args) throws Exception { txSend("hello world!"); txRecv(); } }
- 应答模式之confirm机制
2、confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms
3、ConfirmListener、waitForConfirms均需要配合confirm机制使用
4、暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效
5、basicNack、basicReject:参数requeue=true时,消息会重新进入队列
6、autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉
package com.demo.mq.rabbitmq.example11; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; /** * 应答模式之confirm机制:消息发送 * @author sheungxin * */ public class ConfirmSend { private static String exchange_name=""; private static String queue_name="tx_queue"; /** * confirm机制:确认publisher发送消息到broker,由broker进行应答(不能确认是否被有效消费) * confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms * ConfirmListener、waitForConfirms均需要配合confirm机制使用 * @param mes * @throws Exception */ public static void txSend(Serializable mes) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 channel.confirmSelect(); channel.queueDeclare(queue_name,false,false,true,null); //异步实现发送消息的确认(此部分的消息确认是指发送消息到队列,并非确认消息的有效消费) channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //multiple:测试发现multiple随机true或false,原因未知 System.out.println("Nack deliveryTag:"+deliveryTag+",multiple:"+multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("Ack deliveryTag:"+deliveryTag+",multiple:"+multiple); } }); for(int i=0;i<10;i++){ channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i)); } // channel.waitForConfirms();//同步实现发送消息的确认 System.out.println("-----------"); channel.close(); conn.close(); } public static void main(String[] args) throws Exception { txSend("hello world!"); } }
package com.demo.mq.rabbitmq.example11; import java.io.IOException; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 应答模式之confirm机制:消息接收 * @author sheungxin * */ public class ConfirmRecv { private static String queue_name="tx_queue"; /** * confirm机制:暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效 * basicNack、basicReject:参数requeue=true时,消息会重新进入队列 * autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉 * @throws Exception */ public static void txRecv() throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //开启transaction机制 // channel.confirmSelect(); //autoDelete,true只要被消息 channel.queueDeclare(queue_name,false,false,true,null); //关闭自动应答模式 channel.basicConsume(queue_name, false, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); //multiple批量提交,true提交小于参数中tag消息 long n=envelope.getDeliveryTag()%3; if(n==0){ channel.basicAck(envelope.getDeliveryTag(), false); }else if(n==1){ //requeue,true重新进入队列 channel.basicNack(envelope.getDeliveryTag(), false, true); }else{ //requeue,true重新进入队列,与basicNack差异缺少multiple参数 channel.basicReject(envelope.getDeliveryTag(), true); } try { Thread.sleep(2*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println((n==0?"Ack":n==1?"Nack":"Reject")+" mes :'"+mes+"' done"); } }); } public static void main(String[] args) throws Exception { txRecv(); } }
发表评论
-
RabbitMQ使用场景练习:STOMP plugin
2016-12-22 10:32 4164STOMP plugin Stomp是一个简单的消息 ... -
RabbitMQ使用场景练习:Validated User ID、Length Limit(十二 )
2016-12-16 15:14 1389Validated User ID 发送消息时指定userid ... -
RabbitMQ使用场景练习:优先级队列(十)
2016-12-14 16:05 3634优先级队列 只有当消费者不足,不能及时进行消费的情况 ... -
RabbitMQ使用场景练习:监听器Listener(九)
2016-12-13 21:23 8057监听器 RabbitMQ中监听器有ReturnLis ... -
RabbitMQ使用场景练习:延迟队列(八)
2016-12-13 17:42 5856延时队列 在实际 ... -
RabbitMQ使用总结:持久化
2016-12-13 10:29 2039持久化 RabbitMQ的 ... -
RabbitMQ使用场景练习:RPC(七)
2016-12-08 17:51 2652RPC,同步消息 RabbitMQ默认的consum ... -
RabbitMQ使用场景练习:Headers(六)
2016-12-08 15:28 3056Headers转发器 消息发送时可以在header中 ... -
RabbitMQ使用场景练习:主题Topic(五)
2016-12-06 17:26 2444主题转发器(Topic) Topic转发器的功效包含 ... -
RabbitMQ使用场景练习:路由选择Routing(四)
2016-12-06 17:02 829路由选择(Routing) Routing即按照某条 ... -
RabbitMQ使用场景练习:发布/订阅(三)
2016-12-05 17:30 1288发布/订阅 即实现单点发送消息,多点接收。使用fan ... -
RabbitMQ使用场景练习:工作队列(二)
2016-12-05 17:11 1186工作队列 工作队列的好处在于多个工作线程共享执行任务 ... -
RabbitMQ使用场景练习:入门实例(一)
2016-12-05 16:20 1682注意要点 同一队列多次创建://此处声明队列为了防止接收者 ... -
RabbitMQ的管理与监控
2016-11-30 17:08 1628开启management plugin功能 1、managem ... -
RabbitMQ的安装
2016-11-30 16:02 7981、RabbitMQ安装 官网下载地址:http://www ...
相关推荐
RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,专注于帮助读者理解和掌握如何在实际项目中高效地部署和使用这个强大的消息中间件。RabbitMQ作为开源的消息代理和队列服务器,广泛应用...
5. **消息确认机制**:RabbitMQ提供了消息确认机制,确保消息被正确处理,即使消费者处理过程中出现异常,消息也不会丢失,可以重新投递。 6. **死信队列**:对于无法正确处理的消息,可以设置将其路由到死信队列,...
RabbitMQ学习实践二:MQ的安装
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在实际项目中高效地运用这一强大的消息中间件。书中不仅涵盖了RabbitMQ的基础知识,还深入探讨了其在分布式...
1. **消息确认**:在RabbitMQ中,消息确认(Message Acknowledgement)是一种确保消息被正确处理的机制。当消费者接收到消息后,它需要发送一个确认信号给RabbitMQ,表明消息已被处理。如果RabbitMQ没有收到确认,它...
- **管理控制台**:使用Web管理界面监控和管理RabbitMQ实例。 - **环境配置**:配置RabbitMQ的节点、用户权限、虚拟主机等。 **3. 生产者与消费者** - **创建生产者**:编写代码发送消息到RabbitMQ,支持多种编程...
6. **创建集群**:使用`rabbitmqctl join_cluster`命令将节点加入到集群中,例如`rabbitmqctl join_cluster rabbit@m1`,其中`m1`是集群的另一个节点。 7. **检查集群状态**:通过`rabbitmqctl cluster_status`命令...
2.5 消息通讯场景:使用 RabbitMq 可以实现点对点消息队列和聊天室效果。 RabbitMq 开发指导: 1. 配置文件:修改本地配置文件 rabbitmq.properties,设置 rabbitMQ 服务器地址、端口和用户信息。 2. 添加依赖包...
rabbitmq-server-3.9.11.exe
* 消息确认:RabbitMQ 提供了消息确认机制,确保消息的可靠传递。 * 消息持久化:RabbitMQ 提供了消息持久化机制,确保消息的安全。 demo 下面是一些使用 RabbitMQ 的 demo 例子: demo(1):简单的消息队列 * ...
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...
8. **异常处理与确认**:为了保证消息的可靠性,可以启用消费者确认机制。当消息被正确处理后,消费者需要发送一个确认信号给RabbitMQ,否则消息将被重新投递。同时,应妥善处理异常,避免因为错误导致消息丢失。 9...
* 可靠性:RabbitMQ提供了多种机制来保证消息的传输安全和可靠性。 * 灵活的路由:RabbitMQ提供了多种交换机类型,可以满足不同的路由需求。 * 集群联合:RabbitMQ支持多个服务器的集群联合,提高了系统的可用性和...
RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列
5. 消息可靠传递:RabbitMQ支持消息确认机制,确保消息不会丢失。同时,通过设置死信队列和持久化策略,可以处理消费失败和防止数据丢失。 6. 高可用与集群:为了提高RabbitMQ的服务可用性,可以将其部署为集群,...
9. **错误处理与异常恢复**:书中会讲解如何设计健壮的错误处理机制,包括死信队列的使用、异常重试和消息确认机制。 10. **安全与最佳实践**:介绍RabbitMQ的安全策略,包括SSL/TLS加密、用户认证与授权,以及部署...
rabbitmq-3.10.6:management
- **监控与日志**:使用RabbitMQ提供的监控工具和日志记录,了解系统运行状态。 7. **最佳实践** - **合理设计exchange和queue**:根据业务场景选择合适的exchange类型(如Direct、Fanout、Topic、Header)和绑定...