`
ftj20003
  • 浏览: 132064 次
  • 性别: Icon_minigender_1
  • 来自: ...
社区版块
存档分类
最新评论

Mina的线程池实现分析(2)

    博客分类:
  • Java
阅读更多
    分析了I/O事件的存储,下面看看多个Worker同时工作时I/O事件的取得过程。首先看看有序的Worker的实现:
   private class Worker implements Runnable {

        private volatile long completedTaskCount;
        private Thread thread;
        
        public void run() {
            thread = Thread.currentThread();

            try {
                for (;;) {
                    IoSession session = fetchSession();

                    idleWorkers.decrementAndGet();

                    if (session == null) {
                        synchronized (workers) {
                            if (workers.size() > getCorePoolSize()) {
                                // Remove now to prevent duplicate exit.
                                workers.remove(this);
                                break;
                            }
                        }
                    }

                    if (session == EXIT_SIGNAL) {
                        break;
                    }

                    try {
                        if (session != null) {
                            runTasks(getSessionTasksQueue(session));
                        }
                    } finally {
                        idleWorkers.incrementAndGet();
                    }
                }
            } finally {
                synchronized (workers) {
                    workers.remove(this);
                    OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
                    workers.notifyAll();
                }
            }
        }

        private IoSession fetchSession() {
            IoSession session = null;
            long currentTime = System.currentTimeMillis();
            long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
            for (;;) {
                try {
                    long waitTime = deadline - currentTime;
                    if (waitTime <= 0) {
                        break;
                    }

                    try {
                        session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
                        break;
                    } finally {
                        if (session == null) {
                            currentTime = System.currentTimeMillis();
                        }
                    }
                } catch (InterruptedException e) {
                    // Ignore.
                    continue;
                }
            }
            return session;
        }

        private void runTasks(SessionTasksQueue sessionTasksQueue) {
            for (;;) {
                Runnable task;
                Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
                
                synchronized (tasksQueue) {
                    task = tasksQueue.poll();
                    
                    if (task == null) {
                        sessionTasksQueue.processingCompleted = true;
                        break;
                    }
                }

                eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);

                runTask(task);
            }
        }

        private void runTask(Runnable task) {
            beforeExecute(thread, task);
            boolean ran = false;
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                completedTaskCount ++;
            } catch (RuntimeException e) {
                if (!ran) {
                    afterExecute(task, e);
                }
                throw e;
            }
        }
    }

    Worker的run()上来就是个无限循环,如果工人多了,则当前的Worker被就地裁员;如果没有可以处理的IOSession的事件了,则这个工人可以跳出循环然后不等休息就被裁员,这段代码的实现基本上体现了资本主义世界下公司的作风。开始干活的第一件事就是fetchSession()获取可用的IOSession,然后就是runTasks(getSessionTasksQueue(session))了--获取IOSession对应的I/O事件然后一个个的runTask()处理任务。有序的奥妙在这里完全的暴露出来:每个Worker都是先得到IOSession,session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS)采用了阻塞一定时间的方式获取可用的session,每一个Worker的session都是唯一的,当然除了EXIT_SIGNAL。这样每一个Worker按照对应的session的事件队列tasksQueue的事件顺序执行每一个事件,保证了有序性。再看看无序的实现:
    private class Worker implements Runnable {

        private volatile long completedTaskCount;
        private Thread thread;

        public void run() {
            thread = Thread.currentThread();

            try {
                for (;;) {
                    Runnable task = fetchTask();

                    idleWorkers.decrementAndGet();

                    if (task == null) {
                        synchronized (workers) {
                            if (workers.size() > corePoolSize) {
                                // Remove now to prevent duplicate exit.
                                workers.remove(this);
                                break;
                            }
                        }
                    }

                    if (task == EXIT_SIGNAL) {
                        break;
                    }

                    try {
                        if (task != null) {
                            queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
                            runTask(task);
                        }
                    } finally {
                        idleWorkers.incrementAndGet();
                    }
                }
            } finally {
                synchronized (workers) {
                    workers.remove(this);
                    UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
                    workers.notifyAll();
                }
            }
        }

        private Runnable fetchTask() {
            Runnable task = null;
            long currentTime = System.currentTimeMillis();
            long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
            for (;;) {
                try {
                    long waitTime = deadline - currentTime;
                    if (waitTime <= 0) {
                        break;
                    }

                    try {
                        task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
                        break;
                    } finally {
                        if (task == null) {
                            currentTime = System.currentTimeMillis();
                        }
                    }
                } catch (InterruptedException e) {
                    // Ignore.
                    continue;
                }
            }
            return task;
        }

        private void runTask(Runnable task) {
            beforeExecute(thread, task);
            boolean ran = false;
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                completedTaskCount ++;
            } catch (RuntimeException e) {
                if (!ran) {
                    afterExecute(task, e);
                }
                throw e;
            }
        }
    }

    从源码一眼就能看出差别,这个线程池的Worker是吃大锅饭的。开始干活的第一件事是fetchTask()取得I/O事件,然后就是runTask()处理事件。后者与有序的实现基本一致,而fetchTask()则暴露了所有的Worker都是从同一个队列取事件,而不像有序实现那样每一个Worker都有自己的一个专有的锅。fetchTask()的实现使得不同的Worker可能取得同一个IOSession的I/O事件,而这些事件的处理则完全听天由命的取决于Woker的快准狠!从而可能造成sessionClosed事件在messageReceived事件之前被处理。
   
    另外就是有序的实现中虽然每个IOSession对应的队列是ConcurrentLinkedQueue的实例,支持无锁得并发访问。但是在入队和出队的操作时都是使用了synchronized的机制进行访问,主要原因我想一方面是要保证一系列操作的原子性,另一方面其本身就是无锁的实现,所以保证前者的情况下并不会使性能下降多少。
   
    至此简单的分析了一下mina内部线程池有序和无序的实现,不得不说这个设计还是很精妙的。当然设计是简单的具体的实现要充分的考虑多线程的访问,还是有一定的复杂性的。
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    Mina的线程池实现分析

    Mina的线程池实现分析主要集中在两个自定义线程池类:OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor。这两个线程池都是对Java的ThreadPoolExecutor的扩展,以满足Mina框架在处理I/O事件时的特定需求。 ...

    MINA2与Netty4比较分析

    接下来将根据标题和描述的要求详细分析Mina2与Netty4的区别,重点从它们的线程模型、Buffer使用以及Netty4中集成的序列化工具ProtoBuf等方面进行比较。 首先,Mina2和Netty4都是异步事件驱动的网络应用框架。Netty4...

    Mina源码解析

    2. **Mina的事件模型**: Mina采用事件驱动的编程模型,通过I/O事件(如连接建立、数据读写、连接关闭等)触发相应的处理器。这些事件由IoAdapter抽象类进行封装,并由IoHandler接口处理。开发者可以通过继承...

    udp.rar_MINA udp_android mina UDP_mina_mina u

    在Android上使用MINA实现UDP通信,你需要创建一个UDPServerBootstrap实例,配置线程池、处理器、以及UDP协议解码器和编码器。解码器将接收到的原始字节数组转换为应用级别的对象,而编码器则反之。接下来,注册一个...

    MINA2 教程 socket

    在这个"MINA2 教程 socket"中,我们将探讨如何使用MINA2来实现服务器与客户端之间的通信,并且支持同时在线连接,以及如何利用`newFixedThreadPool`线程池来优化处理并发连接。 1. **MINA2基本概念**: MINA2的...

    mina权威性能测试例子

    2. **协议处理**:Mina支持多种网络协议,如TCP、UDP和SSL/TLS等,开发者可以根据需求选择合适的协议进行通信。在测试中,需要确保所选协议在高并发下仍能保持稳定性和效率。 3. **线程模型**:线程池配置对于性能...

    mina源代码学习提供下载

    通过阅读和分析MINA的源代码,你可以更深入地了解其内部工作原理,学习如何利用MINA构建高性能的网络服务。这包括但不限于理解过滤器链的执行流程、如何自定义协议解析器以及如何优化MINA的配置以适应不同场景的需求...

    mina网络通信实例

    《MINA网络通信实例解析》 MINA(Multipurpose Infrastructure for Network Applications)是Apache软件基金会的一个开源项目,它...在实际应用中,结合源码分析和工具辅助,可以更深入地理解和利用MINA的强大功能。

    Apache Mina 入门Demo

    通过分析这个Demo,我们可以了解到如何实际搭建一个简单的Mina应用。 8. **实战经验**:博主royal2xiaose的博客文章可能分享了如何使用Mina创建一个基础的网络服务,包括项目的初始化、编写过滤器、处理数据等步骤...

    Mina长连接短连接实例

    通过分析Minaclient和MinaHost工程,我们可以学习如何配置和使用这些组件,实现基于长连接的网络通信。同时,提供的文档将有助于我们更深入地理解Mina的工作原理和最佳实践。 总结来说,Apache Mina为开发者提供了...

    mina服务端例子

    在这个“Mina服务端例子”中,我们主要探讨的是如何使用Mina框架来实现一个基于Socket的非阻塞I/O(NIO)服务端。 1. **Mina框架介绍**: Mina提供了事件驱动和异步处理模型,使得开发者可以专注于业务逻辑,而...

    MINA学习总结

    在实际项目中,结合IDE的调试功能,可以更深入地分析MINA的运行过程,优化性能。 总之,MINA是一个强大且灵活的网络应用框架,它不仅提供了高效的异步I/O处理能力,还简化了网络服务的开发流程。通过深入学习和实践...

    mina框架demo

    - **配置文件**:如`mina.xml`,用于配置MINA的服务端口、连接超时、线程池大小等参数。 - **Java源代码**:包括服务器端和客户端的实现,如`ServerHandler.java`、`ClientHandler.java`,分别处理来自客户端的连接...

    Mina 使用DEMO

    Mina 提供了一种抽象层,简化了网络编程,允许开发者专注于业务逻辑,而无需关心底层协议的实现细节。本DEMO将帮助我们深入理解Mina的工作原理及其在实际应用中的使用。 ### 1. Mina 框架的核心概念 - **事件驱动...

    apache mina

    - Mina3_与IoHandler相关的几个类.pdf和Mina2_与IoFilter相关的几个类.pdf将深入探讨IoHandler和IoFilter的具体实现类及其功能。 - Mina4_IoFilter和IoHandler的区别和联系.pdf将对比分析两者的关系和用法。 - ...

    Mina2.0学习笔记(重点)

    ##### 3.3 Mina内部实现分析 1. **线程模型**:深入剖析Mina的线程模型,了解其如何管理线程资源以支持高并发。 2. **数据流处理**:研究Mina的数据处理机制,包括数据缓存、队列管理等。 ##### 3.4 Mina的线程...

    Apache_Mina_Server_2.0_V1.0.rar_apache_apache中文手册_mina

    2. **事件驱动模型**:详细解释 Mina 如何基于事件模型进行工作,包括连接建立、数据传输、连接关闭等事件的处理机制。 3. **非阻塞 I/O**:阐述非阻塞I/O的工作原理,以及如何通过 NIO(Non-blocking Input/Output...

Global site tag (gtag.js) - Google Analytics