精华帖 (0) :: 良好帖 (2) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2012-06-22
最后修改:2012-07-11
1、深入解析Apache Mina源码(1)——Mina的过滤器机制实现 2、深入解析Apache Mina源码(2)——Mina的事件模型3、深入解析Apache Mina源码(3)——Mina的线程池模型 4、深入解析Apache Mina源码(4)——Mina编解码以及对粘包和断包的处理
一、生产者消费者问题
做为苦逼的程序员的我们基本没有不知道生产者消费者问题的,这个经典的问题充分体现了进程同步的问题,还是简单的说下它的概念,生产者和消费者是两个线程,生产者线程生产物品放到空的缓冲区内(可能是一个list),消费者线程从缓冲区内取出物品进行消费并释放缓冲区,缓冲区有个固定大小,当生产者线程将缓冲区填充满时,生产者线程处于等待状态,等待消费者线程消费;当缓冲区消费空了后,消费者线程处于等待状态,等待生产者线程进行生产。当然生产者和消费者也可以有多个线程充当,但是操作的进程地址空间却只能是同一个。 这个经典的问题体现了多线程编程的一些要注意的地方,比如对同一资源进行访问所产生的互斥和同步问题。 下面看下对生产者消费者问题的实现。 物品类: package com.lifanghu.procon; /** * 食物 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-22 上午08:13:34 * @name com.lifanghu.procon.Food.java * @version 1.0 */ public class Food { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } }
缓冲区: package com.lifanghu.procon; import java.util.ArrayList; import java.util.List; /** * 容器,缓冲区 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-22 上午08:33:56 * @name com.lifanghu.procon.Container.java * @version 1.0 */ public class Container { //缓冲区大小 private int size; private List<Food> foods; public Container(int size) { this.size = size; foods = new ArrayList<Food>(size); } public synchronized void poll(Food food) { while (foods.size() >= size) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } foods.add(food); notifyAll(); } public synchronized Food offer() { Food food = null; while (foods.size() == 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } food = foods.remove(foods.size() - 1); notifyAll(); return food; } }
生产者: package com.lifanghu.procon; /** * 生产者 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-22 上午08:13:26 * @name com.lifanghu.procon.Producer.java * @version 1.0 */ public class Producer implements Runnable { private Container container; public Producer(Container container) { super(); this.container = container; } public void run() { for (int i = 0; i < 10; i++) { Food food = new Food(); food.setName("馒头" + i); System.out.println("生产者生产出" + food.getName()); container.poll(food); try { Thread.sleep((long) (Math.random() * 3000)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者: package com.lifanghu.procon; /** * 消费者 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-22 上午08:13:52 * @name com.lifanghu.procon.Consumer.java * @version 1.0 */ public class Consumer implements Runnable { private Container container; public Consumer(Container container) { super(); this.container = container; } public void run() { for (;;) { Food food = container.offer(); try { Thread.sleep((long) (Math.random() * 3000)); } catch (InterruptedException e) { e.printStackTrace(); } if (food != null) { System.out.println(food.getName() + "被消费!"); } } } }
测试类: package com.lifanghu.procon; /** * 客户端测试类 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-22 上午08:13:59 * @name com.lifanghu.procon.Client.java * @version 1.0 */ public class Client { public static void main(String[] args) { Container container = new Container(5); Thread producer1 = new Thread(new Producer(container)); // Thread producer2 = new Thread(new Producer(container)); // producer2.start(); Thread consumer1 = new Thread(new Consumer(container)); producer1.start(); consumer1.start(); } }
输出结果: 生产者生产出馒头0
馒头0被消费! 生产者生产出馒头1 馒头1被消费! 生产者生产出馒头2 生产者生产出馒头3 生产者生产出馒头4 馒头2被消费! 生产者生产出馒头5 馒头4被消费! 馒头5被消费! 生产者生产出馒头6 生产者生产出馒头7 馒头3被消费! 馒头7被消费! 馒头6被消费! 生产者生产出馒头8 馒头8被消费! 生产者生产出馒头9 馒头9被消费!
二、 线程池及实现
上面我们讲到了生产者消费者的问题,那么这和线程池有什么关系呢?其实线程池的实现就是生产者消费者问题的实现,理解了生产者消费者问题就不会对线程池的实现感到神秘了,线程池在很多地方会用到,比如tomcat等各种中间容器的实现,Spring对线程池的支持等,当然mina中也使用到了线程池的概念。至于为什么要用到线程池,网上文章很多,基本是操作系统支持的线程数有限,线程的创建关闭有很大的系统开销,线程的切换也会影响系统性能等等。 下面这个图就是线程池的基本原理图,看看是不是和生产者消费者问题一样。
看下简单对线程池的实现代码,主要包括三个类,一个是线程池,一个是工作任务,一个是客户端进行任务添加。 任务类,比较简单,实现Runnable接口: package com.lifanghu.threadpool; //任务类,具体要执行的操作 public class Worker implements Runnable { private int id; public Worker(int id) { this.id = id; } public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程:" + Thread.currentThread().getName() + " 执行任务" + id); } }
线程池,相对复杂一些,但是原理是很简单的: package com.lifanghu.threadpool; import java.util.LinkedList; /** * 线程池实现 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-22 下午03:31:47 * @name com.lifanghu.threadpool.ThreadPool.java * @version 1.0 */ public class ThreadPool { // 线程池大小 private final int nThreads; // 线程池工作者(具体线程) private final PoolWorker[] threads; // 任务队列 private final LinkedList<Runnable> queue; public ThreadPool(int nThreads) { // 初始线程池,并启动线程池里面的线程 this.nThreads = nThreads; queue = new LinkedList<Runnable>(); threads = new PoolWorker[nThreads]; for (int i = 0; i < nThreads; i++) { threads[i] = new PoolWorker(); threads[i].start(); } } // 提交工作任务,实际将任务放入队列,并通知线程进行消费 public void execute(Runnable r) { synchronized (queue) { queue.addLast(r); queue.notify(); } } private class PoolWorker extends Thread { public void run() { Runnable r; // 循环取出任务队列里的任务进行消费,如果没有任务,就等待任务到来。 while (true) { synchronized (queue) { while (queue.isEmpty()) { try { queue.wait(); } catch (InterruptedException ignored) { } } r = queue.removeFirst(); } try { r.run(); } catch (RuntimeException e) { } } } } } 看下客户端的调用代码: package com.lifanghu.threadpool; /** * 客户端测试类 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-22 下午03:25:36 * @name .Client.java * @version 1.0 */ public class Client { public static void main(String[] args) { ThreadPool queue = new ThreadPool(10); // 提交工作任务。 queue.execute(new Worker(1)); queue.execute(new Worker(2)); queue.execute(new Worker(3)); } } 观察输出结果: 线程:Thread-1 执行任务1
线程:Thread-5 执行任务3 线程:Thread-3 执行任务2 怎么样,感觉是不是很easy呢?咱们的线程池实现其实比较简单的,但是实际应用中我们用线程池比较常见的方式还是使用JDK中对线程池的实现,它提供了ExecutorService,Executor等类实现了对线程池的支持,不过线程池的实现原理其实是和我们的一样的,只不过它更多的考虑了实现细节,功能更强一些,关于它的使用网上有很多文章讲的已经很清楚了,可以参考:http://mshijie.iteye.com/blog/366591。
三、Mina中的线程池模型
前面讲了生产者消费者问题以及由此引出的线程池的实现问题,那么现在我们来看下实际开源项目mina中是怎么使用线程池模型的。 Mina中的线程池使用主要有四个地方: 1、IoAcceptor线程池。 2、IoConnector线程池。 3、IoProcessor线程池。 4、过滤器类ExecutorFilter线程池。
一、先说下IoAcceptor和IoConnector线程池,它俩的实现类都继承了AbstractIoService类,而Executor也是定义在这个类里面的,所以使用线程池的方式是一样的。 先看下AbstractIoService类关于线程池的初始化,它的初始化是在构造方法里面进行的:
if (executor == null) { //默认的线程池:可缓存的线程池 this.executor = Executors.newCachedThreadPool(); createdExecutor = true; } else { this.executor = executor; createdExecutor = false; } //重新设定的线程名称 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
下面是提交作业任务的方法:
protected final void executeWorker(Runnable worker, String suffix) { String actualThreadName = threadName; if (suffix != null) { actualThreadName = actualThreadName + '-' + suffix; } // 向线程池中提交任务。 executor.execute(new NamePreservingRunnable(worker, actualThreadName)); }
对于IoAcceptor的任务提交调用是在bind和unbind方法实现中的,看下bind最终调用,在类AbstractPollingIoAcceptor的 startupAcceptor方法中:
// start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); if (acceptor == null) { acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { //放入工作线程池中,供异步执行。 executeWorker(acceptor); } }
再来看内部Acceptor,它作为接收者任务类,执行端口的绑定,通道的注册操作等。
//实际的注册端口方法 nHandles += registerHandles(); registerHandles方法中关于注册端口的方法:
try { // Process all the addresses for (SocketAddress a : localAddresses) { //注册端口,最终调用低层的注册方法,参考类NioSocketAcceptor H handle = open(a); newHandles.put(localAddress(handle), handle); } unbind方法和bind方法的调用很类似,这里就不说了。 再看下IoConnector,它最终是在方法connect时会提交任务,看下AbstractPollingIoConnector类的startupWorker方法:
if (connector == null) { connector = new Connector(); if (connectorRef.compareAndSet(null, connector)) { //提交执行任务 executeWorker(connector); } }
对于IoAcceptor和IoConnector线程池的线程池大小,一般来说一个对象里面只有一个线程池,一个线程池里面一般有一个线程,当然如果你的连接或者监听比较多时可能会自动增加线程,这个就看线程池自己分配了。
二、关于IoProcessor线程池。IoProcessor里面使用线程池的方式和上面两个使用方式很相似,代码都非常类似,看下AbstractPollingIoProcessor类的startupProcessor方法:
private void startupProcessor() { Processor processor = processorRef.get(); if (processor == null) { processor = new Processor(); if (processorRef.compareAndSet(null, processor)) { //添加执行任务。 executor.execute(new NamePreservingRunnable(processor, threadName)); } } // Just stop the select() and start it again, so that the processor // can be activated immediately. wakeup(); }
它的大小是在SimpleIoProcessorPool中定义的,默认是CPU核数加1,代码如下:
//默认的大小为CPU核数加1 private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
最终调用其实还是调用的AbstractPollingIoProcessor里面的执行线程,可以看下SimpleIoProcessorPool的构造方法:
try { processorConstructor = processorType.getConstructor(ExecutorService.class); //最终还是调用AbstractPollingIoProcessor进行数据处理的。 pool[0] = processorConstructor.newInstance(this.executor); …… // Constructor found now use it for all subsequent instantiations for (int i = 1; i < pool.length; i++) { try { if (usesExecutorArg) { pool[i] = processorConstructor.newInstance(this.executor); } else { pool[i] = processorConstructor.newInstance(); } } catch (Exception e) { // Won't happen because it has been done previously } }
我们可以看到在有个这样的变量:
/** The pool table */ private final IoProcessor<S>[] pool;
从这个变量我们可以发现mina的线程池模型是以多个newCachedThreadPool存在的,至于mina为什么要这样处理,这里我也不得而知,如果哪位知道的话可以一起讨论……
三、ExecutorFilter类中的线程池。这是一个可选的线程池,是加在过滤器当中的。我们一般选择加在过滤器的最后面,这样Handler里面的业务处理就可以在线程池里面进行处理了。它的默认大小是16。
/** The default pool size */ private static final int DEFAULT_MAX_POOL_SIZE = 16;
看下Executor的创建方式:
private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) { // Create a new Executor Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler); return executor; }
类OrderedThreadPoolExecutor是一个继承了ThreadPoolExecutor的类,覆盖了一些方法的实现。看下任务提交的代码:
protected void fireEvent(IoFilterEvent event) { //将事件提交给线程池执行 executor.execute(event); }
里面的实现细节相对比较复杂,感兴趣的童鞋可以再自行深入研究。
四、推荐文章
1. java并发编程-Executor框架 http://mshijie.iteye.com/blog/366591
2. java.util.concurrent介绍 http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html
3. 原子变量(AtomicLong, AtomicInteger, AtomicReference) http://meng-lin.iteye.com/blog/485281
五、总结
上面的文章基本上讲的比较简单,粒度比较粗,线程池的应用是mina的核心之一,里面有很多细节的地方其实是很值得学习的,当然本人到现在也不能完全吃透。还需要以后在交流和学习中与大家一起成长。 每天进步一点点,不做无为的码农。。。。。 2012年6月22日星期五 码农虎虎 wslfh2005@163.com
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2012-06-25
写的很不错,不过有个问题想请教一下。关于IoProcessor的线程池,默认的是CPU核数加1,我在网上找了些资料。说2.0以上的版本是不需要设置线程池大小的。。我不知道是不是理解有误。为什么不需要设置?我如果不使用默认设置,有方法设置吗?期待您的解答。
|
|
返回顶楼 | |
发表时间:2012-06-26
friendlytkyj 写道 写的很不错,不过有个问题想请教一下。关于IoProcessor的线程池,默认的是CPU核数加1,我在网上找了些资料。说2.0以上的版本是不需要设置线程池大小的。。我不知道是不是理解有误。为什么不需要设置?我如果不使用默认设置,有方法设置吗?期待您的解答。 我在写这篇文章的时候是基于mina2.0.4版本,默认的是CPU核数加1,但是这里是可以设置它的大小的,比如NioSocketAcceptor类有无参的构造函数,它调用的父类构造函数中有new SimpleIoProcessorPool<S>(processorClass)参数,可以看它的构造是这样写的:this(processorType, null, DEFAULT_SIZE);其实DEFAULT_SIZE值为:Runtime.getRuntime().availableProcessors() + 1; 如果你想要自己设置的话可以调用NioSocketAcceptor类的有参构造函数 public NioSocketAcceptor(int processorCount) |
|
返回顶楼 | |
发表时间:2012-06-26
wslfh2005 写道 friendlytkyj 写道 写的很不错,不过有个问题想请教一下。关于IoProcessor的线程池,默认的是CPU核数加1,我在网上找了些资料。说2.0以上的版本是不需要设置线程池大小的。。我不知道是不是理解有误。为什么不需要设置?我如果不使用默认设置,有方法设置吗?期待您的解答。 我在写这篇文章的时候是基于mina2.0.4版本,默认的是CPU核数加1,但是这里是可以设置它的大小的,比如NioSocketAcceptor类有无参的构造函数,它调用的父类构造函数中有new SimpleIoProcessorPool<S>(processorClass)参数,可以看它的构造是这样写的:this(processorType, null, DEFAULT_SIZE);其实DEFAULT_SIZE值为:Runtime.getRuntime().availableProcessors() + 1; 如果你想要自己设置的话可以调用NioSocketAcceptor类的有参构造函数 public NioSocketAcceptor(int processorCount) 楼主回答的很对,不过我在实际使用中发现,其实这个线程池的数量没必要设置的太大,因为IoProcessor的线程主要用来接收来自客户端的请求,处理速度很快,线程也很快就释放掉了,关键还是ExecutorFilter类中的线程池的设置,因为这个是用来处理业务逻辑的线程,比较耗资源和时间. |
|
返回顶楼 | |
发表时间:2012-06-26
gouerli 写道 wslfh2005 写道 friendlytkyj 写道 写的很不错,不过有个问题想请教一下。关于IoProcessor的线程池,默认的是CPU核数加1,我在网上找了些资料。说2.0以上的版本是不需要设置线程池大小的。。我不知道是不是理解有误。为什么不需要设置?我如果不使用默认设置,有方法设置吗?期待您的解答。
我在写这篇文章的时候是基于mina2.0.4版本,默认的是CPU核数加1,但是这里是可以设置它的大小的,比如NioSocketAcceptor类有无参的构造函数,它调用的父类构造函数中有new SimpleIoProcessorPool<S>(processorClass)参数,可以看它的构造是这样写的:this(processorType, null, DEFAULT_SIZE);其实DEFAULT_SIZE值为:Runtime.getRuntime().availableProcessors() + 1; 如果你想要自己设置的话可以调用NioSocketAcceptor类的有参构造函数 public NioSocketAcceptor(int processorCount) 楼主回答的很对,不过我在实际使用中发现,其实这个线程池的数量没必要设置的太大,因为IoProcessor的线程主要用来接收来自客户端的请求,处理速度很快,线程也很快就释放掉了,关键还是ExecutorFilter类中的线程池的设置,因为这个是用来处理业务逻辑的线程,比较耗资源和时间. 经楼主和三楼这位兄弟一解释。。顿时豁然开朗。谢谢。。 |
|
返回顶楼 | |
发表时间:2012-06-26
gouerli 写道 wslfh2005 写道 friendlytkyj 写道 写的很不错,不过有个问题想请教一下。关于IoProcessor的线程池,默认的是CPU核数加1,我在网上找了些资料。说2.0以上的版本是不需要设置线程池大小的。。我不知道是不是理解有误。为什么不需要设置?我如果不使用默认设置,有方法设置吗?期待您的解答。
我在写这篇文章的时候是基于mina2.0.4版本,默认的是CPU核数加1,但是这里是可以设置它的大小的,比如NioSocketAcceptor类有无参的构造函数,它调用的父类构造函数中有new SimpleIoProcessorPool<S>(processorClass)参数,可以看它的构造是这样写的:this(processorType, null, DEFAULT_SIZE);其实DEFAULT_SIZE值为:Runtime.getRuntime().availableProcessors() + 1; 如果你想要自己设置的话可以调用NioSocketAcceptor类的有参构造函数 public NioSocketAcceptor(int processorCount) 楼主回答的很对,不过我在实际使用中发现,其实这个线程池的数量没必要设置的太大,因为IoProcessor的线程主要用来接收来自客户端的请求,处理速度很快,线程也很快就释放掉了,关键还是ExecutorFilter类中的线程池的设置,因为这个是用来处理业务逻辑的线程,比较耗资源和时间. 是的,这块是很耗费时间的,因为可能涉及到数据库操作,所以怎么样优化这块是重点。。。有相关优化经验的可以分享下…… |
|
返回顶楼 | |
发表时间:2012-06-27
声明一下,
从这个变量我们可以发现mina的线程池模型是以多个newCachedThreadPool存在的 上面这句话是有问题的,其实线程池还是同一个,只不过为什么一个线程池里面会有CPU+1个线程,这块还有不清楚,希望哪位达人能解释一下…… |
|
返回顶楼 | |
发表时间:2012-06-27
我用的是SPRING整合了MINA,ExecutorFilter是能指定线程池大小的。。不过我现在貌似碰到了新的问题。。就是中文乱码的问题。我客户端接收到的中文没有乱码,但是发送到服务端的中文却是乱码,觉得很奇怪。还是觉得自己对MINA的框架不够熟悉。需要更进一步的了解和熟悉。
|
|
返回顶楼 | |
发表时间:2012-06-27
wslfh2005 写道 声明一下,
从这个变量我们可以发现mina的线程池模型是以多个newCachedThreadPool存在的 上面这句话是有问题的,其实线程池还是同一个,只不过为什么一个线程池里面会有CPU+1个线程,这块还有不清楚,希望哪位达人能解释一下…… 呵呵。。是啊。。很多不明白的地方,网上关于MINA有用的资料很少。。希望能有达人给我们上上课。。 |
|
返回顶楼 | |
发表时间:2012-06-27
friendlytkyj 写道 我用的是SPRING整合了MINA,ExecutorFilter是能指定线程池大小的。。不过我现在貌似碰到了新的问题。。就是中文乱码的问题。我客户端接收到的中文没有乱码,但是发送到服务端的中文却是乱码,觉得很奇怪。还是觉得自己对MINA的框架不够熟悉。需要更进一步的了解和熟悉。 乱码问题是比较常见的问题哈,你可以统一下编码,其实和mina框架关系不会太大。。。 |
|
返回顶楼 | |