`
ftj20003
  • 浏览: 132046 次
  • 性别: Icon_minigender_1
  • 来自: ...
社区版块
存档分类
最新评论

Mina的线程池实现分析(1)

    博客分类:
  • Java
阅读更多
    线程池是并发应用中,为了减少每个任务调用的开销增强性能而经常使用的技术。在mina中大量的使用这一技术,除了Executors的工厂方法构建线程池之外,它还继承自ThreadPoolExecutor提供自己的线程池的实现OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor。这两者主要应用于ExecutorFilter过滤器。这个过滤器是mina内部实现的众多过滤器之一,其主要作用是把I/O events提交给线程池同时处理同一个IOSession的事件,其默认的线程池的构造是前者。这两个线程池的区别就在于同时处理I/O事件时,前者能够保证同一个Session的事件的处理顺序,而后者则不能保证,所以有可能出现sessionClosed事件在messageReceived事件之前被处理。下面试着从代码解密其怎么保证事件处理顺序的。

    分析Mina的源码最大的感受就是其多线程应用的精细,每次从源码解决自己的疑问都有一种难以言喻的喜悦感。如果一般的框架的源码主要看设计结构的话,Mina的源码的精妙更在于具体的实现,虽然乍一看是复杂又混乱,呵呵。先看看部分源码吧:
public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
    ...
    private static final IoSession EXIT_SIGNAL = new DummySession();

    private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
    
    private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();

    private final Set<Worker> workers = new HashSet<Worker>();

    private final AtomicInteger idleWorkers = new AtomicInteger();

    private long completedTaskCount;
    private volatile boolean shutdown;
    ...
    private SessionTasksQueue getSessionTasksQueue(IoSession session) {
        SessionTasksQueue queue = (SessionTasksQueue) session.getAttribute(TASKS_QUEUE);

        if (queue == null) {
            queue = new SessionTasksQueue();
            SessionTasksQueue oldQueue = 
                (SessionTasksQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue);
            
            if (oldQueue != null) {
                queue = oldQueue;
            }
        }
        
        return queue;
    }

    private void addWorker() {
        synchronized (workers) {
            if (workers.size() >= super.getMaximumPoolSize()) {
                return;
            }

            // Create a new worker, and add it to the thread pool
            Worker worker = new Worker();
            Thread thread = getThreadFactory().newThread(worker);
            
            // As we have added a new thread, it's considered as idle.
            idleWorkers.incrementAndGet();
            
            // Now, we can start it.
            thread.start();
            workers.add(worker);

            if (workers.size() > largestPoolSize) {
                largestPoolSize = workers.size();
            }
        }
    }

    private class SessionTasksQueue {
        private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<Runnable>();
        
        private boolean processingCompleted = true;
    }
    ...
}

    首先看到的是OrderedThreadPoolExecutor的部分实现,其除了含有ThreadPoolExecutor的一些静态常量之外,这里列出了自身特有的几个变量。EXIT_SIGNAL是代表空的IOSession,如果此线程池得到的全部都是EXIT_SIGNAL,那么处理也就结束了。waitingSessions是存储可用I/O会话的队列,正是这个队列在后面多线程处理会话的I/O事件起到了有序的作用,这个数据机构在UnorderedThreadPoolExecutor内是没有的,后面可以看到其对待事件的粒度要比前者大。workers是一个Worker的集合,每一个Worker都实现了Runnable接口,线程池管理这些Worker线程执行并发的事件处理。很形象的命名,这些Worker说白了就是线程池内的打工仔。idleWorkers则负责及时的统计空闲的工人以便进一步的剥削,由于仅仅是数字在多线程下的增减所以使用atomic包的实现无疑是上佳的选择。最后就是SessionTasksQueue这个内部类了,这个内部的数据结构其实就是一个队列,负责每一个IOSession所对应的I/O事件的处理。

    这样通过waitingSessions区分IOSession,通过SessionTasksQueue区分每个IOSession的I/O事件这个两层结构就可以为有序处理提供了数据结构的保证。对比UnorderedThreadPoolExecutor,其仅仅提供I/O事件的存取队列LinkedBlockingQueue的实例,而不对IOSession进行区分。从上述的源码可以看到getSessionTasksQueue()方法会试图取特定IOSession相关联的事件队列,如果没有则为IOSession添加事件队列的属性。addWorker()则是启动新的Worker线程加入线程池的管理,这些操作不管是有序还是无序的线程池实现都基本一致。最能反映两者差别的就是线程池的execute()方法和其内部的Worker的run()的实现,通过使用不同的数据结构进行I/O事件的存取处理也就体现了两者的差别。先来看看有序的实现:
    public void execute(Runnable task) {
        ...
        IoEvent event = (IoEvent) task;
        
        IoSession session = event.getSession();
        
        SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
        Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
        
        boolean offerSession;

        boolean offerEvent = eventQueueHandler.accept(this, event);
        
        if (offerEvent) {
            synchronized (tasksQueue) {
                //*********************************1**********************************
                tasksQueue.offer(event);
                
                if (sessionTasksQueue.processingCompleted) {
                    sessionTasksQueue.processingCompleted = false;
                    offerSession = true;
                } else {
                    offerSession = false;
                }
                ...
                //*********************************end********************************
            }
        } else {
            offerSession = false;
        }

        if (offerSession) {
            waitingSessions.offer(session);
        }

        addWorkerIfNecessary();
        ...
    }

    这个方法可以简单的理解为提交I/O事件给线程池处理,也就是I/O事件的存储。前面说到有序的线程池的实现是采用了两层结构,所以代码很清晰,首先是找I/O事件对应的IOSession,然后找IOSession的事件队列属性(没有就创建)把事件添加到队列里面去。由于要保证代码段1的操作的原子性,所以使用了synchronized的锁机制。offerSession在这里的作用就是保证waitingSessions内的非EXIT_SIGNAL的IOSession是唯一的。addWorkerIfNecessary()则是在没有空闲工人处理事件的情况下添加新的人手(线程),最终还是调用addWorker()。再来看看无序线程池的实现:
    public void execute(Runnable task) {
        ...
        IoEvent e = (IoEvent) task;
        boolean offeredEvent = queueHandler.accept(this, e);
        if (offeredEvent) {
            getQueue().offer(e);
        }

        addWorkerIfNecessary();
        ...
    }

    这个就简单明了了,getQueue()获得是类的构造器提供的LinkedBlockingQueue的实例。offer()方法我之前分析过,属于非阻塞的入队实现,所以不管队列满不满都不会阻塞当前的线程。所以这个I/O事件的存储实际上不牵涉到IOSession的划分:同一个IOSession的多个I/O事件存于同一个队列里,不同的IOSession的I/O事件也都在这个队列里;而有序线程池的实现则是:同一个IOSession的多个I/O事件只存于同一个队列里,每一个IOSession都对应有自己的事件队列
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    Mina的线程池实现分析

    Mina的线程池实现分析主要集中在两个自定义线程池类:OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor。这两个线程池都是对Java的ThreadPoolExecutor的扩展,以满足Mina框架在处理I/O事件时的特定需求。 ...

    Mina源码解析

    1. **Mina的过滤器机制实现**: Mina的核心设计理念之一是过滤器链(Filter Chain),它借鉴了Servlet的过滤器模型。每个过滤器都可以在数据传输过程中进行拦截和处理,如数据编码、解码、安全检查等。过滤器之间...

    MINA2与Netty4比较分析

    接下来将根据标题和描述的要求详细分析Mina2与Netty4的区别,重点从它们的线程模型、Buffer使用以及Netty4中集成的序列化工具ProtoBuf等方面进行比较。 首先,Mina2和Netty4都是异步事件驱动的网络应用框架。Netty4...

    udp.rar_MINA udp_android mina UDP_mina_mina u

    在Android上使用MINA实现UDP通信,你需要创建一个UDPServerBootstrap实例,配置线程池、处理器、以及UDP协议解码器和编码器。解码器将接收到的原始字节数组转换为应用级别的对象,而编码器则反之。接下来,注册一个...

    mina权威性能测试例子

    Apache Mina是一个开源的网络通信应用框架,主要应用于构建高性能、高可用性的网络服务器和客户端。...通过这样的测试,我们可以深入理解Mina如何在实际应用中实现高性能和高可扩展性,并为优化和改进提供依据。

    mina源代码学习提供下载

    1. **非阻塞I/O**:MINA基于Java NIO(Non-blocking Input/Output)库,实现了事件驱动和异步通信模型。这种模式可以高效地处理大量并发连接,降低CPU负载。 2. **协议无关性**:MINA并不关心具体的网络协议,如TCP...

    Apache Mina 入门Demo

    1. **Mina架构**:Apache Mina的核心设计基于事件驱动和非阻塞I/O模型,这种模型特别适合处理大量并发连接。它将网络通信层抽象为一组服务,如TCP/IP协议栈,让你专注于业务逻辑而不是底层细节。 2. **IoSession...

    Mina长连接短连接实例

    通过分析Minaclient和MinaHost工程,我们可以学习如何配置和使用这些组件,实现基于长连接的网络通信。同时,提供的文档将有助于我们更深入地理解Mina的工作原理和最佳实践。 总结来说,Apache Mina为开发者提供了...

    mina网络通信实例

    1. **选择合适的Transport类型**:MINA提供了多种传输层实现,如Socket Transport用于TCP协议,Datagram Protocol用于UDP协议。 2. **配置Acceptor**:Acceptor是服务器端接收连接的组件,通过设置监听端口和绑定...

    mina服务端例子

    1. **Mina框架介绍**: Mina提供了事件驱动和异步处理模型,使得开发者可以专注于业务逻辑,而无需关心底层网络通信的复杂性。它支持多种传输协议,包括TCP、UDP、SSL/TLS等,同时提供了过滤器链机制,允许在数据...

    MINA学习总结

    1. 创建服务端Bootstrap,设置线程池、选择器、处理器等。 2. 绑定端口,启动服务端监听。 3. 定义过滤器链,实现业务逻辑。 4. 创建客户端Connector,设置目标地址和连接参数。 5. 连接服务端,进行数据交互。 ...

    mina框架demo

    - **配置文件**:如`mina.xml`,用于配置MINA的服务端口、连接超时、线程池大小等参数。 - **Java源代码**:包括服务器端和客户端的实现,如`ServerHandler.java`、`ClientHandler.java`,分别处理来自客户端的连接...

    Mina 使用DEMO

    Mina 提供了一种抽象层,简化了网络编程,允许开发者专注于业务逻辑,而无需关心底层协议的实现细节。本DEMO将帮助我们深入理解Mina的工作原理及其在实际应用中的使用。 ### 1. Mina 框架的核心概念 - **事件驱动...

    MINA2 教程 socket

    在这个"MINA2 教程 socket"中,我们将探讨如何使用MINA2来实现服务器与客户端之间的通信,并且支持同时在线连接,以及如何利用`newFixedThreadPool`线程池来优化处理并发连接。 1. **MINA2基本概念**: MINA2的...

    apache mina

    - Mina4_IoFilter和IoHandler的区别和联系.pdf将对比分析两者的关系和用法。 - Mina5_配置Mina的线程模型.pdf将讲解如何定制MINA的线程池配置。 通过对这些文档的学习,你可以全面掌握Apache MINA框架,从基础到...

    Mina2.0学习笔记(重点)

    ##### 3.3 Mina内部实现分析 1. **线程模型**:深入剖析Mina的线程模型,了解其如何管理线程资源以支持高并发。 2. **数据流处理**:研究Mina的数据处理机制,包括数据缓存、队列管理等。 ##### 3.4 Mina的线程...

    Apache_Mina_Server_2.0_V1.0.rar_apache_apache中文手册_mina

    1. **Mina 框架概述**:介绍 Mina 的设计理念、架构和组件,包括IoSession、Filter Chain、Protocol Decoder 和 Encoder 等概念。 2. **事件驱动模型**:详细解释 Mina 如何基于事件模型进行工作,包括连接建立、...

Global site tag (gtag.js) - Google Analytics