前面忘了先介绍一下Broker消息存储的组织方式,我们前面知道了一条消息属于某个Topic下的某个分区,消息存储的组织方式是按照此方式进行组织的,结构图如下:
所以对于每个Topic而言,分区是最小的元素,对外API主要由MessageStore提供,一个MessageStore实例代表一个分区的实例,分区存储具体的内容。在MetaQ中,分区的存储采用的多文件的方式进行组合,即MessageStore由多个FileMessageSet组成,而FileMessageSet在MessageStore被包装成Segment,代码如下(MessageStore是值得好好分析的一个类):
- public class MessageStore extends Thread implements Closeable {
- …….
- }
- public class MessageStore extends Thread implements Closeable {
- …….
- }
MessageStore继承了Thread类,继承该类主要是为了实现异步写入方式
- public MessageStore(final String topic, final int partition, final MetaConfig metaConfig,
- final DeletePolicy deletePolicy, final long offsetIfCreate) throws IOException {
- this.metaConfig = metaConfig; //全局配置信息
- this.topic = topic; //当前主题
- final TopicConfig topicConfig = this.metaConfig.getTopicConfig(this.topic);
- String dataPath = metaConfig.getDataPath(); //当前分区的存储路径
- if (topicConfig != null) {
- dataPath = topicConfig.getDataPath();
- }
- final File parentDir = new File(dataPath);
- this.checkDir(parentDir); //检测父目录是否存在
- this.partitionDir = new File(dataPath + File.separator + topic + "-" + partition);
- this.checkDir(this.partitionDir);
- // this.topic = topic;
- this.partition = partition; //当前分区
- this.unflushed = new AtomicInteger(0); //未提交的消息数
- this.lastFlushTime = new AtomicLong(SystemTimer.currentTimeMillis()); //最后一次提交时间
- this.unflushThreshold = topicConfig.getUnflushThreshold(); //最大允许的未flush消息数,超过此值将强制force到磁盘,默认1000
- this.deletePolicy = deletePolicy; //由于是多文件的存储方式,消费过的消息或过期消息需要删除从而腾出空间给新消息的,默认提供归档和过期删除的方式
- // Make a copy to avoid getting it again and again.
- this.maxTransferSize = metaConfig.getMaxTransferSize();
- //启动异步写入的时候,消息提交到磁盘的size配置,同时也是配置组写入时,消息最大长度的控制参数,如果消息长度大于该参数,则会同步写入
- this.maxTransferSize = this.maxTransferSize > ONE_M_BYTES ? ONE_M_BYTES :this.maxTransferSize;
- // Check directory and load exists segments.
- this.checkDir(this.partitionDir);
- this.loadSegments(offsetIfCreate);
- if (this.useGroupCommit()) {
- this.start();
- }
- }
- public MessageStore(final String topic, final int partition, final MetaConfig metaConfig,
- final DeletePolicy deletePolicy, final long offsetIfCreate) throws IOException {
- this.metaConfig = metaConfig; //全局配置信息
- this.topic = topic; //当前主题
- final TopicConfig topicConfig = this.metaConfig.getTopicConfig(this.topic);
- String dataPath = metaConfig.getDataPath(); //当前分区的存储路径
- if (topicConfig != null) {
- dataPath = topicConfig.getDataPath();
- }
- final File parentDir = new File(dataPath);
- this.checkDir(parentDir); //检测父目录是否存在
- this.partitionDir = new File(dataPath + File.separator + topic + "-" + partition);
- this.checkDir(this.partitionDir);
- // this.topic = topic;
- this.partition = partition; //当前分区
- this.unflushed = new AtomicInteger(0); //未提交的消息数
- this.lastFlushTime = new AtomicLong(SystemTimer.currentTimeMillis()); //最后一次提交时间
- this.unflushThreshold = topicConfig.getUnflushThreshold(); //最大允许的未flush消息数,超过此值将强制force到磁盘,默认1000
- this.deletePolicy = deletePolicy; //由于是多文件的存储方式,消费过的消息或过期消息需要删除从而腾出空间给新消息的,默认提供归档和过期删除的方式
- // Make a copy to avoid getting it again and again.
- this.maxTransferSize = metaConfig.getMaxTransferSize();
- //启动异步写入的时候,消息提交到磁盘的size配置,同时也是配置组写入时,消息最大长度的控制参数,如果消息长度大于该参数,则会同步写入
- this.maxTransferSize = this.maxTransferSize > ONE_M_BYTES ? ONE_M_BYTES : this.maxTransferSize;
- // Check directory and load exists segments.
- this.checkDir(this.partitionDir);
- this.loadSegments(offsetIfCreate);
- if (this.useGroupCommit()) {
- this.start();
- }
- }
首先是获取配置信息,其次由于MessageStore采用的多文件存储方,所以要检查父目录的存在,最后则是加载校验已有数据,如果配置了异步写入,则启动异步写入线程(如果unflushThreshold<= 0,则认为启动异步写入的方式)
我们发现在构造方法的倒数第3行调用了loadSegments()方法去加载校验文件,看看该方法到底做了些什么事情
- private void loadSegments(final long offsetIfCreate) throws IOException {
- final List<Segment> accum = new ArrayList<Segment>();
- final File[] ls = this.partitionDir.listFiles();
- if (ls != null) {
- //遍历目录下的所有.meta后缀的数据文件,将所有文件都变为不可变的文件
- for (final File file : ls) {
- if (file.isFile() && file.toString().endsWith(FILE_SUFFIX)) {
- if (!file.canRead()) {
- throw new IOException("Could not read file " + file);
- }
- final String filename = file.getName();
- final long start = Long.parseLong(filename.substring(0, filename.length() - FILE_SUFFIX.length()));
- // 先作为不可变的加载进来
- accum.add(new Segment(start, file, false));
- }
- }
- }
- if (accum.size() == 0) {
- // 没有可用的文件,创建一个,索引从offsetIfCreate开始
- final File newFile = new File(this.partitionDir,this.nameFromOffset(offsetIfCreate));
- accum.add(new Segment(offsetIfCreate, newFile));
- } else {
- // 至少有一个文件,校验并按照start升序排序
- Collections.sort(accum, new Comparator<Segment>() {
- @Override
- public int compare(final Segment o1, final Segment o2) {
- if (o1.start == o2.start) {
- return 0;
- } else if (o1.start > o2.start) {
- return 1;
- } else {
- return -1;
- }
- }
- });
- // 校验文件,是否前后衔接,如果不是,则认为数据文件被破坏或者篡改,抛出异常
- this.validateSegments(accum);
- // 最后一个文件修改为可变
- final Segment last = accum.remove(accum.size() - 1);
- last.fileMessageSet.close();
- log.info("Loading the last segment in mutable mode and running recover on " + last.file.getAbsolutePath());
- final Segment mutable = new Segment(last.start, last.file);
- accum.add(mutable);
- log.info("Loaded " + accum.size() + " segments...");
- }
- this.segments = new SegmentList(accum.toArray(new Segment[accum.size()]));
- //多个segmentg通过SegmentList组织起来,SegmentList能保证在并发访问下的删除、添加保持一致性,SegmentList没有采用java的关键字 synchronized进行同步,而是使用类似cvs原语的方式进行同步访问(因为绝大部分情况下并没有并发问题,可以极大的提高效率),该类比较简单就不再分析
- }
- private void loadSegments(final long offsetIfCreate) throws IOException {
- final List<Segment> accum = new ArrayList<Segment>();
- final File[] ls = this.partitionDir.listFiles();
- if (ls != null) {
- //遍历目录下的所有.meta后缀的数据文件,将所有文件都变为不可变的文件
- for (final File file : ls) {
- if (file.isFile() && file.toString().endsWith(FILE_SUFFIX)) {
- if (!file.canRead()) {
- throw new IOException("Could not read file " + file);
- }
- final String filename = file.getName();
- final long start = Long.parseLong(filename.substring(0, filename.length() - FILE_SUFFIX.length()));
- // 先作为不可变的加载进来
- accum.add(new Segment(start, file, false));
- }
- }
- }
- if (accum.size() == 0) {
- // 没有可用的文件,创建一个,索引从offsetIfCreate开始
- final File newFile = new File(this.partitionDir, this.nameFromOffset(offsetIfCreate));
- accum.add(new Segment(offsetIfCreate, newFile));
- } else {
- // 至少有一个文件,校验并按照start升序排序
- Collections.sort(accum, new Comparator<Segment>() {
- @Override
- public int compare(final Segment o1, final Segment o2) {
- if (o1.start == o2.start) {
- return 0;
- } else if (o1.start > o2.start) {
- return 1;
- } else {
- return -1;
- }
- }
- });
- // 校验文件,是否前后衔接,如果不是,则认为数据文件被破坏或者篡改,抛出异常
- this.validateSegments(accum);
- // 最后一个文件修改为可变
- final Segment last = accum.remove(accum.size() - 1);
- last.fileMessageSet.close();
- log.info("Loading the last segment in mutable mode and running recover on " + last.file.getAbsolutePath());
- final Segment mutable = new Segment(last.start, last.file);
- accum.add(mutable);
- log.info("Loaded " + accum.size() + " segments...");
- }
- this.segments = new SegmentList(accum.toArray(new Segment[accum.size()]));
- //多个segmentg通过SegmentList组织起来,SegmentList能保证在并发访问下的删除、添加保持一致性,SegmentList没有采用java的关键字 synchronized进行同步,而是使用类似cvs原语的方式进行同步访问(因为绝大部分情况下并没有并发问题,可以极大的提高效率),该类比较简单就不再分析
- }
MessageStore采用Segment方式组织存储,Segment包装了FileMessageSet,由FileMessageSet进行读写,MessageStore并将多个Segment进行前后衔接,衔接方式为:第一个Segment对应的消息文件命名为0.meta,第二个则命名为第一个文件的开始位置+第一个Segment的大小,图示如下(假设现在每个文件大小都为1024byte):
为什么要这样进行设计呢,主要是为了提高查询效率。MessageStore将最后一个Segment变为可变Segment,因为最后一个Segment相当于文件尾,消息是有先后顺序的,必须将消息添加到最后一个Segment上。
关注validateSegments()方法做了些什么事情
- private void validateSegments(final List<Segment> segments) {
- //验证按升序排序的Segment是否前后衔接,确保文件没有被篡改和破坏(这里的验证是比较简单的验证,消息内容的验证在FileMessageSet中,通过比较checksum进行验证,在前面的篇幅中介绍过,这两种方式结合可以在范围上从大到小进行验证,保证内容基本不被破坏和篡改)
- this.writeLock.lock();
- try {
- for (int i = 0; i < segments.size() - 1; i++) {
- final Segment curr = segments.get(i);
- final Segment next = segments.get(i + 1);
- if (curr.start + curr.size() != next.start) {
- throw new IllegalStateException("The following segments don't validate: "
- + curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath());
- }
- }
- } finally {
- this.writeLock.unlock();
- }
- }
- private void validateSegments(final List<Segment> segments) {
- //验证按升序排序的Segment是否前后衔接,确保文件没有被篡改和破坏(这里的验证是比较简单的验证,消息内容的验证在FileMessageSet中,通过比较checksum进行验证,在前面的篇幅中介绍过,这两种方式结合可以在范围上从大到小进行验证,保证内容基本不被破坏和篡改)
- this.writeLock.lock();
- try {
- for (int i = 0; i < segments.size() - 1; i++) {
- final Segment curr = segments.get(i);
- final Segment next = segments.get(i + 1);
- if (curr.start + curr.size() != next.start) {
- throw new IllegalStateException("The following segments don't validate: "
- + curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath());
- }
- }
- } finally {
- this.writeLock.unlock();
- }
- }
ITEye整理格式好麻烦,下面的代码分析直接在代码中分析
- //添加消息的方式有两种,同步和异步
- public void append(final long msgId, final PutCommand req, final AppendCallback cb) {
- //首先将内容包装成前面介绍过的消息存储格式
- this.appendBuffer(MessageUtils.makeMessageBuffer(msgId, req), cb);
- }
- //异步写入的包装类
- private static class WriteRequest {
- public final ByteBuffer buf;
- public final AppendCallback cb;
- public Location result;
- public WriteRequest(final ByteBuffer buf, final AppendCallback cb) {
- super();
- this.buf = buf;
- this.cb = cb;
- }
- }
- //这里比较好的设计是采用回调的方式来,对于异步写入实现就变得非常容易
- //AppendCallback返回的是消息成功写入的位置Location(起始位置和消息长度),该Location并不是相对于当前Segment的开始位置0,而是相对于当前Segment给定的值(对应文件命名值即为给定的值),以后查询消息的时候直接使用该位置就可以快速定位到消息写入到哪个文件
- //这也就是为什么文件名的命名采用前后衔接的方式,这也通过2分查找可以快速定位消息的位置
- private void appendBuffer(final ByteBuffer buffer, final AppendCallback cb) {
- if (this.closed) {
- throw new IllegalStateException("Closed MessageStore.");
- }
- //如果启动异步写入并且消息长度小于一次提交的最大值maxTransferSize,则将该消息放入异步写入队列
- if (this.useGroupCommit() && buffer.remaining() < this.maxTransferSize) {
- this.bufferQueue.offer(new WriteRequest(buffer, cb));
- } else {
- Location location = null;
- final int remainning = buffer.remaining();
- this.writeLock.lock();
- try {
- final Segment cur = this.segments.last();
- final long offset = cur.start + cur.fileMessageSet.append(buffer);
- this.mayBeFlush(1);
- this.mayBeRoll();
- location = Location.create(offset, remainning);
- } catch (final IOException e) {
- log.error("Append file failed", e);
- location = Location.InvalidLocaltion;
- } finally {
- this.writeLock.unlock();
- if (cb != null) {
- cb.appendComplete(location);
- }
- //调用回调方法,数据写入文件缓存
- }
- }
- }
- ……
- //判断是否启用异步写入,如果设置为unflushThreshold <=0的数字,则认为启动异步写入;如果设置为unflushThreshold =1,则是同步写入,即每写入一个消息都会提交到磁盘;如果unflushThreshold>0,则是依赖组提交或者是超时提交
- private boolean useGroupCommit() {
- return this.unflushThreshold <= 0;
- }
- @Override
- public void run() {
- // 等待force的队列
- final LinkedList<WriteRequest> toFlush = new LinkedList<WriteRequest>();
- WriteRequest req = null;
- long lastFlushPos = 0;
- Segment last = null;
- //存储没有关闭并且线程没有被中断
- while (!this.closed && !Thread.currentThread().isInterrupted()) {
- try {
- if (last == null) {
- //获取最后的一个segment,将消息写入最后segment对应的文件
- last = this.segments.last();
- lastFlushPos = last.fileMessageSet.highWaterMark();
- }
- if (req == null) {
- //如果等待提交到磁盘的队列toFlush为空,则两种可能:一、刚刚提交完,列表为空;二、等待写入消息的队列为空,如果判断toFlush,则调用bufferQueue.take()方法,可以阻塞住队列,而如果toFlush不为空,则调用bufferQueue.poll,这是提高性能的一种做法。
- if (toFlush.isEmpty()) {
- req = this.bufferQueue.take();
- } else {
- req = this.bufferQueue.poll();
- //如果当前请求为空,表明等待写入的消息已经没有了,这时候文件缓存中的消息需要提交到磁盘,防止消息丢失;或者如果已经写入文件的大小大于maxTransferSize,则提交到磁盘
- //这里需要注意的是,会出现这样一种情况,刚好最后一个segment的文件快满了,这时候是不会roll出一个新的segment写入消息的,而是直接追加到原来的segment尾部,可能导致segment对应的文件大小大于配置的单个segment大小
- if (req == null || last.fileMessageSet.getSizeInBytes() > lastFlushPos + this.maxTransferSize) {
- // 强制force,确保内容保存到磁盘
- last.fileMessageSet.flush();
- lastFlushPos = last.fileMessageSet.highWaterMark();
- // 通知回调
- //异步写入比组写入可靠,因为异步写入一定是提交到磁盘的时候才进行回调的,而组写入如果依赖组提交的方式,则可能会丢失数据,因为组写入在消息写入到文件缓存的时候就进行回调了(除非设置unflushThreshold=1)
- for (final WriteRequest request : toFlush) {
- request.cb.appendComplete(request.result);
- }
- toFlush.clear();
- // 是否需要roll
- this.mayBeRoll();
- // 如果切换文件,重新获取last
- if (this.segments.last() != last) {
- last = null;
- }
- continue;
- }
- }
- }
- if (req == null) {
- continue;
- }
- //写入文件,并计算写入位置
- final int remainning = req.buf.remaining();
- //写入位置为:当前segment给定的值 + 加上文件已有的长度
- final long offset = last.start + last.fileMessageSet.append(req.buf);
- req.result = Location.create(offset, remainning);
- if (req.cb != null) {
- toFlush.add(req);
- }
- req = null;
- } catch (final IOException e) {
- log.error("Append message failed,*critical error*,the group commit thread would be terminated.", e);
- // TODO io异常没办法处理了,简单跳出?
- break;
- } catch (final InterruptedException e) {
- // ignore
- }
- }
- // terminated
- //关闭store 前,将等待写入队列中的剩余消息写入最后一个文件,这时候如果最后一个Segment满了也不会roll出新的Segment,持续的将消息写入到最后一个Segment,所以这时候也会发生Segment的size大于配置的size的情况
- try {
- for (WriteRequest request : this.bufferQueue) {
- final int remainning = request.buf.remaining();
- final long offset = last.start + last.fileMessageSet.append(request.buf);
- if (request.cb != null) {
- request.cb.appendComplete(Location.create(offset, remainning));
- }
- }
- this.bufferQueue.clear();
- } catch (IOException e) {
- log.error("Append message failed", e);
- }
- }
- ……
- //Append多个消息,返回写入的位置
- public void append(final List<Long> msgIds, final List<PutCommand> putCmds, finalAppendCallback cb) {
- this.appendBuffer(MessageUtils.makeMessageBuffer(msgIds, putCmds), cb);
- }
- /**
- * 重放事务操作,如果消息没有存储成功,则重新存储,并返回新的位置
- */
- public void replayAppend(final long offset, final int length, final int checksum,final List<Long> msgIds,
- final List<PutCommand> reqs, final AppendCallback cb) throws IOException {
- //如果消息没有存储,则重新存储,写到最后一个Segment尾部
- final Segment segment = this.findSegment(this.segments.view(), offset);
- if (segment == null) {
- this.append(msgIds, reqs, cb);
- } else {
- final MessageSet messageSet = segment.fileMessageSet.slice(offset - segment.start, offset - segment.start + length);
- final ByteBuffer buf = ByteBuffer.allocate(length);
- messageSet.read(buf, offset - segment.start);
- buf.flip();
- final byte[] bytes = new byte[buf.remaining()];
- buf.get(bytes);
- // 这个校验和是整个消息的校验和,这跟message的校验和不一样,注意区分
- final int checkSumInDisk = CheckSum.crc32(bytes);
- // 没有存入,则重新存储
- if (checksum != checkSumInDisk) {
- this.append(msgIds, reqs, cb);
- } else {
- // 正常存储了消息,无需处理
- if (cb != null) {
- this.notifyCallback(cb, null);
- }
- }
- }
- }
- //判断是否需要roll,如果当前 messagestore最后一个segment的size>=配置的segment size,则产生新的segment,并将新的segment作为最后一个segment,原来最后的segment提交一次,并将mutable设置为false
- private void mayBeRoll() throws IOException {
- if (this.segments.last().fileMessageSet.getSizeInBytes() >=this.metaConfig.getMaxSegmentSize()) {
- this.roll();
- }
- }
- String nameFromOffset(final long offset) {
- final NumberFormat nf = NumberFormat.getInstance();
- nf.setMinimumIntegerDigits(20);
- nf.setMaximumFractionDigits(0);
- nf.setGroupingUsed(false);
- return nf.format(offset) + FILE_SUFFIX;
- }
- private long nextAppendOffset() throws IOException {
- final Segment last = this.segments.last();
- last.fileMessageSet.flush();
- return last.start + last.size();
- }
- private void roll() throws IOException {
- final long newOffset = this.nextAppendOffset();
- //新的segment对应的存储文件的命名为原来最后一个segment的起始位置 + segment的size
- final File newFile = new File(this.partitionDir, this.nameFromOffset(newOffset));
- this.segments.last().fileMessageSet.flush();
- this.segments.last().fileMessageSet.setMutable(false);
- this.segments.append(new Segment(newOffset, newFile));
- }
- //判断是否需要消息提交到磁盘,判断的条件有两个,如果达到组提交的条件或者达到间隔的提交时间
- private void mayBeFlush(final int numOfMessages) throws IOException {
- if (this.unflushed.addAndGet(numOfMessages) >this.metaConfig.getTopicConfig(this.topic).getUnflushThreshold()
- || SystemTimer.currentTimeMillis() - this.lastFlushTime.get() >this.metaConfig.getTopicConfig(this.topic).getUnflushInterval()) {
- this.flush0();
- }
- }
- //提交到磁盘
- public void flush() throws IOException {
- this.writeLock.lock();
- try {
- this.flush0();
- } finally {
- this.writeLock.unlock();
- }
- }
- private void flush0() throws IOException {
- if (this.useGroupCommit()) {
- return;
- }
- //由于只有最后一个segment是可变,即可写入消息的,所以只需要提交最后一个segment的消息
- this.segments.last().fileMessageSet.flush();
- this.unflushed.set(0);
- this.lastFlushTime.set(SystemTimer.currentTimeMillis());
- }
- @Override
- public void close() throws IOException {
- this.closed = true;
- this.interrupt();
- //等待子线程完成写完异步队列中剩余未写的消息
- try {
- this.join(500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- //关闭segment,保证内容都已经提交到磁盘
- for (final Segment segment : this.segments.view()) {
- segment.fileMessageSet.close();
- }
- }
- //返回segment的信息,主要包括segment的开始位置以及 segment 的size
- public List<SegmentInfo> getSegmentInfos() {
- final List<SegmentInfo> rt = new ArrayList<SegmentInfo>();
- for (final Segment seg : this.segments.view()) {
- rt.add(new SegmentInfo(seg.start, seg.size()));
- }
- return rt;
- }
- /**
- * 返回当前最大可读的offset
- */
- //需要注意的是,在文件缓存中的消息是不可读的,可以通过getSizeInBytes()方法来判断还有多少内容还在文件缓存中,getSizeInBytes()方法返回的值是包括所有在磁盘和缓存中的size
- public long getMaxOffset() {
- final Segment last = this.segments.last();
- if (last != null) {
- return last.start + last.size();
- } else {
- return 0;
- }
- }
- /**
- * 返回当前最小可读的offset
- */
- public long getMinOffset() {
- Segment first = this.segments.first();
- if (first != null) {
- return first.start;
- } else {
- return 0;
- }
- }
- /**
- * 根据offset和maxSize返回所在MessageSet, 当offset超过最大offset的时候返回null,
- * 当offset小于最小offset的时候抛出ArrayIndexOutOfBounds异常
- */
- //代码的注释以及清楚的解析了作用
- public MessageSet slice(final long offset, final int maxSize) throws IOException {
- final Segment segment = this.findSegment(this.segments.view(), offset);
- if (segment == null) {
- return null;
- } else {
- return segment.fileMessageSet.slice(offset - segment.start, offset - segment.start + maxSize);
- }
- }
- /**
- * 根据offset查找文件,如果超过尾部,则返回null,如果在头部之前,则抛出ArrayIndexOutOfBoundsException
- */
- //指定位置找到对应的segment,由于前面的文件组织方式,所以这里可以采用2分查找的方式,
- //效率很高
- Segment findSegment(final Segment[] segments, final long offset) {
- if (segments == null || segments.length < 1) {
- return null;
- }
- // 老的数据不存在,返回最近最老的数据
- final Segment last = segments[segments.length - 1];
- // 在头部以前,抛出异常
- if (offset < segments[0].start) {
- throw new ArrayIndexOutOfBoundsException();
- }
- // 刚好在尾部或者超出范围,返回null
- if (offset >= last.start + last.size()) {
- return null;
- }
- // 根据offset二分查找
- int low = 0;
- int high = segments.length - 1;
- while (low <= high) {
- final int mid = high + low >>> 1;
- final Segment found = segments[mid];
- if (found.contains(offset)) {
- return found;
- } else if (offset < found.start) {
- high = mid - 1;
- } else {
- low = mid + 1;
- }
- }
- return null;
- }
- //添加消息的方式有两种,同步和异步
- public void append(final long msgId, final PutCommand req, final AppendCallback cb) {
- //首先将内容包装成前面介绍过的消息存储格式
- this.appendBuffer(MessageUtils.makeMessageBuffer(msgId, req), cb);
- }
- //异步写入的包装类
- private static class WriteRequest {
- public final ByteBuffer buf;
- public final AppendCallback cb;
- public Location result;
- public WriteRequest(final ByteBuffer buf, final AppendCallback cb) {
- super();
- this.buf = buf;
- this.cb = cb;
- }
- }
- //这里比较好的设计是采用回调的方式来,对于异步写入实现就变得非常容易
- //AppendCallback返回的是消息成功写入的位置Location(起始位置和消息长度),该Location并不是相对于当前Segment的开始位置0,而是相对于当前Segment给定的值(对应文件命名值即为给定的值),以后查询消息的时候直接使用该位置就可以快速定位到消息写入到哪个文件
- //这也就是为什么文件名的命名采用前后衔接的方式,这也通过2分查找可以快速定位消息的位置
- private void appendBuffer(final ByteBuffer buffer, final AppendCallback cb) {
- if (this.closed) {
- throw new IllegalStateException("Closed MessageStore.");
- }
- //如果启动异步写入并且消息长度小于一次提交的最大值maxTransferSize,则将该消息放入异步写入队列
- if (this.useGroupCommit() && buffer.remaining() < this.maxTransferSize) {
- this.bufferQueue.offer(new WriteRequest(buffer, cb));
- } else {
- Location location = null;
- final int remainning = buffer.remaining();
- this.writeLock.lock();
- try {
- final Segment cur = this.segments.last();
- final long offset = cur.start + cur.fileMessageSet.append(buffer);
- this.mayBeFlush(1);
- this.mayBeRoll();
- location = Location.create(offset, remainning);
- } catch (final IOException e) {
- log.error("Append file failed", e);
- location = Location.InvalidLocaltion;
- } finally {
- this.writeLock.unlock();
- if (cb != null) {
- cb.appendComplete(location);
- }
- //调用回调方法,数据写入文件缓存
- }
- }
- }
- ……
- //判断是否启用异步写入,如果设置为unflushThreshold <=0的数字,则认为启动异步写入;如果设置为unflushThreshold =1,则是同步写入,即每写入一个消息都会提交到磁盘;如果unflushThreshold>0,则是依赖组提交或者是超时提交
- private boolean useGroupCommit() {
- return this.unflushThreshold <= 0;
- }
- @Override
- public void run() {
- // 等待force的队列
- final LinkedList<WriteRequest> toFlush = new LinkedList<WriteRequest>();
- WriteRequest req = null;
- long lastFlushPos = 0;
- Segment last = null;
- //存储没有关闭并且线程没有被中断
- while (!this.closed && !Thread.currentThread().isInterrupted()) {
- try {
- if (last == null) {
- //获取最后的一个segment,将消息写入最后segment对应的文件
- last = this.segments.last();
- lastFlushPos = last.fileMessageSet.highWaterMark();
- }
- if (req == null) {
- //如果等待提交到磁盘的队列toFlush为空,则两种可能:一、刚刚提交完,列表为空;二、等待写入消息的队列为空,如果判断toFlush,则调用bufferQueue.take()方法,可以阻塞住队列,而如果toFlush不为空,则调用bufferQueue.poll,这是提高性能的一种做法。
- if (toFlush.isEmpty()) {
- req = this.bufferQueue.take();
- } else {
- req = this.bufferQueue.poll();
- //如果当前请求为空,表明等待写入的消息已经没有了,这时候文件缓存中的消息需要提交到磁盘,防止消息丢失;或者如果已经写入文件的大小大于maxTransferSize,则提交到磁盘
- //这里需要注意的是,会出现这样一种情况,刚好最后一个segment的文件快满了,这时候是不会roll出一个新的segment写入消息的,而是直接追加到原来的segment尾部,可能导致segment对应的文件大小大于配置的单个segment大小
- if (req == null || last.fileMessageSet.getSizeInBytes() > lastFlushPos + this.maxTransferSize) {
- // 强制force,确保内容保存到磁盘
- last.fileMessageSet.flush();
- lastFlushPos = last.fileMessageSet.highWaterMark();
- // 通知回调
- //异步写入比组写入可靠,因为异步写入一定是提交到磁盘的时候才进行回调的,而组写入如果依赖组提交的方式,则可能会丢失数据,因为组写入在消息写入到文件缓存的时候就进行回调了(除非设置unflushThreshold=1)
- for (final WriteRequest request : toFlush) {
- request.cb.appendComplete(request.result);
- }
- toFlush.clear();
- // 是否需要roll
- this.mayBeRoll();
- // 如果切换文件,重新获取last
- if (this.segments.last() != last) {
- last = null;
- }
- continue;
- }
- }
- }
- if (req == null) {
- continue;
- }
- //写入文件,并计算写入位置
- final int remainning = req.buf.remaining();
- //写入位置为:当前segment给定的值 + 加上文件已有的长度
- final long offset = last.start + last.fileMessageSet.append(req.buf);
- req.result = Location.create(offset, remainning);
- if (req.cb != null) {
- toFlush.add(req);
- }
- req = null;
- } catch (final IOException e) {
- log.error("Append message failed,*critical error*,the group commit thread would be terminated.", e);
- // TODO io异常没办法处理了,简单跳出?
- break;
- } catch (final InterruptedException e) {
- // ignore
- }
- }
- // terminated
- //关闭store 前,将等待写入队列中的剩余消息写入最后一个文件,这时候如果最后一个Segment满了也不会roll出新的Segment,持续的将消息写入到最后一个Segment,所以这时候也会发生Segment的size大于配置的size的情况
- try {
- for (WriteRequest request : this.bufferQueue) {
- final int remainning = request.buf.remaining();
- final long offset = last.start + last.fileMessageSet.append(request.buf);
- if (request.cb != null) {
- request.cb.appendComplete(Location.create(offset, remainning));
- }
- }
- this.bufferQueue.clear();
- } catch (IOException e) {
- log.error("Append message failed", e);
- }
- }
- ……
- //Append多个消息,返回写入的位置
- public void append(final List<Long> msgIds, final List<PutCommand> putCmds, final AppendCallback cb) {
- this.appendBuffer(MessageUtils.makeMessageBuffer(msgIds, putCmds), cb);
- }
- /**
- * 重放事务操作,如果消息没有存储成功,则重新存储,并返回新的位置
- */
- public void replayAppend(final long offset, final int length, final int checksum, final List<Long> msgIds,
- final List<PutCommand> reqs, final AppendCallback cb) throws IOException {
- //如果消息没有存储,则重新存储,写到最后一个Segment尾部
- final Segment segment = this.findSegment(this.segments.view(), offset);
- if (segment == null) {
- this.append(msgIds, reqs, cb);
- } else {
- final MessageSet messageSet = segment.fileMessageSet.slice(offset - segment.start, offset - segment.start + length);
- final ByteBuffer buf = ByteBuffer.allocate(length);
- messageSet.read(buf, offset - segment.start);
- buf.flip();
- final byte[] bytes = new byte[buf.remaining()];
- buf.get(bytes);
- // 这个校验和是整个消息的校验和,这跟message的校验和不一样,注意区分
- final int checkSumInDisk = CheckSum.crc32(bytes);
- // 没有存入,则重新存储
- if (checksum != checkSumInDisk) {
- this.append(msgIds, reqs, cb);
- } else {
- // 正常存储了消息,无需处理
- if (cb != null) {
- this.notifyCallback(cb, null);
- }
- }
- }
- }
- //判断是否需要roll,如果当前 messagestore最后一个segment的size>=配置的segment size,则产生新的segment,并将新的segment作为最后一个segment,原来最后的segment提交一次,并将mutable设置为false
- private void mayBeRoll() throws IOException {
- if (this.segments.last().fileMessageSet.getSizeInBytes() >= this.metaConfig.getMaxSegmentSize()) {
- this.roll();
- }
- }
- String nameFromOffset(final long offset) {
- final NumberFormat nf = NumberFormat.getInstance();
- nf.setMinimumIntegerDigits(20);
- nf.setMaximumFractionDigits(0);
- nf.setGroupingUsed(false);
- return nf.format(offset) + FILE_SUFFIX;
- }
- private long nextAppendOffset() throws IOException {
- final Segment last = this.segments.last();
- last.fileMessageSet.flush();
- return last.start + last.size();
- }
- private void roll() throws IOException {
- final long newOffset = this.nextAppendOffset();
- //新的segment对应的存储文件的命名为原来最后一个segment的起始位置 + segment的size
- final File newFile = new File(this.partitionDir, this.nameFromOffset(newOffset));
- this.segments.last().fileMessageSet.flush();
- this.segments.last().fileMessageSet.setMutable(false);
- this.segments.append(new Segment(newOffset, newFile));
- }
- //判断是否需要消息提交到磁盘,判断的条件有两个,如果达到组提交的条件或者达到间隔的提交时间
- private void mayBeFlush(final int numOfMessages) throws IOException {
- if (this.unflushed.addAndGet(numOfMessages) > this.metaConfig.getTopicConfig(this.topic).getUnflushThreshold()
- || SystemTimer.currentTimeMillis() - this.lastFlushTime.get() > this.metaConfig.getTopicConfig(this.topic).getUnflushInterval()) {
- this.flush0();
- }
- }
- //提交到磁盘
- public void flush() throws IOException {
- this.writeLock.lock();
- try {
- this.flush0();
- } finally {
- this.writeLock.unlock();
- }
- }
- private void flush0() throws IOException {
- if (this.useGroupCommit()) {
- return;
- }
- //由于只有最后一个segment是可变,即可写入消息的,所以只需要提交最后一个segment的消息
- this.segments.last().fileMessageSet.flush();
- this.unflushed.set(0);
- this.lastFlushTime.set(SystemTimer.currentTimeMillis());
- }
- @Override
- public void close() throws IOException {
- this.closed = true;
- this.interrupt();
- //等待子线程完成写完异步队列中剩余未写的消息
- try {
- this.join(500);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- //关闭segment,保证内容都已经提交到磁盘
- for (final Segment segment : this.segments.view()) {
- segment.fileMessageSet.close();
- }
- }
- //返回segment的信息,主要包括segment的开始位置以及 segment 的size
- public List<SegmentInfo> getSegmentInfos() {
- final List<SegmentInfo> rt = new ArrayList<SegmentInfo>();
- for (final Segment seg : this.segments.view()) {
- rt.add(new SegmentInfo(seg.start, seg.size()));
- }
- return rt;
- }
- /**
- * 返回当前最大可读的offset
- */
- //需要注意的是,在文件缓存中的消息是不可读的,可以通过getSizeInBytes()方法来判断还有多少内容还在文件缓存中,getSizeInBytes()方法返回的值是包括所有在磁盘和缓存中的size
- public long getMaxOffset() {
- final Segment last = this.segments.last();
- if (last != null) {
- return last.start + last.size();
- } else {
- return 0;
- }
- }
- /**
- * 返回当前最小可读的offset
- */
- public long getMinOffset() {
- Segment first = this.segments.first();
- if (first != null) {
- return first.start;
- } else {
- return 0;
- }
- }
- /**
- * 根据offset和maxSize返回所在MessageSet, 当offset超过最大offset的时候返回null,
- * 当offset小于最小offset的时候抛出ArrayIndexOutOfBounds异常
- */
- //代码的注释以及清楚的解析了作用
- public MessageSet slice(final long offset, final int maxSize) throws IOException {
- final Segment segment = this.findSegment(this.segments.view(), offset);
- if (segment == null) {
- return null;
- } else {
- return segment.fileMessageSet.slice(offset - segment.start, offset - segment.start + maxSize);
- }
- }
- /**
- * 根据offset查找文件,如果超过尾部,则返回null,如果在头部之前,则抛出ArrayIndexOutOfBoundsException
- */
- //指定位置找到对应的segment,由于前面的文件组织方式,所以这里可以采用2分查找的方式,
- //效率很高
- Segment findSegment(final Segment[] segments, final long offset) {
- if (segments == null || segments.length < 1) {
- return null;
- }
- // 老的数据不存在,返回最近最老的数据
- final Segment last = segments[segments.length - 1];
- // 在头部以前,抛出异常
- if (offset < segments[0].start) {
- throw new ArrayIndexOutOfBoundsException();
- }
- // 刚好在尾部或者超出范围,返回null
- if (offset >= last.start + last.size()) {
- return null;
- }
- // 根据offset二分查找
- int low = 0;
- int high = segments.length - 1;
- while (low <= high) {
- final int mid = high + low >>> 1;
- final Segment found = segments[mid];
- if (found.contains(offset)) {
- return found;
- } else if (offset < found.start) {
- high = mid - 1;
- } else {
- low = mid + 1;
- }
- }
- return null;
- }
相关推荐
《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...
Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...
因此,我们需要自己编写定制的接收器或者利用现有的第三方库来实现MetaQ与Spark Streaming的对接。 一种常见的方法是通过Apache Kafka作为桥梁,因为MetaQ和Spark都支持与Kafka的集成。你可以将MetaQ中的消息发布到...
1. 日志收集:MetaQ可用于收集分布在各服务器上的日志,统一管理和分析,提高运维效率。 2. 数据同步:在分布式数据库或缓存系统中,MetaQ可以作为数据变更的传播通道,保证数据的一致性。 3. 异步处理:对于耗时...
本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...
MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...
Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...
最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...
RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...
2. **MetaQ架构**:MetaQ 由三部分组成:Broker(经纪人)、Controller(控制器)和 ZooKeeper(协调服务)。Broker 负责消息的存储和分发,Controller 管理集群状态,ZooKeeper 提供分布式协调服务,确保集群的高...
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...
阿里消息中间件MetaQ学习Demo
### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...
阿里巴巴企业诚信体系是基于大数据和先进技术构建的一套全面的安全架构,旨在从多个维度评估和管理企业信用风险。这个体系不仅涵盖了安全威胁情报、安全建设、应急响应和法律法规等多个关键领域,还利用自动化手段...
- **事务支持**:MetaQ支持两种类型的事务——本地事务和XA分布式事务。这两种事务类型能够满足支付宝钱包系统在处理复杂金融交易时对数据一致性的需求。 - **高可用复制**:MetaQ提供了异步复制和同步复制两种模式...
综上所述,实时数仓2.0是一种先进的数据处理框架,它通过优化数据模型、提升处理速度、确保数据质量,以及利用高级状态管理技术,来满足企业对实时数据分析的高要求。这一解决方案为企业提供了更敏捷的业务洞察,...
在开源的推动下,许多技术都得到了长足的发展,如LVS(Linux Virtual Server)、Tengine、MetaQ、dubbo、cobar、Fastjson等。这些技术的成功案例表明,开源不仅是技术的共享,也是知识和创新的共享。 蒋涛的讲话...