- public class Message implements Serializable {
- private long id; //消息的ID
- private String topic; //消息属于哪个主题
- private byte[] data; //消息的内容
- private String attribute; //消息的属性
- private int flag; //属性标志位,如果属性不为空,则该标志位true
- private Partition partition; //该主题下的哪个分区,简单的理解为发送到该主题下的哪个队列
- private transient boolean readOnly; //消息是否只读
- private transient boolean rollbackOnly = false; //该消息是否需要回滚,主要用于事务实现的需要
- }
在进行存储模块分析之前,我们得了解Broker中的一个重要的类 MetaConfig,MetaConfig是Broker配置加载器,通过MetaConfig可以获取到各模块相关的配置,所以MetaConfig 是贯穿所有模块的类。MetaConfig实现MetaConfigMBean接口,该接口定义如下:
- public interface MetaConfigMBean {
- /**
- * Reload topics configuration
- */
- public void reload();
- /**关闭分区 */
- public void closePartitions(String topic, int start, int end);
- /**打开一个topic的所有分区 */
- public void openPartitions(String topic);
- }
MetaConfig注册到了MBeanServer上,所以可以通过JMX协议重新加 载配置以及关闭和打开分区。为了加载的配置立即生效,MetaConfig内置了一个通知机制,可以通过向MetaConfig注册监听器的方式关注相关 配置的变化,监听器需实现PropertyChangeListener接口。
- public void addPropertyChangeListener(final String propertyName, final PropertyChangeListener listener) {
- this.propertyChangeSupport.addPropertyChangeListener(propertyName, listener);
- }
- public void removePropertyChangeListener(final PropertyChangeListener listener) {
- this.propertyChangeSupport.removePropertyChangeListener(listener);
- }
- //configFileChecksum通知
- private Ini createIni(final File file) throws IOException, InvalidFileFormatException {
- ……
- this.propertyChangeSupport.firePropertyChange("configFileChecksum", null, null);
- return conf;
- }
- public void setConfigFileChecksum(long configFileChecksum) {
- this.configFileChecksum = configFileChecksum;
- this.propertyChangeSupport.firePropertyChange("configFileChecksum", null, null);
- }
- //topics、topicConfigMap和unflushInterval通知
- private void populateTopicsConfig(final Ini conf) {
- ……
- if (!newTopicConfigMap.equals(this.topicConfigMap)) {
- this.topics = newTopics;
- this.topicConfigMap = newTopicConfigMap;
- this.propertyChangeSupport.firePropertyChange("topics", null, null);
- this.propertyChangeSupport.firePropertyChange("topicConfigMap", null, null);
- }
- this.propertyChangeSupport.firePropertyChange("unflushInterval", null, null);
- ……
- /**
- * 消息集合
- */
- public interface MessageSet {
- public MessageSet slice(long offset, long limit) throws IOException; //获取一个消息集合
- public void write(GetCommand getCommand, SessionContext ctx);
- public long append(ByteBuffer buff) throws IOException; //存储一个消息,这时候还没有存储到磁盘,需要调用flush方法才能保证存储到磁盘
- public void flush() throws IOException; //提交到磁盘
- public void read(final ByteBuffer bf, long offset) throws IOException; //读取消息
- public void read(final ByteBuffer bf) throws IOException; //读取消息
- public long getMessageCount();//该集合的消息数量
- }
- public void close() throws IOException {
- if (!this.channel.isOpen()) {
- return;
- }
- //保证在文件关闭前,将内容提交到磁盘,而不是在缓存中
- if (this.mutable) {
- this.flush();
- }
- //关闭文件通道
- this.channel.close();
- }
- public class FileMessageSet implements MessageSet, Closeable {
- ……
- private final FileChannel channel; //对应的文件通道
- private final AtomicLong messageCount; //内容数量
- private final AtomicLong sizeInBytes;
- private final AtomicLong highWaterMark; // 已经确保写入磁盘的水位
- private final long offset; // 镜像offset
- private boolean mutable; // 是否可变
- public FileMessageSet(final FileChannel channel, final long offset, final long limit, final boolean mutable) throws IOException {
- this.channel = channel;
- this.offset = offset;
- this.messageCount = new AtomicLong(0);
- this.sizeInBytes = new AtomicLong(0);
- this.highWaterMark = new AtomicLong(0);
- this.mutable = mutable;
- if (mutable) {
- final long startMs = System.currentTimeMillis();
- final long truncated = this.recover();
- if (this.messageCount.get() > 0) {
- log.info("Recovery succeeded in " + (System.currentTimeMillis() - startMs) / 1000 + " seconds. " + truncated + " bytes truncated.");
- }
- } else {
- try {
- this.sizeInBytes.set(Math.min(channel.size(), limit) - offset);
- this.highWaterMark.set(this.sizeInBytes.get());
- } catch (final Exception e) {
- log.error("Set sizeInBytes error", e);
- }
- }
- }
- // 注意FileMessageSet的mutable属性,如果mutable为true的时候,将会调用recover()方法,该方法主要是验证文件内 容的完整性,后面会详细介绍,如果mutable为false的时候,表明该文件不可更改,这个时候磁盘水位和sizeInBytes的值均为文件大小。
- ……
- public long append(final ByteBuffer buf) throws IOException {
- //如果mutable属性为false的时候,不允许追加消息在文件尾
- if (!this.mutable) {
- throw new UnsupportedOperationException("Immutable message set");
- }
- final long offset = this.sizeInBytes.get();
- int sizeInBytes = 0;
- while (buf.hasRemaining()) {
- sizeInBytes += this.channel.write(buf);
- }
- //在这个时候并没有将内容写入磁盘,还在通道的缓存中,需要在适当的时候调用flush方法
- //这个时候磁盘的水位是不更新的,只有在保证写入磁盘才会更新磁盘的水位信息
- this.sizeInBytes.addAndGet(sizeInBytes); 、
- this.messageCount.incrementAndGet();
- return offset;
- }
- //提交到磁盘
- public void flush() throws IOException {
- this.channel.force(true);
- this.highWaterMark.set(this.sizeInBytes.get());
- }
- ……
- @Override
- public MessageSet slice(final long offset, final long limit) throws IOException {
- //返回消息集
- return new FileMessageSet(this.channel, offset, limit, false);
- }
- static final Logger transferLog = LoggerFactory.getLogger("TransferLog");
- @Override
- public void read(final ByteBuffer bf, final long offset) throws IOException {
- //读取内容
- int size = 0;
- while (bf.hasRemaining()) {
- final int l = this.channel.read(bf, offset + size);
- if (l < 0) {
- break;
- }
- size += l;
- }
- }
- @Override
- public void read(final ByteBuffer bf) throws IOException {
- this.read(bf, this.offset);
- }
- //下面的write方法主要是提供zero拷贝
- @Override
- public void write(final GetCommand getCommand, final SessionContext ctx) {
- final IoBuffer buf = this.makeHead(getCommand.getOpaque(), this.sizeInBytes.get());
- // transfer to socket
- this.tryToLogTransferInfo(getCommand, ctx.getConnection());
- ctx.getConnection().transferFrom(buf, null, this.channel, this.offset, this.sizeInBytes.get());
- }
- public long write(final WritableByteChannel socketChanel) throws IOException {
- try {
- return this.getFileChannel().transferTo(this.offset, this.getSizeInBytes(), socketChanel);
- } catch (final IOException e) {
- // Check to see if the IOException is being thrown due to
- // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
- final String message = e.getMessage();
- if (message != null && message.contains("temporarily unavailable")) {
- return 0;
- }
- throw e;
- }
- }
- ……
- private static boolean fastBoot = Boolean.valueOf(System.getProperty("meta.fast_boot", "false"));
- private long recover() throws IOException {
- //如果在System属性里设置了 meta.fast_boot为true,表示快速启动,快速启动不检测文件是否损坏,不对内容进行校验
- if (fastBoot) {
- final long size = this.channel.size();
- this.sizeInBytes.set(size);
- this.highWaterMark.set(size);
- this.messageCount.set(0);
- this.channel.position(size);
- return 0;
- }
- if (!this.mutable) {
- throw new UnsupportedOperationException("Immutable message set");
- }
- final long len = this.channel.size();
- final ByteBuffer buf = ByteBuffer.allocate(MessageUtils.HEADER_LEN);
- long validUpTo = 0L;
- long next = 0L;
- long msgCount = 0;
- do {
- next = this.validateMessage(buf, validUpTo, len);
- if (next >= 0) {
- msgCount++;
- validUpTo = next;
- }
- } while (next >= 0);
- this.channel.truncate(validUpTo);
- this.sizeInBytes.set(validUpTo);
- this.highWaterMark.set(validUpTo);
- this.messageCount.set(msgCount);
- this.channel.position(validUpTo);
- return len - validUpTo;
- }
- 消息在磁盘上存储结构如下:
- /*
- * 20个字节的头部 *
- * <ul>
- * <li>message length(4 bytes),including attribute and payload</li>
- * <li>checksum(4 bytes)</li>
- * <li>message id(8 bytes)</li>
- * <li>message flag(4 bytes)</li>
- * </ul>
- * length长度的内容
- */
- //在存储之前将checksum的信息存入到磁盘,读取的时候再进行校验,比较前后的
- //checksum是否一致,防止消息被篡改
- private long validateMessage(final ByteBuffer buf, final long start, final long len) throws IOException {
- buf.rewind();
- long read = this.channel.read(buf);
- if (read < MessageUtils.HEADER_LEN) {
- return -1;
- }
- buf.flip();
- final int messageLen = buf.getInt();
- final long next = start + MessageUtils.HEADER_LEN + messageLen;
- if (next > len) {
- return -1;
- }
- final int checksum = buf.getInt();
- if (messageLen < 0) {
- // 数据损坏
- return -1;
- }
- final ByteBuffer messageBuffer = ByteBuffer.allocate(messageLen);
- // long curr = start + MessageUtils.HEADER_LEN;
- while (messageBuffer.hasRemaining()) {
- read = this.channel.read(messageBuffer);
- if (read < 0) {
- throw new IOException("文件在recover过程中被修改");
- }
- // curr += read;
- }
- if (CheckSum.crc32(messageBuffer.array()) != checksum) {
- //采用crc32对内容进行校验是否一致
- return -1;
- } else {
- return next;
- }
- }
FileMessageSet是MetaQ 服务器端存储的一个元素,代表了对一个文件的读写操作,能够对文件内容进行完整性和一致性校验,并能提供统计数据,接下来要介绍的是通过 MessageStore怎样的组合将一个个的FileMessageSet,单纯的讲,MetaQ的存储非常值得借鉴。
