  • 浏览: 1627944 次
  • 性别: Icon_minigender_1
  • 来自: 上海





Java代码 复制代码 收藏代码
  1. public class GetProcessor implements RequestProcessor<GetCommand> { 
  2.     public static final Logger log = LoggerFactory.getLogger(GetProcessor.class); 
  4.     private final ThreadPoolExecutor executor; 
  6.     private final CommandProcessor processor; 
  8.     public GetProcessor(final CommandProcessor processor, final ThreadPoolExecutor executor) { 
  9.         this.processor = processor; 
  10.         this.executor = executor; 
  11.     } 
  13.     @Override 
  14.     public ThreadPoolExecutor getExecutor() { 
  15.         return this.executor; 
  16.     } 
  18.     @Override 
  19.     public void handleRequest(final GetCommand request, final Connection conn) { 
  20.         // Processor并没有处理具体的业务逻辑,而是将业务逻辑交给CommandProcessor的processGetCommand()进行处理,Processor只是将处理结果简单的返回给客户端 
  21. final ResponseCommand response = this.processor.processGetCommand(request, SessionContextHolder.getOrCreateSessionContext(conn, null)); 
  22.         if (response != null) { 
  23.             RemotingUtils.response(conn, response); 
  24.         } 
  25.     } 
public class GetProcessor implements RequestProcessor<GetCommand> {	public static final Logger log = LoggerFactory.getLogger(GetProcessor.class);	private final ThreadPoolExecutor executor;	private final CommandProcessor processor;	public GetProcessor(final CommandProcessor processor, final ThreadPoolExecutor executor) {		this.processor = processor;		this.executor = executor;	}	@Override	public ThreadPoolExecutor getExecutor() {		return this.executor;	}	@Override	public void handleRequest(final GetCommand request, final Connection conn) {		// Processor并没有处理具体的业务逻辑,而是将业务逻辑交给CommandProcessor的processGetCommand()进行处理,Processor只是将处理结果简单的返回给客户端final ResponseCommand response = this.processor.processGetCommand(request, SessionContextHolder.getOrCreateSessionContext(conn, null));		if (response != null) {			RemotingUtils.response(conn, response);		}	}}



Java代码 复制代码 收藏代码
  1. public interface CommandProcessor extends Service { 
  2.     //处理Put命令,结果通过PutCallback的回调返回 
  3.     public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) throws Exception; 
  4.      //处理Get命令 
  5.     public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx); 
  7.     /**
  8.      * Under conditions that cannot use notify-remoting directly.
  9.      */ 
  10.      //处理Get命令,并根据条件zeroCopy是否使用zeroCopy 
  11.     public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx, final boolean zeroCopy); 
  12.     //处理查询最近可用offset位置请求 
  13.     public ResponseCommand processOffsetCommand(OffsetCommand request, final SessionContext ctx); 
  14.     //处理退出请求 
  15.     public void processQuitCommand(QuitCommand request, final SessionContext ctx); 
  17.     public ResponseCommand processVesionCommand(VersionCommand request, final SessionContext ctx); 
  18.     //处理统计请求 
  19.     public ResponseCommand processStatCommand(StatsCommand request, final SessionContext ctx); 
  20.     //下面主要定义与事务相关的方法,暂时先不介绍 
  21.     public void removeTransaction(final XATransactionId xid); 
  23.     public Transaction getTransaction(final SessionContext context, final TransactionId xid) throws MetamorphosisException, XAException; 
  25.     public void forgetTransaction(final SessionContext context, final TransactionId xid) throws Exception; 
  27.     public void rollbackTransaction(final SessionContext context, final TransactionId xid) throws Exception; 
  29.     public void commitTransaction(final SessionContext context, final TransactionId xid, final boolean onePhase) throws Exception; 
  31.     public int prepareTransaction(final SessionContext context, final TransactionId xid) throws Exception; 
  33.     public void beginTransaction(final SessionContext context, final TransactionId xid, final int seconds) throws Exception; 
  35.     public TransactionId[] getPreparedTransactions(final SessionContext context, String uniqueQualifier) throws Exception; 
public interface CommandProcessor extends Service {    //处理Put命令,结果通过PutCallback的回调返回	public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) throws Exception;     //处理Get命令	public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx);	/**	 * Under conditions that cannot use notify-remoting directly.	 */     //处理Get命令,并根据条件zeroCopy是否使用zeroCopy	public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx, final boolean zeroCopy);    //处理查询最近可用offset位置请求	public ResponseCommand processOffsetCommand(OffsetCommand request, final SessionContext ctx);    //处理退出请求	public void processQuitCommand(QuitCommand request, final SessionContext ctx);   	public ResponseCommand processVesionCommand(VersionCommand request, final SessionContext ctx);    //处理统计请求	public ResponseCommand processStatCommand(StatsCommand request, final SessionContext ctx);    //下面主要定义与事务相关的方法,暂时先不介绍	public void removeTransaction(final XATransactionId xid);	public Transaction getTransaction(final SessionContext context, final TransactionId xid) throws MetamorphosisException, XAException;	public void forgetTransaction(final SessionContext context, final TransactionId xid) throws Exception;	public void rollbackTransaction(final SessionContext context, final TransactionId xid) throws Exception;	public void commitTransaction(final SessionContext context, final TransactionId xid, final boolean onePhase) throws Exception;	public int prepareTransaction(final SessionContext context, final TransactionId xid) throws Exception;	public void beginTransaction(final SessionContext context, final TransactionId xid, final int seconds) throws Exception;	public TransactionId[] getPreparedTransactions(final SessionContext context, String uniqueQualifier) throws Exception;}




Java代码 复制代码 收藏代码
  1. //Put请求的业务逻辑处理 
  2. @Override 
  3. public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) { 
  4.         final String partitionString = this.metaConfig.getBrokerId() + "-" + request.getPartition(); 
  5. //统计计算 
  6.         this.statsManager.statsPut(request.getTopic(), partitionString, 1); 
  7.         this.statsManager.statsMessageSize(request.getTopic(), request.getData().length); 
  8.         int partition = -1; 
  9.         try { 
  10. //如果对应存储的分区已经关闭,则拒绝该消息 
  11.             if (this.metaConfig.isClosedPartition(request.getTopic(), request.getPartition())) { 
  12.                 log.warn("Can not put message to partition " + request.getPartition() + " for topic=" + request.getTopic() + ",it was closed"); 
  13.                 if (cb != null) { 
  14.                     cb.putComplete(new BooleanCommand(HttpStatus.Forbidden, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:partition[" + partitionString + "] has been closed", request.getOpaque())); 
  15.                 } 
  16.                 return; 
  17.             } 
  19.             partition = this.getPartition(request); 
  20. //获取对应Topic分区的MessageStore实例 
  21.             final MessageStore store = this.storeManager.getOrCreateMessageStore(request.getTopic(), partition); 
  22.             // 如果是动态添加的topic,需要注册到zk 
  23. //就到目前为止,我着实没想明白下面这句代码的用途是什么?  
  24. //如果topic没有在该Broker的配置中配置,在MessageStoreManager中的isLegalTopic()方法中检查就通不过而抛出异常,那么下面这句代码怎么样都不会被执行,而Client要向Broker发送消息,一定要先发布topic,保证topic在zk发布;  
  25.             this.brokerZooKeeper.registerTopicInZk(request.getTopic(), false); 
  26.             // 设置唯一id 
  27.             final long messageId = this.idWorker.nextId(); 
  28.             //存储消息,之前的文章介绍过Broker的存储使用回调的方式,易于异步的实现,代码简单不分析 
  29. store.append(messageId, request, new StoreAppendCallback(partition, partitionString, request, messageId, cb)); 
  30.         } catch (final Exception e) { 
  31. //发生异常,统计计算回滚 
  32.             this.statsManager.statsPutFailed(request.getTopic(), partitionString, 1); 
  33.             log.error("Put message failed", e); 
  34.             if (cb != null) { 
  35. //返回结果 
  36.                 cb.putComplete(new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), partition) + "Detail:" + e.getMessage(), request.getOpaque())); 
  37.             } 
  38.         } 
  39.     } 
  41. @Override 
  42. // GET请求的业务逻辑处理 
  43. public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx) { 
  44. //默认为zeroCopy 
  45.         return this.processGetCommand(request, ctx, true); 
  46.     } 
  48.     @Override 
  49.     public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx, final boolean zeroCopy) { 
  50. //获取查询信息 
  51.         final String group = request.getGroup(); 
  52.         final String topic = request.getTopic(); 
  53. //统计计数(请求数统计) 
  54.         this.statsManager.statsGet(topic, group, 1); 
  56.         // 如果分区被关闭,禁止读数据 --wuhua 
  57.         if (this.metaConfig.isClosedPartition(topic, request.getPartition())) { 
  58.             log.warn("can not get message for topic=" + topic + " from partition " + request.getPartition() + ",it closed,"); 
  59.             return new BooleanCommand(HttpStatus.Forbidden, "Partition[" + this.metaConfig.getBrokerId() + "-" + request.getPartition() + "] has been closed", request.getOpaque()); 
  60.         } 
  61. //获取topic对应分区的MessageStore实例,如果实例不存在,则返回NotFound 
  62.         final MessageStore store = this.storeManager.getMessageStore(topic, request.getPartition()); 
  63.         if (store == null) { 
  64. //统计计数 
  65.             this.statsManager.statsGetMiss(topic, group, 1); 
  66.             return new BooleanCommand(HttpStatus.NotFound, "The topic `" + topic + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque()); 
  67.         } 
  68. //如果请求的起始位置<0,判定该请求无效 
  69.         if (request.getMaxSize() <= 0) { 
  70.             return new BooleanCommand(HttpStatus.BadRequest, "Bad request,invalid max size:" + request.getMaxSize(), request.getOpaque()); 
  71.         } 
  72.         try { 
  73. //读取由request.getOffset()开始的消息集合 
  74.             final MessageSet set = store.slice(request.getOffset(), Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); 
  75. //如果当前消息集不为空 
  76.             if (set != null) { 
  77. //判断是否zeroCopy,如果是zeroCopy,则直接写;如果不是,则将消息集包装成DataCommand,这也就是前面为什么说DataCommand要实现encode()方法的缘故 
  78.                 if (zeroCopy) { 
  79.                     set.write(request, ctx); 
  80.                     return null; 
  81.                 } else { 
  82.                     // refer to the code of line 440 in MessageStore 
  83.                     // create two copies of byte array including the byteBuffer 
  84.                     // and new bytes 
  85.                     // this may not a good use case of Buffer 
  86.                     final ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); 
  87.                     set.read(byteBuffer); 
  88.                     byteBuffer.flip(); 
  89.                     final byte[] bytes = new byte[byteBuffer.remaining()]; 
  90.                     byteBuffer.get(bytes); 
  91.                     return new DataCommand(bytes, request.getOpaque()); 
  92.                 } 
  93.             } else { 
  94. //如果为空消息集,则认为请求无效 
  95. //统计计数 
  96.                 this.statsManager.statsGetMiss(topic, group, 1); 
  97.                 this.statsManager.statsGetFailed(topic, group, 1); 
  99.                 // 当请求的偏移量大于实际最大值时,返回给客户端实际最大的偏移量. 
  100.                 final long maxOffset = store.getMaxOffset(); 
  101.                 final long requestOffset = request.getOffset(); 
  102.                 if (requestOffset > maxOffset && (this.metaConfig.isUpdateConsumerOffsets() || requestOffset == Long.MAX_VALUE)) { 
  103.                     log.info("offset[" + requestOffset + "] is exceeded,tell the client real max offset: " + maxOffset + ",topic=" + topic + ",group=" + group); 
  104.                     this.statsManager.statsOffset(topic, group, 1); 
  105.                     return new BooleanCommand(HttpStatus.Moved, String.valueOf(maxOffset), request.getOpaque()); 
  106.                 } else { 
  107.                     return new BooleanCommand(HttpStatus.NotFound, "Could not find message at position " + requestOffset, request.getOpaque()); 
  108.                 } 
  109.             } 
  110.         } catch (final ArrayIndexOutOfBoundsException e) { 
  111.             log.error("Could not get message from position " + request.getOffset() + ",it is out of bounds,topic=" + topic); 
  112.             // 告知最近可用的offset 
  113.             this.statsManager.statsGetMiss(topic, group, 1); 
  114.             this.statsManager.statsGetFailed(topic, group, 1); 
  115.             final long validOffset = store.getNearestOffset(request.getOffset()); 
  116.             this.statsManager.statsOffset(topic, group, 1); 
  117.             return new BooleanCommand(HttpStatus.Moved, String.valueOf(validOffset), request.getOpaque()); 
  118.         } catch (final Throwable e) { 
  119.             log.error("Could not get message from position " + request.getOffset(), e); 
  120.             this.statsManager.statsGetFailed(topic, group, 1); 
  121.             return new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:" + e.getMessage(), request.getOpaque()); 
  122.         } 
  123.     } 
  125. //查询最近可用offset请求的业务逻辑处理 
  126. @Override 
  127.     public ResponseCommand processOffsetCommand(final OffsetCommand request, final SessionContext ctx) { 
  128. //统计计数 
  129.         this.statsManager.statsOffset(request.getTopic(), request.getGroup(), 1); 
  130. //获取topic对应分区的MessageStore实例 
  131.         final MessageStore store = this.storeManager.getMessageStore(request.getTopic(), request.getPartition()); 
  132. //如果为空,则返回未找到 
  133.         if (store == null) { 
  134.             return new BooleanCommand(HttpStatus.NotFound, "The topic `" + request.getTopic() + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque()); 
  135.         } 
  136.         //获取topic对应分区最近可用的offset 
  137. final long offset = store.getNearestOffset(request.getOffset()); 
  138.         return new BooleanCommand(HttpStatus.Success, String.valueOf(offset), request.getOpaque()); 
  139.     } 
  141. //退出请求业务逻辑处理 
  142.     @Override 
  143.     public void processQuitCommand(final QuitCommand request, final SessionContext ctx) { 
  144.         try { 
  145.             if (ctx.getConnection() != null) { 
  146.                 //关闭与客户端的连接 
  147.                 ctx.getConnection().close(false); 
  148.             } 
  149.         } catch (final NotifyRemotingException e) { 
  150.             // ignore 
  151.         } 
  152.     } 
  154. //版本查询请求业务逻辑处理 
  155. @Override 
  156.     public ResponseCommand processVesionCommand(final VersionCommand request, final SessionContext ctx) { 
  157. //返回当前Broker版本 
  158.         return new BooleanCommand(HttpStatus.Success, BuildProperties.VERSION, request.getOpaque()); 
  159.     } 
  161. //统计请求查询业务逻辑处理 
  162.     @Override 
  163.     public ResponseCommand processStatCommand(final StatsCommand request, final SessionContext ctx) { 
  164. //判断类型,如果类型以config 开头,则传输整个配置文件 
  165.         final String item = request.getItem(); 
  166.         if ("config".equals(item)) { 
  167.             return this.processStatsConfig(request, ctx); 
  168.         } else { 
  169. //如果是获取统计结果,则从统计模块获取响应结果并返回给客户端 
  170.             final String statsInfo = this.statsManager.getStatsInfo(item); 
  171.             return new BooleanCommand(HttpStatus.Success, statsInfo, request.getOpaque()); 
  172.         } 
  173.     } 
  175.     //获取配置文件内容,使用zeroCopy将文件内容发送到客户端,构造的响应用BooleanCommand 
  176. @SuppressWarnings("resource") 
  177.     private ResponseCommand processStatsConfig(final StatsCommand request, final SessionContext ctx) { 
  178.         try { 
  179.             final FileChannel fc = new FileInputStream(this.metaConfig.getConfigFilePath()).getChannel(); 
  180.             // result code length opaque\r\n 
  181.             IoBuffer buf = IoBuffer.allocate(11 + 3 + ByteUtils.stringSize(fc.size()) + ByteUtils.stringSize(request.getOpaque())); 
  182.             ByteUtils.setArguments(buf, MetaEncodeCommand.RESULT_CMD, HttpStatus.Success, fc.size(), request.getOpaque()); 
  183.             buf.flip(); 
  184.             ctx.getConnection().transferFrom(buf, null, fc, 0, fc.size(), request.getOpaque(), 
  185.                     new SingleRequestCallBackListener() { 
  186.                         @Override 
  187.                         public void onResponse(ResponseCommand responseCommand, Connection conn) { 
  188.                             this.closeChannel(); 
  189.                         } 
  191.                         @Override 
  192.                         public void onException(Exception e) { 
  193.                             this.closeChannel(); 
  194.                         } 
  196.                         private void closeChannel() { 
  197.                             try { 
  198.                                 fc.close(); 
  199.                             } catch (IOException e) { 
  200.                                 log.error("IOException while stats config", e); 
  201.                             } 
  202.                         } 
  204.                         @Override 
  205.                         public ThreadPoolExecutor getExecutor() { 
  206.                             return null; 
  207.                         } 
  208.                     }, 5000, TimeUnit.MILLISECONDS); 
  209.         } catch (FileNotFoundException e) { 
  210.             log.error("Config file not found:" + this.metaConfig.getConfigFilePath(), e); 
  211.             return new BooleanCommand(HttpStatus.InternalServerError, "Config file not found:" + this.metaConfig.getConfigFilePath(), request.getOpaque()); 
  212.         } catch (IOException e) { 
  213.             log.error("IOException while stats config", e); 
  214.             return new BooleanCommand(HttpStatus.InternalServerError, "Read config file error:" + e.getMessage(), request.getOpaque()); 
  215.         } catch (NotifyRemotingException e) { 
  216.             log.error("NotifyRemotingException while stats config", e); 
  217.         } 
  218.         return null; 
  219.     } 
