上一篇以及上上篇基本介绍了MetaQ如何使用Gecko框架在网络上传输数据,今天将继续进一步介绍在Broker,各种命令的处理逻辑(暂时将不涉及到事务处理)。
依旧是在MetaMorphosisBroker的registerProcessors()方法中,我们可以注意到一点,每个Processor的实例在构造的时候都注入了一个brokerProcessor的变量,该变量的类型为CommandProcessor。其实,各个Processor的业务逻辑又委托给了CommandProcessor进行处理,比如我们看看其中的GetProcessor的源码:
public class GetProcessor implements RequestProcessor<GetCommand> { public static final Logger log = LoggerFactory.getLogger(GetProcessor.class); private final ThreadPoolExecutor executor; private final CommandProcessor processor; public GetProcessor(final CommandProcessor processor, final ThreadPoolExecutor executor) { this.processor = processor; this.executor = executor; } @Override public ThreadPoolExecutor getExecutor() { return this.executor; } @Override public void handleRequest(final GetCommand request, final Connection conn) { // Processor并没有处理具体的业务逻辑,而是将业务逻辑交给CommandProcessor的processGetCommand()进行处理,Processor只是将处理结果简单的返回给客户端 final ResponseCommand response = this.processor.processGetCommand(request, SessionContextHolder.getOrCreateSessionContext(conn, null)); if (response != null) { RemotingUtils.response(conn, response); } } }
CommandProcessor业务逻辑的处理模块采用责任链的处理方式,目前来说只有两个类型的业务逻辑处理单元:带有事务处理(TransactionalCommandProcessor)的和不带有事务处理(BrokerCommandProcessor)的。老习惯,先上类图:
CommandProcessor接口定义如下:
public interface CommandProcessor extends Service { //处理Put命令,结果通过PutCallback的回调返回 public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) throws Exception; //处理Get命令 public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx); /** * Under conditions that cannot use notify-remoting directly. */ //处理Get命令,并根据条件zeroCopy是否使用zeroCopy public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx, final boolean zeroCopy); //处理查询最近可用offset位置请求 public ResponseCommand processOffsetCommand(OffsetCommand request, final SessionContext ctx); //处理退出请求 public void processQuitCommand(QuitCommand request, final SessionContext ctx); public ResponseCommand processVesionCommand(VersionCommand request, final SessionContext ctx); //处理统计请求 public ResponseCommand processStatCommand(StatsCommand request, final SessionContext ctx); //下面主要定义与事务相关的方法,暂时先不介绍 public void removeTransaction(final XATransactionId xid); public Transaction getTransaction(final SessionContext context, final TransactionId xid) throws MetamorphosisException, XAException; public void forgetTransaction(final SessionContext context, final TransactionId xid) throws Exception; public void rollbackTransaction(final SessionContext context, final TransactionId xid) throws Exception; public void commitTransaction(final SessionContext context, final TransactionId xid, final boolean onePhase) throws Exception; public int prepareTransaction(final SessionContext context, final TransactionId xid) throws Exception; public void beginTransaction(final SessionContext context, final TransactionId xid, final int seconds) throws Exception; public TransactionId[] getPreparedTransactions(final SessionContext context, String uniqueQualifier) throws Exception; }
细心的读者会发现,每个定义的方法的参数都有一个参数SessionContext,SessionContext携带了连接的信息,由Broker创建,具体代码见SessionContextHolder的getOrCreateSessionContext()方法,getOrCreateSessionContext()方法在Processor委托给CommandProcessor处理业务逻辑时被调用。
BrokerCommandProcessor和TransactionalCommandProcessor其实就是各模块的粘合剂,将各模块的功能统一协调形成整体对外提供功能。BrokerCommandProcessor的实现并不难理解,下面让我们来具体分析一下BrokerCommandProcessor这个类:
//Put请求的业务逻辑处理 @Override public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) { final String partitionString = this.metaConfig.getBrokerId() + "-" + request.getPartition(); //统计计算 this.statsManager.statsPut(request.getTopic(), partitionString, 1); this.statsManager.statsMessageSize(request.getTopic(), request.getData().length); int partition = -1; try { //如果对应存储的分区已经关闭,则拒绝该消息 if (this.metaConfig.isClosedPartition(request.getTopic(), request.getPartition())) { log.warn("Can not put message to partition " + request.getPartition() + " for topic=" + request.getTopic() + ",it was closed"); if (cb != null) { cb.putComplete(new BooleanCommand(HttpStatus.Forbidden, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:partition[" + partitionString + "] has been closed", request.getOpaque())); } return; } partition = this.getPartition(request); //获取对应Topic分区的MessageStore实例 final MessageStore store = this.storeManager.getOrCreateMessageStore(request.getTopic(), partition); // 如果是动态添加的topic,需要注册到zk //就到目前为止,我着实没想明白下面这句代码的用途是什么? //如果topic没有在该Broker的配置中配置,在MessageStoreManager中的isLegalTopic()方法中检查就通不过而抛出异常,那么下面这句代码怎么样都不会被执行,而Client要向Broker发送消息,一定要先发布topic,保证topic在zk发布; this.brokerZooKeeper.registerTopicInZk(request.getTopic(), false); // 设置唯一id final long messageId = this.idWorker.nextId(); //存储消息,之前的文章介绍过Broker的存储使用回调的方式,易于异步的实现,代码简单不分析 store.append(messageId, request, new StoreAppendCallback(partition, partitionString, request, messageId, cb)); } catch (final Exception e) { //发生异常,统计计算回滚 this.statsManager.statsPutFailed(request.getTopic(), partitionString, 1); log.error("Put message failed", e); if (cb != null) { //返回结果 cb.putComplete(new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), partition) + "Detail:" + e.getMessage(), request.getOpaque())); } } } @Override // GET请求的业务逻辑处理 public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx) { //默认为zeroCopy return this.processGetCommand(request, ctx, true); } @Override public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx, final boolean zeroCopy) { //获取查询信息 final String group = request.getGroup(); final String topic = request.getTopic(); //统计计数(请求数统计) this.statsManager.statsGet(topic, group, 1); // 如果分区被关闭,禁止读数据 --wuhua if (this.metaConfig.isClosedPartition(topic, request.getPartition())) { log.warn("can not get message for topic=" + topic + " from partition " + request.getPartition() + ",it closed,"); return new BooleanCommand(HttpStatus.Forbidden, "Partition[" + this.metaConfig.getBrokerId() + "-" + request.getPartition() + "] has been closed", request.getOpaque()); } //获取topic对应分区的MessageStore实例,如果实例不存在,则返回NotFound final MessageStore store = this.storeManager.getMessageStore(topic, request.getPartition()); if (store == null) { //统计计数 this.statsManager.statsGetMiss(topic, group, 1); return new BooleanCommand(HttpStatus.NotFound, "The topic `" + topic + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque()); } //如果请求的起始位置<0,判定该请求无效 if (request.getMaxSize() <= 0) { return new BooleanCommand(HttpStatus.BadRequest, "Bad request,invalid max size:" + request.getMaxSize(), request.getOpaque()); } try { //读取由request.getOffset()开始的消息集合 final MessageSet set = store.slice(request.getOffset(), Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); //如果当前消息集不为空 if (set != null) { //判断是否zeroCopy,如果是zeroCopy,则直接写;如果不是,则将消息集包装成DataCommand,这也就是前面为什么说DataCommand要实现encode()方法的缘故 if (zeroCopy) { set.write(request, ctx); return null; } else { // refer to the code of line 440 in MessageStore // create two copies of byte array including the byteBuffer // and new bytes // this may not a good use case of Buffer final ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); set.read(byteBuffer); byteBuffer.flip(); final byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); return new DataCommand(bytes, request.getOpaque()); } } else { //如果为空消息集,则认为请求无效 //统计计数 this.statsManager.statsGetMiss(topic, group, 1); this.statsManager.statsGetFailed(topic, group, 1); // 当请求的偏移量大于实际最大值时,返回给客户端实际最大的偏移量. final long maxOffset = store.getMaxOffset(); final long requestOffset = request.getOffset(); if (requestOffset > maxOffset && (this.metaConfig.isUpdateConsumerOffsets() || requestOffset == Long.MAX_VALUE)) { log.info("offset[" + requestOffset + "] is exceeded,tell the client real max offset: " + maxOffset + ",topic=" + topic + ",group=" + group); this.statsManager.statsOffset(topic, group, 1); return new BooleanCommand(HttpStatus.Moved, String.valueOf(maxOffset), request.getOpaque()); } else { return new BooleanCommand(HttpStatus.NotFound, "Could not find message at position " + requestOffset, request.getOpaque()); } } } catch (final ArrayIndexOutOfBoundsException e) { log.error("Could not get message from position " + request.getOffset() + ",it is out of bounds,topic=" + topic); // 告知最近可用的offset this.statsManager.statsGetMiss(topic, group, 1); this.statsManager.statsGetFailed(topic, group, 1); final long validOffset = store.getNearestOffset(request.getOffset()); this.statsManager.statsOffset(topic, group, 1); return new BooleanCommand(HttpStatus.Moved, String.valueOf(validOffset), request.getOpaque()); } catch (final Throwable e) { log.error("Could not get message from position " + request.getOffset(), e); this.statsManager.statsGetFailed(topic, group, 1); return new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:" + e.getMessage(), request.getOpaque()); } } //查询最近可用offset请求的业务逻辑处理 @Override public ResponseCommand processOffsetCommand(final OffsetCommand request, final SessionContext ctx) { //统计计数 this.statsManager.statsOffset(request.getTopic(), request.getGroup(), 1); //获取topic对应分区的MessageStore实例 final MessageStore store = this.storeManager.getMessageStore(request.getTopic(), request.getPartition()); //如果为空,则返回未找到 if (store == null) { return new BooleanCommand(HttpStatus.NotFound, "The topic `" + request.getTopic() + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque()); } //获取topic对应分区最近可用的offset final long offset = store.getNearestOffset(request.getOffset()); return new BooleanCommand(HttpStatus.Success, String.valueOf(offset), request.getOpaque()); } //退出请求业务逻辑处理 @Override public void processQuitCommand(final QuitCommand request, final SessionContext ctx) { try { if (ctx.getConnection() != null) { //关闭与客户端的连接 ctx.getConnection().close(false); } } catch (final NotifyRemotingException e) { // ignore } } //版本查询请求业务逻辑处理 @Override public ResponseCommand processVesionCommand(final VersionCommand request, final SessionContext ctx) { //返回当前Broker版本 return new BooleanCommand(HttpStatus.Success, BuildProperties.VERSION, request.getOpaque()); } //统计请求查询业务逻辑处理 @Override public ResponseCommand processStatCommand(final StatsCommand request, final SessionContext ctx) { //判断类型,如果类型以config 开头,则传输整个配置文件 final String item = request.getItem(); if ("config".equals(item)) { return this.processStatsConfig(request, ctx); } else { //如果是获取统计结果,则从统计模块获取响应结果并返回给客户端 final String statsInfo = this.statsManager.getStatsInfo(item); return new BooleanCommand(HttpStatus.Success, statsInfo, request.getOpaque()); } } //获取配置文件内容,使用zeroCopy将文件内容发送到客户端,构造的响应用BooleanCommand @SuppressWarnings("resource") private ResponseCommand processStatsConfig(final StatsCommand request, final SessionContext ctx) { try { final FileChannel fc = new FileInputStream(this.metaConfig.getConfigFilePath()).getChannel(); // result code length opaque\r\n IoBuffer buf = IoBuffer.allocate(11 + 3 + ByteUtils.stringSize(fc.size()) + ByteUtils.stringSize(request.getOpaque())); ByteUtils.setArguments(buf, MetaEncodeCommand.RESULT_CMD, HttpStatus.Success, fc.size(), request.getOpaque()); buf.flip(); ctx.getConnection().transferFrom(buf, null, fc, 0, fc.size(), request.getOpaque(), new SingleRequestCallBackListener() { @Override public void onResponse(ResponseCommand responseCommand, Connection conn) { this.closeChannel(); } @Override public void onException(Exception e) { this.closeChannel(); } private void closeChannel() { try { fc.close(); } catch (IOException e) { log.error("IOException while stats config", e); } } @Override public ThreadPoolExecutor getExecutor() { return null; } }, 5000, TimeUnit.MILLISECONDS); } catch (FileNotFoundException e) { log.error("Config file not found:" + this.metaConfig.getConfigFilePath(), e); return new BooleanCommand(HttpStatus.InternalServerError, "Config file not found:" + this.metaConfig.getConfigFilePath(), request.getOpaque()); } catch (IOException e) { log.error("IOException while stats config", e); return new BooleanCommand(HttpStatus.InternalServerError, "Read config file error:" + e.getMessage(), request.getOpaque()); } catch (NotifyRemotingException e) { log.error("NotifyRemotingException while stats config", e); } return null; }
如果不使用内容的事务,Broker已经完成了从网络接收数据—>处理请求(存储消息/查询结果等)—>返回结果的流程,Broker最基础的流程已经基本分析完毕。
相关推荐
《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...
Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...
在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...
1. 日志收集:MetaQ可用于收集分布在各服务器上的日志,统一管理和分析,提高运维效率。 2. 数据同步:在分布式数据库或缓存系统中,MetaQ可以作为数据变更的传播通道,保证数据的一致性。 3. 异步处理:对于耗时...
MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...
本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...
最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...
Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...
RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...
MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...
阿里消息中间件MetaQ学习Demo
### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...
- **事务支持**:MetaQ支持两种类型的事务——本地事务和XA分布式事务。这两种事务类型能够满足支付宝钱包系统在处理复杂金融交易时对数据一致性的需求。 - **高可用复制**:MetaQ提供了异步复制和同步复制两种模式...
综上所述,实时数仓2.0是一种先进的数据处理框架,它通过优化数据模型、提升处理速度、确保数据质量,以及利用高级状态管理技术,来满足企业对实时数据分析的高要求。这一解决方案为企业提供了更敏捷的业务洞察,...
阿里巴巴企业诚信体系是基于大数据和先进技术构建的一套全面的安全架构,旨在从多个维度评估和管理企业信用风险。这个体系不仅涵盖了安全威胁情报、安全建设、应急响应和法律法规等多个关键领域,还利用自动化手段...
在开源的推动下,许多技术都得到了长足的发展,如LVS(Linux Virtual Server)、Tengine、MetaQ、dubbo、cobar、Fastjson等。这些技术的成功案例表明,开源不仅是技术的共享,也是知识和创新的共享。 蒋涛的讲话...