`
sheungxin
  • 浏览: 105458 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RabbitMQ使用场景练习:消息确认机制(十一)

    博客分类:
  • MQ
阅读更多
  • 消息确认机制
RabbitMQ提供了transaction、confirm两种消息确认机制。transaction即事务机制,手动提交和回滚;confirm机制提供了Confirmlistener和waitForConfirms两种方式。confirm机制效率明显会高于transaction机制,但后者的优势在于强一致性。如果没有特别的要求,建议使用conrim机制。

1、从实验来看,消息的确认机制只是确认publisher发送消息到broker,由broker进行应答,不能确认消息是否有效消费。
2、而为了确认消息是否被发送给queue,应该在发送消息中启用参数mandatory=true,使用ReturnListener接收未被发送成功的消息。
3、接下来就需要确认消息是否被有效消费。publisher端目前并没有提供监听事件,但提供了应答机制来保证消息被成功消费,应答方式:
   basicAck:成功消费,消息从队列中删除
   basicNack:requeue=true,消息重新进入队列,false被删除
   basicReject:等同于basicNack
   basicRecover:消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer

  • 应答模式之transaction机制
需要手动提交和回滚,执行txCommit,消息才会转发给队列进入ready状态;执行txRollback,消息被取消
package com.demo.mq.rabbitmq.example11;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 应答模式之transaction机制
 * @author sheungxin
 *
 */
public class TxDemo {
	private static String exchange_name="";
	private static String queue_name="tx_queue";
	
	/**
	 * transaction机制发送消息,事务机制:手动提交和回滚
	 * 执行txCommit,消息才会转发给队列进入ready状态
	 * 执行txRollback,消息被取消
	 * @param mes
	 * @throws Exception
	 */
	public static void txSend(Serializable mes) throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		//开启transaction机制
		channel.txSelect();
		channel.queueDeclare(queue_name,false,false,true,null);
		for(int i=0;i<10;i++){
			try{
				channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i));
				//do something
//				int n=5/0;//试验消息回滚
				channel.txCommit();//提交消息
				System.out.println("发布消息"+mes.toString()+i);
			}catch(Exception e){
				channel.txRollback();//异常,取消消息
				System.out.println("回滚消息"+mes.toString()+i);
			}
		}
	}

	/**
	 * transaction机制接收消息,事务机制:手动提交和回滚
	 * 消费者需要执行basicAck,并txCommit(自动应答模式自动处理,本例中采用手动应答模式)
	 * @throws Exception
	 */
	public static void txRecv() throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		//开启transaction机制
		channel.txSelect();
		channel.queueDeclare(queue_name,false,false,true,null);
		//关闭自动应答模式(自动应答模式不需要ack、txCommit),需要手动basicAck,并执行txCommit
		channel.basicConsume(queue_name, false, new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
				String mes=SerializationUtils.deserialize(body);
				System.out.println("tx Received :'"+mes+"' done");
				channel.basicAck(envelope.getDeliveryTag(), false);
				channel.txCommit();
				
			}
		});
	}
	
	public static void main(String[] args) throws Exception {
		txSend("hello world!");
		txRecv();
	}
}

  • 应答模式之confirm机制
1、确认publisher发送消息到broker,由broker进行应答(不能确认是否被有效消费)
2、confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms
3、ConfirmListener、waitForConfirms均需要配合confirm机制使用
4、暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效
5、basicNack、basicReject:参数requeue=true时,消息会重新进入队列
6、autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉
package com.demo.mq.rabbitmq.example11;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

/**
 * 应答模式之confirm机制:消息发送
 * @author sheungxin
 *
 */
public class ConfirmSend {
	private static String exchange_name="";
	private static String queue_name="tx_queue";
	
	/**
	 * confirm机制:确认publisher发送消息到broker,由broker进行应答(不能确认是否被有效消费)
	 * confirmSelect,进入confirm消息确认模式,确认方式:1、异步ConfirmListener;2、同步waitForConfirms
	 * ConfirmListener、waitForConfirms均需要配合confirm机制使用
	 * @param mes
	 * @throws Exception
	 */
	public static void txSend(Serializable mes) throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		//开启transaction机制
		channel.confirmSelect();
		channel.queueDeclare(queue_name,false,false,true,null);
		//异步实现发送消息的确认(此部分的消息确认是指发送消息到队列,并非确认消息的有效消费)
		channel.addConfirmListener(new ConfirmListener() {
			
			@Override
			public void handleNack(long deliveryTag, boolean multiple)
					throws IOException {
				//multiple:测试发现multiple随机true或false,原因未知
				System.out.println("Nack deliveryTag:"+deliveryTag+",multiple:"+multiple);
			}
			
			@Override
			public void handleAck(long deliveryTag, boolean multiple)
					throws IOException {
				System.out.println("Ack deliveryTag:"+deliveryTag+",multiple:"+multiple);
			}
		});
		for(int i=0;i<10;i++){
			channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString()+i));
		}
//		channel.waitForConfirms();//同步实现发送消息的确认
		System.out.println("-----------");
		channel.close();
		conn.close();
	}
	
	public static void main(String[] args) throws Exception {
		txSend("hello world!");
	}
}

package com.demo.mq.rabbitmq.example11;
import java.io.IOException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 应答模式之confirm机制:消息接收
 * @author sheungxin
 *
 */
public class ConfirmRecv {
	private static String queue_name="tx_queue";

	/**
	 * confirm机制:暂时未弄明白confirm机制在consumer的应用,ConfirmListener在consumer中无效
	 * basicNack、basicReject:参数requeue=true时,消息会重新进入队列
	 * autoDelete队列在消费者关闭后不管是否还有未处理的消息都会关闭掉
	 * @throws Exception
	 */
	public static void txRecv() throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		//开启transaction机制
//		channel.confirmSelect();
		//autoDelete,true只要被消息
		channel.queueDeclare(queue_name,false,false,true,null);
		//关闭自动应答模式
		channel.basicConsume(queue_name, false, new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
				String mes=SerializationUtils.deserialize(body);
				//multiple批量提交,true提交小于参数中tag消息
				long n=envelope.getDeliveryTag()%3;
				if(n==0){
					channel.basicAck(envelope.getDeliveryTag(), false);
				}else if(n==1){
					//requeue,true重新进入队列
					channel.basicNack(envelope.getDeliveryTag(), false, true);
				}else{
					//requeue,true重新进入队列,与basicNack差异缺少multiple参数
					channel.basicReject(envelope.getDeliveryTag(), true);
				}
				try {
					Thread.sleep(2*1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println((n==0?"Ack":n==1?"Nack":"Reject")+" mes :'"+mes+"' done");
			}
		});
	}
	
	public static void main(String[] args) throws Exception {
		txRecv();
	}
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics