`
Technoboy
  • 浏览: 157475 次
  • 性别: Icon_minigender_1
  • 来自: 大连
社区版块
存档分类
最新评论

RocketMQ原理解析-Broker处理发送消息

阅读更多
1. 简介
  Producer发送一条消息到broker后,如何存储到commitlog文件,又是如何构建的consumequeue,index文件,将从源码的角度剖析。

2. SendMessageProcessor
  该类是负责响应Producer发消息到broker的入口处理逻辑类。其processRequest响应两种请求:
  • CONSUMER_SEND_MSG_BACK:  consumer消费失败的消息发回broker。
  • default: producer发消息到broker。

  由default,进入sendMessage方法,首先,我们看到对opaque的操作,表示透传回response,consumer和prodcuer实际是根据opaque进行请求的应答匹配。
 response.setOpaque(request.getOpaque());

  然后,进行了broker存活检查,即this.brokerController.getMessageStore().now()小于getStartAcceptSendRequestTimeStamp() ,表明broker异常。
  msgCheck方法对发送的消息进行了相关检查:topic是否有写权限,topic是否存在,请求的queueId是否存在。如果topic不存在的话,broker端配置autoCreateTopicEnable=true的话,将创建队列数为4的topic。这个创建topic的逻辑比较深:
TopicConfig topicConfig =
                this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

  在msgCheck方法中,可以看到以上代码,如果if (null == topicConfig) ,就会进行topic的创建,真正的创建逻辑在TopicConfigManager#createTopicInSendMessageMethod。queueNums由producer在发送时,默认设置了4. 我们可以通过在producer端设置defaultTopicQueueNums进行调整。
  然后,判断topic是否为重试topic,当重试次数大于16次后,将消息加入DLQ队列。
if ((null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {

            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());

            SubscriptionGroupConfig subscriptionGroupConfig =
                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark(
                        "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return response;
            }
            //16
            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            int reconsumeTimes = requestHeader.getReconsumeTimes();
            if (reconsumeTimes >= maxReconsumeTimes) {
            	//重试队列
                newTopic = MixAll.getDLQTopic(groupName);
                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
                        DLQ_NUMS_PER_GROUP, //
                        PermName.PERM_WRITE, 0
                );
                if (null == topicConfig) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return response;
                }
            }
        }
 
  然后,构建发送消息内部类MessageExtBrokerInner,并执行this.brokerController.getMessageStore().putMessage(msgInner);进行消息的保存。getMessageStore()的默认实现为DefaultMessageStore。跳入DefaultMessageStore#putMessage方法:
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    	//broker是否停止
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        //broker为slave,不能存储消息
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
        //broker是否运行
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }

        //topic长度检查
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
        //消息属性长度检查
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }
        //oscachebusy=true意味着一条消息存储的时间超过了5s,这是有问题的,磁盘,网络磁盘,
        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }

        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);

        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 1000) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

  putMessage内部,也是进行了消息在存入commitlog文件前的检查工作:broker停机,slave不可存消息,broker是否运行,topic是否超长,消息属性是否过大,osPageCacheBusy检查。
都没问题后,调用CommitLog#putMessage:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TransactionNotType//
                || tranType == MessageSysFlag.TransactionCommitType) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MapedFile unlockMapedFile = null;
        MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
        synchronized (this) {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mapedFile || mapedFile.isFull()) {
                mapedFile = this.mapedFileQueue.getLastMapedFile();
            }
            if (null == mapedFile) {
                log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            result = mapedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMapedFile = mapedFile;
                    // Create a new file, re-write the message
                    mapedFile = this.mapedFileQueue.getLastMapedFile();
                    if (null == mapedFile) {
                        // XXX: warn and notify me
                        log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } // end of synchronized

        if (eclipseTimeInLock > 1000) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }
        if (null != unlockMapedFile) {
            this.defaultMessageStore.unlockMapedFile(unlockMapedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        GroupCommitRequest request = null;

        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (msg.isWaitStoreMsgOK()) {
                request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
                            + " client address: " + msg.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            this.flushCommitLogService.wakeup();
        }

        // Synchronous write double
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (msg.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    if (null == request) {
                        request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    }
                    service.putRequest(request);

                    service.getWaitNotifyObject().wakeupAll();

                    boolean flushOK =
                            // TODO
                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
                                + msg.getTags() + " client address: " + msg.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

        return putMessageResult;
    }

  首先,设置消息存储时间,设置body的CRC校验值,检查msg的DelayTimeLevel,(对于consumer消费失败并发回broker后,msg的DelayTimeLevel会随着消费失败进行+1操作,可查看CONSUMER_SEND_MSG_BACK的逻辑。由于消息存入commitlog实现统一,所以此处会根据DelayTimeLevel值设置topic是否为SCHEDULE_TOPIC_XXXX)。
  对于RocketMQ,所有的commitlog文件,index文件,consumequeue文件都映射为MapedFile,由MapedFileQueue统一管理。消息在commitlog文件中,是顺序写,随机读,所以此处要获取到最后一个commitlog文件:
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();

  然后调用mapedFile#appendMessage将消息写入,看一下此方法内部:
public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
        assert msg != null;
        assert cb != null;

        int currentPos = this.wrotePostion.get();


        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result =
                    cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
            this.wrotePostion.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }


        log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "
                + this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }

  消息写入,在MapedFile中有2个变量,wrotePostion和committedPosition。
  • wrotePostion: 消息写入位置,
  • committedPosition: 消息刷盘位置,起始值为0。 当进行刷盘后,值等于wrotePostion

  所以,上面的逻辑中,需要判断wrotePostion要小于fileSize(commitlog的fileSize为1024*1024*1024)。对于commitlog,默认大小为1G,MappedByteBuffer适合做大文件的读写操作,所以MQ将一个个commitlog内容映射到MappedByteBuffer后,通过MapedFile进行了封装。通过MappedByteBuffer#slice后,在AppendMessageCallback#doAppend方法中进行存储。
改方法的签名:
doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg)

  fileFromOffset为文件名,commitlog文件名就是该文件起始消息的offset值。
  maxBlank值为this.fileSize - currentPos,即1024*1024*1024-currentPos。
  进入AppendMessageCallback#doAppend方法:
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
            MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg;
            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position(); //消息的offset,   通过指定offset+msgLen即可圈出一条完整的消息

            //消息msgId,由broker所在机器host+wroteOffset组成。为什么只指定msgId且msgId未包含消息size,可查询到完整的消息呢? 原因在于通过msgId查询消息时,通过offset到commitlog只取前4字节获取到size(前4个字节int为size值),然后再一次查询即可、
            String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(), wroteOffset); 

            // Record ConsumeQueue information
            //逻辑队列,即consumequeue值,可以看出,逻辑队列从0开始。一般wroteOffset值都很大,queueOffset很小,也可以减少网络开销。
            String key = msgInner.getTopic() + "-" + msgInner.getQueueId();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            // Transaction messages that require special handling
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                case MessageSysFlag.TransactionPreparedType:
                case MessageSysFlag.TransactionRollbackType:
                    queueOffset = 0L;
                    break;
                case MessageSysFlag.TransactionNotType:
                case MessageSysFlag.TransactionCommitType:
                default:
                    break;
            }

            /**
             * Serialize message
             */
            final byte[] propertiesData =
                    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
            if (propertiesData.length > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length;

            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData == null ? 0 : topicData.length;

            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

            //计算消息大小
            final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                        + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetMsgStoreItemMemory(maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode);
                // 3 The remaining space may be any value
                //

                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            this.resetMsgStoreItemMemory(msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes());
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort(propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                    msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TransactionPreparedType:
                case MessageSysFlag.TransactionRollbackType:
                    break;
                case MessageSysFlag.TransactionNotType:
                case MessageSysFlag.TransactionCommitType:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }

            return result;
        }

  消息存储完,回到CommitLog#putMessage方法,到了刷盘和同步数据给slave。如果broker配置为SYNC_FLUSH,那么每次存储完消息,就需要检查数据(wrotePostion-committedPosition)是否超过4页,1页4k,即16k,到了即刷盘。如果broker配置为SYNC_MASTER,需要等到数据同步offset小于256m即可返回给客户端。

3. Consumequeue和Index文件
  上面,是一套完整的producer发消息到commitlog的过程。对于消息如何保存在consumequeue以及index中,MQ是通过ReputMessageService做的,主要逻辑在doReput方法内。MQ启动,首先设置reputFromOffset为commitlog最大的offset。
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());

  然后,ReputMessageService线程每隔1ms检查reputFromOffset是否小于commitLog.getMaxOffset(),小于的话,表示commitlog有新的数据,就将数据从reputFromOffset开始,大小为commitlog的wrotePostion - reputFromOffset从commitlog取出。获取数据后,构建DispatchRequest,并在doDispatch方法内构建consumequeue和index索引文件。由于index文件的构建,在前面章节有涉及,这里只简单介绍consumequeue的构建。通过代码调用,最终在ConsumeQueue#putMessagePostionInfo中创建了consumequeue,且只包含commitlog文件的:offset,size,tagsCode:
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQStoreUnitSize);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

  注释后的ReputMessageService#doReput:
private void doReput() {
        	//reputFromOffse小于commitLog.getMaxOffset(),表示有新数据
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }
                
                //获取新数据
                SelectMapedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                        	//将数据取出,之前取出的是ByteBuffer,依次取出详细信息,并设置到DispatchRequest
                            DispatchRequest dispatchRequest =
                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getMsgSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                	//构建consumequeue和index
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                    	//如果客户端有拉数据请求,且服务器设置了LongPollingEnable=true,那么在此通知PullRequestHoldService,并下推消息给客户端。
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                                dispatchRequest.getTagsCode());
                                    }
                                    // FIXED BUG By shijia
                                    this.reputFromOffset += size;
                                    readSize += size;
                                    //统计消息
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                                .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService
                                                .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                                .addAndGet(dispatchRequest.getMsgSize());
                                    }
                                }
                                //到文件尾部,去下一个文件中继续查找
                                else if (size == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {


                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                }
                                else {
                                    doNext = false;
                                    if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                                this.reputFromOffset);

                                        this.reputFromOffset += (result.getSize() - readSize);
                                    }
                                }
                            }
                        }
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }
分享到:
评论

相关推荐

    rocketmq-externals-master.zip

    通过阅读RocketMQ的源码,开发者可以了解到分布式消息中间件的设计理念,学习到如何在高并发环境下处理大量消息的存储和传输,以及如何实现消息的可靠性和一致性。这对于提升系统设计能力,特别是在微服务架构中...

    消息中间件rocketmq原理解析

    以下将基于给定内容详细解析RocketMQ的原理解析。 ### RocketMQ概述 RocketMQ是由阿里巴巴开源的一款分布式消息中间件。它提供了分布式系统之间的异步消息通信能力,广泛应用于各种业务场景中,如订单处理、任务...

    消息中间件rocketmq原理解析.pdf

    标题中提到的"消息中间件rocketmq原理解析"揭示了本文档的核心内容,即对消息中间件RocketMQ的原理进行解析和探讨。RocketMQ是阿里巴巴开源的一款分布式消息中间件,主要用于企业级的分布式系统中,用以实现系统之间...

    RocketMQ原理详解

    ### RocketMQ原理详解 #### 一、RocketMQ概述 RocketMQ是一款由阿里巴巴开源的消息中间件,主要用于异步处理、解耦、削峰等场景。它提供了高性能、高可靠性的消息服务,支持点对点消息、发布/订阅模式、事务消息等...

    消息中间件 rocketmq原理解析

    本文将从以下几个方面对RocketMQ的原理进行解析。 ### 一、Producer #### 1. Producer启动流程 Producer是消息的发送者,它的启动流程如下: - 在发送消息时,如果Producer集合中没有对应topic的信息,则会向...

    RocketMQ实战与原理解析

    ### RocketMQ实战与原理解析 #### 一、RocketMQ简介 Apache RocketMQ是一个分布式消息中间件,由阿里巴巴捐赠并成为Apache顶级项目。RocketMQ具备高性能、低延迟、高可靠等特性,支持发布/订阅模式、消息过滤、...

    rocketmq相关jar包.zip

    2. `rocketmq-common-4.7.0.jar`:这个jar包包含了RocketMQ的通用模块,提供了一些基础工具类和常量,如NameServer地址解析、配置管理、时间戳处理、线程池管理等。同时,它也包含了一些核心的数据结构,如Message...

    Rocketmq核心源码剖析-图灵杨过老师1

    在深入探讨RocketMQ核心源码之前,...通过对RocketMQ的CommitLog和ConsumeQueue的理解,以及NameServer和Broker架构的解析,我们可以更深入地掌握RocketMQ的工作原理,这对于优化系统性能和解决实际问题具有重要意义。

    RocketMQ实战与原理解析.zip

    总的来说,《RocketMQ实战与原理解析》是一本全面且深入的RocketMQ教程,无论你是初涉消息中间件的新手,还是寻求提升的开发者,都能从中受益。不过,作者也提醒,尽管这本书内容详实,但要达到精通RocketMQ的程度,...

    RocketMQ高级原理:深入剖析消息系统的核心机制

    《RocketMQ高级原理:深入剖析消息系统的核心机制》是一篇深度解析RocketMQ核心机制的文章,旨在帮助读者理解和掌握这款分布式消息中间件的工作原理。RocketMQ是阿里巴巴开源的一款高性能、高可用的消息中间件,广泛...

    深入解析RocketMQ

    Producer 如何感知要发送消息的broker 即brokerAddrTable 中的值是怎么获得的, 1. 发送消息的时候指定会指定topic,如果producer 集合中没有会根据指定topic 到namesrv 获取 topic 发布信息TopicPublishInfo,并放...

    阿里RocketMQ_用户指南_V3.2.4 & RocketMQ-原理简介

    1. **RocketMQ概述**:RocketMQ的设计目标是解决大规模分布式系统中的异步处理和解耦问题,其核心特性包括消息队列、发布/订阅模型、事务消息和定时/延时消息等。 2. **架构设计**:RocketMQ的架构主要由NameServer...

    rocketmq-原理简析(适合初级使用者)

    本篇文章主要针对初级使用者,解析RocketMQ的基本概念和工作原理。 1. **ProducerGroup** ProducerGroup 是一组具有相同属性的Producer的集合,这些属性包括处理的消息类型(Topic)以及处理逻辑。在事务消息机制...

    RocketMQ原理分析.rar

    本压缩包“RocketMQ原理分析.rar”包含了对RocketMQ核心机制的深入解析,旨在帮助用户理解其工作原理。 RocketMQ的核心组件包括Producer、Consumer、NameServer和Broker四个部分: 1. **Producer**: 生产者是消息...

    rocketmq-4.9.2版本

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中,提供高可用、高吞吐量的消息传输服务。4.9.2版本作为RocketMQ的一个稳定版本,包含了一系列的功能改进和性能优化。以下是对RocketMQ...

    全面解剖RocketMQ和项目实战-day4-part5.7z

    Broker作为RocketMQ的核心组件之一,负责接收Producer发送的消息,并将它们存储和转发给Consumer。组装消息是指Broker如何根据多种因素(如消息大小、主题、队列等)将接收到的消息合理地存储和分发,确保消息的...

    RocketMQ-master.rar

    在Java编程环境中,RocketMQ的源码解析可以帮助我们理解其内部工作原理,包括消息的生产、消费、存储和调度机制。以下是一些可能涉及的知识点: 1. **消息模型**:RocketMQ支持两种消息模型,发布/订阅(Publish/...

    rocketmq 源码 rocketmq 源码 rocketmq 源码

    发送过程包括选择合适的 Broker、消息序列化和批量发送等步骤。Producer 提供了同步和异步两种发送模式,异步发送可以提高消息发送的吞吐量。 4. **Consumer** Consumer 提供了两种消费模式:Push 模式和 Pull ...

    全面解剖RocketMQ和项目实战-day4-part2.7z

    当Broker出现故障或被手动停用时,NameServer会删除相应的路由信息,防止Producer和Consumer继续向失效的Broker发送消息。理解路由删除的逻辑对于故障恢复和系统稳定性有直接影响。 通过这些视频教程,学习者将能...

    RocketMQ入门实战及源码解析.7z

    RocketMQ的源码解析可以帮助我们理解其内部机制,包括NameServer的注册与发现、Broker的消息存储与检索、Producer的发送逻辑、Consumer的消费策略等。通过阅读源码,可以学习到如何设计分布式系统、如何优化消息传递...

Global site tag (gtag.js) - Google Analytics