`
iwinit
  • 浏览: 454793 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[metaq]Broker

阅读更多

Broker是metaq的核心组件,负责消息的物理存储,分区指定等。例子配置文件

[system]
#broker编号,集群唯一
brokerId=0
#这个broker指定的分区数
numPartitions=2
#nio port
serverPort=8123
#异步刷盘策略,为0表示同步刷盘
unflushThreshold=0
#同上,刷盘间隔
unflushInterval=10000
#单个消息文件的最大size
maxSegmentSize=1073741824
#单个请求最大传输size
maxTransferSize=1048576
#数据清理
deletePolicy=delete,168
#quartz的delete任务表达式
deleteWhen=0 0 6,18 * * ?
flushTxLogAtCommit=1
stat=true

;; Update consumers offsets to current max offsets when consumers offsets are out of range of current broker's messages.
;; It must be false in production.But recommend to be true in development or test.
updateConsumerOffsets=true

[zookeeper]
zk.zkConnect=localhost:2181
zk.zkSessionTimeoutMs=30000
zk.zkConnectionTimeoutMs=30000
zk.zkSyncTimeMs=5000

;; Topics section
#topic名称
[topic=test]

[topic=meta-test]

 broker核心类图如下

 

Broker处理如下

1.broker启动

MetaMorphosisBroker创建,初始化zk连接

public MetaMorphosisBroker(final MetaConfig metaConfig) {
        super();
        this.metaConfig = metaConfig;
	//NIO server
        this.remotingServer = newRemotingServer(metaConfig);
        this.executorsManager = new ExecutorsManager(metaConfig);
	//全局ID生成器
        this.idWorker = new IdWorker(metaConfig.getBrokerId());
	//文件管理
        this.storeManager = new MessageStoreManager(metaConfig, this.newDeletePolicy(metaConfig));
	//监控
        this.statsManager = new StatsManager(this.metaConfig, this.storeManager, this.remotingServer);
	//zookeeper客户端
        this.brokerZooKeeper = new BrokerZooKeeper(metaConfig);
        final BrokerCommandProcessor next =
                new BrokerCommandProcessor(this.storeManager, this.executorsManager, this.statsManager,
                    this.remotingServer, metaConfig, this.idWorker, this.brokerZooKeeper);
        JournalTransactionStore transactionStore = null;
        try {
            transactionStore = new JournalTransactionStore(metaConfig.getDataLogPath(), this.storeManager, metaConfig);
        }
        catch (final Exception e) {
            throw new MetamorphosisServerStartupException("Initializing transaction store failed", e);
        }
	//带事务的处理器
        this.brokerProcessor =
                new TransactionalCommandProcessor(metaConfig, this.storeManager, this.idWorker, next, transactionStore,
                    this.statsManager);
        this.shutdownHook = new ShutdownHook();
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
        MetaMBeanServer.registMBean(this, null);
    }

 broker启动

public synchronized void start() {
        if (!this.shutdown) {
            return;
        }
        this.shutdown = false;
	//加载已有数据并校验老数据
        this.storeManager.init();
        this.executorsManager.init();
        this.statsManager.init();
	//不同command对应不同处理器
        this.registerProcessors();
        try {
		//NIO server启动
            this.remotingServer.start();
        }
        catch (final NotifyRemotingException e) {
            throw new MetamorphosisServerStartupException("start remoting server failed", e);
        }
        try {
		//在/brokers/ids下创建临时节点,名称为节点Id
		//如果为master节点,则创建/brokers/ids/0/master节点
            this.brokerZooKeeper.registerBrokerInZk();
		//如果为master节点,则创建/brokers/ids/master_config_checksum节点
            this.brokerZooKeeper.registerMasterConfigFileChecksumInZk();
            this.addTopicsChangeListener();
		//在/brokers/topics-sub和/brokers/pub创建对应topic临时节点-topics
            this.registerTopicsInZk();
            this.registerZkSuccess = true;
        }
        catch (final Exception e) {
            this.registerZkSuccess = false;
            throw new MetamorphosisServerStartupException("Register broker to zk failed", e);
        }
        log.info("Starting metamorphosis server...");
        this.brokerProcessor.init();
        log.info("Start metamorphosis server successfully");
    }

 注册的处理器

private void registerProcessors() {
        this.remotingServer.registerProcessor(GetCommand.class, new GetProcessor(this.brokerProcessor,
            this.executorsManager.getGetExecutor()));
        this.remotingServer.registerProcessor(PutCommand.class, new PutProcessor(this.brokerProcessor,
            this.executorsManager.getUnOrderedPutExecutor()));
        this.remotingServer.registerProcessor(OffsetCommand.class, new OffsetProcessor(this.brokerProcessor,
            this.executorsManager.getGetExecutor()));
        this.remotingServer
        .registerProcessor(HeartBeatRequestCommand.class, new VersionProcessor(this.brokerProcessor));
        this.remotingServer.registerProcessor(QuitCommand.class, new QuitProcessor(this.brokerProcessor));
        this.remotingServer.registerProcessor(StatsCommand.class, new StatsProcessor(this.brokerProcessor));
        this.remotingServer.registerProcessor(TransactionCommand.class, new TransactionProcessor(this.brokerProcessor,
            this.executorsManager.getUnOrderedPutExecutor()));
    }

 2.无事务put请求

根据注册的处理器,put请求由PutProcessor处理

 public void handleRequest(final PutCommand request, final Connection conn) {
        final TransactionId xid = request.getTransactionId();
        final SessionContext context = SessionContextHolder.getOrCreateSessionContext(conn, xid);
        try {
            this.processor.processPutCommand(request, context, new PutCallback() {
                @Override
                public void putComplete(final ResponseCommand resp) {
                    RemotingUtils.response(context.getConnection(), resp);
                }
            });
            // RemotingUtils.response(context.getConnection(),
            // PutProcessor.this.processor.processPutCommand(request, context));
        }
        catch (final Exception e) {
            RemotingUtils.response(context.getConnection(), new BooleanCommand(HttpStatus.InternalServerError,
                e.getMessage(), request.getOpaque()));
        }
    }

 无事务请求由BrokerCommandProcessor处理

 public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) {
        final String partitionString = this.metaConfig.getBrokerId() + "-" + request.getPartition();
      	.....
		//partition信息
            final int partition = this.getPartition(request);
		//partition对应的store
            final MessageStore store = this.storeManager.getOrCreateMessageStore(request.getTopic(), partition);
            // 如果是动态添加的topic,需要注册到zk
            this.brokerZooKeeper.registerTopicInZk(request.getTopic(), false);
            // 设置唯一id
            final long messageId = this.idWorker.nextId();
		//写数据
            store.append(messageId, request,
                new StoreAppendCallback(partition, partitionString, request, messageId, cb));
        }
        catch (final Exception e) {
            this.statsManager.statsPutFailed(request.getTopic(), partitionString, 1);
            log.error("Put message failed", e);
            if (cb != null) {
                cb.putComplete(new BooleanCommand(HttpStatus.InternalServerError, e.getMessage(), request.getOpaque()));
            }
        }
    }

 具体写数据过程

private void appendBuffer(final ByteBuffer buffer, final AppendCallback cb) {
        if (this.closed) {
            throw new IllegalStateException("Closed MessageStore.");
        }
        if (this.useGroupCommit() && buffer.remaining() < this.maxTransferSize) {
            this.bufferQueue.offer(new WriteRequest(buffer, cb));
        }
        else {
            Location location = null;
            final int remainning = buffer.remaining();
		//单线程append
            this.writeLock.lock();
            try {
		//当前最新的消息文件
                final Segment cur = this.segments.last();
		//append,返回写入数据量
                final long offset = cur.start + cur.fileMessageSet.append(buffer);
		//根据刷盘策略,判断需要刷盘
                this.mayBeFlush(1);
		//超过一定大小,生成新的消息文件
                this.mayBeRoll();
                location = new Location(offset, remainning);
            }
            catch (final IOException e) {
                log.error("Append file failed", e);
                location = Location.InvalidLocaltion;
            }
            finally {
                this.writeLock.unlock();
                if (cb != null) {
                    cb.appendComplete(location);
                }
            }
        }
    }

 append

public long append(final ByteBuffer buf) throws IOException {
        if (!this.mutable) {
            throw new UnsupportedOperationException("Immutable message set");
        }
        final long offset = this.sizeInBytes.get();
        int sizeInBytes = 0;
	//循环写入buffer,直到完成
        while (buf.hasRemaining()) {
            sizeInBytes += this.channel.write(buf);
        }
        this.sizeInBytes.addAndGet(sizeInBytes);
        this.messageCount.incrementAndGet();
        return offset;
    }

 flush

  public void flush() throws IOException {
	//写入磁盘
        this.channel.force(true);
        this.highWaterMark.set(this.sizeInBytes.get());
    }

 3.get请求

根据注册的处理器,get请求由GetProcessor处理,get请求无事务,BrokerCommandProcessor处理

public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx, final boolean zeroCopy) {
        final String group = request.getGroup();
        final String topic = request.getTopic();
        this.statsManager.statsGet(topic, group, 1);

        // 如果分区被关闭,禁止读数据 --wuhua
        if (this.metaConfig.isClosedPartition(topic, request.getPartition())) {
            log.warn("can not get message for topic=" + topic + " from partition " + request.getPartition()
                + ",it closed,");
            return new BooleanCommand(HttpStatus.Forbidden, "Partition["
                    + this.metaConfig.getBrokerId() + "-" + request.getPartition() + "] has been closed", request.getOpaque());
        }
	//根据partition拿store
        final MessageStore store = this.storeManager.getMessageStore(topic, request.getPartition());
        if (store == null) {
            this.statsManager.statsGetMiss(topic, group, 1);
            return new BooleanCommand(HttpStatus.NotFound, "The topic `" + topic
                + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque());
        }
        if (request.getMaxSize() <= 0) {
            return new BooleanCommand(HttpStatus.BadRequest, "Bad request,invalid max size:"
                    + request.getMaxSize(), request.getOpaque());
        }
        try {
		//根据offset和transferSize拿messageSet视图
            final MessageSet set =
                    store.slice(request.getOffset(),
                        Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize()));
            if (set != null) {
		//zeroCopy直接从OS内核写入socket缓存,不经过用户态
                if (zeroCopy) {
                    set.write(request, ctx);
                    return null;
                }
                else {
                    // refer to the code of line 440 in MessageStore
                    // create two copies of byte array including the byteBuffer
                    // and new bytes
                    // this may not a good use case of Buffer
                    final ByteBuffer byteBuffer =
                            ByteBuffer.allocate(Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize()));
                    set.read(byteBuffer);
                    byteBuffer.flip();
                    final byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    return new DataCommand(bytes, request.getOpaque());
                }
            }
		//没取到数据。。。
            else {
                this.statsManager.statsGetMiss(topic, group, 1);
                this.statsManager.statsGetFailed(topic, group, 1);

                // 当请求的偏移量大于实际最大值时,返回给客户端实际最大的偏移量.
                final long maxOffset = store.getMaxOffset();
                final long requestOffset = request.getOffset();
                if (requestOffset > maxOffset
                        && (this.metaConfig.isUpdateConsumerOffsets() || requestOffset == Long.MAX_VALUE)) {
                    log.info("offset[" + requestOffset + "] is exceeded,tell the client real max offset: " + maxOffset
                        + ",topic=" + topic + ",group=" + group);
                    this.statsManager.statsOffset(topic, group, 1);
                    return new BooleanCommand(HttpStatus.Moved, String.valueOf(maxOffset), request.getOpaque());
                }
                else {
                    return new BooleanCommand(HttpStatus.NotFound, "Could not find message at position " + requestOffset,
                        request.getOpaque());
                }
            }
        }
        catch (final ArrayIndexOutOfBoundsException e) {
            log.error("Could not get message from position " + request.getOffset() + ",it is out of bounds,topic="
                    + topic);
            // 告知最近可用的offset
            this.statsManager.statsGetMiss(topic, group, 1);
            this.statsManager.statsGetFailed(topic, group, 1);
            final long validOffset = store.getNearestOffset(request.getOffset());
            this.statsManager.statsOffset(topic, group, 1);
            return new BooleanCommand(HttpStatus.Moved, String.valueOf(validOffset), request.getOpaque());

        }
        catch (final Throwable e) {
            log.error("Could not get message from position " + request.getOffset(), e);
            this.statsManager.statsGetFailed(topic, group, 1);
            return new BooleanCommand(HttpStatus.InternalServerError, e.getMessage(), request.getOpaque());
        }

    }

 slice获取message视图

    /**
     * 根据offset和maxSize返回所在MessageSet, 当offset超过最大offset的时候返回null,
     * 当offset小于最小offset的时候抛出ArrayIndexOutOfBounds异常
     * 
     * @param offset
     * 
     * @param maxSize
     * @return
     * @throws IOException
     */
    public MessageSet slice(final long offset, final int maxSize) throws IOException {
	//二分查找命中segment
        final Segment segment = this.findSegment(this.segments.view(), offset);
        if (segment == null) {
            return null;
        }
        else {
		//返回message视图
            return segment.fileMessageSet.slice(offset - segment.start, offset - segment.start + maxSize);
        }
    }

 

  • 大小: 176.5 KB
