`
wslfh2005
  • 浏览: 13255 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

【端午节礼物大放送】深入解析Apache Mina源码(3)——Mina的线程池模型

阅读更多


1、深入解析Apache Mina源码(1)——Mina的过滤器机制实现

2、深入解析Apache Mina源码(2)——Mina的事件模型

3、深入解析Apache Mina源码(3)——Mina的线程池模型

 

一、生产者消费者问题

 

做为苦逼的程序员的我们基本没有不知道生产者消费者问题的,这个经典的问题充分体现了进程同步的问题,还是简单的说下它的概念,生产者和消费者是两个线程,生产者线程生产物品放到空的缓冲区内(可能是一个list,消费者线程从缓冲区内取出物品进行消费并释放缓冲区,缓冲区有个固定大小,当生产者线程将缓冲区填充满时,生产者线程处于等待状态,等待消费者线程消费;当缓冲区消费空了后,消费者线程处于等待状态,等待生产者线程进行生产。当然生产者和消费者也可以有多个线程充当,但是操作的进程地址空间却只能是同一个。

这个经典的问题体现了多线程编程的一些要注意的地方,比如对同一资源进行访问所产生的互斥和同步问题。

下面看下对生产者消费者问题的实现。

物品类:

package com.lifanghu.procon;

/**
 * 食物
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-22 上午08:13:34
 * @name com.lifanghu.procon.Food.java
 * @version 1.0
 */

public class Food {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

}

 

 缓冲区:

package com.lifanghu.procon;

import java.util.ArrayList;
import java.util.List;

/**
 * 容器,缓冲区
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-22 上午08:33:56
 * @name com.lifanghu.procon.Container.java
 * @version 1.0
 */

public class Container {

    //缓冲区大小 
    private int size;
    private List<Food> foods;

    public Container(int size) {
        this.size = size;
        foods = new ArrayList<Food>(size);
    }

    public synchronized void poll(Food food) {
        while (foods.size() >= size) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        foods.add(food);
        notifyAll();
    }
    public synchronized Food offer() {
        Food food = null;
        while (foods.size() == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        food = foods.remove(foods.size() - 1);
        notifyAll();
        return food;
    }
}

 

 生产者:

package com.lifanghu.procon;

/**
 * 生产者
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-22 上午08:13:26
 * @name com.lifanghu.procon.Producer.java
 * @version 1.0
 */

public class Producer implements Runnable {

    private Container container;

    public Producer(Container container) {
        super();
        this.container = container;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            Food food = new Food();
            food.setName("馒头" + i);
            System.out.println("生产者生产出" + food.getName());
            container.poll(food);
            try {
                Thread.sleep((long) (Math.random() * 3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

 消费者:

package com.lifanghu.procon;

/**
 * 消费者
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-22 上午08:13:52
 * @name com.lifanghu.procon.Consumer.java
 * @version 1.0
 */

public class Consumer implements Runnable {

    private Container container;

    public Consumer(Container container) {
        super();
        this.container = container;
    }

    public void run() {
        for (;;) {
            Food food = container.offer();
            try {
                Thread.sleep((long) (Math.random() * 3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (food != null) {
                System.out.println(food.getName() + "被消费!");
            }
        }
    }
}

 

 测试类:

package com.lifanghu.procon;

/**
 * 客户端测试类
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-22 上午08:13:59
 * @name com.lifanghu.procon.Client.java
 * @version 1.0
 */

public class Client {

    public static void main(String[] args) {
        Container container = new Container(5);
        Thread producer1 = new Thread(new Producer(container));
        // Thread producer2 = new Thread(new Producer(container));
        // producer2.start();
        Thread consumer1 = new Thread(new Consumer(container));
        producer1.start();
        consumer1.start();
    }
}

 

 输出结果: 

生产者生产出馒头0
馒头0被消费!
生产者生产出馒头1
馒头1被消费!
生产者生产出馒头2
生产者生产出馒头3
生产者生产出馒头4
馒头2被消费!
生产者生产出馒头5
馒头4被消费!
馒头5被消费!
生产者生产出馒头6
生产者生产出馒头7
馒头3被消费!
馒头7被消费!
馒头6被消费!
生产者生产出馒头8
馒头8被消费!
生产者生产出馒头9
馒头9被消费!

 

二、 线程池及实现

 

上面我们讲到了生产者消费者的问题,那么这和线程池有什么关系呢?其实线程池的实现就是生产者消费者问题的实现,理解了生产者消费者问题就不会对线程池的实现感到神秘了,线程池在很多地方会用到,比如tomcat等各种中间容器的实现,Spring对线程池的支持等,当然mina中也使用到了线程池的概念。至于为什么要用到线程池,网上文章很多,基本是操作系统支持的线程数有限,线程的创建关闭有很大的系统开销,线程的切换也会影响系统性能等等。

下面这个图就是线程池的基本原理图,看看是不是和生产者消费者问题一样。


 

看下简单对线程池的实现代码,主要包括三个类,一个是线程池,一个是工作任务,一个是客户端进行任务添加。

任务类,比较简单,实现Runnable接口:

package com.lifanghu.threadpool;
//任务类,具体要执行的操作
public class Worker implements Runnable {
    private int id;
    public Worker(int id) {
        this.id = id;
    }
    public void run() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务" + id);
    }
}

 

线程池,相对复杂一些,但是原理是很简单的:

package com.lifanghu.threadpool;
import java.util.LinkedList;
/**
 * 线程池实现
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-22 下午03:31:47
 * @name com.lifanghu.threadpool.ThreadPool.java
 * @version 1.0
 */
public class ThreadPool {
    // 线程池大小
    private final int nThreads;
    // 线程池工作者(具体线程)
    private final PoolWorker[] threads;
    // 任务队列
    private final LinkedList<Runnable> queue;
    public ThreadPool(int nThreads) {
        // 初始线程池,并启动线程池里面的线程
        this.nThreads = nThreads;
        queue = new LinkedList<Runnable>();
        threads = new PoolWorker[nThreads];
        for (int i = 0; i < nThreads; i++) {
            threads[i] = new PoolWorker();
            threads[i].start();
        }
    }
    // 提交工作任务,实际将任务放入队列,并通知线程进行消费
    public void execute(Runnable r) {
        synchronized (queue) {
            queue.addLast(r);
            queue.notify();
        }
    }

    private class PoolWorker extends Thread {
        public void run() {
            Runnable r;
            // 循环取出任务队列里的任务进行消费,如果没有任务,就等待任务到来。
            while (true) {
                synchronized (queue) {
                    while (queue.isEmpty()) {
                        try {
                            queue.wait();
                        } catch (InterruptedException ignored) {
                        }
                    }
                    r = queue.removeFirst();
                }
                try {
                    r.run();
                } catch (RuntimeException e) {
                }
            }
        }
    }
}
  

看下客户端的调用代码:

package com.lifanghu.threadpool;
/**
 * 客户端测试类
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-22 下午03:25:36
 * @name .Client.java
 * @version 1.0
 */
public class Client {
    public static void main(String[] args) {
        ThreadPool queue = new ThreadPool(10);
        // 提交工作任务。
        queue.execute(new Worker(1));
        queue.execute(new Worker(2));
        queue.execute(new Worker(3));
    }
}
  

观察输出结果:

线程:Thread-1 执行任务1
线程:Thread-5 执行任务3
线程:Thread-3 执行任务2

  怎么样,感觉是不是很easy呢?咱们的线程池实现其实比较简单的,但是实际应用中我们用线程池比较常见的方式还是使用JDK中对线程池的实现,它提供了ExecutorServiceExecutor等类实现了对线程池的支持,不过线程池的实现原理其实是和我们的一样的,只不过它更多的考虑了实现细节,功能更强一些,关于它的使用网上有很多文章讲的已经很清楚了,可以参考:http://mshijie.iteye.com/blog/366591

 

三、Mina中的线程池模型

 

前面讲了生产者消费者问题以及由此引出的线程池的实现问题,那么现在我们来看下实际开源项目mina中是怎么使用线程池模型的。

Mina中的线程池使用主要有四个地方:

1、IoAcceptor线程池。

2、IoConnector线程池。

3、IoProcessor线程池。

4、过滤器类ExecutorFilter线程池。

 

一、先说下IoAcceptorIoConnector线程池,它俩的实现类都继承了AbstractIoService类,而Executor也是定义在这个类里面的,所以使用线程池的方式是一样的。

先看下AbstractIoService类关于线程池的初始化,它的初始化是在构造方法里面进行的:

 

if (executor == null) {
    //默认的线程池:可缓存的线程池
    this.executor = Executors.newCachedThreadPool();
    createdExecutor = true;
} else {
    this.executor = executor;
    createdExecutor = false;
}

//重新设定的线程名称
threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();

 

下面是提交作业任务的方法:

 

protected final void executeWorker(Runnable worker, String suffix) {
    String actualThreadName = threadName;
    if (suffix != null) {
        actualThreadName = actualThreadName + '-' + suffix;
    }
    // 向线程池中提交任务。
    executor.execute(new NamePreservingRunnable(worker, actualThreadName));
}
 

 

对于IoAcceptor的任务提交调用是在bindunbind方法实现中的,看下bind最终调用,在类AbstractPollingIoAcceptor的 startupAcceptor方法中:

 

// start the acceptor if not already started
Acceptor acceptor = acceptorRef.get();

if (acceptor == null) {
    acceptor = new Acceptor();

    if (acceptorRef.compareAndSet(null, acceptor)) {
        //放入工作线程池中,供异步执行。
        executeWorker(acceptor);
    }
}

 

 再来看内部Acceptor,它作为接收者任务类,执行端口的绑定,通道的注册操作等。

 

//实际的注册端口方法
nHandles += registerHandles();

 registerHandles方法中关于注册端口的方法:

 

try {
    // Process all the addresses
    for (SocketAddress a : localAddresses) {
        //注册端口,最终调用低层的注册方法,参考类NioSocketAcceptor
        H handle = open(a);
        newHandles.put(localAddress(handle), handle);
    }
  

unbind方法和bind方法的调用很类似,这里就不说了。

再看下IoConnector,它最终是在方法connect时会提交任务,看下AbstractPollingIoConnector类的startupWorker方法:

 

if (connector == null) {
    connector = new Connector();
    
    if (connectorRef.compareAndSet(null, connector)) {
        //提交执行任务
        executeWorker(connector);
    }
}

 

 对于IoAcceptorIoConnector线程池的线程池大小,一般来说一个对象里面只有一个线程池,一个线程池里面一般有一个线程,当然如果你的连接或者监听比较多时可能会自动增加线程,这个就看线程池自己分配了。

 

二、关于IoProcessor线程池。IoProcessor里面使用线程池的方式和上面两个使用方式很相似,代码都非常类似,看下AbstractPollingIoProcessor类的startupProcessor方法:

 

private void startupProcessor() {
    Processor processor = processorRef.get();

    if (processor == null) {
        processor = new Processor();

        if (processorRef.compareAndSet(null, processor)) {
            //添加执行任务。
            executor.execute(new NamePreservingRunnable(processor, threadName));
        }
    }
    // Just stop the select() and start it again, so that the processor
    // can be activated immediately.
    wakeup();
}

 

 它的大小是在SimpleIoProcessorPool中定义的,默认是CPU核数加1,代码如下:

 

//默认的大小为CPU核数加1
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;

 

 最终调用其实还是调用的AbstractPollingIoProcessor里面的执行线程,可以看下SimpleIoProcessorPool的构造方法:

 

try {
    processorConstructor = processorType.getConstructor(ExecutorService.class);
    //最终还是调用AbstractPollingIoProcessor进行数据处理的。
pool[0] = processorConstructor.newInstance(this.executor);
……
// Constructor found now use it for all subsequent instantiations
for (int i = 1; i < pool.length; i++) {
    try {
        if (usesExecutorArg) {
            pool[i] = processorConstructor.newInstance(this.executor);
        } else {
            pool[i] = processorConstructor.newInstance();
        }
    } catch (Exception e) {
        // Won't happen because it has been done previously
    }
}

 

 我们可以看到在有个这样的变量:

 

    /** The pool table */
    private final IoProcessor<S>[] pool;

 

 从这个变量我们可以发现mina的线程池模型是以多个newCachedThreadPool存在的,至于mina为什么要这样处理,这里我也不得而知,如果哪位知道的话可以一起讨论……

 

三、ExecutorFilter类中的线程池。这是一个可选的线程池,是加在过滤器当中的。我们一般选择加在过滤器的最后面,这样Handler里面的业务处理就可以在线程池里面进行处理了。它的默认大小是16

 

    /** The default pool size */
    private static final int DEFAULT_MAX_POOL_SIZE = 16;

 

 看下Executor的创建方式:

 

    private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
        // Create a new Executor
        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
            keepAliveTime, unit, threadFactory, queueHandler);
        
        return executor;
    }

 

 类OrderedThreadPoolExecutor是一个继承了ThreadPoolExecutor的类,覆盖了一些方法的实现。看下任务提交的代码:

 

    protected void fireEvent(IoFilterEvent event) {
        //将事件提交给线程池执行
        executor.execute(event);
    }

 

 里面的实现细节相对比较复杂,感兴趣的童鞋可以再自行深入研究。

 

     四、推荐文章

 

 

1. java并发编程-Executor框架

http://mshijie.iteye.com/blog/366591

 

2. java.util.concurrent介绍

http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html

 

3. 原子变量(AtomicLong, AtomicInteger, AtomicReference)

http://meng-lin.iteye.com/blog/485281

 

 

    五、总结

 

上面的文章基本上讲的比较简单,粒度比较粗,线程池的应用是mina的核心之一,里面有很多细节的地方其实是很值得学习的,当然本人到现在也不能完全吃透。还需要以后在交流和学习中与大家一起成长。

每天进步一点点,不做无为的码农。。。。。

2012622日星期五

码农虎虎

http://weibo.com/hurtigf

http://www.lifanghu.com/

wslfh2005@163.com

 

 

 

 

  • 大小: 21 KB
分享到:
评论
1 楼 flysnail 2012-11-27  
分析的不错

相关推荐

    scratch少儿编程逻辑思维游戏源码-城堡战争.zip

    scratch少儿编程逻辑思维游戏源码-城堡战争.zip

    【Go语言编程】大厂Go工程师面试题集锦:涵盖并发、网络、数据库及算法设计要点

    内容概要:本文档汇集了来自字节跳动、腾讯、金山WPS、跟谁学和百度等大厂的Go工程师面试题,涵盖广泛的技术领域。主要包括Go语言特性(如goroutine调度、channel机制)、操作系统(进程间通信、线程调度)、计算机网络(TCP/IP协议栈、HTTP协议)、数据结构与算法(排序算法、LRU缓存)、数据库(MySQL索引优化、Redis内部机制)、分布式系统(负载均衡、服务发现)等方面的知识点。通过这些问题,不仅考察应聘者的理论基础,还测试其实际项目经验和技术深度。 适合人群:有一定Go语言编程经验和计算机基础知识的开发者,特别是准备应聘互联网大厂的中级及以上水平的后端工程师或全栈工程师。 使用场景及目标:①帮助求职者全面复习Go语言及其相关领域的核心概念;②为面试官提供有价值的参考题目,确保候选人具备解决复杂问题的能力;③指导工程师深入理解并掌握企业级应用开发所需的关键技能。 阅读建议:由于题目覆盖面广且难度较高,建议读者结合自身情况选择重点复习方向,同时配合实际编码练习加深理解。对于每个知识点,不仅要记住答案,更要理解背后的原理,这样才能在面试中灵活应对各种变体问题。

    scratch少儿编程逻辑思维游戏源码-堡垒之夜(吃鸡游戏).zip

    scratch少儿编程逻辑思维游戏源码-堡垒之夜(吃鸡游戏).zip

    少儿编程scratch项目源代码文件案例素材-派.zip

    少儿编程scratch项目源代码文件案例素材-派.zip

    scratch少儿编程逻辑思维游戏源码-Scratch 冒险.zip

    scratch少儿编程逻辑思维游戏源码-Scratch 冒险.zip

    2025 飞特舵机, Arduino版本

    2025 飞特舵机, Arduino版本

    scratch少儿编程逻辑思维游戏源码-躲避.zip

    scratch少儿编程逻辑思维游戏源码-躲避.zip

    PFC5.0纤维混凝土三点弯曲模拟:参数化建模与实验分析

    内容概要:本文详细介绍了利用PFC5.0进行纤维混凝土三点弯曲模拟的方法。首先,作者展示了如何通过定义纤维的体积含量、长度、半径和刚度等关键参数来构建纤维网络。接着,描述了三点弯曲加载的具体实现方式,包括加载速率控制和终止条件设定。最后,提供了后处理方法,如绘制并导出力-位移曲线图,以便于分析材料破坏机制。文中还给出了若干实用建议,如纤维半径的选择范围、加载速率的初始值以及不同类型纤维的接触模型选择。 适合人群:从事材料科学尤其是混凝土材料研究的专业人士,以及对离散元法和数值模拟感兴趣的科研工作者。 使用场景及目标:适用于希望深入了解纤维混凝土力学性能的研究人员,旨在帮助他们掌握PFC5.0软件的操作技巧,优化模拟参数设置,提高实验效率。 其他说明:文中提供的代码片段可以直接应用于实际项目中,同时附带了一些实践经验分享,有助于初学者快速入门并避免常见错误。

    少儿编程scratch项目源代码文件案例素材-生存V1(有BAG).zip

    少儿编程scratch项目源代码文件案例素材-生存V1(有BAG).zip

    少儿编程scratch项目源代码文件案例素材-披萨机器人.zip

    少儿编程scratch项目源代码文件案例素材-披萨机器人.zip

    少儿编程scratch项目源代码文件案例素材-气球滑雪板.zip

    少儿编程scratch项目源代码文件案例素材-气球滑雪板.zip

    少儿编程scratch项目源代码文件案例素材-使命召唤(苏联插旗).zip

    少儿编程scratch项目源代码文件案例素材-使命召唤(苏联插旗).zip

    可跨平台移植的模拟IIC实战项目STM32F407-TestIIC

    1. GPIO模拟I2C 实战项目,根据正点原子 STM32F407ZGT6 进行更改; 2. 可适配STM32、GD32、HC32等MCU;

    scratch少儿编程逻辑思维游戏源码-百米冲刺.zip

    scratch少儿编程逻辑思维游戏源码-百米冲刺.zip

    【蓝桥杯竞赛】历年试题精选与备考资源汇总:编程算法及硬件单片机试题解析与练习指导

    内容概要:本文档汇总了蓝桥杯历年试题及练习资源,涵盖编程类试题精选、硬件与单片机试题、练习资源与题库以及备考建议。编程类试题精选包括基础算法题(如数组求和、质因数分解)、经典算法案例(如最大子序列和、兰顿蚂蚁模拟)和数据结构应用(如字符全排列)。硬件与单片机试题主要涉及客观题考点,如BUCK电路和电源设计。练习资源与题库部分介绍了真题平台(如Dotcpp、CSDN专题)和专项训练包(如Python题库、Java百题集、C++真题解析)。备考建议分为分阶段练习(新手阶段、进阶提升)和模拟实战(如使用Dotcpp估分系统进行限时训练),强调按年份和组别分类练习,强化代码实现与调试能力。; 适合人群:准备参加蓝桥杯竞赛的学生及编程爱好者。; 使用场景及目标:①针对不同编程语言和难度级别的题目进行专项训练;②通过历年真题和模拟实战提高解题速度和准确性;③掌握算法设计、数据结构应用及硬件基础知识。; 阅读建议:此文档提供了丰富的试题和练习资源,建议根据自身水平选择合适的题目进行练习,并结合真题平台的估分系统和社区开源代码进行对比优化,逐步提升编程能力和竞赛水平。

    30kW储能PCS原理图设计:量产设计的关键要素与优化策略

    内容概要:本文详细介绍了30kW储能PCS(电力转换系统)原理图的设计要点及其量产化过程中需要注意的技术细节。首先阐述了储能PCS的基本概念和重要性,接着深入探讨了主拓扑结构的选择,特别是双级式结构的优势以及关键组件如IGBT的驱动时序配置。随后讨论了控制算法的智能化改进,包括加入前馈补偿以提高系统的稳定性。此外,还强调了EMC设计、PCB布局、元件选择等方面的注意事项,并分享了一些实际生产中遇到的问题及解决方案。最后提到了自动化测试方法和散热管理策略,确保产品在各种环境下的可靠运行。 适合人群:从事储能系统设计、电力电子产品研发的工程师和技术人员。 使用场景及目标:帮助读者掌握30kW储能PCS从原理图设计到量产实施的全流程关键技术,提升产品的性能和可靠性,避免常见错误。 其他说明:文中提供了具体的代码片段和实践经验,有助于理解和应用相关理论。

    少儿编程scratch项目源代码文件案例素材-喷气包多德.zip

    少儿编程scratch项目源代码文件案例素材-喷气包多德.zip

    机械工程中基于Python的齿轮啮合性能与动态响应分析

    内容概要:本文深入探讨了齿轮啮合性能及其动态特性,特别是直齿轮的基础参数计算、渐开线绘制以及接触力仿真的具体实现。首先介绍了齿轮的基本参数如模数、齿数、压力角等,并给出了具体的计算实例。接着详细讲解了如何利用Python进行渐开线的数学建模并绘图展示,强调了这种曲线对于确保齿轮平稳传动的重要性。然后讨论了齿轮在啮合过程中接触力的变化规律,提供了简化的Python代码来模拟这一现象。最后指出,在实际工程项目中应当借助专业的软件包如PyDy或ADAMS来进行更加精确的动力学分析,同时肯定了自行编写代码的价值在于能够更好地理解和排查问题。 适合人群:机械工程领域的研究人员、工程师以及相关专业的学生。 使用场景及目标:①帮助读者掌握齿轮基本理论知识;②指导读者运用Python编程技能完成简单的齿轮性能分析任务;③为后续深入研究提供思路和技术支持。 阅读建议:由于文中涉及较多的专业术语和数学公式,建议读者提前复习相关基础知识,并尝试运行提供的代码片段加深理解。此外,对于想要进一步探索该领域的读者来说,可以参考文末提到的专业工具包进行更复杂的研究。

    少儿编程scratch项目源代码文件案例素材-任务.zip

    少儿编程scratch项目源代码文件案例素材-任务.zip

    少儿编程scratch项目源代码文件案例素材-时光大盗.zip

    少儿编程scratch项目源代码文件案例素材-时光大盗.zip

Global site tag (gtag.js) - Google Analytics