`

metaq源码解读之FetchManager

 
阅读更多

 FetchManager:请求管理器接口。

        既然是管理器,就需要知道管理的对象是什么?FetchRequest——管理的是一次次的请求。

        既然是管理器,就需要给被管理者提供容所?FetchRequestQueue——请求delay queue。

        既然是管理器,就需要有管理实施者?FetchRequestRunner——从请求队列中提取请求,然后处理。

        既然是管理器,就需要知道管理了哪些事情?

               ① 启动请求处理器

               ② 停止请求处理器

               ③ 重置亦即初始化管理器

               ④ 添加请求

               ⑤ 获取请求总数

               ⑥ 标记处理器状态

        管理器如何管理的?

               ①处理器 FetchRequestRunner

while (!this.stopped) {//只要处理器没有被终止,则不停的从请求队列中提取请求并处理
    try {
          final FetchRequest request = SimpleFetchManager.this.requestQueue.take();
          this.processRequest(request);
    } catch (final InterruptedException e) {
          // take响应中断,忽略
     }
}

 

void processRequest(final FetchRequest request) {
     try {
           final MessageIterator iterator = SimpleFetchManager.this.consumer.fetch(request, -1, null);
           final MessageListener listener =
SimpleFetchManager.this.consumer.getMessageListener(request.getTopic());
           final ConsumerMessageFilter filter =
SimpleFetchManager.this.consumer.getMessageFilter(request.getTopic());
           this.notifyListener(request, iterator, listener, filter, SimpleFetchManager.this.consumer.getConsumerConfig().getGroup());
     }
}

                 ②请求队列 FetchRequestQueue

//队列中定义的线程用来等待队列的头元素。这种leader-follower设计模式的变种有助于使等待的时间最小化。当一个线程变为leader,它只需等待下一个delay的时间,而其他线程将无限期等待。
//leader线程在从take()或者poll()等等方法返回之前必须signal其他线程,除非这期间其他线程变成了leader线程。
//每当队列的头结点被更早到期时间的节点替代,leader失效被重新设置为null,其他的线程——不一定是当前的leader,将被signal。
//因此等待线程要时刻准备着获取leader或者丧失leader
private Thread leader = null;

public void offer(FetchRequest e) {
    final Lock lock = this.lock;
    lock.lock();
    try {
       //如果已经关联了队列且不是关联本队列的请求,不予添加
       if (e.getRefQueue() != null && e.getRefQueue() != this) {
           return;
       }
       // 请求关联本队列
       e.setRefQueue(this);
       //入队、排序
       this.queue.offer(e);
       Collections.sort(this.queue);
       // Leader is changed.
       if (this.queue.peek() == e) {
           this.leader = null;
           this.available.signal();
       }
   }finally {
       lock.unlock();
   }
}
public FetchRequest take() throws InterruptedException {
        final Lock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                FetchRequest first = this.queue.peek();
                if (first == null) {//队列没有请求,等待
                    this.available.await();
                }
                else {
                    //当队列中存在延迟已到元素,则当前线程进行处理。只有当所有元素都延迟未到,才需要考虑leader问题
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0) {//请求延迟时间已到,出队接受处理
                        return this.queue.poll();
                    }
                    else if (this.leader != null) {//当前线程为fllower,等待成为leader,其他线程已变为leader
                        this.available.await();
                    }
                    else {
                        //当前线程设置为leader
                        Thread thisThread = Thread.currentThread();
                        this.leader = thisThread;
                        try {
                            //leader线程等待剩余延迟时间
                            this.available.awaitNanos(delay);
                        }
                        finally {
                            //延迟过后如果当前线程还是leader,则leader失效。重新争夺leader
                            if (this.leader == thisThread) {
                                this.leader = null;
                            }
                        }
                    }
                }
            }
        }
        finally {
            //其他线程没有成为leader,并且队列不空,则唤醒一个等待者
            if (this.leader == null && this.queue.peek() != null) {
                this.available.signal();
            }
            lock.unlock();
        }
    }

 

分享到:
评论

相关推荐

    metamorphosis(metaq)

    《Metamorphosis (MetaQ) 服务端1.4.3版本详解及客户端使用》 Metamorphosis,简称MetaQ,是一款高效、稳定、可扩展的消息队列系统,由阿里巴巴开发并开源,主要用于解决分布式环境下的异步处理、解耦以及数据传输...

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaQ向spark传数据

    在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...

    metaq-server-1.4.6.2.tar.gz

    《MetaQ服务器1.4.6.2版的深度解析》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于提供高可靠、高可用的消息传输服务。在本文中,我们将深入探讨MetaQ Server 1.4.6.2版本的核心特性、架构设计以及使用...

    metaq-server-1.4.6.2客户端+服务端

    MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...

    Metaq在JDk 7下的异常及解决方案

    《Metaq在JDK 7下的异常及其解决策略》 Metaq是一款高性能的消息中间件,广泛应用于分布式系统中,提供高效、稳定的消息传递服务。然而,在JDK 7环境下,Metaq可能会遇到一些运行异常,其中最常见的就是与物理文件...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    《MetaQ服务器1.4.6.2版本详解》 MetaQ是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。在1.4.6.2这个版本中,它继续保持着与原版一致的核心特性,提供高效、稳定、可扩展的...

    Metaq详细手册.docx

    《Metaq详细手册》 Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息...

    MetaQ 分布式消息服务中间件.pdf

    MetaQ是一款分布式消息服务中间件,其核心功能基于发布-订阅模型。在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者...

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    metaQ的安装包

    MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...

    阿里消息中间件MetaQ学习Demo.zip

    阿里消息中间件MetaQ学习Demo

    支付宝之所以牛逼的原因:来看内部架构剖析

    Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...

    支付宝钱包系统架构内部剖析(架构图)

    - **纯Java实现**:无论是通信层还是存储层,MetaQ均使用Java语言实现,这对于支付宝这样的大型企业而言非常重要,因为Java是业界广泛使用的编程语言之一,这意味着更容易找到熟练掌握该语言的开发人员。 - **事务...

    阿里rocketMQ

    阿里RocketMQ是一款开源的消息中间件,它在阿里巴巴集团内部广泛使用,并且被社区接纳成为Apache顶级项目。RocketMQ的设计目标是提供低延迟、高可靠、高可扩展的消息传递服务,适用于大规模分布式系统中的消息通讯。...

    Storm项目:流数据监控(下)

    该文档为storm模拟项目系列文档之一,是MetaQ与storm接口的说明文档,主要介绍了如何集成MetaQ到项目代码中。软件(阿里),其对应的许多技术文档还是比较容易看的,并且Github提供了许多的应用实例,所以使用MetaQ...

    Metamorphosis, 一种高可用高性能的分布式.zip

    Metamorphosis, 一种高可用高性能的分布式 #新闻MetaQ 1.4.6.2 发布。更新日志MetaQ 1.4.6.1 发布。更新日志MetaQ 1.4.5.1 发布。更新日志MetaQ 1.4.5发布。更新日志meta: 一个用于的ruby 客户端。 源代码

    RocketMQ群问题整理

    7. **MetaQ与RocketMQ关系**:MetaQ 在 3.0 版本之后更名为 RocketMQ。 8. **JMS支持**:RocketMQ 支持 JMS 客户端 API,用户可以通过官方提供的 rocketmq-jmsclient 进行开发。 9. **启动异常**:如果启动 Broker...

Global site tag (gtag.js) - Google Analytics