`
zy116494718
  • 浏览: 472961 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

线程池系列三:结合线程池实现Socket

    博客分类:
  • Java
 
阅读更多

Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了 Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

 一、简介

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

  1. 建立监听端口。

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。

  这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销 毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器 模型将如下:

  1. 建立监听端口,创建线程池。

  2. 发现有新连接,使用线程池来执行服务任务。

  3. 服务完毕,释放线程到线程池。

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。

  初始化

  初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态 方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个 java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方 法来建立线程池。

ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

  使用ServerSocket对象来初始化监听端口。

private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);

  服务新连接

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。

while(true){
 Socket socket = serverListenSocket.accept();
 pool.execute(new ServiceThread(socket));
}

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。

  服务任务

  服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此 ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个 线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:

private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
 int ret = 0;
 try{
  lock.lock();
  ret = count;
 }finally{
  lock.unlock();
 }
 return ret;
}
private void increaseCount(){
 try{
  lock.lock();
  ++count;
 }finally{
  lock.unlock();
 }
}



二、服务器端的完整实现

  服务器端的完整实现代码如下:

 

package demo;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Server
{
    private static int produceTaskSleepTime = 100;
    private static int consumeTaskSleepTime = 1200;
    private static int produceTaskMaxNumber = 100;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAX_POOL_SIZE = 100;
    private static final int KEEPALIVE_TIME = 3;
    private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 19527;
    private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
    // private ThreadPoolExecutor serverThreadPool = null;
    private ExecutorService pool = null;
    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
    private ServerSocket serverListenSocket = null;
    private int times = 5;

    public void start()
    {
        // You can also init thread pool in this way.
        /*
         * serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue, rejectedExecutionHandler);
         */
        pool = Executors.newFixedThreadPool(10);
        try
        {
            serverListenSocket = new ServerSocket(PORT);
            serverListenSocket.setReuseAddress(true);

            System.out.println("I'm listening");
            while (times-- > 0)
            {
                Socket socket = serverListenSocket.accept();
                String welcomeString = "hello";
                // serverThreadPool.execute(new ServiceThread(socket, welcomeString));
                pool.execute(new ServiceThread(socket));
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        cleanup();
    }

    public void cleanup()
    {
        if (null != serverListenSocket)
        {
            try
            {
                serverListenSocket.close();
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // serverThreadPool.shutdown();
        pool.shutdown();
       //调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。interrupt():只有阻塞(sleep,wait,join的线程调用他们的interrupt()才起作用,正在运行的线程不起作用也不抛异常)
    }

    public static void main(String args[])
    {
        Server server = new Server();
        server.start();
    }
}

class ServiceThread implements Runnable, Serializable
{
    private static final long serialVersionUID = 0;
    private Socket connectedSocket = null;
    private String helloString = null;
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();

    ServiceThread(Socket socket)
    {
        connectedSocket = socket;
    }

    public void run()
    {
        increaseCount();
        int curCount = getCount();
        helloString = "hello, id = " + curCount + "\r\n";

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new TimeConsumingTask());

        DataOutputStream dos = null;
        try
        {
            dos = new DataOutputStream(connectedSocket.getOutputStream());
            dos.write(helloString.getBytes());
            try
            {
                dos.write("let's do soemthing other.\r\n".getBytes());
                String result = future.get();
                dos.write(result.getBytes());
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            catch (ExecutionException e)
            {
                e.printStackTrace();
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally
        {
            if (null != connectedSocket)
            {
                try
                {
                    connectedSocket.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (null != dos)
            {
                try
                {
                    dos.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            executor.shutdown();
        }
    }

    private int getCount()
    {
        int ret = 0;
        try
        {
            lock.lock();
            ret = count;
        }
        finally
        {
            lock.unlock();
        }
        return ret;
    }

    private void increaseCount()
    {
        try
        {
            lock.lock();
            ++count;
        }
        finally
        {
            lock.unlock();
        }
    }
}

class TimeConsumingTask implements Callable<String>
{
    public String call() throws Exception
    {
        System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
        return "ok, here's the result: It takes me lots of time to produce this result";
    }

}
 

ps:

这里重点介绍下Future 和 Callable。Callable接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是Runnable不会返回结果,并且无法抛出经过检查的异常。Callable可以和Future配合使用,用Future的get方法可以取得Callable中返回的值。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other.\r\n".getBytes());
String result = future.get();
dos.write(result.getBytes());

使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,get会阻塞直到它完成。

如上所示,在new TimeConsumingTask()后线程不会阻塞,而是在submit任务后继续执行dos.write().... 等操作,
然后再想取得结果的时候用future.get()方法取得。

其中TimeConsumingTask实现了Callable接口

class TimeConsumingTask implements Callable {
   public String call() throws Exception {
 System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
 return "ok, here's the result: It takes me lots of time to produce this result";
}
} 

 


这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似于Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。

Fature实现了3个功能:1.获取任务的结果2.取消任务3.获得任务进行状态(完成还是被取消)。

 

方法介绍:
boolean cancel(boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
boolean isCancelled() 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isDone() 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
V  get() throws InterruptedException, ExecutionException  等待任务执行结束,然后获得V类型的结果。
             InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,
             还会抛出CancellationException V     
get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同
上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException

分享到:
评论

相关推荐

    socket 线程池实现

    Socket线程池是一种高效管理网络连接的技术,它结合了Socket通信和线程池的设计思想,以提高系统的并发处理能力和资源利用率。在Java等编程语言中,我们常常利用线程池来处理大量的并发Socket连接,避免频繁创建和...

    C++线程池结合IOCP完成端口实现socket高并发服务端程序

    C++线程池结合IOCP完成端口实现socket高并发服务端程序 包含mysql数据库操作、json数据解析

    线程池+socket

    线程池是一种管理线程的机制,而Socket是实现进程间通信(IPC)的一种方式,尤其在网络环境中。下面将详细阐述这两个概念以及它们的结合使用。 1. **线程池**: - **定义**:线程池是一组预先创建的线程,用于执行...

    JAVA 写的SOCKET线程池

    Java中的Socket线程池是一种高效的网络编程模型,它结合了Socket通信和多线程技术,以提高服务端处理客户端请求的并发性能。对于初学者来说,理解并掌握这个概念至关重要,因为这能帮助他们构建更稳定、可扩展的网络...

    java服务器端Socket线程池

    Java服务器端的Socket线程池是一种优化服务器性能和资源管理的重要技术。在高并发场景下,服务器需要处理大量客户端的连接请求,...在实际开发中,应结合具体需求选择合适的线程池实现,以实现最优的性能和资源利用率。

    Socket网络编程学习笔记之---使用线程池提高性能

    "SocketTest3"可能是一个包含示例代码的文件,它演示了如何将Socket与线程池结合来处理网络连接。这个文件可能包含以下内容: - Server端:创建ServerSocket监听端口,接收到连接请求后,不直接创建新线程,而是将...

    C++语言实现的线程池,结合windowsIOCP完成端口,实现socket高并发服务端程序.zip

    C++语言实现的线程池,结合windowsIOCP完成端口,实现socket高并发服务端程序.zip

    java+socket 及多线程线程池应用(IBM教程)

    8. **最佳实践**:教程可能还会提供一些最佳实践建议,如如何设计健壮的并发系统,如何处理网络异常,以及如何利用线程池实现高效的并发处理。 这个IBM教程是难得的学习资源,它将理论与实践相结合,帮助开发者深入...

    易语言Hp-Socket5.4.1高性能TCP/UDP通信组件

    &gt; 加入线程池公共组件:。1、加入 IHPThreadPool 线程池公共组件。2、IHPThreadPool 主要方法:。 1) Start:启动线程池。 2) Stop:关闭线程池。 3) Submit:提交任务。 4) AdjustThreadCount:调整线程池大小。&gt; ...

    完成端口结合线程池类库,源码,实例

    本文将深入探讨完成端口结合线程池的原理和应用,以及如何通过提供的源码和实例来理解和使用这种技术。 首先,理解完成端口的基本概念至关重要。完成端口是一种通知机制,当异步I/O操作完成时,系统会将事件通知放...

    C# 简单MSSQL线程池+异步SOCKET服务端完整源码

    在本文中,我们将深入探讨基于C#的MSSQL线程池、异步SOCKET服务端编程的关键概念和实现细节。这些技术对于任何希望在.NET环境中构建高效、可扩展的服务器端应用程序的开发者来说,都是至关重要的。 首先,让我们...

    Linux下基于epoll 线程池高并发服务器实现研究.pdf

    Linux下基于epoll线程池高并发服务器实现研究 本文研究了Linux操作系统下基于epoll机制和线程池技术的高并发服务器实现。高并发服务器在当前数字化、网络化和信息化的时代中变得越来越重要,因为服务器的并发请求量...

    利用Socket界面实现计算三角形面积

    在实现多客户端并发处理时,服务器通常会创建一个线程池或使用异步IO模型,每当有新的连接请求到来时,都会创建一个新的线程或异步任务来处理,这样可以避免因单个连接处理阻塞而影响其他客户端的连接。 在项目"TCP...

    运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接.doc

    在Java编程中,`ExecutorService`是Java并发包(`java.util.concurrent`)中的核心...对于UDP通信,虽然没有提供具体实现,但原理相似,可以将`DatagramSocket`与`ExecutorService`结合使用来处理并发的UDP数据报文。

    WSAEventSelect 线程池 实现服务器示例

    在本文中,我们将深入探讨如何使用`...在`EventSelectServer`示例中,我们可以学习如何有效地利用Windows Socket API和线程池技术来实现这一目标,这对于任何从事网络编程的开发者来说都是一份宝贵的参考资料。

    socket实现多线程

    在IT行业中,网络编程是不可或缺的一部分,而Socket编程则是实现客户端与服务器通信的基础。本教程将探讨如何在Java中利用Socket实现多线程,以提高应用程序的并发性能。我们将主要关注`SubThread.java`、`Client....

    易语言-易语言Hp-Socket5.4.1高性能TCP/UDP通信组件

    v5.4.1 更新: &gt; 加入线程池公共组件: 1、加入 IHPThreadPool 线程池公共组件 2、IHPThreadPool 主要方法: 1) Start:启动线程池 ...1、HP-Socket v5.4.1 完全兼容 HP-Socket v5.3.2 版本,可以直接替换升级

    C# Socket连接池

    在压缩包文件"NetPool"中,可能包含了实现C# Socket连接池的源代码,你可以通过阅读和学习这些代码,进一步了解如何结合线程池技术来构建自己的Socket连接池。同时,别忘了使用Windows性能计数器来监控你的系统,...

Global site tag (gtag.js) - Google Analytics