Broker是metaq的核心组件,负责消息的物理存储,分区指定等。例子配置文件
[system] #broker编号,集群唯一 brokerId=0 #这个broker指定的分区数 numPartitions=2 #nio port serverPort=8123 #异步刷盘策略,为0表示同步刷盘 unflushThreshold=0 #同上,刷盘间隔 unflushInterval=10000 #单个消息文件的最大size maxSegmentSize=1073741824 #单个请求最大传输size maxTransferSize=1048576 #数据清理 deletePolicy=delete,168 #quartz的delete任务表达式 deleteWhen=0 0 6,18 * * ? flushTxLogAtCommit=1 stat=true ;; Update consumers offsets to current max offsets when consumers offsets are out of range of current broker's messages. ;; It must be false in production.But recommend to be true in development or test. updateConsumerOffsets=true [zookeeper] zk.zkConnect=localhost:2181 zk.zkSessionTimeoutMs=30000 zk.zkConnectionTimeoutMs=30000 zk.zkSyncTimeMs=5000 ;; Topics section #topic名称 [topic=test] [topic=meta-test]
broker核心类图如下
Broker处理如下
1.broker启动
MetaMorphosisBroker创建,初始化zk连接
public MetaMorphosisBroker(final MetaConfig metaConfig) { super(); this.metaConfig = metaConfig; //NIO server this.remotingServer = newRemotingServer(metaConfig); this.executorsManager = new ExecutorsManager(metaConfig); //全局ID生成器 this.idWorker = new IdWorker(metaConfig.getBrokerId()); //文件管理 this.storeManager = new MessageStoreManager(metaConfig, this.newDeletePolicy(metaConfig)); //监控 this.statsManager = new StatsManager(this.metaConfig, this.storeManager, this.remotingServer); //zookeeper客户端 this.brokerZooKeeper = new BrokerZooKeeper(metaConfig); final BrokerCommandProcessor next = new BrokerCommandProcessor(this.storeManager, this.executorsManager, this.statsManager, this.remotingServer, metaConfig, this.idWorker, this.brokerZooKeeper); JournalTransactionStore transactionStore = null; try { transactionStore = new JournalTransactionStore(metaConfig.getDataLogPath(), this.storeManager, metaConfig); } catch (final Exception e) { throw new MetamorphosisServerStartupException("Initializing transaction store failed", e); } //带事务的处理器 this.brokerProcessor = new TransactionalCommandProcessor(metaConfig, this.storeManager, this.idWorker, next, transactionStore, this.statsManager); this.shutdownHook = new ShutdownHook(); Runtime.getRuntime().addShutdownHook(this.shutdownHook); MetaMBeanServer.registMBean(this, null); }
broker启动
public synchronized void start() { if (!this.shutdown) { return; } this.shutdown = false; //加载已有数据并校验老数据 this.storeManager.init(); this.executorsManager.init(); this.statsManager.init(); //不同command对应不同处理器 this.registerProcessors(); try { //NIO server启动 this.remotingServer.start(); } catch (final NotifyRemotingException e) { throw new MetamorphosisServerStartupException("start remoting server failed", e); } try { //在/brokers/ids下创建临时节点,名称为节点Id //如果为master节点,则创建/brokers/ids/0/master节点 this.brokerZooKeeper.registerBrokerInZk(); //如果为master节点,则创建/brokers/ids/master_config_checksum节点 this.brokerZooKeeper.registerMasterConfigFileChecksumInZk(); this.addTopicsChangeListener(); //在/brokers/topics-sub和/brokers/pub创建对应topic临时节点-topics this.registerTopicsInZk(); this.registerZkSuccess = true; } catch (final Exception e) { this.registerZkSuccess = false; throw new MetamorphosisServerStartupException("Register broker to zk failed", e); } log.info("Starting metamorphosis server..."); this.brokerProcessor.init(); log.info("Start metamorphosis server successfully"); }
注册的处理器
private void registerProcessors() { this.remotingServer.registerProcessor(GetCommand.class, new GetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor())); this.remotingServer.registerProcessor(PutCommand.class, new PutProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor())); this.remotingServer.registerProcessor(OffsetCommand.class, new OffsetProcessor(this.brokerProcessor, this.executorsManager.getGetExecutor())); this.remotingServer .registerProcessor(HeartBeatRequestCommand.class, new VersionProcessor(this.brokerProcessor)); this.remotingServer.registerProcessor(QuitCommand.class, new QuitProcessor(this.brokerProcessor)); this.remotingServer.registerProcessor(StatsCommand.class, new StatsProcessor(this.brokerProcessor)); this.remotingServer.registerProcessor(TransactionCommand.class, new TransactionProcessor(this.brokerProcessor, this.executorsManager.getUnOrderedPutExecutor())); }
2.无事务put请求
根据注册的处理器,put请求由PutProcessor处理
public void handleRequest(final PutCommand request, final Connection conn) { final TransactionId xid = request.getTransactionId(); final SessionContext context = SessionContextHolder.getOrCreateSessionContext(conn, xid); try { this.processor.processPutCommand(request, context, new PutCallback() { @Override public void putComplete(final ResponseCommand resp) { RemotingUtils.response(context.getConnection(), resp); } }); // RemotingUtils.response(context.getConnection(), // PutProcessor.this.processor.processPutCommand(request, context)); } catch (final Exception e) { RemotingUtils.response(context.getConnection(), new BooleanCommand(HttpStatus.InternalServerError, e.getMessage(), request.getOpaque())); } }
无事务请求由BrokerCommandProcessor处理
public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) { final String partitionString = this.metaConfig.getBrokerId() + "-" + request.getPartition(); ..... //partition信息 final int partition = this.getPartition(request); //partition对应的store final MessageStore store = this.storeManager.getOrCreateMessageStore(request.getTopic(), partition); // 如果是动态添加的topic,需要注册到zk this.brokerZooKeeper.registerTopicInZk(request.getTopic(), false); // 设置唯一id final long messageId = this.idWorker.nextId(); //写数据 store.append(messageId, request, new StoreAppendCallback(partition, partitionString, request, messageId, cb)); } catch (final Exception e) { this.statsManager.statsPutFailed(request.getTopic(), partitionString, 1); log.error("Put message failed", e); if (cb != null) { cb.putComplete(new BooleanCommand(HttpStatus.InternalServerError, e.getMessage(), request.getOpaque())); } } }
具体写数据过程
private void appendBuffer(final ByteBuffer buffer, final AppendCallback cb) { if (this.closed) { throw new IllegalStateException("Closed MessageStore."); } if (this.useGroupCommit() && buffer.remaining() < this.maxTransferSize) { this.bufferQueue.offer(new WriteRequest(buffer, cb)); } else { Location location = null; final int remainning = buffer.remaining(); //单线程append this.writeLock.lock(); try { //当前最新的消息文件 final Segment cur = this.segments.last(); //append,返回写入数据量 final long offset = cur.start + cur.fileMessageSet.append(buffer); //根据刷盘策略,判断需要刷盘 this.mayBeFlush(1); //超过一定大小,生成新的消息文件 this.mayBeRoll(); location = new Location(offset, remainning); } catch (final IOException e) { log.error("Append file failed", e); location = Location.InvalidLocaltion; } finally { this.writeLock.unlock(); if (cb != null) { cb.appendComplete(location); } } } }
append
public long append(final ByteBuffer buf) throws IOException { if (!this.mutable) { throw new UnsupportedOperationException("Immutable message set"); } final long offset = this.sizeInBytes.get(); int sizeInBytes = 0; //循环写入buffer,直到完成 while (buf.hasRemaining()) { sizeInBytes += this.channel.write(buf); } this.sizeInBytes.addAndGet(sizeInBytes); this.messageCount.incrementAndGet(); return offset; }
flush
public void flush() throws IOException { //写入磁盘 this.channel.force(true); this.highWaterMark.set(this.sizeInBytes.get()); }
3.get请求
根据注册的处理器,get请求由GetProcessor处理,get请求无事务,BrokerCommandProcessor处理
public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx, final boolean zeroCopy) { final String group = request.getGroup(); final String topic = request.getTopic(); this.statsManager.statsGet(topic, group, 1); // 如果分区被关闭,禁止读数据 --wuhua if (this.metaConfig.isClosedPartition(topic, request.getPartition())) { log.warn("can not get message for topic=" + topic + " from partition " + request.getPartition() + ",it closed,"); return new BooleanCommand(HttpStatus.Forbidden, "Partition[" + this.metaConfig.getBrokerId() + "-" + request.getPartition() + "] has been closed", request.getOpaque()); } //根据partition拿store final MessageStore store = this.storeManager.getMessageStore(topic, request.getPartition()); if (store == null) { this.statsManager.statsGetMiss(topic, group, 1); return new BooleanCommand(HttpStatus.NotFound, "The topic `" + topic + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque()); } if (request.getMaxSize() <= 0) { return new BooleanCommand(HttpStatus.BadRequest, "Bad request,invalid max size:" + request.getMaxSize(), request.getOpaque()); } try { //根据offset和transferSize拿messageSet视图 final MessageSet set = store.slice(request.getOffset(), Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); if (set != null) { //zeroCopy直接从OS内核写入socket缓存,不经过用户态 if (zeroCopy) { set.write(request, ctx); return null; } else { // refer to the code of line 440 in MessageStore // create two copies of byte array including the byteBuffer // and new bytes // this may not a good use case of Buffer final ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); set.read(byteBuffer); byteBuffer.flip(); final byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); return new DataCommand(bytes, request.getOpaque()); } } //没取到数据。。。 else { this.statsManager.statsGetMiss(topic, group, 1); this.statsManager.statsGetFailed(topic, group, 1); // 当请求的偏移量大于实际最大值时,返回给客户端实际最大的偏移量. final long maxOffset = store.getMaxOffset(); final long requestOffset = request.getOffset(); if (requestOffset > maxOffset && (this.metaConfig.isUpdateConsumerOffsets() || requestOffset == Long.MAX_VALUE)) { log.info("offset[" + requestOffset + "] is exceeded,tell the client real max offset: " + maxOffset + ",topic=" + topic + ",group=" + group); this.statsManager.statsOffset(topic, group, 1); return new BooleanCommand(HttpStatus.Moved, String.valueOf(maxOffset), request.getOpaque()); } else { return new BooleanCommand(HttpStatus.NotFound, "Could not find message at position " + requestOffset, request.getOpaque()); } } } catch (final ArrayIndexOutOfBoundsException e) { log.error("Could not get message from position " + request.getOffset() + ",it is out of bounds,topic=" + topic); // 告知最近可用的offset this.statsManager.statsGetMiss(topic, group, 1); this.statsManager.statsGetFailed(topic, group, 1); final long validOffset = store.getNearestOffset(request.getOffset()); this.statsManager.statsOffset(topic, group, 1); return new BooleanCommand(HttpStatus.Moved, String.valueOf(validOffset), request.getOpaque()); } catch (final Throwable e) { log.error("Could not get message from position " + request.getOffset(), e); this.statsManager.statsGetFailed(topic, group, 1); return new BooleanCommand(HttpStatus.InternalServerError, e.getMessage(), request.getOpaque()); } }
slice获取message视图
/** * 根据offset和maxSize返回所在MessageSet, 当offset超过最大offset的时候返回null, * 当offset小于最小offset的时候抛出ArrayIndexOutOfBounds异常 * * @param offset * * @param maxSize * @return * @throws IOException */ public MessageSet slice(final long offset, final int maxSize) throws IOException { //二分查找命中segment final Segment segment = this.findSegment(this.segments.view(), offset); if (segment == null) { return null; } else { //返回message视图 return segment.fileMessageSet.slice(offset - segment.start, offset - segment.start + maxSize); } }
相关推荐
Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...
MetaQ Server的架构主要包括Producer、Consumer、Broker和Controller四个主要部分: 1. Producer:生产者负责将消息发送到MetaQ Server,通过API接口与Server进行通信。 2. Consumer:消费者从MetaQ Server订阅并...
MetaQ在消息写入Master Broker后会直接返回success以提高性能,但可能会出现Slave Broker丢失数据的情况。所以,MetaQ确保了数据的持久化,但性能相比消息发送即通知的模式略低。 MetaQ的消息持久化策略确保了消息...
扩展性方面,MetaQ支持动态添加或删除Broker,用户可以根据业务增长灵活地调整集群规模。此外,它的主题和分区(Partition)设计允许水平扩展,通过增加更多的Partition来提高处理能力。 安全性是另一个重要的考虑...
- **Broker**: 实际存储和转发消息的节点。 - **Zookeeper**: 作为元数据存储,协调集群状态。 **MetaQ客户端** 客户端是应用程序与MetaQ服务端交互的接口,主要任务是发布消息、订阅主题以及消费消息。客户端库...
- 启动 MetaQ 的 Broker 和 Controller,通常通过执行 `bin/start-broker.sh` 和 `bin/start-controller.sh` 脚本来完成。 - 验证安装是否成功,可以通过访问 Web 管理界面或者使用命令行工具检查服务状态。 5. *...
7. **MetaQ与RocketMQ关系**:MetaQ 在 3.0 版本之后更名为 RocketMQ。 8. **JMS支持**:RocketMQ 支持 JMS 客户端 API,用户可以通过官方提供的 rocketmq-jmsclient 进行开发。 9. **启动异常**:如果启动 Broker...
4. **Broker**:Broker是RocketMQ的核心组件,负责存储消息、接受Producer发送的消息并转发给Consumer。每个Broker可以包含多个Topic,每个Topic又可以分为多个Queue,这样可以实现消息的分区和负载均衡。 RocketMQ...
Producer是消息的生产者,它们在发送消息前,会从NameServer获取Broker列表,选择合适的Broker进行消息发送。Consumer则是消息的消费者,负责接收和处理消息。 NameServer和Broker之间保持长连接,定期检查Broker的...
RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...
1. **Push(推送)**:类似于Broker将消息推送给Consumer的方式,但实际上仍是Consumer主动从Broker拉取消息。 - **优点**:采用长轮询方式,能有效减少Broker和Consumer之间的交互频率,提高系统的整体性能。 2. *...
3. 交易柔性事务支付宝的开源分布式消息中间件–Metamorphosis(MetaQ):负责处理交易消息的传输和处理,包括消息存储、顺序写、吞吐量大和支持本地和XA事务等功能。 Metamorphosis(MetaQ) 介绍: Metamorphosis...
NameServer是轻量级的目录服务器,主要提供了Broker管理、路由信息管理、服务发现等功能。 #### 2. Namesrv启动流程 NameServer的启动需要进行初始化和注册过程。 #### 3. RouteInfoManager RouteInfoManager负责...
- **发送消息到broker**:Producer将消息发送至Master Broker,然后消息通过Broker的主从复制机制同步到Slave Broker上。 ### Producer消息发送 - **轮询队列实现负载均衡**:Producer通过轮询的方式遍历Topic下的...
2. **Broker**:消息的实际存储单元,可以组成Broker Group来支持集群部署。 3. **Producer**:消息生产者,负责向Broker发送消息。 4. **Consumer**:消息消费者,负责从Broker拉取消息或接收消息推送。 RocketMQ...
文档中提到了RocketMQ从早期的Metaq版本到3.x版本的发展历程,以及与早期版本的兼容性问题。用户指南V3.2.4还提及了与其它系统的整合,比如与JMS、CORBA Notification的整合,这说明了RocketMQ的开放性和对不同消息...
RocketMQ是一个由阿里巴巴开源的消息中间件,脱胎自阿里巴巴的MetaQ,在设计上借鉴了Kafka。下面将详细介绍Window搭建部署RocketMQ的步骤。 一、安装RocketMQ 首先需要下载RocketMQ的发行版本,解压缩后得到bin、...
最后,文档中还提供了产品发展历史的概述,从Metaq的迭代到RocketMQ的正式上线,以及其后续版本的发展。介绍了如何在不同的业务系统中深度定制RocketMQ来满足特定的需求,如淘宝、支付宝和B2B等场景下的应用。并且,...
物理部署中可能包括消息服务器(Broker)、NameServer、生产者和消费者等。 3. RocketMQ逻辑部署结构 逻辑部署结构是指消息系统的软件架构和组件间的交互。包括生产者、消费者、Broker服务器、NameServer和消息...