`
chenchangqun
  • 浏览: 55423 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

rocketMq实战(2)-客户端集成

阅读更多

rocketmq的配置和demo搞定后,离上线运用 还有很多问题要解决

如: 怎样集成 到项目中,并做到规范,易用。使用中有哪些问题是需要考虑的,监控运维问题怎么解决。

本文先解决 客户端集成,下面贴出我经过反复试验后的最终代码和配置。

 本文贴出的代码和配置都是经过反复测试和验证,并在实际项目中使用的,目前只使用几个重要的参数,更精细的配置请参考官方文档。

 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  <modelVersion>4.0.0</modelVersion>
  <groupId>rockmqtest</groupId>
  <artifactId>rockmqtest</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  	<properties>
		<spring.version>3.1.1.RELEASE</spring.version>
		<slf4j.version>1.7.13</slf4j.version>
	</properties>
  <dependencies>
    <!-- RocketMQ Java SDK -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.5.8</version>
</dependency>
	<dependency>
	<groupId>org.springframework</groupId>
	 <artifactId>spring-context</artifactId>
	 <version>3.1.1.RELEASE</version>
	</dependency>
	<dependency>
	<groupId>org.springframework</groupId>
	 <artifactId>spring-test</artifactId>
	 <version>3.1.1.RELEASE</version>
	</dependency>
	
<dependency>
    <groupId>ch.ethz.ganymed</groupId>
    <artifactId>ganymed-ssh2</artifactId>
    <version>build209</version>
</dependency>
<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>2.6</version>
</dependency>
<dependency>
    <groupId>com.jcraft</groupId>
    <artifactId>jsch</artifactId>
    <version>0.1.46</version>
</dependency>

 
  	<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
			<scope>test</scope>
		</dependency>
 
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>jcl-over-slf4j</artifactId>
        <version>1.5.8</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.5.8</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.5.8</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.14</version>
    </dependency>
  </dependencies>
 
</project>

 

配置:

 

<!-- 同一个主题共用一个producer 不同主题不共用,防止发送速度快造成的性能问题 groupName= topic+producer-->
<bean id="mqProducer"   class="com.sfbest.rocketmq.online.MQProducer" init-method="start">
<constructor-arg>
<value>FirstGroupName</value>
</constructor-arg>
  <!-- 目标的主题 -->
  <property name="topic" value="TopicTest1"></property>
<property name="namesrvAddr" value="10.103.16.77:9876"></property>
</bean>
 

<!--  rocketMq 消费者配置 默认集群模式消费  -->
<!--  一个topic代表一个大业务 tag代表小业务如updateA,updateB。 同一个topic 共用一个consumer   consumerGroupName=topic+Consumer-->
<bean id="mqPushConsumer" class="com.sfbest.rocketmq.online.MQConsumer" init-method="init">
<!-- 消费者分组名,同一个业务分组名相同,不同则分组名不同 -->
<property name="consumerGroupName" value="firstSpringConsumer"></property>
  <!-- nameserver 地址和端口号 -->
  <property name="namesrvAddr" value="10.103.16.77:9876"></property>
  <!-- 订阅的话题 -->
  <property name="topic" value="TopicTest1"></property>
  <!-- 订阅话题的标记 *号表示所有TAG-->
  <property name="tagExpr"  value="*"></property>
 
	<property name="mqBusinessMap">
		 <map>
		    <entry key="TagA" value-ref="mqBusinessTestA"/>
		    <entry key="TagB" value-ref="mqBusinessTestB"/>
		 </map>
	</property>
</bean>

<!-- 自定义   具体业务实现类-->
<bean id="mqBusinessTestA" class="com.sfbest.rocketmq.online.MQBusinessTestAImpl"></bean>
<bean id="mqBusinessTestB" class="com.sfbest.rocketmq.online.MQBusinessTestBImpl"></bean>

 

 客户端 实现要考虑,扩展性,易用性,性能,规范。并且要根据rocketMq的特性编写,下面就讲讲我的实现。

关键参数可配置,设计命名规范和接口规范。

同一个主题共用一个producer 不同主题不共用,防止发送速度快造成的性能问题。

同一个主题共用一个cousumer 不用的tag实现分发,预留接口给业务实现。

 

 

Producer客户端公共类

 

/**
 * defaultMQProducer装饰器
 * 封装易用的发送方法
 * @author chenchangqun
 *
 */
public class BestMQProducer  extends DefaultMQProducer{
	private String topic;
	
	 private static final Logger LOG = LoggerFactory.getLogger(BestMQConsumer.class);

	public BestMQProducer(String producerGroupName){
		super(producerGroupName);
		//默认参数设置
		
	}
	
	
	public void init(){
		try {
			super.start();
			
		} catch (MQClientException e) {
			LOG.error("rockmq producer start fail",e);
		}
	}
	/**
	 * 发送方法
	 * @author chenchangqun
	 * @param key
	 * @param content
	 * @return
	 */
	public boolean sendMsg(String tag,String key,String content){

	      Message msg=null;  // body
		try {
			msg = new Message(topic, // topic
					tag, // tag
					key, // key
					content.getBytes("UTF-8"));
		} catch (Exception e) {
			LOG.error("create message fail ,cannot send msg ,invoke end 。topic={},tag={},key={},content={}",topic,tag,key,content,e);
			return false;
		}
	      try {
	    	  //没有初始化,则初始化
	    	  if(this.defaultMQProducerImpl.getServiceState()==ServiceState.CREATE_JUST){
	    		  LOG.info("mqProducer should be start");
	    		  this.init();
	    	  }
	    	 LOG.debug("mq send param tag:{},key:{},content:{}",tag,key,content);
			SendResult sendResult = super.send(msg);
			LOG.debug("sendResult:{}",sendResult.getSendStatus().toString());
//			sendResult.getSendStatus()==SendStatus.
//		     System.out.println("sendResult:{}"+sendResult);
			return sendResult.getSendStatus()==SendStatus.SEND_OK;
		} catch (MQClientException e) {
			LOG.error("send fail",e);
		} catch (RemotingException e) {
			LOG.error("send fail",e);
		} catch (MQBrokerException e) {
			LOG.error("send fail",e);
		} catch (InterruptedException e) {
			LOG.error("send fail",e);
		}	  
	      return false;
	}

	public String getTopic() {
		return topic;
	}

	public void setTopic(String topic) {
		this.topic = topic;
	}


 
	 
}

 producer的实现有两点考虑 

 

(1)覆盖 父类的 start,方便以后在启动时加业务逻辑。

(2)提供规范的发送方法,参数 包含 tag,key,content,因为同一个produder的tocpic是确定的。剩下的参数需要传递。可否提供topic参数?我认为不应该,如果以后发送方法多了,直接传递topic会导致代码可读性差,耦合性高。也可能会导致性能问题。

 

Consumer客户端公共类

 

 

/**
 * mq单例工厂类,设置了一些默认参数
 * 
 * @author chenchangqun
 *
 */
public class BaseConsumer {
	private String nameServAddr;
 
   private static DefaultMQPushConsumer consumer ;
   private BaseConsumer() {
   }
   public static DefaultMQPushConsumer getDefaultMQPushConsumer(String consumerName){ 
	   
	   if(consumer==null){
		   consumer = new DefaultMQPushConsumer(consumerName);          
		   consumer.setMessageModel(MessageModel.CLUSTERING);
		   consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
	   }

       return consumer;        
   }
   
   public String getNameServAddr() {
	return nameServAddr;
}
public void setNameServAddr(String nameServAddr) {
	this.nameServAddr = nameServAddr;
}
}

 

 

注意下面的部分

 

consumer.setMessageModel(MessageModel.CLUSTERING);

rocketMQ的重要特性就是支持集群消费,什么是集群消费,就是一个业务有多个实现,我们通常希望 其中一个实例消费成功后,其他实例就不要再消费了。而上述配置就是集群消费。

 

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

先解释个思维误区:如果一个消息被我的应用成功消费,是不是这条消息就没用了,可以干掉或丢弃。 答案:不对,在broker来讲你只是其中一个订阅者,这条消息可能被其他消费者订阅,broker只能记录你的消费进度。

offset是重要的名称,标记消息位置

我怎么知道消费了多少,还有多少消息没消费?答案:broker没统计这个,只能通过offset相减计算,即目前生产的   offset减去last cosumet offset。

 

要从上次消费的位置开始消费,配置如上。

 

Consumer客户端公共类

写道
/**
* mq参数设置和初始化
* 创建监听器,并根据不同的TAG分发
* @author chenchangqun
*
*/
public class BestMQConsumer {
private static final Logger LOG = LoggerFactory.getLogger(BestMQConsumer.class);
private String topic;
private String tagExpr;
private String namesrvAddr;
private Map<String,IMQBusiness> mqBusinessMap;
private String consumerGroupName;
//int count =0;
public void init(){
// 获取消息生产者
DefaultMQPushConsumer consumer = BaseConsumer.getDefaultMQPushConsumer(consumerGroupName);
// 订阅主体
try {
consumer.setNamesrvAddr(namesrvAddr);
consumer.subscribe(topic, tagExpr);
consumer.registerMessageListener( new MessageListenerConcurrently(){
//一次消费多条msg,如果第一条成功 第二条失败 返回 成功 失败?
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for(MessageExt msg:msgs){
if(msg.getTags()!=null){
IMQBusiness mqBusiness= mqBusinessMap.get(msg.getTags());
if(mqBusiness!=null){
if(!mqBusiness.invokeMessage(msg)){
LOG.error("cosume fail return RECONSUME_LATER");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}


}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}
}
);

/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("spring Consumer first Started.");
} catch (MQClientException e) {
LOG.error("cousumner init fail",e);
}
}
public void setTopic(String topic) {
this.topic = topic;
}
public void setTagExpr(String tagExpr) {
this.tagExpr = tagExpr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
}

public void setMqBusinessMap(Map<String, IMQBusiness> mqBusinessMap) {
this.mqBusinessMap = mqBusinessMap;
}


}

 

接口类

 

/**
 *消费MQ的业务接口
 * @author chenchangqun
 *
 */
public interface IMQBusiness {

	public boolean invokeMessage(MessageExt msg);
	
	
}

 

 

接口实现类

 

public class MQBusinessTestAImpl implements IMQBusiness {
	@Override
	public boolean invokeMessage(MessageExt msg) {
		System.out.println(" invoke A rec:"+new String(msg.getBody()));
		return true;
	}
}

 

 

cousumer使用了PUSH的方式,即被动通知。实际上实现是用到了长连接。

上面的cousumer代码 主要有两个重点,一个缺陷

(1)经过测试如果不返回CONSUME_SUCCESS,则会一直重试。

(2)实现了基于tag的分发,方便业务自定义实现。

缺陷

    没有配置每次消费条数,使用默认每次消费一条,如果一次消费多条msg,如果第一条成功 第二条失败 返回 成功 失败?,这种情况还需要根据业务具体情况处理

 

注意看我的附件,里面包含完整的代码和测试类。而电子书中的文档是我费劲心血收集的,应该是目前比较好的rocketmq文档和学习资料。

 

 

 

 

 

分享到:
评论

相关推荐

    java -RocketMQ实战视频教程(上下全集)

    根据提供的文件信息,我们可以从以下几个方面来探讨与“java -RocketMQ实战视频教程(上下全集)”相关的知识点: ### RocketMQ简介 RocketMQ是由阿里巴巴开源的一款分布式消息中间件,它具有高性能、低延迟的特点...

    Spring Boot优雅使用RocketMQ的方法实例

    主要给大家介绍了关于Spring Boot优雅使用RocketMQ的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Spring Boot具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧

    RocketMQ消息队列资料

    4. **轻量级框架**:RocketMQ的客户端库小巧,易于集成到各种应用中,同时提供了丰富的API,开发者可以快速上手并实现复杂的消息处理逻辑。 5. **灵活性**:RocketMQ支持多种部署模式,如集群模式、广播模式等,...

    kafka开发和rocketmq消息技术文档

    通过阅读《Kafka 权威指南》和《RocketMQ 实战与原理解析》这两本书,你可以深入理解这两个消息中间件的架构、配置、最佳实践以及如何在实际项目中应用它们。对于分布式消息系统的设计和实现,这两份文档将提供宝贵...

    xmljava系统源码-spring-boot-demo:springbootdemo是一个用来深度学习并实战springboot的项目,该项

    xml java系统源码 spring-boot-demo spring ...OAuth2)、websocket(服务端向客户端推送消息)。 开发计划 网上部分项目demo太过基本,只适合平时学习demo,无法实际用到生产环境中。故开出次项目,希

    时下超火的JAVA电商支付实战课程 从任务系统接口开发到优化 电商支付前后端业务融合

    ### 时下超火的JAVA电商支付实战课程:从任务系统接口开发到优化——电商支付前后端业务融合 #### 概述 随着电子商务行业的快速发展,支付系统的稳定性和效率成为了电商平台能否成功的关键因素之一。本课程将围绕...

    SpringBoot+nacos+websocket+redis+mysql+mybatis-plus微服务项目实战

    - WebSocket是一种在客户端和服务器之间建立长连接的协议,允许双向通信,极大地优化了实时数据传输效率。 - 在微服务中,WebSocket常用于实现聊天室、实时推送通知等场景,提高用户体验。 4. **Redis**: - ...

    疯狂Spring Cloud微服务架构实战视频教程

    - **Spring Cloud Alibaba**:集合了阿里云提供的多种微服务组件,包括Nacos、Sentinel、RocketMQ等。 ### 实战经验分享 - **实战案例**:本教程通过一个完整的电商项目来演示Spring Cloud微服务架构的构建过程,...

    微服务架构实战指南: 构建与治理高可用微服务系统

    ### 微服务架构实战指南:构建与治理高可用微服务系统 #### 一、系统架构演变及微服务架构概述 ##### 1.1 系统架构演变 随着互联网技术的不断发展,网站应用规模不断扩大,系统架构也随之演变。从最初的单体应用...

    《Spring Cloud Alibaba微服务原理与实战》中的的案-Spring-Cloud-Alibaba-.zip

    《Spring Cloud Alibaba 微服务原理与实战》是一本深度探讨Spring Cloud Alibaba的书籍,它旨在帮助开发者理解并掌握微服务架构的关键技术和实践方法。在这个压缩包中,包含的是该书的示例代码——Spring-Cloud-...

    HSF服务框架共28页.pdf.zip

    - HSF与其他阿里巴巴开源项目的集成,如Dubbo、RocketMQ等 - 实战案例,展示HSF在实际项目中的应用和优化技巧 通过对HSF服务框架的深入理解和实践,开发者能够构建出可扩展、高性能且易于维护的分布式服务系统,为...

    java毕业设计&课设-微服务气相实战(视频+源码).doc

    - **Spring Cloud Alibaba**:Spring Cloud和阿里巴巴中间件的集成,它为我们提供了使用Distributed Application Service Bus、Nacos、Sentinel、Seata、RocketMQ等组件的能力。 #### 知识点二:项目视频+源码+资料...

    spring-boot示例项目

    security-oauth2-credentials|[oauth2实现密码模式、客户端模式](https://github.com/smltq/spring-boot-demo/blob/master/security-oauth2-credentials/README.md) security-oauth2-auth-code|[基于spring boot...

    互联网架构Springboot优惠券实战1

    在本课程“互联网架构Springboot优惠券实战1”中,我们将深入探讨如何使用Spring Boot构建一个互联网应用,特别是关注优惠券功能的实现。Spring Boot是一个快速开发框架,它简化了基于Spring的应用程序创建过程,...

    从 0 开始带你成为消息中间件实战高手.txt

    - **多语言支持**:提供了多种编程语言的客户端库,便于不同开发环境下的集成。 #### 2.2 Kafka Kafka 是一款分布式的流处理平台,主要特点包括: - **高吞吐量**:能够处理每秒数十万条消息的发布和订阅。 - **...

    2023 SpringCloudAlibaba分布式微服务 物流系统 实战视频教程 下载 因为太大存百度云盘2.zip

    《2023 SpringCloudAlibaba分布式微服务 物流系统 实战视频教程 下载 因为太大存百度云盘2.zip》这个压缩包文件包含了关于SpringCloud Alibaba分布式微服务和物流系统的实战教学资源。虽然具体的目录信息没有完全...

    java毕业设计&课设-NettySpringboot仿某信聊天全栈实战 (视频+源码).doc

    Netty是一款高性能的异步事件驱动网络应用框架,主要用于快速开发可维护的高性能协议服务器与客户端。它支持多种传输协议,如TCP、UDP、HTTP等,并提供了丰富的功能组件,简化了网络编程的复杂性。 #### 2.2 Spring...

Global site tag (gtag.js) - Google Analytics