FetchManager:请求管理器接口。
既然是管理器,就需要知道管理的对象是什么?FetchRequest——管理的是一次次的请求。
既然是管理器,就需要给被管理者提供容所?FetchRequestQueue——请求delay queue。
既然是管理器,就需要有管理实施者?FetchRequestRunner——从请求队列中提取请求,然后处理。
既然是管理器,就需要知道管理了哪些事情?
① 启动请求处理器
② 停止请求处理器
③ 重置亦即初始化管理器
④ 添加请求
⑤ 获取请求总数
⑥ 标记处理器状态
管理器如何管理的?
①处理器 FetchRequestRunner
while (!this.stopped) {//只要处理器没有被终止,则不停的从请求队列中提取请求并处理 try { final FetchRequest request = SimpleFetchManager.this.requestQueue.take(); this.processRequest(request); } catch (final InterruptedException e) { // take响应中断,忽略 } }
void processRequest(final FetchRequest request) { try { final MessageIterator iterator = SimpleFetchManager.this.consumer.fetch(request, -1, null); final MessageListener listener = SimpleFetchManager.this.consumer.getMessageListener(request.getTopic()); final ConsumerMessageFilter filter = SimpleFetchManager.this.consumer.getMessageFilter(request.getTopic()); this.notifyListener(request, iterator, listener, filter, SimpleFetchManager.this.consumer.getConsumerConfig().getGroup()); } }
②请求队列 FetchRequestQueue
//队列中定义的线程用来等待队列的头元素。这种leader-follower设计模式的变种有助于使等待的时间最小化。当一个线程变为leader,它只需等待下一个delay的时间,而其他线程将无限期等待。 //leader线程在从take()或者poll()等等方法返回之前必须signal其他线程,除非这期间其他线程变成了leader线程。 //每当队列的头结点被更早到期时间的节点替代,leader失效被重新设置为null,其他的线程——不一定是当前的leader,将被signal。 //因此等待线程要时刻准备着获取leader或者丧失leader private Thread leader = null; public void offer(FetchRequest e) { final Lock lock = this.lock; lock.lock(); try { //如果已经关联了队列且不是关联本队列的请求,不予添加 if (e.getRefQueue() != null && e.getRefQueue() != this) { return; } // 请求关联本队列 e.setRefQueue(this); //入队、排序 this.queue.offer(e); Collections.sort(this.queue); // Leader is changed. if (this.queue.peek() == e) { this.leader = null; this.available.signal(); } }finally { lock.unlock(); } } public FetchRequest take() throws InterruptedException { final Lock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { FetchRequest first = this.queue.peek(); if (first == null) {//队列没有请求,等待 this.available.await(); } else { //当队列中存在延迟已到元素,则当前线程进行处理。只有当所有元素都延迟未到,才需要考虑leader问题 long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) {//请求延迟时间已到,出队接受处理 return this.queue.poll(); } else if (this.leader != null) {//当前线程为fllower,等待成为leader,其他线程已变为leader this.available.await(); } else { //当前线程设置为leader Thread thisThread = Thread.currentThread(); this.leader = thisThread; try { //leader线程等待剩余延迟时间 this.available.awaitNanos(delay); } finally { //延迟过后如果当前线程还是leader,则leader失效。重新争夺leader if (this.leader == thisThread) { this.leader = null; } } } } } } finally { //其他线程没有成为leader,并且队列不空,则唤醒一个等待者 if (this.leader == null && this.queue.peek() != null) { this.available.signal(); } lock.unlock(); } }
相关推荐
《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...
Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...
在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...
《MetaQ服务器1.4.6.2版的深度解析》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于提供高可靠、高可用的消息传输服务。在本文中,我们将深入探讨MetaQ Server 1.4.6.2版本的核心特性、架构设计以及使用...
MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...
《Metaq在JDK 7下的异常及其解决策略》 Metaq是一款高性能的消息中间件,广泛应用于分布式系统中,提供高效、稳定的消息传递服务。然而,在JDK 7环境下,Metaq可能会遇到一些运行异常,其中最常见的就是与物理文件...
《MetaQ服务器1.4.6.2版本详解》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。在1.4.6.2这个版本中,它继续保持着与原版一致的核心特性,提供高效、稳定、可扩展的...
《Metaq详细手册》 Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息...
MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...
Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。
MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...
阿里消息中间件MetaQ学习Demo
Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...
- **纯Java实现**:无论是通信层还是存储层,MetaQ均使用Java语言实现,这对于支付宝这样的大型企业而言非常重要,因为Java是业界广泛使用的编程语言之一,这意味着更容易找到熟练掌握该语言的开发人员。 - **事务...
阿里RocketMQ是一款开源的消息中间件,它在阿里巴巴集团内部广泛使用,并且被社区接纳成为Apache顶级项目。RocketMQ的设计目标是提供低延迟、高可靠、高可扩展的消息传递服务,适用于大规模分布式系统中的消息通讯。...
该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ...
Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码
7. **MetaQ与RocketMQ关系**:MetaQ 在 3.0 版本之后更名为 RocketMQ。 8. **JMS支持**:RocketMQ 支持 JMS 客户端 API,用户可以通过官方提供的 rocketmq-jmsclient 进行开发。 9. **启动异常**:如果启动 Broker...