1. 介绍
当broker为slave且有master的情况下,通过HAService,slave从master同步commitlog数据,并通过ReputMessageService异步构建consumequeue。通过SlaveSynchronize定时每分钟从master同步config目录下的四个文件。默认,master监听broker端口+1。
2. 启动
slave可以通过配置指定master地址,也可以从namesrv获取master地址。broker启动,在BrokerController#initialize方法:
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
如果未指定master address,则updateMasterHAServerAddrPeriodically = true:
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
HAService的内部类HAClient负责连接master,从其run方法可以看出,当master address不为空,且当前broker为slave,就尝试连接master:
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
slave将commitlog的最大offset上报master,如果为首次连接,那么offset为0, master收到为0的offset后,从其最大offset开始同步,否则从指定的offset开始同步数据给slave,每次最大同步32k数据。
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
SelectMapedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMapedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
slave收到master消息后,存储到commitlog然后立即上报ackOffset。
if (diff >= (MSG_HEADER_SIZE + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + MSG_HEADER_SIZE);
this.byteBufferRead.get(bodyData);
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
this.dispatchPostion += MSG_HEADER_SIZE + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
HAService的内部类AcceptSocketService作为master broker的监听服务,HAConnection的内部类ReadSocketService负责处理slave的offset,WriteSocketService将commitlog发送给slave。slave和master之间每5s发送心跳维护连接。
3. 同步复制和异步复制
从commitlog的putMessage方法可以看出:
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (msg.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
// TODO
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
如果为同步复制,当slave落后master 256M数据后,直接报SLAVE_NOT_AVAILABLE异常。这将导致producer内部发送失败,而broker是在将数据写入commitlog文件后才同步给slave,所以可能会在业务逻辑中遇到这种异常而重发导致重复数据产生。
所以,如果将MQ部署为MS结构,最好选择异步复制的方式。
分享到:
相关推荐
6. 容错机制:如基于主从复制的HA方案,以及基于时间窗口的重试策略,确保消息正确投递。 在"rocketmq-all-5.1.3-bin-release"这个目录下,通常会包含以下组件和文件: 1. `bin`目录:存放可执行脚本,如启动、...
通过配置文件中的集群参数,如ha.server.ip和ha.server.port,可以实现Broker之间的主从复制。 7. **消息模型**:RocketMQ支持点对点模型(Queue模型)和发布/订阅模型(Topic模型)。点对点模型下,一个Queue只有...
7. **集群与HA**:RocketMQ通过Broker集群提供高可用性,每个Broker都可以有多个副本,当主Broker故障时,可以从副本中选举新的主。此外,NameServer的无状态设计也增强了系统的容错能力。 8. **事务消息**:...
总之,`rocketmq-all-4.8.0-source-release.zip`提供了一个深入了解和定制RocketMQ的平台,无论是对于学习分布式消息队列的原理,还是在实际项目中优化RocketMQ的使用,都是极其有价值的资源。通过阅读源码和实践...
- **顺序消息原理**:通过控制消息的发送顺序和存储位置来保证消息按顺序被消费。 - **顺序消息缺陷**:可能会降低系统的整体吞吐量。 7. **事务消息**:支持半消息机制,确保消息处理的一致性和完整性。 8. **...
首先,你需要从Apache RocketMQ的官方网站或者镜像站点下载`apatch-rocketmq.tar.gz`这个压缩包。在Linux终端中,你可以使用`tar`命令来解压这个文件。例如: ``` tar -zxvf apatch-rocketmq.tar.gz ``` 这将...
本文将从以下几个方面对RocketMQ的原理进行解析。 ### 一、Producer #### 1. Producer启动流程 Producer是消息的发送者,它的启动流程如下: - 在发送消息时,如果Producer集合中没有对应topic的信息,则会向...
06.hadoop-HA机制整体解析--引入zookeeper.mp4
双机热备工具jeffreyningsoftware-ha4win 版本1.5,非常好用的免费HA 工具,可以组简单的双机热备,当然不包括数据的同步,只提供IP地址的替换。其他功能需要自定义脚本。
7.1. 邮件列表:Linux-HA和Linux-HA-dev是两个重要的邮件列表,用于讨论问题和开发事宜。 7.2. 错误跟踪系统:用于报告和跟踪Heartbeat的bug。 7.3. IRC频道:实时交流平台,用户可以即时获取帮助。 8. 提交补丁:...
《免费的双机热备软件jeffreyningsoftware-ha4win-v1.3:实现高可用性与系统稳定性》 在IT行业中,系统稳定性和高可用性是至关重要的因素,尤其是在关键业务环境中。"jeffreyningsoftware-ha4win-v1.3"是一款专为...
Berkeley DB 高可用性(BDB-HA)作为一款嵌入式数据管理系统的杰出代表,以其卓越的性能、可靠性与可扩展性,在众多领域得到了广泛应用。本篇白皮书旨在深入探讨 BDB-HA 的核心特性,以及如何最佳利用其解决特定的...
Oracle RAC 环境下 OGG-HA 配置案例 一、Oracle RAC 环境简介 Oracle RAC(Real Application Clusters)是一种高可用性和高性能的解决方案,可以将多个服务器组合成一个集群,提供高可用性和高性能的数据库服务。...
PE-Explorer能够解析这些资源,并提供直观的编辑界面。对于图标修改,用户只需几步简单的操作,就能将exe或dll文件中的图标替换为自己设计的新图标,这在软件个性化定制或者品牌标识更新时非常实用。 汉化工具是PE-...
TTP233D-HA6 TonTouch™ 是单按键触摸检测芯片,此触摸检测芯片内建稳压电路,提供稳定的电压给触摸感应电路使用,稳定的触摸检测效果可以广泛的满足不同应用的需求,此触摸检测芯片是专为取代传统按键而设计,触摸...
VMware vSphere 6.7 HA 环境搭建 VMware vSphere 6.7 是一款功能强大且广泛应用于企业环境中的虚拟化平台。HA(High Availability,高可用性)环境搭建是确保虚拟机环境的高可用性和可靠性的关键步骤。本文将详细...
n-HA/PA66对损伤胸腰椎骨生物力学属性的恢复效果研究,沈伟,林峰,目的 研究经伤椎椎弓根植入纳米羟基磷石灰/聚酰胺(nanohydroxyapatite/polyamide-66, n-HA/PA66)复合人工骨粒对损伤胸腰椎体的生物力学属性恢�
# 在生产环境中,可能还需要配置多个Broker实例以实现HA # 启动Console java -jar rocketmq-console-ng-2.0.0.jar --spring.config.location=conf/application.properties # 启动Producer和Consumer # 这些通常会...
HadoopHA集群搭建描述及指令,里面有各种注意事项。 集群部署节点角色的规划(3节点) ------------------ server01 namenode resourcemanager zkfc nodemanager datanode zookeeper journal node server02 ...