分享到:
评论

相关推荐

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaq-server-1.4.6.2.tar.gz

    MetaQ Server的架构主要包括Producer、Consumer、Broker和Controller四个主要部分: 1. Producer:生产者负责将消息发送到MetaQ Server,通过API接口与Server进行通信。 2. Consumer:消费者从MetaQ Server订阅并...

    MetaQ 分布式消息服务中间件.pdf

    MetaQ在消息写入Master Broker后会直接返回success以提高性能,但可能会出现Slave Broker丢失数据的情况。所以,MetaQ确保了数据的持久化,但性能相比消息发送即通知的模式略低。 MetaQ的消息持久化策略确保了消息...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    扩展性方面,MetaQ支持动态添加或删除Broker,用户可以根据业务增长灵活地调整集群规模。此外,它的主题和分区(Partition)设计允许水平扩展,通过增加更多的Partition来提高处理能力。 安全性是另一个重要的考虑...

    metaq-server-1.4.6.2客户端+服务端

    - **Broker**: 实际存储和转发消息的节点。 - **Zookeeper**: 作为元数据存储,协调集群状态。 **MetaQ客户端** 客户端是应用程序与MetaQ服务端交互的接口,主要任务是发布消息、订阅主题以及消费消息。客户端库...

    metaQ的安装包

    - 启动 MetaQ 的 Broker 和 Controller,通常通过执行 `bin/start-broker.sh` 和 `bin/start-controller.sh` 脚本来完成。 - 验证安装是否成功,可以通过访问 Web 管理界面或者使用命令行工具检查服务状态。 5. *...

    RocketMQ群问题整理

    7. **MetaQ与RocketMQ关系**:MetaQ 在 3.0 版本之后更名为 RocketMQ。 8. **JMS支持**:RocketMQ 支持 JMS 客户端 API,用户可以通过官方提供的 rocketmq-jmsclient 进行开发。 9. **启动异常**:如果启动 Broker...

    阿里rocketMQ

    4. **Broker**:Broker是RocketMQ的核心组件,负责存储消息、接受Producer发送的消息并转发给Consumer。每个Broker可以包含多个Topic,每个Topic又可以分为多个Queue,这样可以实现消息的分区和负载均衡。 RocketMQ...

    rocketmq-note.pdf

    Producer是消息的生产者,它们在发送消息前,会从NameServer获取Broker列表,选择合适的Broker进行消息发送。Consumer则是消息的消费者,负责接收和处理消息。 NameServer和Broker之间保持长连接,定期检查Broker的...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    RocketMQ的使用、原理

    1. **Push(推送)**:类似于Broker将消息推送给Consumer的方式,但实际上仍是Consumer主动从Broker拉取消息。 - **优点**:采用长轮询方式,能有效减少Broker和Consumer之间的交互频率,提高系统的整体性能。 2. *...

    【系统架构】最全最强解析:支付宝钱包系统架构内部剖析(架构图).docx

    3. 交易柔性事务支付宝的开源分布式消息中间件–Metamorphosis(MetaQ):负责处理交易消息的传输和处理,包括消息存储、顺序写、吞吐量大和支持本地和XA事务等功能。 Metamorphosis(MetaQ) 介绍: Metamorphosis...

    消息中间件 rocketmq原理解析

    NameServer是轻量级的目录服务器,主要提供了Broker管理、路由信息管理、服务发现等功能。 #### 2. Namesrv启动流程 NameServer的启动需要进行初始化和注册过程。 #### 3. RouteInfoManager RouteInfoManager负责...

    消息中间件rocketmq原理解析

    - **发送消息到broker**:Producer将消息发送至Master Broker,然后消息通过Broker的主从复制机制同步到Slave Broker上。 ### Producer消息发送 - **轮询队列实现负载均衡**:Producer通过轮询的方式遍历Topic下的...

    分布式消息引擎Apache RocketMQ最佳实践

    2. **Broker**:消息的实际存储单元,可以组成Broker Group来支持集群部署。 3. **Producer**:消息生产者,负责向Broker发送消息。 4. **Consumer**:消息消费者,负责从Broker拉取消息或接收消息推送。 RocketMQ...

    阿里RocketMQ用户指南V3.2.4.pdf

    文档中提到了RocketMQ从早期的Metaq版本到3.x版本的发展历程,以及与早期版本的兼容性问题。用户指南V3.2.4还提及了与其它系统的整合,比如与JMS、CORBA Notification的整合,这说明了RocketMQ的开放性和对不同消息...

    Window搭建部署RocketMQ步骤详解

    RocketMQ是一个由阿里巴巴开源的消息中间件,脱胎自阿里巴巴的MetaQ,在设计上借鉴了Kafka。下面将详细介绍Window搭建部署RocketMQ的步骤。 一、安装RocketMQ 首先需要下载RocketMQ的发行版本,解压缩后得到bin、...

    RocketMQ原理简介

    最后,文档中还提供了产品发展历史的概述,从Metaq的迭代到RocketMQ的正式上线,以及其后续版本的发展。介绍了如何在不同的业务系统中深度定制RocketMQ来满足特定的需求,如淘宝、支付宝和B2B等场景下的应用。并且,...

    RocketMQ技术原理

    物理部署中可能包括消息服务器(Broker)、NameServer、生产者和消费者等。 3. RocketMQ逻辑部署结构 逻辑部署结构是指消息系统的软件架构和组件间的交互。包括生产者、消费者、Broker服务器、NameServer和消息...

Global site tag (gtag.js) - Google Analytics