`
coach
  • 浏览: 386695 次
  • 性别: Icon_minigender_2
  • 来自: 印度
社区版块
存档分类
最新评论

Java5中的线程池实例讲解

阅读更多
Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

  简介

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

  1. 建立监听端口。

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。

  这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器模型将如下:

  1. 建立监听端口,创建线程池。

  2. 发现有新连接,使用线程池来执行服务任务。

  3. 服务完毕,释放线程到线程池。

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。

  初始化

  初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方法来建立线程池。

ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

  使用ServerSocket对象来初始化监听端口。

private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);

  服务新连接

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。

while(true){
 Socket socket = serverListenSocket.accept();
 pool.execute(new ServiceThread(socket));
}

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。

  服务任务

  服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:

private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
 int ret = 0;
 try{
  lock.lock();
  ret = count;
 }finally{
  lock.unlock();
 }
 return ret;
}
private void increaseCount(){
 try{
  lock.lock();
  ++count;
 }finally{
  lock.unlock();
 }
}

  服务线程在开始给客户端打印一个欢迎信息,

increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());

  然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,则等待到运行完毕。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());

  其中TimeConsumingTask实现了Callable接口

class TimeConsumingTask implements Callable {
 public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
 }
}

  这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们提交了一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let's do soemthing other".getBytes());当程序执行到String result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。

服务器端的完整实现

  服务器端的完整实现代码如下:
package demo;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Server
{
    private static int produceTaskSleepTime = 100;
    private static int consumeTaskSleepTime = 1200;
    private static int produceTaskMaxNumber = 100;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAX_POOL_SIZE = 100;
    private static final int KEEPALIVE_TIME = 3;
    private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 19527;
    private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
    // private ThreadPoolExecutor serverThreadPool = null;
    private ExecutorService pool = null;
    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
    private ServerSocket serverListenSocket = null;
    private int times = 5;

    public void start()
    {
        // You can also init thread pool in this way.
        /*
         * serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue, rejectedExecutionHandler);
         */
        pool = Executors.newFixedThreadPool(10);
        try
        {
            serverListenSocket = new ServerSocket(PORT);
            serverListenSocket.setReuseAddress(true);

            System.out.println("I'm listening");
            while (times-- > 0)
            {
                Socket socket = serverListenSocket.accept();
                String welcomeString = "hello";
                // serverThreadPool.execute(new ServiceThread(socket, welcomeString));
                pool.execute(new ServiceThread(socket));
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        cleanup();
    }

    public void cleanup()
    {
        if (null != serverListenSocket)
        {
            try
            {
                serverListenSocket.close();
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // serverThreadPool.shutdown();
        pool.shutdown();
       //调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。interrupt():只有阻塞(sleep,wait,join的线程调用他们的interrupt()才起作用,正在运行的线程不起作用也不抛异常)
    }

    public static void main(String args[])
    {
        Server server = new Server();
        server.start();
    }
}

class ServiceThread implements Runnable, Serializable
{
    private static final long serialVersionUID = 0;
    private Socket connectedSocket = null;
    private String helloString = null;
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();

    ServiceThread(Socket socket)
    {
        connectedSocket = socket;
    }

    public void run()
    {
        increaseCount();
        int curCount = getCount();
        helloString = "hello, id = " + curCount + "\r\n";

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new TimeConsumingTask());

        DataOutputStream dos = null;
        try
        {
            dos = new DataOutputStream(connectedSocket.getOutputStream());
            dos.write(helloString.getBytes());
            try
            {
                dos.write("let's do soemthing other.\r\n".getBytes());
                String result = future.get();
                dos.write(result.getBytes());
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            catch (ExecutionException e)
            {
                e.printStackTrace();
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally
        {
            if (null != connectedSocket)
            {
                try
                {
                    connectedSocket.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (null != dos)
            {
                try
                {
                    dos.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            executor.shutdown();
        }
    }

    private int getCount()
    {
        int ret = 0;
        try
        {
            lock.lock();
            ret = count;
        }
        finally
        {
            lock.unlock();
        }
        return ret;
    }

    private void increaseCount()
    {
        try
        {
            lock.lock();
            ++count;
        }
        finally
        {
            lock.unlock();
        }
    }
}

class TimeConsumingTask implements Callable<String>
{
    public String call() throws Exception
    {
        System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
        return "ok, here's the result: It takes me lots of time to produce this result";
    }

}
分享到:
评论
2 楼 memoryisking 2014-11-18  
可以看看这篇文章,这个是struts教程网上一个简单的例子,构建一个简单的线程池:http://www.strutshome.com/index.php/archives/710
1 楼 jjruanlili 2014-08-26  
要搞个executor和nio的结合,差不多

相关推荐

    java线程池实例详细讲解

    Java线程池是一种高效管理线程资源的工具,它能够帮助开发者有效地控制并调度线程,从而提升系统性能,减少系统资源的浪费。在Java中,`ExecutorService`接口是线程池的主要入口,它是`java.util.concurrent`包的一...

    在spring boot中使用java线程池ExecutorService的讲解

    在上面的代码中,我们创建了一个线程池配置类,使用 @Bean 注解创建了一个线程池实例,该实例具有 5 个核心线程,10 个最大线程,闲置线程存活 60 秒,使用 ArrayBlockingQueue 作为任务队列。 总结 在 Spring ...

    java多线程的讲解和实战

    本资料详细讲解了Java多线程的原理,并提供了丰富的实战代码,非常适合Java初学者以及希望深入理解多线程的开发者。 1. **线程的基本概念**:线程是程序执行的最小单位,一个进程中可以有多个线程同时运行。Java...

    ActiveMQ与Spring线程池整合实例

    对于实例的讲解,在竹子的论坛有我对这个实例的帖子(http://www.java2000.net/viewthread.jsp?tid=1167) lib中包含: apache-activemq-4.1.1.jar backport-util-concurrent-2.1.jar commons-lang-2.0.jar commons...

    java多线程编程实例_Source

    在本实例源码中,包含17个章节和上百个实例,旨在深入讲解Java多线程的核心概念和实际应用。 一、线程基础知识 在Java中,线程是程序的执行流,每个线程都有自己的程序计数器、虚拟机栈、本地方法栈和一部分堆内存...

    Java多线程编程实例

    本书“Java多线程编程实例”深入浅出地讲解了如何在Java环境中实现多线程操作,尽管出版时间较早,但其内容的经典性和实用性使其在现代开发中仍具有极高的参考价值。 首先,我们要理解Java中的线程是如何创建的。...

    java中级实例应用教程

    多线程是Java的一大特色,书中的实例可能涵盖并发编程的基础,如创建线程、同步机制(synchronized关键字、wait()、notify()等)、线程池以及并发工具类的使用,帮助读者构建高效的多任务应用程序。 网络编程部分...

    java+socket 及多线程线程池应用(IBM教程)

    而多线程和线程池在Java编程中则是提高系统效率、优化资源管理的关键技术。IBM作为全球知名的科技公司,其提供的教程往往具有权威性和深度。在这个“java+socket 及多线程线程池应用”的教程中,我们可以期待学习到...

    java高清关于线程的讲解实例.rar

    Java线程是并发编程中的核心概念,它允许程序同时执行多个任务,极大地提高了程序的效率和响应速度...通过学习和实践提供的"java关于线程的讲解实例6666.pdf",可以深入理解这些概念,并提升在多线程环境下的编程能力。

    Java 线程池ExecutorService详解及实例代码

    Java线程池ExecutorService是Java并发编程中非常重要的一个组件,它通过管理和复用线程资源,有效地控制并发任务的执行,从而提高系统的性能和稳定性。本文将详细讲解ExecutorService的原理、使用场景以及如何通过...

    徐明浩java编程基础应用与实例代码

    此外,压缩包中的"Source"文件夹很可能包含了上述所有实例的源代码,读者可以直接下载、编译和运行,以更直观地理解Java编程。通过阅读和调试这些代码,可以加深对Java编程的理解,同时提升解决问题的能力。 总的来...

    java项目开发实例自学手册(源码)

    书中的实例可能会涉及并发编程的概念,如同步、互斥和线程池。 7. **网络编程**:Java的Socket编程允许开发网络应用程序,读者可以学习如何创建客户端和服务器程序进行通信。 8. **JDBC数据库访问**:Java ...

    JAVA 实例

    多线程是Java的一个重要特性,这部分讲解了如何创建和管理线程,包括同步机制(synchronized关键字、wait()和notify()方法)以及线程池的使用。 7. **chap08 - 类设计和设计模式** 这一章讨论了面向对象设计原则...

    java多线程Runnable实例

    本实例将深入讲解如何使用`Runnable`接口来实现多线程,并通过具体的`TestRunnable.java`源代码进行演示。 一、`Runnable`接口简介 在Java中,`Runnable`接口位于`java.lang.Runnable`包下,它定义了一个单一的方法...

    JAVA 多线程讲解及实例

    Java多线程是Java编程中的核心概念,它允许程序同时执行多个任务,极大地提升了软件的效率和性能。在现代计算环境中,多线程已经成为必备的技能,尤其在服务器端编程、高并发应用以及实时系统中,多线程的运用更是至...

    java JDK 实例开发宝典

    Java JDK实例开发宝典是一本专注于Java开发实践的书籍,主要涵盖了JDK中的核心特性和常见工具的使用方法。在本书中,读者可以找到一系列精心设计的实例,旨在帮助开发者深入理解和熟练应用Java语言。以下是对书中...

    socket 线程池实现(已经在项目中应用)

    在`SocketServer`类中,你需要创建一个`ExecutorService`实例,这是Java内置的线程池接口。你可以使用`Executors`工具类的静态方法来创建线程池,例如`newFixedThreadPool(int nThreads)`,它会创建一个固定大小的...

    java 经典实例 很赞的一部书

    5. **输入/输出流**:Java的I/O流系统是处理文件、网络通信的关键,书中的实例涵盖了文件操作、对象序列化、网络套接字通信等方面,帮助读者掌握数据的读取和写入。 6. **多线程**:Java提供了强大的多线程支持,书...

    多线程教程+实例讲解

    六、多线程实例讲解 1. 文件读写:多线程可以提高文件读写的并发性,比如在处理大量文件时,每个线程处理一个文件。 2. 数据库操作:数据库连接通常昂贵且有限,多线程可以有效利用连接池,提高查询和更新速度。 3. ...

    精通Java--JDK、数据库系统开发、Web开发基础与实例

    在并发编程方面,书中会介绍Java中的线程机制,包括线程的创建、同步、通信以及线程池的使用。理解并熟练掌握线程操作对于编写高效的多任务程序至关重要。 数据集合部分会涵盖Java集合框架,包括List、Set、Queue、...

Global site tag (gtag.js) - Google Analytics