2006 年 1 月 18 日
Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。
简介
本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:
1. 建立监听端口。
2. 发现有新连接,接受连接,启动线程,执行服务线程。
3. 服务完毕,关闭线程。
这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器模型将如下:
1. 建立监听端口,创建线程池。
2. 发现有新连接,使用线程池来执行服务任务。
3. 服务完毕,释放线程到线程池。
下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。
初始化
初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方法来建立线程池。
ExecutorService pool = Executors.newFixedThreadPool(10);
表示新建了一个线程池,线程池里面有10个线程为任务队列服务。
使用ServerSocket对象来初始化监听端口。
private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);
|
服务新连接
当有新连接建立时,accept返回时,将服务任务提交给线程池执行。
while(true){
Socket socket = serverListenSocket.accept();
pool.execute(new ServiceThread(socket));
}
|
这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。
服务任务
服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:
private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
int ret = 0;
try{
lock.lock();
ret = count;
}finally{
lock.unlock();
}
return ret;
}
private void increaseCount(){
try{
lock.lock();
++count;
}finally{
lock.unlock();
}
}
|
服务线程在开始给客户端打印一个欢迎信息,
increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());
|
然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,则等待到运行完毕。
ExecutorService executor = Executors.newSingleThreadExecutor();
Future <String> future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());
其中TimeConsumingTask实现了Callable接口
class TimeConsumingTask implements Callable <String>{
public String call() throws Exception {
System.out.println
("It's a time-consuming task,
you'd better retrieve your result in the furture");
return "ok, here's the result: It takes me lots of time to produce this result";
}
}
|
这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们提交了一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let's do soemthing other".getBytes());当程序执行到String result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。
服务器端的完整实现
服务器端的完整实现代码如下:
package com.andrew;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Server {
private static int produceTaskSleepTime = 100;
private static int consumeTaskSleepTime = 1200;
private static int produceTaskMaxNumber = 100;
private static final int CORE_POOL_SIZE = 2;
private static final int MAX_POOL_SIZE = 100;
private static final int KEEPALIVE_TIME = 3;
private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final String HOST = "127.0.0.1";
private static final int PORT = 19527;
private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(
QUEUE_CAPACITY);
//private ThreadPoolExecutor serverThreadPool = null;
private ExecutorService pool = null;
private RejectedExecutionHandler rejectedExecutionHandler = new
ThreadPoolExecutor.DiscardOldestPolicy();
private ServerSocket serverListenSocket = null;
private int times = 5;
public void start() {
// You can also init thread pool in this way.
/*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
rejectedExecutionHandler);*/
pool = Executors.newFixedThreadPool(10);
try {
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
System.out.println("I'm listening");
while (times-- > 0) {
Socket socket = serverListenSocket.accept();
String welcomeString = "hello";
//serverThreadPool.execute(new ServiceThread(socket, welcomeString));
pool.execute(new ServiceThread(socket));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
cleanup();
}
public void cleanup() {
if (null != serverListenSocket) {
try {
serverListenSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//serverThreadPool.shutdown();
pool.shutdown();
}
public static void main(String args[]) {
Server server = new Server();
server.start();
}
}
class ServiceThread implements Runnable, Serializable {
private static final long serialVersionUID = 0;
private Socket connectedSocket = null;
private String helloString = null;
private static int count = 0;
private static ReentrantLock lock = new ReentrantLock();
ServiceThread(Socket socket) {
connectedSocket = socket;
}
public void run() {
increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount + "\r\n";
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new TimeConsumingTask());
DataOutputStream dos = null;
try {
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());
try {
dos.write("let's do soemthing other.\r\n".getBytes());
String result = future.get();
dos.write(result.getBytes());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (null != connectedSocket) {
try {
connectedSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != dos) {
try {
dos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
executor.shutdown();
}
}
private int getCount() {
int ret = 0;
try {
lock.lock();
ret = count;
} finally {
lock.unlock();
}
return ret;
}
private void increaseCount() {
try {
lock.lock();
++count;
} finally {
lock.unlock();
}
}
}
class TimeConsumingTask implements Callable<String> {
public String call() throws Exception {
System.out
.println("It's a time-consuming task,
you'd better retrieve your result in the furture");
return "ok, here's the result: It takes me lots of time to produce this result";
}
}
|
|
分享到:
相关推荐
首先,Java 5.0中引入了`java.util.concurrent.Executors`工厂类,它提供了一系列静态工厂方法来创建不同类型的线程池。其中`newFixedThreadPool`方法可以根据需要创建一个固定大小的线程池,这个线程池中的线程数量...
3. **Lock接口**:Java提供`java.util.concurrent.locks.Lock`接口,提供了比`synchronized`更细粒度的锁控制,如可重入锁、公平锁等。例如,`ReentrantLock`。 ```java import java.util.concurrent.locks.Lock; ...
- `java.util.concurrent`包提供了高级并发工具,如`ExecutorService`、`Future`、`Semaphore`等。 4. 输入输出流: - `java.io`包提供了文件操作、网络通信等输入输出流。 - `java.nio`包提供了非阻塞I/O,性能...
"13-Java并发编程学习宝典.zip" 包含了一系列关于Java并发编程的学习资源,旨在帮助开发者掌握多线程编程的核心技术和最佳实践。以下是这些资源所涵盖的关键知识点: 1. **多线程基础** - "03 多线程开发如此简单—...
本篇文章将深入探讨Java并发编程的相关知识点,主要基于提供的两个文件——"Java并发编程实战(中文版).pdf"和"Java Concurrency in Practice.pdf"。 1. **线程与并发** - **线程基础**:Java中的线程是并发执行...
这个压缩包文件"java_high_concurrent_wwj.rar"显然包含了关于Java并发编程的详细内容,很可能是吴伟杰(WWJ)编著或者整理的一系列教程或文章。在这个主题下,我们可以探讨以下几个关键知识点: 1. **线程基础**:...
《和朱晔一起复习Java并发》是一系列专门探讨Java并发的文章,可能涵盖了以下关键知识点: 1. **线程基础**:Java中的`Thread`类,如何创建和启动线程,以及线程的状态转换(新建、就绪、运行、阻塞、死亡)。 2. ...
使用`synchronized`关键字、`volatile`变量、`java.util.concurrent`包中的工具类等可以有效地管理并发。 四、内存泄漏 Java中的内存泄漏并不像C++那样直接导致资源耗尽,但过度持有对象引用会导致垃圾收集器无法...
Java网络爬虫(蜘蛛)是用于自动抓取互联网信息的程序,它可以帮助开发者从网站上收集数据,如新闻、文章、产品信息等。本项目提供了一个简单的Java爬虫源码,便于初学者理解和实践网络爬虫的基本原理和编程技巧。 ...
在本篇文章中,我们将深入探讨如何使用Java语言来创建一个简单的Web服务器。 首先,我们需要理解Web服务器的基本工作原理。Web服务器是负责接收HTTP请求并返回HTTP响应的软件。当用户在浏览器中输入URL并按下回车键...
### Java 5.0 多线程编程实践 在Java 5.0版本中,多线程编程得到了显著增强,特别是在引入了`java.util.concurrent`包之后,为开发者提供了更为丰富的API来处理并发问题。本篇文章将深入探讨如何在Java 5.0中有效地...
这个项目可能使用了Java的网络编程API(如`java.net`包)来发起HTTP请求,使用正则表达式或DOM解析器(如Jsoup)解析HTML,以及使用线程池(如`java.util.concurrent`包)来管理多线程。此外,为了存储抓取到的数据...
2. **条件变量(Condition)**:Java 5引入了`java.util.concurrent.locks.Condition`接口,提供了一种更灵活的等待/通知机制。我们可以创建多个条件变量,每个对应不同的等待状态。 3. **信号量(Semaphore)**:`...
Java并发工具包是Java平台中用于处理多线程并行计算的重要组件,它包含在`java.util.concurrent`包中。这个工具包提供了多种高级并发工具,使得开发者能够编写出高效、安全、易于理解和维护的多线程代码。下面将详细...
在实践中,学习线程不仅包括创建和启动线程,还包括线程同步、线程通信(如`wait()`, `notify()`, `notifyAll()`)、线程安全(使用`synchronized`关键字或`java.util.concurrent`包中的工具类)、中断和守护线程等...
import java.util.concurrent.*; public class ScheduledEmailSender { private final ExecutorService executor; public ScheduledEmailSender() { this.executor = Executors....
这个"java高手的文章合集"很可能包含了一系列深入探讨Java技术、最佳实践以及问题解决策略的文章。以下是一些可能涵盖的知识点: 1. **Java基础知识**:文章可能会介绍Java的基本语法,包括数据类型、变量、控制流...
【源码】标签可能意味着文章会深入到Java并发库的源代码分析,例如研究`java.lang.Thread`和`java.util.concurrent`包下的实现细节。源码分析有助于理解底层的工作机制,帮助开发者更好地控制和优化并发程序。 ...
Java SE 5和Java SE 6在并发方面做出了重要的改进,提供了改进的虚拟机性能、高可伸缩性的并发类和一系列新的并发构建块。《JAVA并发编程实践》的作者们,他们大多来自Java标准化组织JSR166专家组,为Java并发工具的...