1. 介绍
索引文件指,发送一条消息后,MQ通过(topic和uniqKey)或(topic和keys)构建的索引,然后通过queryMsgByKey可以查询到结果。注意,通过queryMsgById的查询,不是索引查询。索引文件存在于/store/index/文件夹下,以时间戳命名,如20151209213520685。每个索引文件,默认存储2000w条数据,文件大小默认为420000040字节。索引文件由头信息,槽位,内容组成。
2. 组成
头信息包含6个字段,分别为:
- beginTimestamp:long型,第一条消息的存储时间。
- endTimestamp:long型, 最后一条消息的存储时间。broker异常关闭下判断是否删除索引文件。
- beginPhyOffset:long型,第一条消息的offset。
- endPhyOffset:long型, 最后一条消息的offset。
- hashSlotCount:int型,slot数量。从零递增,有消息即+1。
- indexCount:int型,index数量。从1递增。有消息即+1。
所以,头信息占用40字节。
内容包括四个字段,分别为:
- keyHash:int型,key的hash值,key为topic和uniqueKey或topic和keys组合。
- phyOffset: long型,offset值。
- timeDiff: int型,消息的存储时间与beginTimestamp差值。
- nextIndexOffset:int型,即key的hashcode或取模冲突后,指向的下一个index offset。
所以每条索引消息占用20字节。槽位(slotNum)默认有500w,int型。
说明:如果uniqueKey不为空,以topic和uniqueKey创建索引。然后判断keys是否为空,不为空,以”空格”分隔keys得到key组数,以topic和key值进行索引:
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
if ((keys != null && keys.length() > 0)) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
3. 创建
当producer发送消息到broker后,MQ通过ReputMessageService线程异步构建consumequeue和index。
4. 插入
当有索引消息时,先计算key的hashcode值,然后hashcode%slotNum得到槽位,由于key的hashcode和取模都会导致冲突,所以槽值总是指向最新的一个索引项。为了节省空间,存储的时间是存储时间-开始时间。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, HASH_SLOT_SIZE,
// false);
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()) {
slotValue = INVALID_INDEX;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
+ this.indexHeader.getIndexCount() * INDEX_SIZE;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} else {
log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
+ this.indexNum);
}
return false;
}
5. 查询
先计算出对应的slot,由于key的hash值不同但模数相同,所以在查询时会比较一次key的hash值,然后加入返回列表,每次最多返回32条索引信息。这里需要注意,由于hash值相同但key不等下产生的相同slot,也会被返回给客户端,所以在客户端又进行了一次处理。
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mapedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// HASH_SLOT_SIZE, true);
}
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
// TODO NOTFOUND
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
+ nextIndexToRead * INDEX_SIZE;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= INVALID_INDEX
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
e.printStackTrace();
}
}
this.mapedFile.release();
}
}
}
6. 注意
由于每条索引消息的构建并未同步更新checkpoint文件的indexMsgTimestamp信息,所以在broker异常关闭情形下,broker重启后会删除最后一个索引文件,这将导致消息通过索引查询失败。确实不太清楚,作者的本意。也许是遗漏同步更新checkpoint的bug吧。
分享到:
相关推荐
在《RocketMQ-原理简介.pdf》中,可能会深入解析RocketMQ的工作原理: 1. **消息存储机制**:RocketMQ采用日志文件存储消息,通过CommitLog和IndexFile配合,实现高效的消息查询和检索。 2. **消息传输机制**:...
以下是对RocketMQ核心知识点的详细解析: 1. **消息模型** - **点对点(P2P)**:消费者可以从队列中拉取消息,每个消息只会被一个消费者消费,适合一对多的发布/订阅场景。 - **发布/订阅(Pub/Sub)**:主题下...
在这个“全面解剖RocketMQ和项目实战-day4-part4.7z”资料包中,包含了一系列视频教程,详细讲解了RocketMQ的关键特性和工作流程,下面将对这些知识点进行深入解析。 1. **实时更新消息消费队列与索引文件流程说明*...
RocketMQ源码分析,分为存储篇、NameServer篇、Broker篇、Producer篇、Consumer篇五大部分进行源码级的讲解。大致如下: 1、讲解commitlog、consumequeue、index、transaction文件等数据结构、数据读写、HA高可用等...
综上所述,RocketMQ源码分析从存储、消息刷盘、文件操作等多个角度,详细解读了其设计原理和实现机制,为深入理解RocketMQ提供了代码级别的解析。通过这种方式,开发者可以更加清晰地看到消息中间件在消息存储、管理...
面试中,掌握消息队列的基本概念、工作原理和常见问题,对于求职者来说至关重要。以下是一些面试中可能遇到的Java消息队列相关知识点: 1. **消息队列的作用** - **异步处理**:消息队列允许系统将非实时任务放入...