1. 介绍
索引文件指,发送一条消息后,MQ通过(topic和uniqKey)或(topic和keys)构建的索引,然后通过queryMsgByKey可以查询到结果。注意,通过queryMsgById的查询,不是索引查询。索引文件存在于/store/index/文件夹下,以时间戳命名,如20151209213520685。每个索引文件,默认存储2000w条数据,文件大小默认为420000040字节。索引文件由头信息,槽位,内容组成。
2. 组成
头信息包含6个字段,分别为:
- beginTimestamp:long型,第一条消息的存储时间。
- endTimestamp:long型, 最后一条消息的存储时间。broker异常关闭下判断是否删除索引文件。
- beginPhyOffset:long型,第一条消息的offset。
- endPhyOffset:long型, 最后一条消息的offset。
- hashSlotCount:int型,slot数量。从零递增,有消息即+1。
- indexCount:int型,index数量。从1递增。有消息即+1。
所以,头信息占用40字节。
内容包括四个字段,分别为:
- keyHash:int型,key的hash值,key为topic和uniqueKey或topic和keys组合。
- phyOffset: long型,offset值。
- timeDiff: int型,消息的存储时间与beginTimestamp差值。
- nextIndexOffset:int型,即key的hashcode或取模冲突后,指向的下一个index offset。
所以每条索引消息占用20字节。槽位(slotNum)默认有500w,int型。
说明:如果uniqueKey不为空,以topic和uniqueKey创建索引。然后判断keys是否为空,不为空,以”空格”分隔keys得到key组数,以topic和key值进行索引:
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
if ((keys != null && keys.length() > 0)) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
3. 创建
当producer发送消息到broker后,MQ通过ReputMessageService线程异步构建consumequeue和index。
4. 插入
当有索引消息时,先计算key的hashcode值,然后hashcode%slotNum得到槽位,由于key的hashcode和取模都会导致冲突,所以槽值总是指向最新的一个索引项。为了节省空间,存储的时间是存储时间-开始时间。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, HASH_SLOT_SIZE,
// false);
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()) {
slotValue = INVALID_INDEX;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
+ this.indexHeader.getIndexCount() * INDEX_SIZE;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} else {
log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
+ this.indexNum);
}
return false;
}
5. 查询
先计算出对应的slot,由于key的hash值不同但模数相同,所以在查询时会比较一次key的hash值,然后加入返回列表,每次最多返回32条索引信息。这里需要注意,由于hash值相同但key不等下产生的相同slot,也会被返回给客户端,所以在客户端又进行了一次处理。
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mapedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// HASH_SLOT_SIZE, true);
}
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
// TODO NOTFOUND
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
+ nextIndexToRead * INDEX_SIZE;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= INVALID_INDEX
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
e.printStackTrace();
}
}
this.mapedFile.release();
}
}
}
6. 注意
由于每条索引消息的构建并未同步更新checkpoint文件的indexMsgTimestamp信息,所以在broker异常关闭情形下,broker重启后会删除最后一个索引文件,这将导致消息通过索引查询失败。确实不太清楚,作者的本意。也许是遗漏同步更新checkpoint的bug吧。
分享到:
相关推荐
unzip rocketmq-all-4.9.2-bin-release.zip -d /usr/local/ # 修改一下文件夹名,改成 rocketmq 方便使用 mv /usr/local/rocketmq-4.9.2 /usr/local/rocketmq 3. 配置环境变量 ROCKETMQ_HOME 和 PATH 为了后续操作...
总结来说,"rocketmq-console-ng-1.0.0.zip"是一个包含RocketMQ Console管理控制台的压缩包,主要文件"rocketmq-console-ng-1.0.0.jar"是一个Java可执行文件,用于运行RocketMQ的图形化管理界面。使用这个控制台,...
rocketmq-externals-rocketmq-console-1.0.0.zip
java -jar rocketmq-console-ng-1.0.1.jar 启动 ---当终端断了该服务就会停止 nohup java -jar rocketmq-console-ng-1.0.1.jar >>/soft/RocketMQ/rocketmqlogs/log.out 2>&1 & 后台启动 --当终端断了也不会停止服务
rocketmq-console打包文件,可直接命令: java -jar rocketmq-console-ng-1.0.0.jar -Xms256m -Xmx256m -Xmn256m
rocketmq-externals-rocketmq-console-1.0.0.tar
课直接使用的 1、取出jar里面的application.properties 修改rocketmq.config.namesrvAddr=localhost:9876 2、修改后放回去 3、启动java -jar rocketmq-console-ng-1.0.0.jar 4、http://localhost:8080/
"rocketmq-externals-master.zip"是一个包含RocketMQ源码的压缩包,对于深入理解RocketMQ的工作原理、性能优化以及进行二次开发非常有帮助。下面将详细阐述RocketMQ的核心概念、架构、工作流程以及源码解析的关键点...
已经过mvn clean package -Dmaven.test.skip=true...可以直接执行启动命令:java -jar rocketmq-console-ng-2.0.0.jar --server.port=9090 默认配置:server.port=8088、rocketmq.config.namesrvAddr=127.0.0.1:9876
rocketmq-dashboard-1.0.1-SNAPSHOT.jar
"rocketmq-console-ng-2.x"代表的是RocketMQ控制台的新版本,"ng"可能指的是"Next Generation",意味着这个版本带来了更先进的特性和改进。2.0.0是这个版本的编号,通常代表着相较于之前的版本,它可能包含了大量的...
在“rocketmq-all-5.1.1-bin-release.zip”这个压缩包中,包含了RocketMQ的5.1.1版本的所有源码、编译后的二进制文件和相关的配置文件,供开发者部署和使用。以下将详细讲解RocketMQ的主要组成部分、工作原理以及...
在"rocketmq-externals-release-rocketmq-console-1.0.0"这个压缩包中,包含了RocketMQ Console的1.0.0版本。这个版本可能包含以下关键组件和功能: 1. **源代码**:该压缩包可能包括了RocketMQ Console的源码,...
直接解压,通过命令直接启动: java -jar rocketmq-console-ng-2.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=:9876
在这个"rocketmq-all-5.1.3-bin-release.zip"压缩包中,包含了运行和使用RocketMQ所需的各种文件。 首先,让我们深入理解RocketMQ的核心概念和功能。RocketMQ是一个分布式消息传递系统,它允许应用程序之间通过发送...