最近在使用spring boot/spring cloud搭建做微服务架构,发现spring boot官方提供的starter中居然没有集成RocketMQword天,顿时激发我的创作基情啊有木有
上面这张截图来自spring boot官方文档,为啥官方提供了JMS、AMQP和Kafka却偏偏少了RocketMQ呢,我认为是因为目前RocketMQ在国外并不普及,而且才捐献给apache不久,需要一段时间,那么如此看来,写一个spring-boot-starter-rocketmq还是比较有意义的。
but,本人水平毕竟有限,写的东西自然没法和spring相比,这个版本的starter参考了JMS的starter来封装,虽然不够尽善尽美,但还是极具实用价值的
编写spring-boot-starter-rocketmq
创建一个Maven项目名字就叫spring-boot-starter-rocketmq,其pom.xml文件内容如下:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <groupId>com.bqjr</groupId>
- <version>0.0.1-SNAPSHOT</version>
- <name>spring-boot-starter-rocketmq</name>
- <description>Starter for using RocketMQ</description>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.3.RELEASE</version>
- <relativePath/>
- </parent>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- <rocketmq.version>4.0.0-incubating</rocketmq.version>
- </properties>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>spring-boot-starter-rocketmq</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <!-- RocketMq客户端相关依赖 -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-common</artifactId>
- <version>${rocketmq.version}</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.16.10</version><!--$NO-MVN-MAN-VER$-->
- </dependency>
- </dependencies>
- </project>
编写配置类RocketmqProperties,这个类的属性对应application.properties文件中的配置项,目前只提供核心的一些配置支持,其他性能优化方面的配置参数可自行扩展
- /**
- * @author jiangjb
- */
- @Data
- @ConfigurationProperties(PREFIX)
- public class RocketmqProperties {
- public static final String PREFIX = "spring.extend.rocketmq";
- private String namesrvAddr;
- private String instanceName;
- private String clientIP;
- private ProducerConfig producer;
- private ConsumerConfig consumer;
- }
编写配置解析类RocketmqAutoConfiguration,这个类主要初始化了三个Bean:defaultProducer用来发送普通消息、transactionProducer用来发送事务消息以及pushConsumer用来接收订阅的所有topic下的消息,并派发给不同的tag的消费者。
- /**
- * @author jiangjb
- */
- @Configuration
- @EnableConfigurationProperties(RocketmqProperties.class)
- @ConditionalOnProperty(prefix = PREFIX, value = "namesrvAddr")
- public class RocketmqAutoConfiguration {
- @Autowired
- private RocketmqProperties properties;
- @Value("${spring.application.name}")
- private String producerGroupName;
- @Value("${spring.application.name}")
- private String consumerGroupName;
- @Autowired
- private ApplicationEventPublisher publisher;
- /**
- * 初始化向rocketmq发送普通消息的生产者
- */
- @Bean
- @ConditionalOnProperty(prefix = PREFIX, value = "producer.instanceName")
- public DefaultMQProducer defaultProducer() throws MQClientException{
- /**
- * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
- * 注意:ProducerGroupName需要由应用来保证唯一<br>
- * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
- * 因为服务器会回查这个Group下的任意一个Producer
- */
- DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);
- producer.setNamesrvAddr(properties.getNamesrvAddr());
- producer.setInstanceName(properties.getProducer().getInstanceName());
- producer.setVipChannelEnabled(false);
- /**
- * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
- * 注意:切记不可以在每次发送消息时,都调用start方法
- */
- producer.start();
- System.out.println("RocketMq defaultProducer Started.");
- return producer;
- }
- /**
- * 初始化向rocketmq发送事务消息的生产者
- */
- @Bean
- @ConditionalOnProperty(prefix = PREFIX, value = "producer.tranInstanceName")
- public TransactionMQProducer transactionProducer() throws MQClientException{
- /**
- * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
- * 注意:ProducerGroupName需要由应用来保证唯一<br>
- * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
- * 因为服务器会回查这个Group下的任意一个Producer
- */
- TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroupName");
- producer.setNamesrvAddr(properties.getNamesrvAddr());
- producer.setInstanceName(properties.getProducer().getTranInstanceName());
- // 事务回查最小并发数
- producer.setCheckThreadPoolMinSize(2);
- // 事务回查最大并发数
- producer.setCheckThreadPoolMaxSize(2);
- // 队列数
- producer.setCheckRequestHoldMax(2000);
- //TODO 由于社区版本的服务器阉割调了消息回查的功能,所以这个地方没有意义
- //TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
- //producer.setTransactionCheckListener(transactionCheckListener);
- /**
- * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
- * 注意:切记不可以在每次发送消息时,都调用start方法
- */
- producer.start();
- System.out.println("RocketMq TransactionMQProducer Started.");
- return producer;
- }
- /**
- * 初始化rocketmq消息监听方式的消费者
- */
- @Bean
- @ConditionalOnProperty(prefix = PREFIX, value = "consumer.instanceName")
- public DefaultMQPushConsumer pushConsumer() throws MQClientException{
- /**
- * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
- * 注意:ConsumerGroupName需要由应用来保证唯一
- */
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.setNamesrvAddr(properties.getNamesrvAddr());
- consumer.setInstanceName(properties.getConsumer().getInstanceName());
- consumer.setConsumeMessageBatchMaxSize(1);//设置批量消费,以提升消费吞吐量,默认是1
- /**
- * 订阅指定topic下tags
- */
- List<String> subscribeList = properties.getConsumer().getSubscribe();
- for (String sunscribe : subscribeList) {
- consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);
- }
- consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
- MessageExt msg = msgs.get(0);
- try {
- //默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());
- //发布消息到达的事件,以便分发到每个tag的监听方法
- this.publisher.publishEvent(new RocketmqEvent(msg,consumer));
- System.out.println("消息到达事件已经发布成功!");
- } catch (Exception e) {
- e.printStackTrace();
- if(msg.getReconsumeTimes()<=3){//重复消费3次
- //TODO 进行日志记录
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- } else {
- //TODO 消息消费失败,进行日志记录
- }
- }
- //如果没有return success,consumer会重复消费此信息,直到success。
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(5000);//延迟5秒再启动,主要是等待spring事件监听相关程序初始化完成,否则,回出现对RocketMQ的消息进行消费后立即发布消息到达的事件,然而此事件的监听程序还未初始化,从而造成消息的丢失
- /**
- * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
- */
- try {
- consumer.start();
- } catch (Exception e) {
- System.out.println("RocketMq pushConsumer Start failure!!!.");
- e.printStackTrace();
- }
- System.out.println("RocketMq pushConsumer Started.");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- return consumer;
- }
- }
编写基于spring事件传播机制的事件类RocketmqEvent,用来定义上面的consumer接收到消息后的发布的事件。
- /**
- *
- * @author jiangjb
- *
- */
- @Data
- @EqualsAndHashCode(callSuper=false)
- public class RocketmqEvent extends ApplicationEvent{
- private static final long serialVersionUID = -4468405250074063206L;
- private DefaultMQPushConsumer consumer;
- private MessageExt messageExt;
- private String topic;
- private String tag;
- private byte[] body;
- public RocketmqEvent(MessageExt msg,DefaultMQPushConsumer consumer) throws Exception {
- super(msg);
- this.topic = msg.getTopic();
- this.tag = msg.getTags();
- this.body = msg.getBody();
- this.consumer = consumer;
- this.messageExt = msg;
- }
- public String getMsg() {
- try {
- return new String(this.body,"utf-8");
- } catch (UnsupportedEncodingException e) {
- return null;
- }
- }
- public String getMsg(String code) {
- try {
- return new String(this.body,code);
- } catch (UnsupportedEncodingException e) {
- return null;
- }
- }
- }
然后运行maven的编译、打包
编写测试项目rocketmq-starter-test
pom.xml中加入上面的starter的依赖
- <dependency>
- <groupId>com.bqjr</groupId>
- <artifactId>spring-boot-starter-rocketmq</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- /**
- *
- * @author jiangjb
- *
- */
- @RestController
- public class producerDemo {
- @Autowired
- private DefaultMQProducer defaultProducer;
- @Autowired
- private TransactionMQProducer transactionProducer;
- @Value("${spring.extend.rocketmq.producer.topic}")
- private String producerTopic;
- @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
- public void sendMsg() {
- Message msg = new Message(producerTopic,// topic
- "TagA",// tag
- "OrderID001",// key
- ("Hello jyqlove333").getBytes());// body
- try {
- defaultProducer.send(msg,new SendCallback(){
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println(sendResult);
- //TODO 发送成功处理
- }
- @Override
- public void onException(Throwable e) {
- System.out.println(e);
- //TODO 发送失败处理
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- @RequestMapping(value = "/sendTransactionMsg", method = RequestMethod.GET)
- public String sendTransactionMsg() {
- SendResult sendResult = null;
- try {
- //构造消息
- Message msg = new Message(producerTopic,// topic
- "TagA",// tag
- "OrderID001",// key
- ("Hello jyqlove333").getBytes());// body
- //发送事务消息,LocalTransactionExecute的executeLocalTransactionBranch方法中执行本地逻辑
- sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1,Object arg) -> {
- int value = 1;
- //TODO 执行本地事务,改变value的值
- //===================================================
- System.out.println("执行本地事务。。。完成");
- if(arg instanceof Integer){
- value = (Integer)arg;
- }
- //===================================================
- if (value == 0) {
- throw new RuntimeException("Could not find db");
- } else if ((value % 5) == 0) {
- return LocalTransactionState.ROLLBACK_MESSAGE;
- } else if ((value % 4) == 0) {
- return LocalTransactionState.COMMIT_MESSAGE;
- }
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }, 4);
- System.out.println(sendResult);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return sendResult.toString();
- }
- }
消费消息测试类consumerDemo
- /**
- *
- * @author jiangjb
- *
- */
- @Component
- public class consumerDemo {
- @EventListener(condition = "#event.topic=='TopicTest1' && #event.tag=='TagA'")
- public void rocketmqMsgListen(RocketmqEvent event) {
- DefaultMQPushConsumer consumer = event.getConsumer();
- try {
- System.out.println("com.bqjr.consumerDemo监听到一个消息达到:" + event.getMsg("gbk"));
- //TODO 进行业务处理
- } catch (Exception e) {
- if(event.getMessageExt().getReconsumeTimes()<=3){//重复消费3次
- try {
- consumer.sendMessageBack(event.getMessageExt(), 2);
- } catch (Exception e1) {
- //TODO 消息消费失败,进行日志记录
- }
- } else {
- //TODO 消息消费失败,进行日志记录
- }
- }
- }
- }
来,测试一把
在浏览器中访问:http://10.89.0.144:12306/sendMsg,控制台输出如下:再测试一下消费者,在RocketMQ控制台(RocketMQ控制台的介绍放到下一篇吧)发送一条消息
查看控制台打印的消费日志
恭喜你,成功了。
补充说明:
本来想自定义一个叫RocketmqListener的注解来实现消息的监听的,花了大量时间去阅读和研究了spring关于EventListener注解和JmsListener注解的实现,发现目前我并不能很好的理解和掌控其设计思路,想以瓢画葫最终也没能实现,迫于五一节来临,只能使用EventListener注解代替,不过发现其实也不错。
http://blog.csdn.net/jayjjb/article/details/70906511
相关推荐
赠送jar包:dynamic-datasource-spring-boot-starter-3.4.1.jar; 赠送原API文档:dynamic-datasource-spring-boot-starter-3.4.1-javadoc.jar; 赠送源代码:dynamic-datasource-spring-boot-starter-3.4.1-sources...
赠送jar包:seata-spring-boot-starter-1.3.0.jar; 赠送原API文档:seata-spring-boot-starter-1.3.0-javadoc.jar; 赠送源代码:seata-spring-boot-starter-1.3.0-sources.jar; 赠送Maven依赖信息文件:seata-...
赠送jar包:aliyun-sms-spring-boot-starter-2.0.2.jar 赠送原API文档:aliyun-sms-spring-boot-starter-2.0.2-javadoc.jar 赠送源代码:aliyun-sms-spring-boot-starter-2.0.2-sources.jar 包含翻译后的API文档...
赠送jar包:druid-spring-boot-starter-1.1.9.jar; 赠送原API文档:druid-spring-boot-starter-1.1.9-javadoc.jar; 赠送源代码:druid-spring-boot-starter-1.1.9-sources.jar; 赠送Maven依赖信息文件:druid-...
《深入解析jasypt-spring-boot-starter 3.0.5依赖的POM与JAR》 在Java开发领域,构建和管理依赖是至关重要的环节。jasypt-spring-boot-starter是一个流行的安全库,它允许开发者在Spring Boot应用中轻松地实现加密...
赠送jar包:dynamic-datasource-spring-boot-starter-3.4.1.jar; 赠送原API文档:dynamic-datasource-spring-boot-starter-3.4.1-javadoc.jar; 赠送源代码:dynamic-datasource-spring-boot-starter-3.4.1-sources...
赠送jar包:druid-spring-boot-starter-1.2.8.jar; 赠送原API文档:druid-spring-boot-starter-1.2.8-javadoc.jar; 赠送源代码:druid-spring-boot-starter-1.2.8-sources.jar; 赠送Maven依赖信息文件:druid-...
rocketmq-spring-boot-starter 阿里云RocketMQSpring图书版支持功能: 发送普通消息的三种模式:同步,异步和单向 订阅消息群集,广播 发送和接收顺序消息 交易讯息 延迟讯息 接收和接收定时消息定时消息和延迟消息...
赠送jar包:oss-spring-boot-starter-1.0.3.jar 赠送原API文档:oss-spring-boot-starter-1.0.3-javadoc.jar 赠送源代码:oss-spring-boot-starter-1.0.3-sources.jar 包含翻译后的API文档:oss-spring-boot-...
赠送jar包:druid-spring-boot-starter-1.1.10.jar; 赠送原API文档:druid-spring-boot-starter-1.1.10-javadoc.jar; 赠送源代码:druid-spring-boot-starter-1.1.10-sources.jar; 赠送Maven依赖信息文件:druid-...
赠送jar包:oss-spring-boot-starter-1.0.3.jar; 赠送原API文档:oss-spring-boot-starter-1.0.3-javadoc.jar; 赠送源代码:oss-spring-boot-starter-1.0.3-sources.jar; 赠送Maven依赖信息文件:oss-spring-boot...
赠送jar包:aliyun-sms-spring-boot-starter-2.0.2.jar; 赠送原API文档:aliyun-sms-spring-boot-starter-2.0.2-javadoc.jar; 赠送源代码:aliyun-sms-spring-boot-starter-2.0.2-sources.jar; 赠送Maven依赖信息...
赠送jar包:druid-spring-boot-starter-1.1.9.jar; 赠送原API文档:druid-spring-boot-starter-1.1.9-javadoc.jar; 赠送源代码:druid-spring-boot-starter-1.1.9-sources.jar; 赠送Maven依赖信息文件:druid-...
自己写的一个spring-boot整合rocketmq的starter,以及一个用来测试的项目rocketmq-starter-test。 涉及以下知识的最佳实践: 1、自定义spring boot starter; 2、使用spring的事件传播机制实现bean与bean之间基于...
标题 "spring-boot-starter-parent-1.5.13.RELEASE.zip" 提供的信息表明,这是一个与Spring Boot相关的压缩包,具体来说是Spring Boot的起步依赖(Starter Parent)的一个版本,版本号为1.5.13.RELEASE。Spring Boot...
mybatis-spring-boot-starter-2.1.3.jarmybatis-spring-boot-starter-2.1.3.jarmybatis-spring-boot-starter-2.1.3.jar
druid-spring-boot-starter-1.2.8.jar
mybatis-spring-boot-starter-2.1.4.jarmybatis-spring-boot-starter-2.1.4.jar
`dynamic-datasource-spring-boot-starter`项目正致力于解决这个问题,它是一个基于Spring Boot的启动器,用于实现动态数据源切换,提供灵活的数据源管理功能。 **动态数据源**指的是在一个应用中可以动态地选择...
activiti-spring-boot-starter-basic-6.0.0适配springboot2.1.2