`
m635674608
  • 浏览: 5027279 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ的顺序消费和事务消费

    博客分类:
  • MQ
 
阅读更多

一、三种消费 :1.普通消费 2. 顺序消费 3.事务消费

1.1  顺序消费:在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费,他的实现是生产者(一个生产者可以对多个主题去发送消息)将这个三个消息放在topic(一个topic默认有4个队列)的一个队列里面,单机支持上万个持久化队列,消费端去消费的时候也是只能有一个Consumer去取得这个队列里面的数据,然后顺序消费。

单个节点(Producer端1个、Consumer端1个)

Producer端

 

[java] view plain copy
 
  1. package order;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.alibaba.rocketmq.client.exception.MQBrokerException;  
  6. import com.alibaba.rocketmq.client.exception.MQClientException;  
  7. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
  8. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
  9. import com.alibaba.rocketmq.client.producer.SendResult;  
  10. import com.alibaba.rocketmq.common.message.Message;  
  11. import com.alibaba.rocketmq.common.message.MessageQueue;  
  12. import com.alibaba.rocketmq.remoting.exception.RemotingException;  
  13.   
  14. /** 
  15.  * Producer,发送顺序消息 
  16.  */  
  17. public class Producer {  
  18.     public static void main(String[] args) {  
  19.         try {  
  20.             DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
  21.             producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  22.   
  23.             producer.start();  
  24.   
  25.             // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
  26.             // "TagE" };  
  27.   
  28.             for (int i = 1; i <= 5; i++) {  
  29.   
  30.                 Message msg = new Message("TopicOrderTest""order_1""KEY" + i, ("order_1 " + i).getBytes());  
  31.   
  32.                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
  33.                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
  34.                         Integer id = (Integer) arg;  
  35.                         int index = id % mqs.size();  
  36.                         return mqs.get(index);  
  37.                     }  
  38.                 }, 0);  
  39.   
  40.                 System.out.println(sendResult);  
  41.             }  
  42.   
  43.             producer.shutdown();  
  44.         } catch (MQClientException e) {  
  45.             e.printStackTrace();  
  46.         } catch (RemotingException e) {  
  47.             e.printStackTrace();  
  48.         } catch (MQBrokerException e) {  
  49.             e.printStackTrace();  
  50.         } catch (InterruptedException e) {  
  51.             e.printStackTrace();  
  52.         }  
  53.     }  
  54. }  

 

Consumer端代码

 

[java] view plain copy
 
  1. package order;  
  2.   
  3. import java.util.List;  
  4. import java.util.concurrent.TimeUnit;  
  5. import java.util.concurrent.atomic.AtomicLong;  
  6.   
  7. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  8. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;  
  9. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;  
  10. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;  
  11. import com.alibaba.rocketmq.client.exception.MQClientException;  
  12. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  13. import com.alibaba.rocketmq.common.message.MessageExt;  
  14.   
  15. /** 
  16.  * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) 
  17.  */  
  18. public class Consumer1 {  
  19.   
  20.     public static void main(String[] args) throws MQClientException {  
  21.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
  22.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  23.   
  24.         /** 
  25.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
  26.          * 如果非第一次启动,那么按照上次消费的位置继续消费 
  27.          */  
  28.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  29.   
  30.         consumer.subscribe("TopicOrderTest""*");  
  31.   
  32.         consumer.registerMessageListener(new MessageListenerOrderly() {  
  33.             AtomicLong consumeTimes = new AtomicLong(0);  
  34.   
  35.             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
  36.                 // 设置自动提交  
  37.                 context.setAutoCommit(true);  
  38.                 for (MessageExt msg : msgs) {  
  39.                     System.out.println(msg + ",内容:" + new String(msg.getBody()));  
  40.                 }  
  41.   
  42.                 try {  
  43.                     TimeUnit.SECONDS.sleep(5L);  
  44.                 } catch (InterruptedException e) {  
  45.   
  46.                     e.printStackTrace();  
  47.                 }  
  48.                 ;  
  49.   
  50.                 return ConsumeOrderlyStatus.SUCCESS;  
  51.             }  
  52.         });  
  53.   
  54.         consumer.start();  
  55.   
  56.         System.out.println("Consumer1 Started.");  
  57.     }  
  58.   
  59. }  

 

 

结果如下图所示:

这个五条数据被顺序消费了

 

多个节点(Producer端1个、Consumer端2个)

Producer端代码:

 

[java] view plain copy
 
  1. package order;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.alibaba.rocketmq.client.exception.MQBrokerException;  
  6. import com.alibaba.rocketmq.client.exception.MQClientException;  
  7. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  
  8. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;  
  9. import com.alibaba.rocketmq.client.producer.SendResult;  
  10. import com.alibaba.rocketmq.common.message.Message;  
  11. import com.alibaba.rocketmq.common.message.MessageQueue;  
  12. import com.alibaba.rocketmq.remoting.exception.RemotingException;  
  13.   
  14. /** 
  15.  * Producer,发送顺序消息 
  16.  */  
  17. public class Producer {  
  18.     public static void main(String[] args) {  
  19.         try {  
  20.             DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
  21.             producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  22.   
  23.             producer.start();  
  24.   
  25.             // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",  
  26.             // "TagE" };  
  27.   
  28.             for (int i = 1; i <= 5; i++) {  
  29.   
  30.                 Message msg = new Message("TopicOrderTest""order_1""KEY" + i, ("order_1 " + i).getBytes());  
  31.   
  32.                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
  33.                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
  34.                         Integer id = (Integer) arg;  
  35.                         int index = id % mqs.size();  
  36.                         return mqs.get(index);  
  37.                     }  
  38.                 }, 0);  
  39.   
  40.                 System.out.println(sendResult);  
  41.             }  
  42.             for (int i = 1; i <= 5; i++) {  
  43.   
  44.                 Message msg = new Message("TopicOrderTest""order_2""KEY" + i, ("order_2 " + i).getBytes());  
  45.   
  46.                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
  47.                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
  48.                         Integer id = (Integer) arg;  
  49.                         int index = id % mqs.size();  
  50.                         return mqs.get(index);  
  51.                     }  
  52.                 }, 1);  
  53.   
  54.                 System.out.println(sendResult);  
  55.             }  
  56.             for (int i = 1; i <= 5; i++) {  
  57.   
  58.                 Message msg = new Message("TopicOrderTest""order_3""KEY" + i, ("order_3 " + i).getBytes());  
  59.   
  60.                 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
  61.                     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
  62.                         Integer id = (Integer) arg;  
  63.                         int index = id % mqs.size();  
  64.                         return mqs.get(index);  
  65.                     }  
  66.                 }, 2);  
  67.   
  68.                 System.out.println(sendResult);  
  69.             }  
  70.   
  71.             producer.shutdown();  
  72.         } catch (MQClientException e) {  
  73.             e.printStackTrace();  
  74.         } catch (RemotingException e) {  
  75.             e.printStackTrace();  
  76.         } catch (MQBrokerException e) {  
  77.             e.printStackTrace();  
  78.         } catch (InterruptedException e) {  
  79.             e.printStackTrace();  
  80.         }  
  81.     }  
  82. }  

 

Consumer1

 

[java] view plain copy
 
  1. /** 
  2.  * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) 
  3.  */  
  4. public class Consumer1 {  
  5.   
  6.     public static void main(String[] args) throws MQClientException {  
  7.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
  8.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  9.   
  10.         /** 
  11.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
  12.          * 如果非第一次启动,那么按照上次消费的位置继续消费 
  13.          */  
  14.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  15.   
  16.         consumer.subscribe("TopicOrderTest""*");  
  17.           
  18.         /** 
  19.          * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到  
  20.          *,第二个线程无法访问这个队列 
  21.          */  
  22.         consumer.registerMessageListener(new MessageListenerOrderly() {  
  23.             AtomicLong consumeTimes = new AtomicLong(0);  
  24.   
  25.             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
  26.                 // 设置自动提交  
  27.                 context.setAutoCommit(true);  
  28.                 for (MessageExt msg : msgs) {  
  29.                     System.out.println(msg + ",内容:" + new String(msg.getBody()));  
  30.                 }  
  31.   
  32.                 try {  
  33.                     TimeUnit.SECONDS.sleep(5L);  
  34.                 } catch (InterruptedException e) {  
  35.   
  36.                     e.printStackTrace();  
  37.                 }  
  38.                 ;  
  39.   
  40.                 return ConsumeOrderlyStatus.SUCCESS;  
  41.             }  
  42.         });  
  43.   
  44.         consumer.start();  
  45.   
  46.         System.out.println("Consumer1 Started.");  
  47.     }  
  48.   
  49. }  



 

Consumer2

 

[java] view plain copy
 
  1. /** 
  2.  * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) 
  3.  */  
  4. public class Consumer2 {  
  5.   
  6.     public static void main(String[] args) throws MQClientException {  
  7.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
  8.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  9.   
  10.         /** 
  11.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
  12.          * 如果非第一次启动,那么按照上次消费的位置继续消费 
  13.          */  
  14.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  15.   
  16.         consumer.subscribe("TopicOrderTest""*");  
  17.           
  18.         /** 
  19.          * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到  
  20.          *,第二个线程无法访问这个队列 
  21.          */  
  22.         consumer.registerMessageListener(new MessageListenerOrderly() {  
  23.             AtomicLong consumeTimes = new AtomicLong(0);  
  24.   
  25.             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
  26.                 // 设置自动提交  
  27.                 context.setAutoCommit(true);  
  28.                 for (MessageExt msg : msgs) {  
  29.                     System.out.println(msg + ",内容:" + new String(msg.getBody()));  
  30.                 }  
  31.   
  32.                 try {  
  33.                     TimeUnit.SECONDS.sleep(5L);  
  34.                 } catch (InterruptedException e) {  
  35.   
  36.                     e.printStackTrace();  
  37.                 }  
  38.                 ;  
  39.   
  40.                 return ConsumeOrderlyStatus.SUCCESS;  
  41.             }  
  42.         });  
  43.   
  44.         consumer.start();  
  45.   
  46.         System.out.println("Consumer2 Started.");  
  47.     }  
  48.   
  49. }  



 

先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息
Consumer1消费情况如图,都按照顺序执行了




Consumer2消费情况如图,都按照顺序执行了

 



二、事务消费

这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。

事物消费需要先说说什么是事物。比如说:我们跨行转账,从工商银行转到建设银行,也就是我从工商银行扣除1000元之后,我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后,建设银行的服务器突然宕机,那么我扣除了1000,但是并没有在建设银行给我加1000,就出现了数据的不一致。因此加1000和减1000才行,减1000和减1000必须一起成功,一起失败。

再比如,我们进行网购的时候,我们下单之后,订单提交成功,仓库商品的数量必须减一。但是订单可能是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功之后,仓库数量服务器突然宕机。这样也出现了数据不一致的问题。

使用消息队列来解决分布式事物:

现在我们去外面饭店吃饭,很多时候都不会直接给了钱之后直接在付款的窗口递饭菜,而是付款之后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似,提高了吞吐量。即使你到第二个窗口,师傅告诉你已经没饭了,你可以拿着这个凭证去退款,即使中途由于出了意外你无法到达窗口进行取饭,但是只要凭证还在,可以将钱退给你。这样就保证了数据的一致性。

如何保证凭证(消息)有2种方法:

1、在工商银行扣款的时候,余额表扣除1000,同时记录日志,而且这2个表是在同一个数据库实例中,可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户,建设银行收到之后给我返回已经加了1000给用户的确认信息之后,我再标记日志表里面的日志为已经完成。

2、通过消息中间件

原文地址:http://www.jianshu.com/p/453c6e7ff81c

 

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

例子:

Consumer 端

 

[java] view plain copy
 
  1. package transaction;  
  2.   
  3. import java.util.List;  
  4.   
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
  9. import com.alibaba.rocketmq.client.exception.MQClientException;  
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
  11. import com.alibaba.rocketmq.common.message.MessageExt;  
  12.   
  13. /** 
  14.  * Consumer,订阅消息 
  15.  */  
  16. public class Consumer {  
  17.   
  18.     public static void main(String[] args) throws InterruptedException, MQClientException {  
  19.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");  
  20.         consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  21.         consumer.setConsumeMessageBatchMaxSize(10);  
  22.         /** 
  23.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> 
  24.          * 如果非第一次启动,那么按照上次消费的位置继续消费 
  25.          */  
  26.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  27.   
  28.         consumer.subscribe("TopicTransactionTest""*");  
  29.   
  30.         consumer.registerMessageListener(new MessageListenerConcurrently() {  
  31.   
  32.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
  33.   
  34.                 try {  
  35.   
  36.                     for (MessageExt msg : msgs) {  
  37.                         System.out.println(msg + ",内容:" + new String(msg.getBody()));  
  38.                     }  
  39.   
  40.                 } catch (Exception e) {  
  41.                     e.printStackTrace();  
  42.   
  43.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
  44.   
  45.                 }  
  46.   
  47.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
  48.             }  
  49.         });  
  50.   
  51.         consumer.start();  
  52.   
  53.         System.out.println("transaction_Consumer Started.");  
  54.     }  
  55. }  

 

 

 

Producer端

 

[java] view plain copy
 
  1. package transaction;  
  2.   
  3. import com.alibaba.rocketmq.client.exception.MQClientException;  
  4. import com.alibaba.rocketmq.client.producer.SendResult;  
  5. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;  
  6. import com.alibaba.rocketmq.client.producer.TransactionMQProducer;  
  7. import com.alibaba.rocketmq.common.message.Message;  
  8.   
  9. /** 
  10.  * 发送事务消息例子 
  11.  *  
  12.  */  
  13. public class Producer {  
  14.     public static void main(String[] args) throws MQClientException, InterruptedException {  
  15.   
  16.         TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
  17.         TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");  
  18.         producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
  19.         // 事务回查最小并发数  
  20.         producer.setCheckThreadPoolMinSize(2);  
  21.         // 事务回查最大并发数  
  22.         producer.setCheckThreadPoolMaxSize(2);  
  23.         // 队列数  
  24.         producer.setCheckRequestHoldMax(2000);  
  25.         producer.setTransactionCheckListener(transactionCheckListener);  
  26.         producer.start();  
  27.   
  28.         // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"  
  29.         // };  
  30.         TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();  
  31.         for (int i = 1; i <= 2; i++) {  
  32.             try {  
  33.                 Message msg = new Message("TopicTransactionTest""transaction" + i, "KEY" + i,  
  34.                         ("Hello RocketMQ " + i).getBytes());  
  35.                 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);  
  36.                 System.out.println(sendResult);  
  37.   
  38.                 Thread.sleep(10);  
  39.             } catch (MQClientException e) {  
  40.                 e.printStackTrace();  
  41.             }  
  42.         }  
  43.   
  44.         for (int i = 0; i < 100000; i++) {  
  45.             Thread.sleep(1000);  
  46.         }  
  47.   
  48.         producer.shutdown();  
  49.   
  50.     }  
  51. }  


TransactionExecuterImpl  --执行本地事务

[java] view plain copy
 
  1. package transaction;  
  2.   
  3. import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;  
  4. import com.alibaba.rocketmq.client.producer.LocalTransactionState;  
  5. import com.alibaba.rocketmq.common.message.Message;  
  6.   
  7. /** 
  8.  * 执行本地事务 
  9.  */  
  10. public class TransactionExecuterImpl implements LocalTransactionExecuter {  
  11.     // private AtomicInteger transactionIndex = new AtomicInteger(1);  
  12.   
  13.     public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {  
  14.   
  15.         System.out.println("执行本地事务msg = " + new String(msg.getBody()));  
  16.   
  17.         System.out.println("执行本地事务arg = " + arg);  
  18.   
  19.         String tags = msg.getTags();  
  20.   
  21.         if (tags.equals("transaction2")) {  
  22.             System.out.println("======我的操作============,失败了  -进行ROLLBACK");  
  23.             return LocalTransactionState.ROLLBACK_MESSAGE;  
  24.         }  
  25.         return LocalTransactionState.COMMIT_MESSAGE;  
  26.         // return LocalTransactionState.UNKNOW;  
  27.     }  
  28. }  


TransactionCheckListenerImpl--未决事务,服务器回查客户端(目前已经被阉割啦)

 

[java] view plain copy
 
  1. package transaction;  
  2.   
  3. import com.alibaba.rocketmq.client.producer.LocalTransactionState;  
  4. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;  
  5. import com.alibaba.rocketmq.common.message.MessageExt;  
  6.   
  7. /** 
  8.  * 未决事务,服务器回查客户端 
  9.  */  
  10. public class TransactionCheckListenerImpl implements TransactionCheckListener {  
  11.     // private AtomicInteger transactionIndex = new AtomicInteger(0);  
  12.   
  13.     //在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。  
  14.     public LocalTransactionState checkLocalTransactionState(MessageExt msg) {  
  15.         System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));  
  16.         // return LocalTransactionState.ROLLBACK_MESSAGE;  
  17.   
  18.         return LocalTransactionState.COMMIT_MESSAGE;  
  19.   
  20.         // return LocalTransactionState.UNKNOW;  
  21.     }  
  22. }  



 

 

 

producer端:发送数据到MQ,并且处理本地事物。这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据。第二个数据失败了,不会被消费。




 

Consumer只会接收到一个,第二个数据不会被接收到

 

http://blog.csdn.net/u010634288/article/details/57158374

分享到:
评论

相关推荐

    rocketmq使用.zip

    除了基本使用和事务消息,RocketMQ还有其他高级功能,如消息回溯、延迟消息、顺序消息、消息过滤等。消息回溯可以在不影响正常消息消费的情况下,追溯历史消息。延迟消息可以设定消息在未来的某个时间点才被消费。...

    重复消费、顺序消费、分布式事务.pdf

    3. **消息队列特性**:利用消息队列系统本身的特性支持顺序消费,如RabbitMQ、RocketMQ等提供的顺序消费功能。 ### 三、分布式事务 #### 定义 分布式事务是指跨越多个数据库或者服务的事务操作,它涉及到多个资源...

    SpringBoot整合RocketMq,rocketMq

    整合SpringBoot与RocketMQ的主要目的是在SpringBoot应用中利用RocketMQ的功能,如发布/订阅消息、顺序消息、事务消息等。这使得应用可以轻松地处理高并发和大量数据的传输问题,提高系统的稳定性和可扩展性。 **二...

    消息队列-RocketMQ1

    RocketMQ支持顺序消费,每个topic下的四个队列可用于实现顺序消息处理。此外,它还解决了分布式事务的问题,例如在银行转账场景中保证数据一致性。通过预处理消息、提交本地事务,然后发送确认消息来确保事务一致性...

    RocketMQ相关资料文档以及demo

    RocketMQ还提供了多种特性,如消息顺序性、消息重试、延迟消息、死信队列、事务消息等。消息顺序性适用于对消息顺序有严格要求的场景,如金融交易。消息重试机制确保消息不丢失,延迟消息可以在指定时间后才被消费。...

    RocketMQ_design.zip

    其中,AT级别确保消息至少被消费一次,MT则保证消息在单个消费者内按照发送顺序消费。 RocketMQ的设计中还包括了消息回溯、延迟消息和事务消息等高级特性。消息回溯允许消费者从历史消息中读取,而延迟消息则可以...

    rocketmq-4.7.0.zip

    8. **顺序消息**:RocketMQ支持全局顺序和局部顺序消息,保证消息按照特定顺序消费,这对于某些业务场景(如日志记录、库存更新)至关重要。 9. **高可用与容灾**:RocketMQ通过主备切换、镜像复制等方式保证服务的...

    rocketmq-dashboard.zip

    5. **分布式事务**:RocketMQ支持分布式事务,通过半消息和全局顺序消息实现事务一致性。 6. **流量控制**:为了防止消费者或生产者因处理速度过慢而导致消息积压,RocketMQ提供了流量控制策略。 7. **消息过滤**...

    kafka开发和rocketmq消息技术文档

    4. **分布式事务协调**:RocketMQ 使用 GroupOffset 和 BrokeOffset 实现消费者组的事务协调,确保消息的正确消费。 5. **云原生设计**:RocketMQ 针对云环境进行了优化,支持大规模集群管理和监控。 6. **多语言...

    rocketmq教程两套

    011-011_RocketMQ_Producer_顺序消费机制详解 012-012_RocketMQ_Producer_事务消息机制详解 013-013_RocketMQ_Consumer_Push和Pull模式及使用详解 014-014_RocketMQ_Consumer_配置参数详解 015-015_RocketMQ_...

    rocketmq-all-4.9.3-bin-release

    5. **消息顺序**:RocketMQ提供严格的消息顺序保证,通过指定消息的队列和发送策略,可以确保消息按照特定的顺序被消费。 6. **消息分发策略**:RocketMQ支持广播(Broadcasting)和集群(Clustering)两种消费模式...

    RocketMQ技术讲解V2.0

    RocketMQ源码分析,分为存储篇、NameServer篇、Broker篇、Producer篇、Consumer篇五大部分进行源码级的讲解。...5、讲解Consumer端的启动、PUSH模式的消息消费、PULL模式的消息消费、顺序消费/并发消费等功能;

    RocketMQ_design_.pdf.zip

    全局顺序消息是在一个Topic下所有消息都按照发送顺序消费,而分区顺序消息则是每个队列内的消息顺序。 6. **事务消息**:RocketMQ提供了分布式事务支持,实现了分布式环境下的二阶段提交协议,保证了消息的最终一致...

    rocketmq-externals-master.zip

    2. **顺序消息**:RocketMQ支持顺序消息发送和消费,这意味着消息按照特定的顺序(如生产者发送的顺序)被消费者接收,这对于需要保证消息处理顺序的业务场景非常关键。 3. **批量消息**:批量发送和接收消息是提高...

    RocketMQ实践:确保消息不丢失与顺序性的高效策略

    3. **事务消息**:RocketMQ支持事务消息,可以确保消息在数据库事务提交后才发送,如果事务失败,消息不会发出,从而避免了数据不一致导致的消息丢失。 4. **持久化存储**:RocketMQ将消息存储在磁盘上,即使在系统...

    rocketmq安装包.rar

    - 消费策略:支持顺序消费、广播消费、集群消费等多种消费策略。 5. **RocketMQ部署与配置**:安装RocketMQ涉及NameServer、Broker、Producer和Consumer的配置,包括启动脚本、配置文件修改、环境变量设置等步骤。...

    rocketmq源码.zip

    5. 高效的刷盘策略:RocketMQ采用异步刷盘和顺序写磁盘技术,保证了高吞吐量和低延迟。 三、RocketMQ的实现机制 1. 消息确认:Pull Consumer在拉取消息后需要向Broker发送确认,而Push Consumer则由Broker在消息...

    RocketMQ 3.2.6

    11. **Java API**:RocketMQ 3.2.6 版本提供了 Java 开发接口,开发者可以方便地在 Java 应用中集成 RocketMQ,实现消息的发送和消费。 学习RocketMQ 3.2.6,你需要理解以上核心概念,并通过实践操作加深理解。同时...

    rocketmq安装包,rocketmq-all-5.1.3-bin-release

    13. **消息顺序性**:RocketMQ通过设置消息队列的分配策略,可以保证在同一队列内的消息顺序消费,适用于需要消息顺序性的场景。 通过"rocketmq-all-5.1.3-bin-release"这个压缩包,你可以得到完整的RocketMQ运行...

    rocketmq-master

    6. 消息顺序:对于需要顺序处理的场景,RocketMQ提供有序消息服务,确保消息的顺序消费。 7. 灵活的调度策略:支持多种调度策略,如定时消息、延时消息和死信队列,满足不同业务需求。 三、RocketMQ的应用场景 1....

Global site tag (gtag.js) - Google Analytics