关注我
文章首发于公众号《程序员果果》
地址 : https://mp.weixin.qq.com/s/dYqGd9zi2mNelsNNLIribg
消息发送示例
导入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.yml:
rocketmq:
name-server: 172.16.250.129:9876
producer:
group: myGroup
普通消息
同步发送
原理:
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
应用场景:
这种可靠性同步地发送方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
示例代码:
public void sendMsg() throws Exception {
Message message = new Message(
// 普通消息所属的Topic
"Topic-Normal",
// Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列 RocketMQ 的服务器过滤。
"TagA",
// Message Body可以是任何二进制形式的数据。
"Hello MQ".getBytes()
);
rocketMQTemplate.getProducer().send( message );
// 等同于上面的方式(常用)
//rocketMQTemplate.convertAndSend("Topic-Normal:TagA","Hello MQ".getBytes());
}
异步发送
原理:
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。RocketMQ异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
应用场景:
异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
示例代码:
public void sendAsyncMsg() {
Map<String , Object> map = new HashMap<>();
map.put( "name" , "zs" );
map.put( "age" , 20);
rocketMQTemplate.asyncSend( "Topic-Normal", map , new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 消息发送成功。
log.info( "async send success" );
}
@Override
public void onException(Throwable throwable) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
log.info( "async send fail" );
}
} );
}
顺序消息
全局顺序消息
- 概念:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照FIFO原则来发布和消费的场景。
- 示例:在证券处理中,以人民币兑换美元为Topic,在价格相同的情况下,先出价者优先处理,则可以按照FIFO的方式发布和消费全局顺序消息。
分区顺序消息
- 概念:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
- 适用场景:适用于性能要求高,以Sharding Key作为分区字段,在同一个区块中严格地按照FIFO原则进行消息发布和消费的场景。
- 示例:
- 用户注册需要发送发验证码,以用户ID作为Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
- 电商的订单创建,以订单ID作为Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
无序消息、全局顺序消息、分区顺序消息的对比
示例代码
public void sendOrderlyMsg() {
//根据指定的hashKey按顺序发送
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
// 分区顺序消息中区分不同分区的关键字段,Sharding Key与普通消息的key是完全不同的概念。
// 全局顺序消息,该字段可以设置为任意非空字符串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = rocketMQTemplate.syncSendOrderly( "Topic-Order", "send order msg".getBytes(), shardingKey );
// 发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success . msgId is:" + sendResult.getMsgId());
}
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed");
e.printStackTrace();
}
}
}
延时消息
概念:
Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
适用场景:
消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
示例代码:
public void sendDelayMsg() {
rocketMQTemplate.syncSend( "Topic-Delay",
MessageBuilder.withPayload( "Hello MQ".getBytes() ).build(),
3000,
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
//messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
3 );
}
事务消息
概念:
- 事务消息:消息队列RocketMQ提供类似X/Open XA的分布式事务功能,通过消息队列RocketMQ事务消息能达到分布式事务的最终一致。
- 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
- 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。
分布式事务消息的优势:
消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
典型场景:
在电商购物车下单时,涉及到购物车系统和交易系统,这两个系统之间的数据最终一致性可以通过分布式事务消息的异步处理实现。在这种场景下,交易系统是最为核心的系统,需要最大限度地保证下单成功。而购物车系统只需要订阅消息队列RocketMQ的交易订单消息,做相应的业务处理,即可保证最终的数据一致性。
事务消息交互流程如下图所示:
事务消息发送步骤如下:
- 发送方将半事务消息发送至消息队列RocketMQ服务端。
- 消息队列RocketMQ服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
示例代码:
发送事务消息包含以下两个步骤:
- 1. 发送半事务消息(Half Message,示例代码如下
/**
* 事务消息
*/
public void sendTransactionMsg() {
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(
"Topic-Tx:TagA",
MessageBuilder.withPayload( "Hello MQ transaction===".getBytes() ).build(),
null );
SendStatus sendStatus = transactionSendResult.getSendStatus();
LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
System.out.println( new Date() + " Send mq message status "+ sendStatus +" , localTransactionState "+ localTransactionState );
}
- 2. 发送方开始执行本地事务逻辑
@Component
@RocketMQTransactionListener
public class TxProducerListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
System.out.println("TX message listener execute local transaction");
RocketMQLocalTransactionState result;
try {
// 业务代码( 例如下订单 )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
System.out.println("execute local transaction error");
result = RocketMQLocalTransactionState.UNKNOWN;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务( 例如检查下订单是否成功 )
System.out.println("TX message listener check local transaction");
RocketMQLocalTransactionState result;
try {
//业务代码( 根据检查结果,决定是COMMIT或ROLLBACK )
result = RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
// 异常就回滚
System.out.println("check local transaction error");
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
}
- 3. 发送方在本地事务执行后,若向服务端提交二次确认是Commit,RocketMQ服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;订阅方代码如下
@Component
@Slf4j
@RocketMQMessageListener(topic = "Topic-Tx",consumerGroup = "consumer-tx-group")
public class TxConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Receive message:{}" , message);
}
}
源码
https://github.com/gf-huanchupk/SpringBootLearning
系列文章
相关推荐
然后使用 @RocketMQMessageListener 注解配置 consumer group、topic 和消息类型。 4. **事务一致性**: 如果需要保证消息的发送与数据库操作一致,可以使用 RocketMQ 的事务消息特性,配合 Spring 的事务管理,...
它支持发布/订阅模式和点对点模式,同时提供事务消息、顺序消息、延迟消息等多种消息类型,以满足不同业务需求。 2. **RocketMQ的客户端API**:这个集合中的jar文件包含了RocketMQ的Java客户端API,开发者可以通过...
RocketMQ 的消息类型有事务消息、确认消息、延时消息等。为了避免消息丢失,需要在生产阶段、存储阶段和消费阶段采取合理的办法。 RocketMQ 的消息堆积 RocketMQ 的消息堆积是指消息队列中积累了大量的消息,导致...
MQTT协议是IoT领域常用的一种轻量级协议,它支持消息的匹配和接收者的选择。消息队列可以通过Push(推送)+ ACK(确认)或者Pull(拉取)+ Offset(偏移量)来确保消息的可靠传递。在集中到离散的转换中,客户端可以...
例如,当用户领取优惠券时,该操作可以通过发送消息到RocketMQ队列,然后由后台服务异步处理,提高系统的响应速度和整体性能。 **MySQL** MySQL是一款流行的开源关系型数据库管理系统,广泛应用于Web应用。在这个...
本文详细介绍了RabbitMQ消息队列的基本概念及其优缺点,并对比分析了几种常用的消息队列产品。此外,还深入探讨了RabbitMQ内部的工作机制,包括各个组件的功能和交互方式。对于想要深入了解RabbitMQ或准备相关面试的...
2. **消息队列的类型** - **点对点(P2P)模型**:每个消息只有一个消费者,常用于实时性要求高的场景。 - **发布/订阅(Pub/Sub)模型**:一个消息可以有多个消费者,适用于广播式的消息分发。 3. **常见消息...
以下是几种常用的消息队列中间件及其特点: ##### 1. ActiveMQ - **成熟稳定:** Apache ActiveMQ 是一个开源的消息中间件,具有较长的历史和成熟的社区支持。 - **更新频率较低:** 近期的版本更新较少,可能意味...
对于Apache RocketMQ,这是一个分布式消息中间件,它支持高并发和低延迟的消息传递。配置RocketMQ的nameserver和broker服务开机自启涉及以下步骤: 1. 在`/lib/systemd/system`目录下创建两个名为`...
在企业应用集成(EAI)中,消息队列是一种常用的集成方法,可以与其他方法如文件传输、共享数据库、远程过程调用等一同使用。消息队列通过缓冲层来控制数据流速度,优化系统性能,并降低系统组件之间的强依赖关系。...
1. **消息队列的类型** - **AMQP**:高级消息队列协议,是一种开放标准应用层协议,用于消息中间件。 - **JMS**:Java消息服务,是Java平台中关于面向消息中间件(MOM)的API,用于方法间进行异步消息交换。 2. **...
在IT行业中,消息队列(Message Queue,简称MQ)是一种常用的技术,用于应用程序之间的异步通信和解耦。"mq_get.zip"这个压缩包文件很可能包含了一组与获取MQ队列消息相关的工具、代码示例或者教程。让我们深入探讨...
、JTA组件简介 1、JTA基本概念 ...资源管理器是任意类型的持久化数据存储容器,例如在开发中常用的关系型数据库:MySQL,Oracle等,消息中间件RocketMQ、RabbitMQ等。 事务管理器提供事务声明,事务
消息通知方面,SpringBoot可以结合RabbitMQ或RocketMQ等消息中间件,实现实时的消息推送。Vue组件则用于展示通知列表,确保用户及时获取相关信息。 总的来说,这个项目不仅提供了学习SpringBoot和Vue实际开发的平台...
**知识点七:RocketMQ顺序消息处理** - **顺序消息定义**:消息发送者发送的消息需按照指定顺序被消费。 - **保证方法**:使用分区顺序消息或全局顺序消息,依赖于消息队列本身的机制。 **知识点八:Elasticsearch...
8. **分布式技术**:Dubbo是阿里巴巴的分布式服务框架,Zookeeper用于分布式协调,Redis是高性能的内存数据结构存储,RocketMQ是分布式消息中间件。 9. **数据结构与算法**:面试中可能涉及基础的数据结构(如数组...
sanri-tools-maven集成了一系列常用技术,如Kafka、Redis、Zookeeper、Dubbo、Swagger、SOAP、Mybatis、Elasticsearch、MongoDB、Git、Maven、RocketMQ和Minio,为广大开发者提供便捷的数据查询和监控解决方案,欢迎...
它们各有优劣:ActiveMQ适合小型项目,RabbitMQ适合复杂消息模型,RocketMQ在大规模并发下表现优秀,Kafka则适合实时数据流处理。消息队列用于异步处理、解耦系统、实现定时任务等。集成Jenkins可以自动化部署和构建...
"本地事务消息表"是实际工作中常用的技术,它通过记录事务消息来保证数据一致性。Apache RocketMQ的半消息机制(Half Message Mechanism)则为同步场景提供了分布式事务的支持,可以在AOP(面向切面编程)中实现对...
消息中间件专题讨论了如何在项目中有效地引入消息队列(MQ)以解决解耦、异步处理、流量削峰等问题,列举了RocketMQ、RabbitMQ和Kafka等流行的消息中间件,并对它们的基本原理、应用场景、架构设计以及高级特性进行...