`
Technoboy
  • 浏览: 156698 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

RocketMQ原理解析-Index

阅读更多
1. 介绍
  索引文件指,发送一条消息后,MQ通过(topic和uniqKey)或(topic和keys)构建的索引,然后通过queryMsgByKey可以查询到结果。注意,通过queryMsgById的查询,不是索引查询。索引文件存在于/store/index/文件夹下,以时间戳命名,如20151209213520685。每个索引文件,默认存储2000w条数据,文件大小默认为420000040字节。索引文件由头信息,槽位,内容组成。

2. 组成
  头信息包含6个字段,分别为:
  • beginTimestamp:long型,第一条消息的存储时间。
  • endTimestamp:long型, 最后一条消息的存储时间。broker异常关闭下判断是否删除索引文件。
  • beginPhyOffset:long型,第一条消息的offset。
  • endPhyOffset:long型, 最后一条消息的offset。
  • hashSlotCount:int型,slot数量。从零递增,有消息即+1。
  • indexCount:int型,index数量。从1递增。有消息即+1。
  •     所以,头信息占用40字节。

    内容包括四个字段,分别为:
  • keyHash:int型,key的hash值,key为topic和uniqueKey或topic和keys组合。
  • phyOffset: long型,offset值。
  • timeDiff: int型,消息的存储时间与beginTimestamp差值。
  • nextIndexOffset:int型,即key的hashcode或取模冲突后,指向的下一个index offset。
  •     所以每条索引消息占用20字节。槽位(slotNum)默认有500w,int型。

    说明:如果uniqueKey不为空,以topic和uniqueKey创建索引。然后判断keys是否为空,不为空,以”空格”分隔keys得到key组数,以topic和key值进行索引:

if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }

            if ((keys != null && keys.length() > 0)) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i <  keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                            indexFile = putKey(indexFile, msg, buildKey(topic, key));
                            if (indexFile == null) {
                                log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                                return;
                            }
                        }
                 }
            }


3. 创建
  当producer发送消息到broker后,MQ通过ReputMessageService线程异步构建consumequeue和index。

4. 插入
  当有索引消息时,先计算key的hashcode值,然后hashcode%slotNum得到槽位,由于key的hashcode和取模都会导致冲突,所以槽值总是指向最新的一个索引项。为了节省空间,存储的时间是存储时间-开始时间。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, HASH_SLOT_SIZE,
                // false);
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = INVALID_INDEX;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();


                timeDiff = timeDiff / 1000;


                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

                int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
                                + this.indexHeader.getIndexCount() * INDEX_SIZE;


                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);


                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());


                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } else {
            log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
                    + this.indexNum);
        }

        return false;
    }


5. 查询
  先计算出对应的slot,由于key的hash值不同但模数相同,所以在查询时会比较一次key的hash值,然后加入返回列表,每次最多返回32条索引信息。这里需要注意,由于hash值相同但key不等下产生的相同slot,也会被返回给客户端,所以在客户端又进行了一次处理。
 
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
                                final long begin, final long end, boolean lock) {
        if (this.mapedFile.hold()) {
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * HASH_SLOT_SIZE;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // HASH_SLOT_SIZE, true);
                }

                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }

                if (slotValue <= INVALID_INDEX || slotValue > this.indexHeader.getIndexCount()
                        || this.indexHeader.getIndexCount() <= 1) {
                    // TODO NOTFOUND
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }

                        int absIndexPos =
                                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * HASH_SLOT_SIZE
                                        + nextIndexToRead * INDEX_SIZE;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);


                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= INVALID_INDEX
                                || prevIndexRead > this.indexHeader.getIndexCount()
                                || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                this.mapedFile.release();
            }
        }
    }


6. 注意
    由于每条索引消息的构建并未同步更新checkpoint文件的indexMsgTimestamp信息,所以在broker异常关闭情形下,broker重启后会删除最后一个索引文件,这将导致消息通过索引查询失败。确实不太清楚,作者的本意。也许是遗漏同步更新checkpoint的bug吧。
分享到:
评论

相关推荐

    rocketmq-all-4.8.0-bin-release

    unzip rocketmq-all-4.9.2-bin-release.zip -d /usr/local/ # 修改一下文件夹名,改成 rocketmq 方便使用 mv /usr/local/rocketmq-4.9.2 /usr/local/rocketmq 3. 配置环境变量 ROCKETMQ_HOME 和 PATH 为了后续操作...

    rocketmq-console-ng-1.0.0.zip

    总结来说,"rocketmq-console-ng-1.0.0.zip"是一个包含RocketMQ Console管理控制台的压缩包,主要文件"rocketmq-console-ng-1.0.0.jar"是一个Java可执行文件,用于运行RocketMQ的图形化管理界面。使用这个控制台,...

    rocketmq-externals-rocketmq-console-1.0.0.zip

    rocketmq-externals-rocketmq-console-1.0.0.zip

    rocketmq-console-ng-1.0.1.jar

    java -jar rocketmq-console-ng-1.0.1.jar 启动 ---当终端断了该服务就会停止 nohup java -jar rocketmq-console-ng-1.0.1.jar &gt;&gt;/soft/RocketMQ/rocketmqlogs/log.out 2&gt;&1 & 后台启动 --当终端断了也不会停止服务

    rocketmq-console-ng-1.0.0.jar

    rocketmq-console打包文件,可直接命令: java -jar rocketmq-console-ng-1.0.0.jar -Xms256m -Xmx256m -Xmn256m

    rocketmq-externals-rocketmq-console-1.0.0.tar

    rocketmq-externals-rocketmq-console-1.0.0.tar

    rocketmq可视化程序安装 rocketmq-externals-release-rocketmq-console-1.0

    课直接使用的 1、取出jar里面的application.properties 修改rocketmq.config.namesrvAddr=localhost:9876 2、修改后放回去 3、启动java -jar rocketmq-console-ng-1.0.0.jar 4、http://localhost:8080/

    rocketmq-externals-master.zip

    "rocketmq-externals-master.zip"是一个包含RocketMQ源码的压缩包,对于深入理解RocketMQ的工作原理、性能优化以及进行二次开发非常有帮助。下面将详细阐述RocketMQ的核心概念、架构、工作流程以及源码解析的关键点...

    rocketmq-console-ng-2.0.0.jar

    已经过mvn clean package -Dmaven.test.skip=true...可以直接执行启动命令:java -jar rocketmq-console-ng-2.0.0.jar --server.port=9090 默认配置:server.port=8088、rocketmq.config.namesrvAddr=127.0.0.1:9876

    rocketmq-dashboard-1.0.1-SNAPSHOT.jar

    rocketmq-dashboard-1.0.1-SNAPSHOT.jar

    rocketmq可视化控制台最新版 rocketmq-console-ng-2.x

    "rocketmq-console-ng-2.x"代表的是RocketMQ控制台的新版本,"ng"可能指的是"Next Generation",意味着这个版本带来了更先进的特性和改进。2.0.0是这个版本的编号,通常代表着相较于之前的版本,它可能包含了大量的...

    rocketmq-all-5.1.1-bin-release.zip

    在“rocketmq-all-5.1.1-bin-release.zip”这个压缩包中,包含了RocketMQ的5.1.1版本的所有源码、编译后的二进制文件和相关的配置文件,供开发者部署和使用。以下将详细讲解RocketMQ的主要组成部分、工作原理以及...

    rocketmq-externals-release-rocketmq-console-1.0.0

    在"rocketmq-externals-release-rocketmq-console-1.0.0"这个压缩包中,包含了RocketMQ Console的1.0.0版本。这个版本可能包含以下关键组件和功能: 1. **源代码**:该压缩包可能包括了RocketMQ Console的源码,...

    rocketmq-console-ng-2.0.0

    直接解压,通过命令直接启动: java -jar rocketmq-console-ng-2.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=:9876

    rocketmq-all-5.1.3-bin-release.zip

    在这个"rocketmq-all-5.1.3-bin-release.zip"压缩包中,包含了运行和使用RocketMQ所需的各种文件。 首先,让我们深入理解RocketMQ的核心概念和功能。RocketMQ是一个分布式消息传递系统,它允许应用程序之间通过发送...

Global site tag (gtag.js) - Google Analytics