`
zy116494718
  • 浏览: 478659 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

线程池系列三:结合线程池实现Socket

    博客分类:
  • Java
 
阅读更多

Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了 Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

 一、简介

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

  1. 建立监听端口。

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。

  这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销 毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器 模型将如下:

  1. 建立监听端口,创建线程池。

  2. 发现有新连接,使用线程池来执行服务任务。

  3. 服务完毕,释放线程到线程池。

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。

  初始化

  初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态 方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个 java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方 法来建立线程池。

ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

  使用ServerSocket对象来初始化监听端口。

private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);

  服务新连接

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。

while(true){
 Socket socket = serverListenSocket.accept();
 pool.execute(new ServiceThread(socket));
}

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。

  服务任务

  服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此 ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个 线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:

private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
 int ret = 0;
 try{
  lock.lock();
  ret = count;
 }finally{
  lock.unlock();
 }
 return ret;
}
private void increaseCount(){
 try{
  lock.lock();
  ++count;
 }finally{
  lock.unlock();
 }
}



二、服务器端的完整实现

  服务器端的完整实现代码如下:

 

package demo;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Server
{
    private static int produceTaskSleepTime = 100;
    private static int consumeTaskSleepTime = 1200;
    private static int produceTaskMaxNumber = 100;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAX_POOL_SIZE = 100;
    private static final int KEEPALIVE_TIME = 3;
    private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 19527;
    private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
    // private ThreadPoolExecutor serverThreadPool = null;
    private ExecutorService pool = null;
    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
    private ServerSocket serverListenSocket = null;
    private int times = 5;

    public void start()
    {
        // You can also init thread pool in this way.
        /*
         * serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue, rejectedExecutionHandler);
         */
        pool = Executors.newFixedThreadPool(10);
        try
        {
            serverListenSocket = new ServerSocket(PORT);
            serverListenSocket.setReuseAddress(true);

            System.out.println("I'm listening");
            while (times-- > 0)
            {
                Socket socket = serverListenSocket.accept();
                String welcomeString = "hello";
                // serverThreadPool.execute(new ServiceThread(socket, welcomeString));
                pool.execute(new ServiceThread(socket));
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        cleanup();
    }

    public void cleanup()
    {
        if (null != serverListenSocket)
        {
            try
            {
                serverListenSocket.close();
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // serverThreadPool.shutdown();
        pool.shutdown();
       //调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。interrupt():只有阻塞(sleep,wait,join的线程调用他们的interrupt()才起作用,正在运行的线程不起作用也不抛异常)
    }

    public static void main(String args[])
    {
        Server server = new Server();
        server.start();
    }
}

class ServiceThread implements Runnable, Serializable
{
    private static final long serialVersionUID = 0;
    private Socket connectedSocket = null;
    private String helloString = null;
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();

    ServiceThread(Socket socket)
    {
        connectedSocket = socket;
    }

    public void run()
    {
        increaseCount();
        int curCount = getCount();
        helloString = "hello, id = " + curCount + "\r\n";

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new TimeConsumingTask());

        DataOutputStream dos = null;
        try
        {
            dos = new DataOutputStream(connectedSocket.getOutputStream());
            dos.write(helloString.getBytes());
            try
            {
                dos.write("let's do soemthing other.\r\n".getBytes());
                String result = future.get();
                dos.write(result.getBytes());
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            catch (ExecutionException e)
            {
                e.printStackTrace();
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally
        {
            if (null != connectedSocket)
            {
                try
                {
                    connectedSocket.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (null != dos)
            {
                try
                {
                    dos.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            executor.shutdown();
        }
    }

    private int getCount()
    {
        int ret = 0;
        try
        {
            lock.lock();
            ret = count;
        }
        finally
        {
            lock.unlock();
        }
        return ret;
    }

    private void increaseCount()
    {
        try
        {
            lock.lock();
            ++count;
        }
        finally
        {
            lock.unlock();
        }
    }
}

class TimeConsumingTask implements Callable<String>
{
    public String call() throws Exception
    {
        System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
        return "ok, here's the result: It takes me lots of time to produce this result";
    }

}
 

ps:

这里重点介绍下Future 和 Callable。Callable接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是Runnable不会返回结果,并且无法抛出经过检查的异常。Callable可以和Future配合使用,用Future的get方法可以取得Callable中返回的值。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other.\r\n".getBytes());
String result = future.get();
dos.write(result.getBytes());

使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,get会阻塞直到它完成。

如上所示,在new TimeConsumingTask()后线程不会阻塞,而是在submit任务后继续执行dos.write().... 等操作,
然后再想取得结果的时候用future.get()方法取得。

其中TimeConsumingTask实现了Callable接口

class TimeConsumingTask implements Callable {
   public String call() throws Exception {
 System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
 return "ok, here's the result: It takes me lots of time to produce this result";
}
} 

 


这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似于Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。

Fature实现了3个功能:1.获取任务的结果2.取消任务3.获得任务进行状态(完成还是被取消)。

 

方法介绍:
boolean cancel(boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
boolean isCancelled() 任务是否已经取消,任务正常完成前将其取消,则返回 true
boolean isDone() 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
V  get() throws InterruptedException, ExecutionException  等待任务执行结束,然后获得V类型的结果。
             InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,
             还会抛出CancellationException V     
get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同
上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException

分享到:
评论

相关推荐

    socket 线程池实现

    Socket线程池是一种高效管理网络连接的技术,它结合了Socket通信和线程池的设计思想,以提高系统的并发处理能力和资源利用率。在Java等编程语言中,我们常常利用线程池来处理大量的并发Socket连接,避免频繁创建和...

    C++线程池结合IOCP完成端口实现socket高并发服务端程序

    "C++线程池结合IOCP完成端口实现socket高并发服务端程序"是一个利用现代C++特性,结合IO Completion Ports(IOCP)技术以及线程池策略来优化socket服务器性能的项目。IOCP是Windows操作系统中一种高效的I/O模型,...

    线程池+socket

    线程池是一种管理线程的机制,而Socket是实现进程间通信(IPC)的一种方式,尤其在网络环境中。下面将详细阐述这两个概念以及它们的结合使用。 1. **线程池**: - **定义**:线程池是一组预先创建的线程,用于执行...

    JAVA 写的SOCKET线程池

    Java中的Socket线程池是一种高效的网络编程模型,它结合了Socket通信和多线程技术,以提高服务端处理客户端请求的并发性能。对于初学者来说,理解并掌握这个概念至关重要,因为这能帮助他们构建更稳定、可扩展的网络...

    xianchengchi.zip_Socket 线程池_socket池_线程池_线程池socket

    总的来说,Socket线程池是优化网络服务性能的有效手段,它结合了线程池和Socket通信的优点。而在处理多个线程写入同一个套接字的问题时,需要根据具体需求选择合适的同步策略或使用更高级的I/O模型。通过这样的方式...

    C++语言实现的线程池,结合windowsIOCP完成端口,实现socket高并发服务端程序.zip

    综上所述,这个项目展示了如何利用C++的socket编程技术,结合线程池和Windows IOCP机制,设计并实现一个能够处理大量并发连接的TCP服务器。这是一项对系统设计和性能优化有着深度要求的任务,对于理解和提升网络编程...

    易语言Hp-Socket5.4.1高性能TCP/UDP通信组件

    &gt; 加入线程池公共组件:。1、加入 IHPThreadPool 线程池公共组件。2、IHPThreadPool 主要方法:。 1) Start:启动线程池。 2) Stop:关闭线程池。 3) Submit:提交任务。 4) AdjustThreadCount:调整线程池大小。&gt; ...

    java服务器端Socket线程池

    Java服务器端的Socket线程池是一种优化服务器性能和资源管理的重要技术。在高并发场景下,服务器需要处理大量客户端的连接请求,...在实际开发中,应结合具体需求选择合适的线程池实现,以实现最优的性能和资源利用率。

    Socket网络编程学习笔记之---使用线程池提高性能

    "SocketTest3"可能是一个包含示例代码的文件,它演示了如何将Socket与线程池结合来处理网络连接。这个文件可能包含以下内容: - Server端:创建ServerSocket监听端口,接收到连接请求后,不直接创建新线程,而是将...

    socket编程 使用select与线程池

    总的来说,`select`和线程池结合使用,能够构建出高并发、低延迟的网络服务。线程池处理客户端请求,降低了系统开销,`select`则帮助服务器有效地管理多个连接,实现高效的I/O操作。在实际开发中,这种模式广泛应用...

    java+socket 及多线程线程池应用(IBM教程)

    8. **最佳实践**:教程可能还会提供一些最佳实践建议,如如何设计健壮的并发系统,如何处理网络异常,以及如何利用线程池实现高效的并发处理。 这个IBM教程是难得的学习资源,它将理论与实践相结合,帮助开发者深入...

    realmsg-4.rar_Socket 线程池_msn transfer_socket_socket thread linux

    在Linux环境下,线程的创建和管理可以利用POSIX线程库(pthread),结合Socket编程,可以创建专门处理Socket连接的线程,每个线程负责一个或多个连接,提高服务的并发能力。 5. **文件传输**: 文件传输是即时通信...

    JAVA服务器端Socket线程池

    本文将详细解析标题为“JAVA服务器端Socket线程池”的知识点,涵盖其基本概念、实现原理、核心类与方法的介绍,并结合示例代码深入探讨其实现细节。 #### 二、Socket编程基础 Socket编程是网络通信的基础,通过...

    Socket通讯解决并发采用线程池

    标题中提到的"Socket通讯解决并发采用线程池ThreadPool.java",可能是一个Java程序,实现了Socket通信并利用`ThreadPoolExecutor`处理并发请求。`ThreadPool.java`可能包含了线程池的配置和管理逻辑,如: 1. 初始...

    完成端口结合线程池类库,源码,实例

    本文将深入探讨完成端口结合线程池的原理和应用,以及如何通过提供的源码和实例来理解和使用这种技术。 首先,理解完成端口的基本概念至关重要。完成端口是一种通知机制,当异步I/O操作完成时,系统会将事件通知放...

    QTCPSocket线程池方法

    总结来说,`QTCPSocket` 和 `QTcpServer` 结合线程池的使用,使得在QT环境中实现高效、可扩展的TCP服务器成为可能。这种设计允许程序同时处理多个客户端连接,而不受单线程模型的限制,提高了系统的并发性能和响应...

    简单实用,线程池+socket收发数据+解析字节格式报文

    本文将深入探讨如何结合线程池和Socket实现数据的收发,并解析字节格式的报文。 首先,让我们理解什么是Socket。Socket是网络编程中的一个概念,它在应用程序与网络协议之间提供了一种接口,允许程序通过TCP/IP协议...

    C# 简单MSSQL线程池+异步SOCKET服务端完整源码

    在本文中,我们将深入探讨基于C#的MSSQL线程池、异步SOCKET服务端编程的关键概念和实现细节。这些技术对于任何希望在.NET环境中构建高效、可扩展的服务器端应用程序的开发者来说,都是至关重要的。 首先,让我们...

Global site tag (gtag.js) - Google Analytics