`

RocketMQ最佳实践(三)开发spring-boot-starter-rocketmq实现与spring boot项目的整合

阅读更多
最近在使用spring boot/spring cloud搭建做微服务架构,发现spring boot官方提供的starter中居然没有集成RocketMQ惊讶word天,顿时激发我的创作基情啊有木有大笑
上面这张截图来自spring boot官方文档,为啥官方提供了JMS、AMQP和Kafka却偏偏少了RocketMQ呢,我认为是因为目前RocketMQ在国外并不普及,而且才捐献给apache不久,需要一段时间,那么如此看来,写一个spring-boot-starter-rocketmq还是比较有意义的。
 
but,本人水平毕竟有限,写的东西自然没法和spring相比,这个版本的starter参考了JMS的starter来封装,虽然不够尽善尽美,但还是极具实用价值的微笑

编写spring-boot-starter-rocketmq

 

 

创建一个Maven项目名字就叫spring-boot-starter-rocketmq,其pom.xml文件内容如下:

 

[html] view plain copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  4.   
  5.     <groupId>com.bqjr</groupId>  
  6.     <version>0.0.1-SNAPSHOT</version>  
  7.     <name>spring-boot-starter-rocketmq</name>  
  8.     <description>Starter for using RocketMQ</description>  
  9.   
  10.     <parent>  
  11.         <groupId>org.springframework.boot</groupId>  
  12.         <artifactId>spring-boot-starter-parent</artifactId>  
  13.         <version>1.5.3.RELEASE</version>  
  14.         <relativePath/>  
  15.     </parent>  
  16.     <properties>  
  17.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  18.         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  
  19.         <java.version>1.8</java.version>  
  20.         <rocketmq.version>4.0.0-incubating</rocketmq.version>  
  21.     </properties>  
  22.       
  23.     <modelVersion>4.0.0</modelVersion>  
  24.     <artifactId>spring-boot-starter-rocketmq</artifactId>  
  25.   
  26.   
  27.     <dependencies>  
  28.         <dependency>  
  29.             <groupId>org.springframework.boot</groupId>  
  30.             <artifactId>spring-boot-starter</artifactId>  
  31.         </dependency>  
  32.         <!-- RocketMq客户端相关依赖 -->  
  33.         <dependency>  
  34.             <groupId>org.apache.rocketmq</groupId>  
  35.             <artifactId>rocketmq-client</artifactId>  
  36.             <version>${rocketmq.version}</version>  
  37.         </dependency>  
  38.         <dependency>  
  39.             <groupId>org.apache.rocketmq</groupId>  
  40.             <artifactId>rocketmq-common</artifactId>  
  41.             <version>${rocketmq.version}</version>  
  42.         </dependency>  
  43.           
  44.         <dependency>  
  45.             <groupId>org.projectlombok</groupId>  
  46.             <artifactId>lombok</artifactId>  
  47.             <version>1.16.10</version><!--$NO-MVN-MAN-VER$-->  
  48.         </dependency>  
  49.     </dependencies>  
  50.   
  51.   
  52. </project>  

编写配置类RocketmqProperties,这个类的属性对应application.properties文件中的配置项,目前只提供核心的一些配置支持,其他性能优化方面的配置参数可自行扩展

 

 

[java] view plain copy
 
  1. /** 
  2.  * @author jiangjb 
  3.  */  
  4. @Data  
  5. @ConfigurationProperties(PREFIX)  
  6. public class RocketmqProperties {  
  7.   
  8.     public static final String PREFIX = "spring.extend.rocketmq";  
  9.   
  10.     private String namesrvAddr;  
  11.     private String instanceName;  
  12.     private String clientIP;  
  13.     private ProducerConfig producer;  
  14.     private ConsumerConfig consumer;  
  15.       
  16. }  

编写配置解析类RocketmqAutoConfiguration,这个类主要初始化了三个Bean:defaultProducer用来发送普通消息、transactionProducer用来发送事务消息以及pushConsumer用来接收订阅的所有topic下的消息,并派发给不同的tag的消费者。

[java] view plain copy
 
  1. /** 
  2.  * @author jiangjb 
  3.  */  
  4. @Configuration  
  5. @EnableConfigurationProperties(RocketmqProperties.class)  
  6. @ConditionalOnProperty(prefix = PREFIX, value = "namesrvAddr")  
  7. public class RocketmqAutoConfiguration {  
  8.       
  9.     @Autowired  
  10.     private RocketmqProperties properties;  
  11.       
  12.     @Value("${spring.application.name}")  
  13.     private String producerGroupName;  
  14.       
  15.     @Value("${spring.application.name}")  
  16.     private String consumerGroupName;  
  17.       
  18.     @Autowired  
  19.     private ApplicationEventPublisher publisher;  
  20.     /** 
  21.      * 初始化向rocketmq发送普通消息的生产者 
  22.      */  
  23.     @Bean  
  24.     @ConditionalOnProperty(prefix = PREFIX, value = "producer.instanceName")  
  25.     public DefaultMQProducer defaultProducer() throws MQClientException{  
  26.         /** 
  27.          * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  28.          * 注意:ProducerGroupName需要由应用来保证唯一<br> 
  29.          * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, 
  30.          * 因为服务器会回查这个Group下的任意一个Producer 
  31.          */  
  32.         DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);  
  33.         producer.setNamesrvAddr(properties.getNamesrvAddr());  
  34.         producer.setInstanceName(properties.getProducer().getInstanceName());  
  35.         producer.setVipChannelEnabled(false);  
  36.   
  37.         /** 
  38.          * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  39.          * 注意:切记不可以在每次发送消息时,都调用start方法 
  40.          */  
  41.         producer.start();  
  42.         System.out.println("RocketMq defaultProducer Started.");  
  43.         return producer;  
  44.     }  
  45.       
  46.     /** 
  47.      * 初始化向rocketmq发送事务消息的生产者 
  48.      */  
  49.     @Bean  
  50.     @ConditionalOnProperty(prefix = PREFIX, value = "producer.tranInstanceName")  
  51.     public TransactionMQProducer transactionProducer() throws MQClientException{  
  52.         /** 
  53.          * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  54.          * 注意:ProducerGroupName需要由应用来保证唯一<br> 
  55.          * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, 
  56.          * 因为服务器会回查这个Group下的任意一个Producer 
  57.          */  
  58.         TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroupName");  
  59.         producer.setNamesrvAddr(properties.getNamesrvAddr());  
  60.         producer.setInstanceName(properties.getProducer().getTranInstanceName());  
  61.           
  62.         // 事务回查最小并发数  
  63.         producer.setCheckThreadPoolMinSize(2);  
  64.         // 事务回查最大并发数  
  65.         producer.setCheckThreadPoolMaxSize(2);  
  66.         // 队列数  
  67.         producer.setCheckRequestHoldMax(2000);  
  68.           
  69.         //TODO 由于社区版本的服务器阉割调了消息回查的功能,所以这个地方没有意义  
  70.         //TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
  71.         //producer.setTransactionCheckListener(transactionCheckListener);  
  72.           
  73.         /** 
  74.          * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  75.          * 注意:切记不可以在每次发送消息时,都调用start方法 
  76.          */  
  77.         producer.start();  
  78.           
  79.         System.out.println("RocketMq TransactionMQProducer Started.");  
  80.         return producer;  
  81.     }  
  82.     /** 
  83.      * 初始化rocketmq消息监听方式的消费者 
  84.      */  
  85.     @Bean  
  86.     @ConditionalOnProperty(prefix = PREFIX, value = "consumer.instanceName")  
  87.     public DefaultMQPushConsumer pushConsumer() throws MQClientException{  
  88.         /** 
  89.          * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  90.          * 注意:ConsumerGroupName需要由应用来保证唯一 
  91.          */  
  92.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);  
  93.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  94.         consumer.setNamesrvAddr(properties.getNamesrvAddr());  
  95.         consumer.setInstanceName(properties.getConsumer().getInstanceName());  
  96.         consumer.setConsumeMessageBatchMaxSize(1);//设置批量消费,以提升消费吞吐量,默认是1  
  97.           
  98.           
  99.         /** 
  100.          * 订阅指定topic下tags 
  101.          */  
  102.         List<String> subscribeList = properties.getConsumer().getSubscribe();  
  103.         for (String sunscribe : subscribeList) {  
  104.             consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);  
  105.         }  
  106.           
  107.          consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {  
  108.   
  109.             MessageExt msg = msgs.get(0);  
  110.               
  111.             try {  
  112.                 //默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息  
  113.                 System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());  
  114.                 //发布消息到达的事件,以便分发到每个tag的监听方法  
  115.                 this.publisher.publishEvent(new RocketmqEvent(msg,consumer));   
  116.                 System.out.println("消息到达事件已经发布成功!");  
  117.             } catch (Exception e) {  
  118.                 e.printStackTrace();  
  119.                 if(msg.getReconsumeTimes()<=3){//重复消费3次  
  120.                     //TODO 进行日志记录  
  121.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
  122.                 } else {  
  123.                     //TODO 消息消费失败,进行日志记录  
  124.                 }  
  125.             }  
  126.               
  127.             //如果没有return success,consumer会重复消费此信息,直到success。  
  128.             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  129.         });  
  130.   
  131.      new Thread(new Runnable() {  
  132.         @Override  
  133.         public void run() {  
  134.             try {  
  135.                 Thread.sleep(5000);//延迟5秒再启动,主要是等待spring事件监听相关程序初始化完成,否则,回出现对RocketMQ的消息进行消费后立即发布消息到达的事件,然而此事件的监听程序还未初始化,从而造成消息的丢失  
  136.                 /** 
  137.                  * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  138.                  */  
  139.                 try {  
  140.                     consumer.start();  
  141.                 } catch (Exception e) {  
  142.                     System.out.println("RocketMq pushConsumer Start failure!!!.");  
  143.                     e.printStackTrace();  
  144.                 }  
  145.                   
  146.                 System.out.println("RocketMq pushConsumer Started.");  
  147.   
  148.             } catch (InterruptedException e) {  
  149.                 e.printStackTrace();  
  150.             }  
  151.         }  
  152.   
  153.         }).start();  
  154.           
  155.         return consumer;  
  156.     }  
  157.   
  158. }  

编写基于spring事件传播机制的事件类RocketmqEvent,用来定义上面的consumer接收到消息后的发布的事件。

 

 

[java] view plain copy
 
  1. /** 
  2.  *  
  3.  * @author jiangjb 
  4.  * 
  5.  */  
  6. @Data  
  7. @EqualsAndHashCode(callSuper=false)  
  8. public class RocketmqEvent extends ApplicationEvent{  
  9.     private static final long serialVersionUID = -4468405250074063206L;  
  10.       
  11.     private DefaultMQPushConsumer consumer;  
  12.     private MessageExt messageExt;  
  13.     private String topic;  
  14.     private String tag;  
  15.     private byte[] body;  
  16.       
  17.     public RocketmqEvent(MessageExt msg,DefaultMQPushConsumer consumer) throws Exception {  
  18.         super(msg);  
  19.         this.topic = msg.getTopic();  
  20.         this.tag = msg.getTags();  
  21.         this.body = msg.getBody();  
  22.         this.consumer = consumer;  
  23.         this.messageExt = msg;  
  24.     }  
  25.   
  26.     public String getMsg() {  
  27.         try {  
  28.             return new String(this.body,"utf-8");  
  29.         } catch (UnsupportedEncodingException e) {  
  30.             return null;  
  31.         }  
  32.     }  
  33.       
  34.     public String getMsg(String code) {  
  35.         try {  
  36.             return new String(this.body,code);  
  37.         } catch (UnsupportedEncodingException e) {  
  38.             return null;  
  39.         }  
  40.     }  
  41.       
  42. }  

然后运行maven的编译、打包

 

编写测试项目rocketmq-starter-test

pom.xml中加入上面的starter的依赖
[html] view plain copy
 
  1. <dependency>  
  2.     <groupId>com.bqjr</groupId>  
  3.     <artifactId>spring-boot-starter-rocketmq</artifactId>  
  4.     <version>0.0.1-SNAPSHOT</version>  
  5. </dependency>  
发送消息测试类producerDemo
[java] view plain copy
 
  1. /** 
  2.  *  
  3.  * @author jiangjb 
  4.  * 
  5.  */  
  6. @RestController  
  7. public class producerDemo {  
  8.       
  9.     @Autowired  
  10.     private DefaultMQProducer defaultProducer;  
  11.       
  12.     @Autowired  
  13.     private TransactionMQProducer transactionProducer;  
  14.       
  15.     @Value("${spring.extend.rocketmq.producer.topic}")  
  16.     private String producerTopic;  
  17.       
  18.     @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)  
  19.     public void sendMsg() {  
  20.          Message msg = new Message(producerTopic,// topic  
  21.                  "TagA",// tag  
  22.                  "OrderID001",// key  
  23.                  ("Hello jyqlove333").getBytes());// body  
  24.          try {  
  25.             defaultProducer.send(msg,new SendCallback(){  
  26.                   
  27.                 @Override  
  28.                 public void onSuccess(SendResult sendResult) {  
  29.                      System.out.println(sendResult);  
  30.                      //TODO 发送成功处理  
  31.                 }  
  32.                   
  33.                 @Override  
  34.                 public void onException(Throwable e) {  
  35.                      System.out.println(e);  
  36.                     //TODO 发送失败处理  
  37.                 }  
  38.             });  
  39.         } catch (Exception e) {  
  40.             e.printStackTrace();  
  41.         }  
  42.     }  
  43.       
  44.     @RequestMapping(value = "/sendTransactionMsg", method = RequestMethod.GET)  
  45.     public String sendTransactionMsg() {  
  46.         SendResult sendResult = null;  
  47.         try {  
  48.             //构造消息  
  49.             Message msg = new Message(producerTopic,// topic  
  50.                     "TagA",// tag  
  51.                     "OrderID001",// key  
  52.                     ("Hello jyqlove333").getBytes());// body  
  53.               
  54.             //发送事务消息,LocalTransactionExecute的executeLocalTransactionBranch方法中执行本地逻辑  
  55.             sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1,Object arg) -> {  
  56.                 int value = 1;  
  57.                   
  58.                 //TODO 执行本地事务,改变value的值  
  59.                 //===================================================  
  60.                 System.out.println("执行本地事务。。。完成");  
  61.                 if(arg instanceof Integer){  
  62.                     value = (Integer)arg;  
  63.                 }  
  64.                 //===================================================  
  65.                   
  66.                 if (value == 0) {  
  67.                     throw new RuntimeException("Could not find db");  
  68.                 } else if ((value % 5) == 0) {  
  69.                     return LocalTransactionState.ROLLBACK_MESSAGE;  
  70.                 } else if ((value % 4) == 0) {  
  71.                     return LocalTransactionState.COMMIT_MESSAGE;  
  72.                 }  
  73.                 return LocalTransactionState.ROLLBACK_MESSAGE;  
  74.             }, 4);  
  75.             System.out.println(sendResult);  
  76.         } catch (Exception e) {  
  77.             e.printStackTrace();  
  78.         }  
  79.         return sendResult.toString();  
  80.     }  
  81. }  

消费消息测试类consumerDemo
[java] view plain copy
 
  1. /** 
  2.  *  
  3.  * @author jiangjb 
  4.  * 
  5.  */  
  6. @Component  
  7. public class consumerDemo {  
  8.       
  9.     @EventListener(condition = "#event.topic=='TopicTest1' && #event.tag=='TagA'")  
  10.     public void rocketmqMsgListen(RocketmqEvent event) {  
  11.         DefaultMQPushConsumer consumer = event.getConsumer();  
  12.         try {  
  13.             System.out.println("com.bqjr.consumerDemo监听到一个消息达到:" + event.getMsg("gbk"));  
  14.             //TODO 进行业务处理  
  15.         } catch (Exception e) {  
  16.             if(event.getMessageExt().getReconsumeTimes()<=3){//重复消费3次  
  17.                 try {  
  18.                     consumer.sendMessageBack(event.getMessageExt(), 2);  
  19.                 } catch (Exception e1) {  
  20.                     //TODO 消息消费失败,进行日志记录  
  21.                 }  
  22.             } else {  
  23.                 //TODO 消息消费失败,进行日志记录  
  24.                   
  25.             }  
  26.         }  
  27.     }  
  28. }  

来,测试一把

在浏览器中访问:http://10.89.0.144:12306/sendMsg,控制台输出如下:

再测试一下消费者,在RocketMQ控制台(RocketMQ控制台的介绍放到下一篇吧微笑)发送一条消息
查看控制台打印的消费日志
恭喜你,成功了。大笑


补充说明:

 

        本来想自定义一个叫RocketmqListener的注解来实现消息的监听的,花了大量时间去阅读和研究了spring关于EventListener注解和JmsListener注解的实现,发现目前我并不能很好的理解和掌控其设计思路,想以瓢画葫最终也没能实现,迫于五一节来临,只能使用EventListener注解代替,不过发现其实也不错。

 

http://blog.csdn.net/jayjjb/article/details/70906511

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics