`
jsx112
  • 浏览: 315906 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

JDK1.5新特性 - 线程池详解三

阅读更多
简单的线程池实现
我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。以下代码实现了具有线程池的工作队列。
public class WorkQueue
{
    private final int nThreads;
    private final PoolWorker[] threads;
    private final LinkedList queue;
    public WorkQueue(int nThreads)
    {
        this.nThreads = nThreads;
        queue = new LinkedList();
        threads = new PoolWorker[nThreads];
        for (int i=0; i<nThreads; i++) {
            threads[i] = new PoolWorker();
            threads[i].start();
        }
    }
    public void execute(Runnable r) {
        synchronized(queue) {
            queue.addLast(r);
            queue.notify();
        }
    }
    private class PoolWorker extends Thread {
        public void run() {
            Runnable r;
            while (true) {
                synchronized(queue) {
                    while (queue.isEmpty()) {
                        try
                        {
                            queue.wait();
                        }
                        catch (InterruptedException ignored)
                        {
                        }
                    }
                    r = (Runnable) queue.removeFirst();
                }
                // If we don’t catch RuntimeException,
                // the pool could leak threads
                try {
                    r.run();
                }
                catch (RuntimeException e) {
                    // You might want to log something here
                }
            }
        }
    }
}
虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。
用线程池执行任务
如果你开发项目的时候用到很多的short-lived任务,这里推荐使用“线程池”这项技术。你可以创建一个线程池来执行池中的的任务,来取代每次执行任务是都要为新的任务来new和discard。如果一个线程在池中是可用状态,那么任务将立即执行。执行完成之后线程返回池中,否则,任务将一直等待直到有线程处在可用状态。

J2SE 5.0为大家提供了一个新的java.util.concurrent package,并且在这个报中提供了一个pre-built 的线程池架构。在java.util.concurrent中提供了一个Executor 接口,里面有一个execute的方法,参数是Runnable 类型
   public interface Executor {
     public void execute(Runnable command);
   }
使用线程池架构,你就必须创建一个Executor实例,然后你给他分配一些runnable任务,例如:
Java代码 
Executor executor = ...;  
executor.execute(aRunnable1);  
executor.execute(aRunnable2); 
   Executor executor = ...;
   executor.execute(aRunnable1);
   executor.execute(aRunnable2);
然后你创建或者找到Executor的实现类,实现类可以立即(或者连续)执行分配的任务,例如:
Java代码 
class MyExecutor implements Executor {  
    public void execute(Runnable r) {  
        new Thread(r).start();  
    }  

   class MyExecutor implements Executor {
       public void execute(Runnable r) {
           new Thread(r).start();
       }
   }
concurrency utilities也包括了一个ThreadPoolExecutor类,它提供了很多对线程的一般性操作,提供了四个构造函数,每个都可以指定如:线程池大小,持续时间,一个线程factory,和拒绝线程的handler。
Java代码 
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue)  
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory)  
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          RejectedExecutionHandler handler)  
public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory,  
                          RejectedExecutionHandler handler) 
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue)
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory)
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             RejectedExecutionHandler handler)
   public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler)
但是你不必声明构造函数,Executors类会为你创建一个线程池。在一种最简单的情况下,你在Executors类中声明了newFixedThreadPool方法,并且在池中分配了许多线程。你可以使用ExecutorService(继承Executor的一个接口),去execute和submit 那些Runnable任务,使用ExecutorService中的submit方法可以得到一个返回结果,当然submit也可以返回一个Future对象用来检查任务是否执行。
让我们来先做一个Runnable类,名字为NamePrinter,它通知你运行、暂停、和耗费的时间。
Java代码 
public class NamePrinter implements Runnable {  
  private final String name;  
  private final int delay;  
  public NamePrinter(String name, int delay) {  
    this.name = name;  
    this.delay = delay;  
  }  
  public void run() {  
    System.out.println("Starting: " + name);  
    try {  
      Thread.sleep(delay);  
    } catch (InterruptedException ignored) {  
    }  
    System.out.println("Done with: " + name);  
  }  

   public class NamePrinter implements Runnable {
     private final String name;
     private final int delay;
     public NamePrinter(String name, int delay) {
       this.name = name;
       this.delay = delay;
     }
     public void run() {
       System.out.println("Starting: " + name);
       try {
         Thread.sleep(delay);
       } catch (InterruptedException ignored) {
       }
       System.out.println("Done with: " + name);
     }
   }
然后下面是我们测试的项目UsePool,它创建一个有三个线程的线程池,分配了10个任务给它(运行10次NamePrinter),UsePool在被shutdown 和 awaitTermination之前将等待并执行分配的任务。一个ExecutorService必须要在terminated之前执行shutdown,shutdownNow方法是立即尝试shutdown操作。shutdownNow 方法将返回没有被执行的任务。
Java代码 
import java.util.concurrent.*;  
import java.util.Random;  
public class UsePool {  
  public static void main(String args[]) {  
    Random random = new Random();  
    ExecutorService executor =   
            Executors.newFixedThreadPool(3);  
    // Sum up wait times to know when to shutdown  
    int waitTime = 500;  
    for (int i=0; i<10; i++) {  
      String name = "NamePrinter " + i;  
      int time = random.nextInt(1000);  
      waitTime += time;  
      Runnable runner = new NamePrinter(name, time);  
      System.out.println("Adding: " + name + " / " + time);  
      executor.execute(runner);  
    }  
    try {  
      Thread.sleep(waitTime);  
      executor.shutdown();  
      executor.awaitTermination  
              (waitTime, TimeUnit.MILLISECONDS);  
    } catch (InterruptedException ignored) {  
    }  
    System.exit(0);  
  }  

   import java.util.concurrent.*;
   import java.util.Random;
   public class UsePool {
     public static void main(String args[]) {
       Random random = new Random();
       ExecutorService executor =
               Executors.newFixedThreadPool(3);
       // Sum up wait times to know when to shutdown
       int waitTime = 500;
       for (int i=0; i<10; i++) {
         String name = "NamePrinter " + i;
         int time = random.nextInt(1000);
         waitTime += time;
         Runnable runner = new NamePrinter(name, time);
         System.out.println("Adding: " + name + " / " + time);
         executor.execute(runner);
       }
       try {
         Thread.sleep(waitTime);
         executor.shutdown();
         executor.awaitTermination
                 (waitTime, TimeUnit.MILLISECONDS);
       } catch (InterruptedException ignored) {
       }
       System.exit(0);
     }
    }
输出的结果是:
Adding: NamePrinter 0 / 30
Adding: NamePrinter 1 / 727
Adding: NamePrinter 2 / 980
Starting: NamePrinter 0
Starting: NamePrinter 1
Starting: NamePrinter 2
Adding: NamePrinter 3 / 409
Adding: NamePrinter 4 / 49
Adding: NamePrinter 5 / 802
Adding: NamePrinter 6 / 211
Adding: NamePrinter 7 / 459
Adding: NamePrinter 8 / 994
Adding: NamePrinter 9 / 459
Done with: NamePrinter 0
Starting: NamePrinter 3
Done with: NamePrinter 3
Starting: NamePrinter 4
Done with: NamePrinter 4
Starting: NamePrinter 5
Done with: NamePrinter 1
Starting: NamePrinter 6
Done with: NamePrinter 6
Starting: NamePrinter 7
Done with: NamePrinter 2
Starting: NamePrinter 8
Done with: NamePrinter 5
Starting: NamePrinter 9
Done with: NamePrinter 7
Done with: NamePrinter 9
Done with: NamePrinter 8
注意前三个NamePrinter对象启动的非查的快,之后的NamePrinter对象每次启动都要等待前面的执行完成。
在J2SE 5.0有非常多的pooling framework可以用,例如,你可以创建一个scheduled线程池……
更多信息还是看官方的concurrency utilities,地址:http://java.sun.com/j2se/1.5.0/docs/guide/concurrency/
public class PoolAsynService extends BaseService implements Runnable {
private Thread thread = new Thread(this);
private List waitToList = (List) Collections.synchronizedList(new LinkedList());
// ////////////线程池参数/////////////////
private int corePoolSize = 5;// : 线程池维护线程的最少数量
private int maximumPoolSize = 10;// :线程池维护线程的最大数量
private long keepAliveTime = 60;// : 线程池维护线程所允许的空闲时间
private TimeUnit unit = TimeUnit.SECONDS;// : 线程池维护线程所允许的空闲时间的单位
private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);// :
// 线程池所使用的缓冲队列
private RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();// :
// 线程池对拒绝任务的处理策略
// //////////线程池参数/////////////////
private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue, handler);
public void run() {
while (!thread.isInterrupted()) {
if (!waitToList.isEmpty()) {
try {
threadPool.execute(new Executor());
} catch (Exception e) {
logger.error("pool  execute error!!!", e);
}
}

Tools.block(25);
}
}
public void doAsync(Object executor, Object... objects) {
Throwable t = new Throwable();
StackTraceElement[] elements = t.getStackTrace();
StackTraceElement element = elements[1];
String method = element.getMethodName();
AsyncContext ctx = new AsyncContext();
ctx.args = objects;
ctx.executor = executor;
ctx.method = method;
if (method.endsWith("PA")) {
waitToList.add(ctx);
} else {
logger.warn("async method name is not good!");
}
}
private class AsyncContext {
String method;
Object executor;
Object[] args;
}
private class Executor implements Runnable {
public void run() {
if (!waitToList.isEmpty()) {
try {
Object task = waitToList.remove(0);
AsyncContext ctx = (AsyncContext) task;
doTaskByCtx(ctx);
} catch (Exception e) {
logger.error("async error!!!", e);
}
}
}
private void doTaskByCtx(AsyncContext ctx) {
String targetMethodName = ctx.method.substring(0, ctx.method
.length() - 2);
Method targetMethod = null;
Class clazz = null;
try {
clazz = ctx.executor.getClass();
Method[] methods = clazz.getDeclaredMethods();
if (methods != null) {
for (int i = 0; i < methods.length; i++) {
String name = methods[i].getName();
if (name.equals(targetMethodName)) {
targetMethod = methods[i];
break;
}
}
if (targetMethod != null) {
targetMethod.invoke(ctx.executor, ctx.args);
}
}
} catch (Exception e) {
logger.error(
"do async fail! " + clazz + ":" + targetMethodName, e);
}
}
}
@Override
public void destroy() {
thread.interrupt();
threadPool.shutdown();
logger.info("thread pool asynService shut down");
}
@Override
public void init() {
thread.start();
logger.info("thread pool asynService start");
}
}
分享到:
评论

相关推荐

    JDK1.5免安装

    **JDK1.5免安装详解** Java Development Kit(JDK)是Oracle公司提供的用于开发和运行Java应用程序的工具集合。JDK1.5,也称为Java 5.0,是一个重要的版本,它引入了许多创新特性,提升了Java编程的效率和性能。...

    jdk1.5.0.22 64位

    **Java Development Kit (JDK) 1.5.0.22 64位详解** JDK(Java Development Kit)是Oracle公司发布的用于开发、编译、调试Java应用程序的工具集合。版本1.5.0.22是JDK的一个历史版本,也被称为Java 5.0 Update 22,...

    JDK1.5 JAVA

    综上所述,JDK1.5是Java发展史上的一次重大飞跃,它的诸多新特性不仅提升了开发者的生产力,也为Java语言的后续版本奠定了坚实的基础。而提供的文件"jdk-1_5_0_08-windows-i586-p.exe"正是JDK1.5的安装程序,适用于...

    JDK1.6.0_13免安装版

    **JDK 1.6.0_13详解** Java Development Kit(JDK)是Java编程语言和平台的核心组件,它包含编译器、调试器、JRE(Java Runtime Environment)以及一系列工具,用于开发、运行和管理Java应用程序。JDK 1.6.0_13是...

    java socket线程池

    从JDK 1.5开始,Java并发API得到了增强,提供了更为强大的并发工具和库,其中就包括线程池的实现。线程池的主要优势在于可以重用线程,减少线程创建和销毁带来的开销,同时还可以有效控制并发数和管理资源。 在给出...

    jdk_api_1_6中文版

    《Java开发工具包(JDK)1.6中文版详解》 Java开发工具包(Java Development Kit,简称JDK)是Sun Microsystems公司为Java开发者提供的核心开发环境,它包含了编译、运行Java程序所需的各种工具和API文档。JDK 1.6...

    JDK1.6 API英文版all(CHM)

    **二、JDK 1.6新特性** 1. **增强的枚举(enum)**:在JDK 1.5引入枚举类型后,1.6版本进一步增强了枚举的使用,包括枚举常量之间的比较和遍历。 2. **泛型(Generics)**:泛型增加了类型安全,允许在编译时检查...

    jdk1.5.0_10

    泛型是JDK 1.5引入的最重要的特性之一,它允许在定义类、接口和方法时指定类型参数,从而实现了类型安全。泛型可以避免类型转换异常,提高代码的可读性和复用性。例如,泛型集合如`List&lt;T&gt;`,这里的`T`代表一个待定...

    JDK DOC5.0 中文帮助文档

    **JDK DOC 5.0 中文帮助文档详解** JDK (Java Development Kit) 是Java编程语言的标准开发工具集,而JDK DOC 5.0是针对Java 1.5版本的一份完整的中文帮助文档。这份文档对于Java开发者来说极其重要,因为它提供了...

    java的concurrent用法详解

    虽然也有诸如Apache Commons Pool等第三方库提供了部分支持,但这些解决方案相比JDK内置的支持仍然显得不够完善和便捷。 随着Java 1.5的发布,Sun公司(现在是Oracle公司的一部分)引入了`java.util.concurrent`包...

    Java并发框架:Executor API详解

    Java并发框架中的Executor API是Java多线程编程的重要组成部分,它是Java从JDK 1.5版本开始引入的,旨在提供一种更高效、更可控的线程管理方式。Executor API的核心在于`java.util.concurrent.Executor`接口,它定义...

    tomcat6.0下载

    Tomcat 6.0需要Java Development Kit (JDK)的支持,确保系统已经安装了与Tomcat版本兼容的JDK,通常是JDK 1.5或更高版本。 5. **配置与管理**: - `conf`目录包含了Tomcat的主要配置文件,如server.xml、web.xml...

    30重点面试题-Fu1

    【知识点详解】 1. String、StringBuffer 和 StringBuilder 的区别: - String:不可变对象,一旦创建就不能修改,适合在不需要修改字符串的情况下使用,因为其内部优化,多次拼接会创建多个中间对象,可能导致...

    Tomcat源码研究.pdf

    - **高级特性**:Tomcat还支持更高级的JMX特性,如通知和事件监听,使得在特定条件下自动触发某些操作成为可能。 - **最佳实践**:为了充分利用JMX,建议遵循一些最佳实践,比如合理设计MBeans的结构、正确设置安全...

    谈谈java的concurrent用法

    这个包的引入极大地简化了并发编程,特别是从JDK1.5版本开始,由Doug Lea设计的一系列并发工具类使得编写高效、安全的并发程序变得更为容易。本文将深入探讨`concurrent`包中的关键组件及其用法。 首先,`Executor`...

    Java面试题和答案.pdf

    ### Java高频面试知识点详解 #### 一、Java基础 **1. JDK和JRE有什么区别?** - **JRE(Java Runtime Environment)**: 包含Java虚拟机(JVM)、Java核心类库和支持文件,是运行Java程序所需的基础环境。 - **JDK...

    Spring-Reference_zh_CN(Spring中文参考手册)

    2. Spring 2.0 的新特性 2.1. 简介 2.2. 控制反转(IoC)容器 2.2.1. 更简单的XML配置 2.2.2. 新的bean作用域 2.2.3. 可扩展的XML编写 2.3. 面向切面编程(AOP) 2.3.1. 更加简单的AOP XML配置 2.3.2. 对@AspectJ 切面的...

    JAVA核心知识点整理(有效)

    1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM ........................

Global site tag (gtag.js) - Google Analytics