- 浏览: 157475 次
- 性别:
- 来自: 大连
文章分类
最新评论
-
eclipseakwolf:
很棒的文章
Shiro User Manual-Authentication -
xugangqiang:
very good
Java Concurrent Programming (1) -
jlcon:
是不是AbstractShiroTest还需要继承EasyMo ...
Shiro User Manual-Testing -
jlcon:
createNiceMock这个EasyMockSupport ...
Shiro User Manual-Testing -
Technoboy:
53873039oycg 写道楼主:问下,你的那个dao接口的 ...
Shiro Example
1. 简介
Producer发送一条消息到broker后,如何存储到commitlog文件,又是如何构建的consumequeue,index文件,将从源码的角度剖析。
2. SendMessageProcessor
该类是负责响应Producer发消息到broker的入口处理逻辑类。其processRequest响应两种请求:
由default,进入sendMessage方法,首先,我们看到对opaque的操作,表示透传回response,consumer和prodcuer实际是根据opaque进行请求的应答匹配。
然后,进行了broker存活检查,即this.brokerController.getMessageStore().now()小于getStartAcceptSendRequestTimeStamp() ,表明broker异常。
msgCheck方法对发送的消息进行了相关检查:topic是否有写权限,topic是否存在,请求的queueId是否存在。如果topic不存在的话,broker端配置autoCreateTopicEnable=true的话,将创建队列数为4的topic。这个创建topic的逻辑比较深:
在msgCheck方法中,可以看到以上代码,如果if (null == topicConfig) ,就会进行topic的创建,真正的创建逻辑在TopicConfigManager#createTopicInSendMessageMethod。queueNums由producer在发送时,默认设置了4. 我们可以通过在producer端设置defaultTopicQueueNums进行调整。
然后,判断topic是否为重试topic,当重试次数大于16次后,将消息加入DLQ队列。
然后,构建发送消息内部类MessageExtBrokerInner,并执行this.brokerController.getMessageStore().putMessage(msgInner);进行消息的保存。getMessageStore()的默认实现为DefaultMessageStore。跳入DefaultMessageStore#putMessage方法:
putMessage内部,也是进行了消息在存入commitlog文件前的检查工作:broker停机,slave不可存消息,broker是否运行,topic是否超长,消息属性是否过大,osPageCacheBusy检查。
都没问题后,调用CommitLog#putMessage:
首先,设置消息存储时间,设置body的CRC校验值,检查msg的DelayTimeLevel,(对于consumer消费失败并发回broker后,msg的DelayTimeLevel会随着消费失败进行+1操作,可查看CONSUMER_SEND_MSG_BACK的逻辑。由于消息存入commitlog实现统一,所以此处会根据DelayTimeLevel值设置topic是否为SCHEDULE_TOPIC_XXXX)。
对于RocketMQ,所有的commitlog文件,index文件,consumequeue文件都映射为MapedFile,由MapedFileQueue统一管理。消息在commitlog文件中,是顺序写,随机读,所以此处要获取到最后一个commitlog文件:
然后调用mapedFile#appendMessage将消息写入,看一下此方法内部:
消息写入,在MapedFile中有2个变量,wrotePostion和committedPosition。
所以,上面的逻辑中,需要判断wrotePostion要小于fileSize(commitlog的fileSize为1024*1024*1024)。对于commitlog,默认大小为1G,MappedByteBuffer适合做大文件的读写操作,所以MQ将一个个commitlog内容映射到MappedByteBuffer后,通过MapedFile进行了封装。通过MappedByteBuffer#slice后,在AppendMessageCallback#doAppend方法中进行存储。
改方法的签名:
fileFromOffset为文件名,commitlog文件名就是该文件起始消息的offset值。
maxBlank值为this.fileSize - currentPos,即1024*1024*1024-currentPos。
进入AppendMessageCallback#doAppend方法:
消息存储完,回到CommitLog#putMessage方法,到了刷盘和同步数据给slave。如果broker配置为SYNC_FLUSH,那么每次存储完消息,就需要检查数据(wrotePostion-committedPosition)是否超过4页,1页4k,即16k,到了即刷盘。如果broker配置为SYNC_MASTER,需要等到数据同步offset小于256m即可返回给客户端。
3. Consumequeue和Index文件
上面,是一套完整的producer发消息到commitlog的过程。对于消息如何保存在consumequeue以及index中,MQ是通过ReputMessageService做的,主要逻辑在doReput方法内。MQ启动,首先设置reputFromOffset为commitlog最大的offset。
然后,ReputMessageService线程每隔1ms检查reputFromOffset是否小于commitLog.getMaxOffset(),小于的话,表示commitlog有新的数据,就将数据从reputFromOffset开始,大小为commitlog的wrotePostion - reputFromOffset从commitlog取出。获取数据后,构建DispatchRequest,并在doDispatch方法内构建consumequeue和index索引文件。由于index文件的构建,在前面章节有涉及,这里只简单介绍consumequeue的构建。通过代码调用,最终在ConsumeQueue#putMessagePostionInfo中创建了consumequeue,且只包含commitlog文件的:offset,size,tagsCode:
注释后的ReputMessageService#doReput:
Producer发送一条消息到broker后,如何存储到commitlog文件,又是如何构建的consumequeue,index文件,将从源码的角度剖析。
2. SendMessageProcessor
该类是负责响应Producer发消息到broker的入口处理逻辑类。其processRequest响应两种请求:
- CONSUMER_SEND_MSG_BACK: consumer消费失败的消息发回broker。
- default: producer发消息到broker。
由default,进入sendMessage方法,首先,我们看到对opaque的操作,表示透传回response,consumer和prodcuer实际是根据opaque进行请求的应答匹配。
response.setOpaque(request.getOpaque());
然后,进行了broker存活检查,即this.brokerController.getMessageStore().now()小于getStartAcceptSendRequestTimeStamp() ,表明broker异常。
msgCheck方法对发送的消息进行了相关检查:topic是否有写权限,topic是否存在,请求的queueId是否存在。如果topic不存在的话,broker端配置autoCreateTopicEnable=true的话,将创建队列数为4的topic。这个创建topic的逻辑比较深:
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
在msgCheck方法中,可以看到以上代码,如果if (null == topicConfig) ,就会进行topic的创建,真正的创建逻辑在TopicConfigManager#createTopicInSendMessageMethod。queueNums由producer在发送时,默认设置了4. 我们可以通过在producer端设置defaultTopicQueueNums进行调整。
然后,判断topic是否为重试topic,当重试次数大于16次后,将消息加入DLQ队列。
if ((null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) { String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return response; } //16 int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } int reconsumeTimes = requestHeader.getReconsumeTimes(); if (reconsumeTimes >= maxReconsumeTimes) { //重试队列 newTopic = MixAll.getDLQTopic(groupName); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // DLQ_NUMS_PER_GROUP, // PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response; } } }
然后,构建发送消息内部类MessageExtBrokerInner,并执行this.brokerController.getMessageStore().putMessage(msgInner);进行消息的保存。getMessageStore()的默认实现为DefaultMessageStore。跳入DefaultMessageStore#putMessage方法:
public PutMessageResult putMessage(MessageExtBrokerInner msg) { //broker是否停止 if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } //broker为slave,不能存储消息 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is slave mode, so putMessage is forbidden "); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } //broker是否运行 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); } //topic长度检查 if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } //消息属性长度检查 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } //oscachebusy=true意味着一条消息存储的时间超过了5s,这是有问题的,磁盘,网络磁盘, if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } long beginTime = this.getSystemClock().now(); PutMessageResult result = this.commitLog.putMessage(msg); long eclipseTime = this.getSystemClock().now() - beginTime; if (eclipseTime > 1000) { log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result; }
putMessage内部,也是进行了消息在存入commitlog文件前的检查工作:broker停机,slave不可存消息,broker是否运行,topic是否超长,消息属性是否过大,osPageCacheBusy检查。
都没问题后,调用CommitLog#putMessage:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TransactionNotType// || tranType == MessageSysFlag.TransactionCommitType) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long eclipseTimeInLock = 0; MapedFile unlockMapedFile = null; MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock(); synchronized (this) { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp); if (null == mapedFile || mapedFile.isFull()) { mapedFile = this.mapedFileQueue.getLastMapedFile(); } if (null == mapedFile) { log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } result = mapedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMapedFile = mapedFile; // Create a new file, re-write the message mapedFile = this.mapedFileQueue.getLastMapedFile(); if (null == mapedFile) { // XXX: warn and notify me log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mapedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } // end of synchronized if (eclipseTimeInLock > 1000) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result); } if (null != unlockMapedFile) { this.defaultMessageStore.unlockMapedFile(unlockMapedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); GroupCommitRequest request = null; // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (msg.isWaitStoreMsgOK()) { request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() + " client address: " + msg.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { this.flushCommitLogService.wakeup(); } // Synchronous write double if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (msg.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { if (null == request) { request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); } service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = // TODO request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() + " client address: " + msg.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } return putMessageResult; }
首先,设置消息存储时间,设置body的CRC校验值,检查msg的DelayTimeLevel,(对于consumer消费失败并发回broker后,msg的DelayTimeLevel会随着消费失败进行+1操作,可查看CONSUMER_SEND_MSG_BACK的逻辑。由于消息存入commitlog实现统一,所以此处会根据DelayTimeLevel值设置topic是否为SCHEDULE_TOPIC_XXXX)。
对于RocketMQ,所有的commitlog文件,index文件,consumequeue文件都映射为MapedFile,由MapedFileQueue统一管理。消息在commitlog文件中,是顺序写,随机读,所以此处要获取到最后一个commitlog文件:
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
然后调用mapedFile#appendMessage将消息写入,看一下此方法内部:
public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) { assert msg != null; assert cb != null; int currentPos = this.wrotePostion.get(); if (currentPos < this.fileSize) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg); this.wrotePostion.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: " + this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
消息写入,在MapedFile中有2个变量,wrotePostion和committedPosition。
- wrotePostion: 消息写入位置,
- committedPosition: 消息刷盘位置,起始值为0。 当进行刷盘后,值等于wrotePostion
所以,上面的逻辑中,需要判断wrotePostion要小于fileSize(commitlog的fileSize为1024*1024*1024)。对于commitlog,默认大小为1G,MappedByteBuffer适合做大文件的读写操作,所以MQ将一个个commitlog内容映射到MappedByteBuffer后,通过MapedFile进行了封装。通过MappedByteBuffer#slice后,在AppendMessageCallback#doAppend方法中进行存储。
改方法的签名:
doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg)
fileFromOffset为文件名,commitlog文件名就是该文件起始消息的offset值。
maxBlank值为this.fileSize - currentPos,即1024*1024*1024-currentPos。
进入AppendMessageCallback#doAppend方法:
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg; // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); //消息的offset, 通过指定offset+msgLen即可圈出一条完整的消息 //消息msgId,由broker所在机器host+wroteOffset组成。为什么只指定msgId且msgId未包含消息size,可查询到完整的消息呢? 原因在于通过msgId查询消息时,通过offset到commitlog只取前4字节获取到size(前4个字节int为size值),然后再一次查询即可、 String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(), wroteOffset); // Record ConsumeQueue information //逻辑队列,即consumequeue值,可以看出,逻辑队列从0开始。一般wroteOffset值都很大,queueOffset很小,也可以减少网络开销。 String key = msgInner.getTopic() + "-" + msgInner.getQueueId(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType: queueOffset = 0L; break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: default: break; } /** * Serialize message */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); if (propertiesData.length > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length; final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData == null ? 0 : topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; //计算消息大小 final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetMsgStoreItemMemory(maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode); // 3 The remaining space may be any value // // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // Initialization of storage space this.resetMsgStoreItemMemory(msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.msgStoreItemMemory.put(msgInner.getBornHostBytes()); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.msgStoreItemMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort(propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType: break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }
消息存储完,回到CommitLog#putMessage方法,到了刷盘和同步数据给slave。如果broker配置为SYNC_FLUSH,那么每次存储完消息,就需要检查数据(wrotePostion-committedPosition)是否超过4页,1页4k,即16k,到了即刷盘。如果broker配置为SYNC_MASTER,需要等到数据同步offset小于256m即可返回给客户端。
3. Consumequeue和Index文件
上面,是一套完整的producer发消息到commitlog的过程。对于消息如何保存在consumequeue以及index中,MQ是通过ReputMessageService做的,主要逻辑在doReput方法内。MQ启动,首先设置reputFromOffset为commitlog最大的offset。
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
然后,ReputMessageService线程每隔1ms检查reputFromOffset是否小于commitLog.getMaxOffset(),小于的话,表示commitlog有新的数据,就将数据从reputFromOffset开始,大小为commitlog的wrotePostion - reputFromOffset从commitlog取出。获取数据后,构建DispatchRequest,并在doDispatch方法内构建consumequeue和index索引文件。由于index文件的构建,在前面章节有涉及,这里只简单介绍consumequeue的构建。通过代码调用,最终在ConsumeQueue#putMessagePostionInfo中创建了consumequeue,且只包含commitlog文件的:offset,size,tagsCode:
this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQStoreUnitSize); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode);
注释后的ReputMessageService#doReput:
private void doReput() { //reputFromOffse小于commitLog.getMaxOffset(),表示有新数据 for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } //获取新数据 SelectMapedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { //将数据取出,之前取出的是ByteBuffer,依次取出详细信息,并设置到DispatchRequest DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { //构建consumequeue和index DefaultMessageStore.this.doDispatch(dispatchRequest); if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { //如果客户端有拉数据请求,且服务器设置了LongPollingEnable=true,那么在此通知PullRequestHoldService,并下推消息给客户端。 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode()); } // FIXED BUG By shijia this.reputFromOffset += size; readSize += size; //统计消息 if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } //到文件尾部,去下一个文件中继续查找 else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += (result.getSize() - readSize); } } } } } finally { result.release(); } } else { doNext = false; } } }
发表评论
-
RocketMQ原理解析-Consumer
2017-04-14 13:46 63621. 介绍 Consumer默认使用DefaultMQPu ... -
RocketMQ原理解析-HA
2017-04-13 10:01 23161. 介绍 当broker为slave且有master的情 ... -
RocketMQ原理解析-Broker
2017-04-13 10:00 30791. 部署方式 Broker ... -
RocketMQ原理解析-Name Server
2017-04-13 10:00 13621. 介绍 Namesrv的功能,就相当于RPC中的注 ... -
RocketMQ原理解析-Setup
2017-04-13 09:59 8931. 下载 https://github.com/al ... -
RocketMQ原理解析-broker清理文件
2017-04-07 16:37 01. commitlog文件删除 RocketMQ的c ... -
RocketMQ原理解析-Index
2017-04-13 09:56 15821. 介绍 索引文件指,发送一条消息后,MQ通过(topi ... -
RocketMQ原理解析-Producer
2017-04-14 13:46 25811.启动 producer通过配置的namesrv列表 ...
相关推荐
通过阅读RocketMQ的源码,开发者可以了解到分布式消息中间件的设计理念,学习到如何在高并发环境下处理大量消息的存储和传输,以及如何实现消息的可靠性和一致性。这对于提升系统设计能力,特别是在微服务架构中...
以下将基于给定内容详细解析RocketMQ的原理解析。 ### RocketMQ概述 RocketMQ是由阿里巴巴开源的一款分布式消息中间件。它提供了分布式系统之间的异步消息通信能力,广泛应用于各种业务场景中,如订单处理、任务...
标题中提到的"消息中间件rocketmq原理解析"揭示了本文档的核心内容,即对消息中间件RocketMQ的原理进行解析和探讨。RocketMQ是阿里巴巴开源的一款分布式消息中间件,主要用于企业级的分布式系统中,用以实现系统之间...
### RocketMQ原理详解 #### 一、RocketMQ概述 RocketMQ是一款由阿里巴巴开源的消息中间件,主要用于异步处理、解耦、削峰等场景。它提供了高性能、高可靠性的消息服务,支持点对点消息、发布/订阅模式、事务消息等...
本文将从以下几个方面对RocketMQ的原理进行解析。 ### 一、Producer #### 1. Producer启动流程 Producer是消息的发送者,它的启动流程如下: - 在发送消息时,如果Producer集合中没有对应topic的信息,则会向...
### RocketMQ实战与原理解析 #### 一、RocketMQ简介 Apache RocketMQ是一个分布式消息中间件,由阿里巴巴捐赠并成为Apache顶级项目。RocketMQ具备高性能、低延迟、高可靠等特性,支持发布/订阅模式、消息过滤、...
2. `rocketmq-common-4.7.0.jar`:这个jar包包含了RocketMQ的通用模块,提供了一些基础工具类和常量,如NameServer地址解析、配置管理、时间戳处理、线程池管理等。同时,它也包含了一些核心的数据结构,如Message...
在深入探讨RocketMQ核心源码之前,...通过对RocketMQ的CommitLog和ConsumeQueue的理解,以及NameServer和Broker架构的解析,我们可以更深入地掌握RocketMQ的工作原理,这对于优化系统性能和解决实际问题具有重要意义。
总的来说,《RocketMQ实战与原理解析》是一本全面且深入的RocketMQ教程,无论你是初涉消息中间件的新手,还是寻求提升的开发者,都能从中受益。不过,作者也提醒,尽管这本书内容详实,但要达到精通RocketMQ的程度,...
《RocketMQ高级原理:深入剖析消息系统的核心机制》是一篇深度解析RocketMQ核心机制的文章,旨在帮助读者理解和掌握这款分布式消息中间件的工作原理。RocketMQ是阿里巴巴开源的一款高性能、高可用的消息中间件,广泛...
Producer 如何感知要发送消息的broker 即brokerAddrTable 中的值是怎么获得的, 1. 发送消息的时候指定会指定topic,如果producer 集合中没有会根据指定topic 到namesrv 获取 topic 发布信息TopicPublishInfo,并放...
1. **RocketMQ概述**:RocketMQ的设计目标是解决大规模分布式系统中的异步处理和解耦问题,其核心特性包括消息队列、发布/订阅模型、事务消息和定时/延时消息等。 2. **架构设计**:RocketMQ的架构主要由NameServer...
本篇文章主要针对初级使用者,解析RocketMQ的基本概念和工作原理。 1. **ProducerGroup** ProducerGroup 是一组具有相同属性的Producer的集合,这些属性包括处理的消息类型(Topic)以及处理逻辑。在事务消息机制...
本压缩包“RocketMQ原理分析.rar”包含了对RocketMQ核心机制的深入解析,旨在帮助用户理解其工作原理。 RocketMQ的核心组件包括Producer、Consumer、NameServer和Broker四个部分: 1. **Producer**: 生产者是消息...
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中,提供高可用、高吞吐量的消息传输服务。4.9.2版本作为RocketMQ的一个稳定版本,包含了一系列的功能改进和性能优化。以下是对RocketMQ...
Broker作为RocketMQ的核心组件之一,负责接收Producer发送的消息,并将它们存储和转发给Consumer。组装消息是指Broker如何根据多种因素(如消息大小、主题、队列等)将接收到的消息合理地存储和分发,确保消息的...
在Java编程环境中,RocketMQ的源码解析可以帮助我们理解其内部工作原理,包括消息的生产、消费、存储和调度机制。以下是一些可能涉及的知识点: 1. **消息模型**:RocketMQ支持两种消息模型,发布/订阅(Publish/...
发送过程包括选择合适的 Broker、消息序列化和批量发送等步骤。Producer 提供了同步和异步两种发送模式,异步发送可以提高消息发送的吞吐量。 4. **Consumer** Consumer 提供了两种消费模式:Push 模式和 Pull ...
当Broker出现故障或被手动停用时,NameServer会删除相应的路由信息,防止Producer和Consumer继续向失效的Broker发送消息。理解路由删除的逻辑对于故障恢复和系统稳定性有直接影响。 通过这些视频教程,学习者将能...
RocketMQ的源码解析可以帮助我们理解其内部机制,包括NameServer的注册与发现、Broker的消息存储与检索、Producer的发送逻辑、Consumer的消费策略等。通过阅读源码,可以学习到如何设计分布式系统、如何优化消息传递...