//Put请求的业务逻辑处理@Overridepublic void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) {		final String partitionString = this.metaConfig.getBrokerId() + "-" + request.getPartition();//统计计算		this.statsManager.statsPut(request.getTopic(), partitionString, 1);		this.statsManager.statsMessageSize(request.getTopic(), request.getData().length);		int partition = -1;		try {//如果对应存储的分区已经关闭,则拒绝该消息			if (this.metaConfig.isClosedPartition(request.getTopic(), request.getPartition())) {				log.warn("Can not put message to partition " + request.getPartition() + " for topic=" + request.getTopic() + ",it was closed");				if (cb != null) {					cb.putComplete(new BooleanCommand(HttpStatus.Forbidden, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:partition[" + partitionString + "] has been closed", request.getOpaque()));				}				return;			}			partition = this.getPartition(request);//获取对应Topic分区的MessageStore实例			final MessageStore store = this.storeManager.getOrCreateMessageStore(request.getTopic(), partition);			// 如果是动态添加的topic,需要注册到zk//就到目前为止,我着实没想明白下面这句代码的用途是什么? //如果topic没有在该Broker的配置中配置,在MessageStoreManager中的isLegalTopic()方法中检查就通不过而抛出异常,那么下面这句代码怎么样都不会被执行,而Client要向Broker发送消息,一定要先发布topic,保证topic在zk发布; 			this.brokerZooKeeper.registerTopicInZk(request.getTopic(), false);			// 设置唯一id			final long messageId = this.idWorker.nextId();			//存储消息,之前的文章介绍过Broker的存储使用回调的方式,易于异步的实现,代码简单不分析store.append(messageId, request, new StoreAppendCallback(partition, partitionString, request, messageId, cb));		} catch (final Exception e) {//发生异常,统计计算回滚			this.statsManager.statsPutFailed(request.getTopic(), partitionString, 1);			log.error("Put message failed", e);			if (cb != null) {//返回结果				cb.putComplete(new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), partition) + "Detail:" + e.getMessage(), request.getOpaque()));			}		}	}@Override// GET请求的业务逻辑处理public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx) {//默认为zeroCopy		return this.processGetCommand(request, ctx, true);	}	@Override	public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx, final boolean zeroCopy) {//获取查询信息		final String group = request.getGroup();		final String topic = request.getTopic();//统计计数(请求数统计)		this.statsManager.statsGet(topic, group, 1);		// 如果分区被关闭,禁止读数据 --wuhua		if (this.metaConfig.isClosedPartition(topic, request.getPartition())) {			log.warn("can not get message for topic=" + topic + " from partition " + request.getPartition() + ",it closed,");			return new BooleanCommand(HttpStatus.Forbidden, "Partition[" + this.metaConfig.getBrokerId() + "-" + request.getPartition() + "] has been closed", request.getOpaque());		}//获取topic对应分区的MessageStore实例,如果实例不存在,则返回NotFound		final MessageStore store = this.storeManager.getMessageStore(topic, request.getPartition());		if (store == null) {//统计计数			this.statsManager.statsGetMiss(topic, group, 1);			return new BooleanCommand(HttpStatus.NotFound, "The topic `" + topic + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque());		}//如果请求的起始位置<0,判定该请求无效		if (request.getMaxSize() <= 0) {			return new BooleanCommand(HttpStatus.BadRequest, "Bad request,invalid max size:" + request.getMaxSize(), request.getOpaque());		}		try {//读取由request.getOffset()开始的消息集合			final MessageSet set = store.slice(request.getOffset(), Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize()));//如果当前消息集不为空			if (set != null) {//判断是否zeroCopy,如果是zeroCopy,则直接写;如果不是,则将消息集包装成DataCommand,这也就是前面为什么说DataCommand要实现encode()方法的缘故				if (zeroCopy) {					set.write(request, ctx);					return null;				} else {					// refer to the code of line 440 in MessageStore					// create two copies of byte array including the byteBuffer					// and new bytes					// this may not a good use case of Buffer					final ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize()));					set.read(byteBuffer);					byteBuffer.flip();					final byte[] bytes = new byte[byteBuffer.remaining()];					byteBuffer.get(bytes);					return new DataCommand(bytes, request.getOpaque());				}			} else {//如果为空消息集,则认为请求无效//统计计数				this.statsManager.statsGetMiss(topic, group, 1);				this.statsManager.statsGetFailed(topic, group, 1);				// 当请求的偏移量大于实际最大值时,返回给客户端实际最大的偏移量.				final long maxOffset = store.getMaxOffset();				final long requestOffset = request.getOffset();				if (requestOffset > maxOffset && (this.metaConfig.isUpdateConsumerOffsets() || requestOffset == Long.MAX_VALUE)) {					log.info("offset[" + requestOffset + "] is exceeded,tell the client real max offset: " + maxOffset + ",topic=" + topic + ",group=" + group);					this.statsManager.statsOffset(topic, group, 1);					return new BooleanCommand(HttpStatus.Moved, String.valueOf(maxOffset), request.getOpaque());				} else {					return new BooleanCommand(HttpStatus.NotFound, "Could not find message at position " + requestOffset, request.getOpaque());				}			}		} catch (final ArrayIndexOutOfBoundsException e) {			log.error("Could not get message from position " + request.getOffset() + ",it is out of bounds,topic=" + topic);			// 告知最近可用的offset			this.statsManager.statsGetMiss(topic, group, 1);			this.statsManager.statsGetFailed(topic, group, 1);			final long validOffset = store.getNearestOffset(request.getOffset());			this.statsManager.statsOffset(topic, group, 1);			return new BooleanCommand(HttpStatus.Moved, String.valueOf(validOffset), request.getOpaque());		} catch (final Throwable e) {			log.error("Could not get message from position " + request.getOffset(), e);			this.statsManager.statsGetFailed(topic, group, 1);			return new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:" + e.getMessage(), request.getOpaque());		}	}//查询最近可用offset请求的业务逻辑处理@Override	public ResponseCommand processOffsetCommand(final OffsetCommand request, final SessionContext ctx) {//统计计数		this.statsManager.statsOffset(request.getTopic(), request.getGroup(), 1);//获取topic对应分区的MessageStore实例		final MessageStore store = this.storeManager.getMessageStore(request.getTopic(), request.getPartition());//如果为空,则返回未找到		if (store == null) {			return new BooleanCommand(HttpStatus.NotFound, "The topic `" + request.getTopic() + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque());		}		//获取topic对应分区最近可用的offsetfinal long offset = store.getNearestOffset(request.getOffset());		return new BooleanCommand(HttpStatus.Success, String.valueOf(offset), request.getOpaque());	}//退出请求业务逻辑处理	@Override	public void processQuitCommand(final QuitCommand request, final SessionContext ctx) {		try {			if (ctx.getConnection() != null) {				//关闭与客户端的连接				ctx.getConnection().close(false);			}		} catch (final NotifyRemotingException e) {			// ignore		}	}//版本查询请求业务逻辑处理@Override	public ResponseCommand processVesionCommand(final VersionCommand request, final SessionContext ctx) {//返回当前Broker版本		return new BooleanCommand(HttpStatus.Success, BuildProperties.VERSION, request.getOpaque());	}//统计请求查询业务逻辑处理	@Override	public ResponseCommand processStatCommand(final StatsCommand request, final SessionContext ctx) {//判断类型,如果类型以config 开头,则传输整个配置文件		final String item = request.getItem();		if ("config".equals(item)) {			return this.processStatsConfig(request, ctx);		} else {//如果是获取统计结果,则从统计模块获取响应结果并返回给客户端			final String statsInfo = this.statsManager.getStatsInfo(item);			return new BooleanCommand(HttpStatus.Success, statsInfo, request.getOpaque());		}	}	//获取配置文件内容,使用zeroCopy将文件内容发送到客户端,构造的响应用BooleanCommand@SuppressWarnings("resource")	private ResponseCommand processStatsConfig(final StatsCommand request, final SessionContext ctx) {		try {			final FileChannel fc = new FileInputStream(this.metaConfig.getConfigFilePath()).getChannel();			// result code length opaque\r\n			IoBuffer buf = IoBuffer.allocate(11 + 3 + ByteUtils.stringSize(fc.size()) + ByteUtils.stringSize(request.getOpaque()));			ByteUtils.setArguments(buf, MetaEncodeCommand.RESULT_CMD, HttpStatus.Success, fc.size(), request.getOpaque());			buf.flip();			ctx.getConnection().transferFrom(buf, null, fc, 0, fc.size(), request.getOpaque(),					new SingleRequestCallBackListener() {						@Override						public void onResponse(ResponseCommand responseCommand, Connection conn) {							this.closeChannel();						}						@Override						public void onException(Exception e) {							this.closeChannel();						}						private void closeChannel() {							try {								fc.close();							} catch (IOException e) {								log.error("IOException while stats config", e);							}						}						@Override						public ThreadPoolExecutor getExecutor() {							return null;						}					}, 5000, TimeUnit.MILLISECONDS);		} catch (FileNotFoundException e) {			log.error("Config file not found:" + this.metaConfig.getConfigFilePath(), e);			return new BooleanCommand(HttpStatus.InternalServerError, "Config file not found:" + this.metaConfig.getConfigFilePath(), request.getOpaque());		} catch (IOException e) {			log.error("IOException while stats config", e);			return new BooleanCommand(HttpStatus.InternalServerError, "Read config file error:" + e.getMessage(), request.getOpaque());		} catch (NotifyRemotingException e) {			log.error("NotifyRemotingException while stats config", e);		}		return null;	}





    《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...


    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...




    1. 日志收集:MetaQ可用于收集分布在各服务器上的日志,统一管理和分析,提高运维效率。 2. 数据同步:在分布式数据库或缓存系统中,MetaQ可以作为数据变更的传播通道,保证数据的一致性。 3. 异步处理:对于耗时...


    MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...

    Metaq在JDk 7下的异常及解决方案

    本文将深入解析这一异常的具体情况,分析其原因,并提出相应的解决方案。 异常现象主要表现为:在尝试清理内存映射文件时,由于Java反射机制调用了`java.nio.DirectByteBuffer`类中的`viewedBuffer()`方法,导致`...

    metaq-server- 和原版一样就是换了个名字

    最后,MetaQ还提供了丰富的监控和管理工具,包括日志分析、性能指标监控和Web管理界面,方便运维人员进行故障排查和系统调优。 综上所述,MetaQ服务器1.4.6.2版本在保持原有功能的基础上,可能针对性能、稳定性和...


    Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息持久化**:Metaq保证...


    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...


    MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...



    MetaQ 分布式消息服务中间件.pdf




    万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf

    ### 万亿级数据洪峰下的消息引擎——Apache RocketMQ #### 阿里消息中间件的演变历史 自2007年起,阿里巴巴集团在消息中间件领域不断探索与实践,经历了从Notify到MetaQ再到Apache RocketMQ的发展历程。以下是这一...


    - **事务支持**:MetaQ支持两种类型的事务——本地事务和XA分布式事务。这两种事务类型能够满足支付宝钱包系统在处理复杂金融交易时对数据一致性的需求。 - **高可用复制**:MetaQ提供了异步复制和同步复制两种模式...






    在开源的推动下,许多技术都得到了长足的发展,如LVS(Linux Virtual Server)、Tengine、MetaQ、dubbo、cobar、Fastjson等。这些技术的成功案例表明,开源不仅是技术的共享,也是知识和创新的共享。 蒋涛的讲话...

Global site tag (gtag.js) - Google Analytics