- 浏览: 264317 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (119)
- spring (1)
- hibernate (1)
- struts (0)
- ibatis (0)
- memcache (4)
- mysql (1)
- ant (0)
- junit (0)
- protobuf (1)
- android (1)
- java (15)
- language (1)
- google (1)
- scala (0)
- ruby (0)
- python (0)
- 设计模式 (1)
- think in java (6)
- 服务器 (4)
- javascript (24)
- css (2)
- mongodb (1)
- eclipse (1)
- 并发 (1)
- test (1)
- jquery (3)
- vim (2)
- javaio (1)
- log4j (0)
- jdk (0)
- api (0)
- hadoop (1)
- HashMap (1)
- 数据库 (1)
- webservice (1)
- jvm (0)
- linux (4)
最新评论
-
weilingfeng98:
定制SSLContext
java安全SSL -
weixuanfeng:
楼主有没有用eclipse,Java调用Ant脚本的代码啊。 ...
ant调用步骤
(转)使用java.util.concurrent实现的线程池、消息队列功能
昨天开始研究java.util.concurrent,是出于线程安全的知识懂得不多,对自己写的线程池没有信心,所以就用了包里专家写好的线程池。这个包的功能很强大。有兴趣的朋友可以搜索了解更多的内容。
今天刚写好了一段200行左右的代码,拿出来跟大家分享我的学习经验。初次实践,不足之处,望能得到高手的指点。
功能说明:一个发送消息模块将消息发送到消息队列中,无需等待返回结果,发送模块继续执行其他任务。消息队列中的指令由线程池中的线程来处理。使用一个Queue来存放线程池溢出时的任务。
TestDriver.java是一个驱动测试,sendMsg方法不间断的向ThreadPoolManager发送数据。
01
public class TestDriver
02
{
03
ThreadPoolManager tpm = ThreadPoolManager.newInstance();
04
05
public void sendMsg( String msg )
06
{
07
tpm.addLogMsg( msg + "记录一条日志 " );
08
}
09
10
public static void main( String[] args )
11
{
12
for( int i = 0; i < 100; i++ )
13
{
14
new TestDriver().sendMsg( Integer.toString( i ) );
15
}
16
}
17
}
ThreadPoolManager类:是负责管理线程池的类。同时维护一个Queue和调度进程。
01
public class ThreadPoolManager
02
{
03
private static ThreadPoolManager tpm = new ThreadPoolManager();
04
05
// 线程池维护线程的最少数量
06
private final static int CORE_POOL_SIZE = 4;
07
08
// 线程池维护线程的最大数量
09
private final static int MAX_POOL_SIZE = 10;
10
11
// 线程池维护线程所允许的空闲时间
12
private final static int KEEP_ALIVE_TIME = 0;
13
14
// 线程池所使用的缓冲队列大小
15
private final static int WORK_QUEUE_SIZE = 10;
16
17
// 消息缓冲队列
18
Queue<String> msgQueue = new LinkedList<String>();
19
20
// 访问消息缓存的调度线程
21
final Runnable accessBufferThread = new Runnable()
22
{
23
public void run()
24
{
25
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
26
if( hasMoreAcquire() )
27
{
28
String msg = ( String ) msgQueue.poll();
29
Runnable task = new AccessDBThread( msg );
30
threadPool.execute( task );
31
}
32
}
33
};
34
35
final RejectedExecutionHandler handler = new RejectedExecutionHandler()
36
{
37
public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
38
{
39
System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
40
msgQueue.offer((( AccessDBThread ) r ).getMsg() );
41
}
42
};
43
44
// 管理数据库访问的线程池
45
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
46
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
47
new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler );
48
49
// 调度线程池
50
final ScheduledExecutorService scheduler = Executors
51
.newScheduledThreadPool( 1 );
52
53
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(
54
accessBufferThread, 0, 1, TimeUnit.SECONDS );
55
56
public static ThreadPoolManager newInstance()
57
{
58
return tpm;
59
}
60
61
private ThreadPoolManager(){}
62
63
private boolean hasMoreAcquire()
64
{
65
return !msgQueue.isEmpty();
66
}
67
68
public void addLogMsg( String msg )
69
{
70
Runnable task = new AccessDBThread( msg );
71
threadPool.execute( task );
72
}
73
}
AccessDBThread类:线程池中工作的线程。
01
public class AccessDBThread implements Runnable
02
{
03
private String msg;
04
05
public String getMsg()
06
{
07
return msg;
08
}
09
10
public void setMsg( String msg )
11
{
12
this.msg = msg;
13
}
14
15
public AccessDBThread(){
16
super();
17
}
18
19
public AccessDBThread(String msg){
20
this.msg = msg;
21
}
22
23
public void run()
24
{
25
// 向数据库中添加Msg变量值
26
System.out.println("Added the message: "+msg+" into the Database");
27
}
28
29
}
昨天开始研究java.util.concurrent,是出于线程安全的知识懂得不多,对自己写的线程池没有信心,所以就用了包里专家写好的线程池。这个包的功能很强大。有兴趣的朋友可以搜索了解更多的内容。
今天刚写好了一段200行左右的代码,拿出来跟大家分享我的学习经验。初次实践,不足之处,望能得到高手的指点。
功能说明:一个发送消息模块将消息发送到消息队列中,无需等待返回结果,发送模块继续执行其他任务。消息队列中的指令由线程池中的线程来处理。使用一个Queue来存放线程池溢出时的任务。
TestDriver.java是一个驱动测试,sendMsg方法不间断的向ThreadPoolManager发送数据。
01
public class TestDriver
02
{
03
ThreadPoolManager tpm = ThreadPoolManager.newInstance();
04
05
public void sendMsg( String msg )
06
{
07
tpm.addLogMsg( msg + "记录一条日志 " );
08
}
09
10
public static void main( String[] args )
11
{
12
for( int i = 0; i < 100; i++ )
13
{
14
new TestDriver().sendMsg( Integer.toString( i ) );
15
}
16
}
17
}
ThreadPoolManager类:是负责管理线程池的类。同时维护一个Queue和调度进程。
01
public class ThreadPoolManager
02
{
03
private static ThreadPoolManager tpm = new ThreadPoolManager();
04
05
// 线程池维护线程的最少数量
06
private final static int CORE_POOL_SIZE = 4;
07
08
// 线程池维护线程的最大数量
09
private final static int MAX_POOL_SIZE = 10;
10
11
// 线程池维护线程所允许的空闲时间
12
private final static int KEEP_ALIVE_TIME = 0;
13
14
// 线程池所使用的缓冲队列大小
15
private final static int WORK_QUEUE_SIZE = 10;
16
17
// 消息缓冲队列
18
Queue<String> msgQueue = new LinkedList<String>();
19
20
// 访问消息缓存的调度线程
21
final Runnable accessBufferThread = new Runnable()
22
{
23
public void run()
24
{
25
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
26
if( hasMoreAcquire() )
27
{
28
String msg = ( String ) msgQueue.poll();
29
Runnable task = new AccessDBThread( msg );
30
threadPool.execute( task );
31
}
32
}
33
};
34
35
final RejectedExecutionHandler handler = new RejectedExecutionHandler()
36
{
37
public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
38
{
39
System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
40
msgQueue.offer((( AccessDBThread ) r ).getMsg() );
41
}
42
};
43
44
// 管理数据库访问的线程池
45
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
46
CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
47
new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler );
48
49
// 调度线程池
50
final ScheduledExecutorService scheduler = Executors
51
.newScheduledThreadPool( 1 );
52
53
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(
54
accessBufferThread, 0, 1, TimeUnit.SECONDS );
55
56
public static ThreadPoolManager newInstance()
57
{
58
return tpm;
59
}
60
61
private ThreadPoolManager(){}
62
63
private boolean hasMoreAcquire()
64
{
65
return !msgQueue.isEmpty();
66
}
67
68
public void addLogMsg( String msg )
69
{
70
Runnable task = new AccessDBThread( msg );
71
threadPool.execute( task );
72
}
73
}
AccessDBThread类:线程池中工作的线程。
01
public class AccessDBThread implements Runnable
02
{
03
private String msg;
04
05
public String getMsg()
06
{
07
return msg;
08
}
09
10
public void setMsg( String msg )
11
{
12
this.msg = msg;
13
}
14
15
public AccessDBThread(){
16
super();
17
}
18
19
public AccessDBThread(String msg){
20
this.msg = msg;
21
}
22
23
public void run()
24
{
25
// 向数据库中添加Msg变量值
26
System.out.println("Added the message: "+msg+" into the Database");
27
}
28
29
}
发表评论
-
hadoop安装步骤
2012-04-11 18:07 103291.安装jdk(自带了jre) ... -
javaIo总结
2011-12-11 01:19 865节点流: 1. -
java并发整理
2011-09-14 00:34 1126java并发库中几个同步工具类 CopyOnWriteArra ... -
eclipse快捷键
2011-05-27 12:38 724来一个大众版的eclipse快捷键使用技巧,请看链接http: ... -
URLRewrite配置(转)
2011-05-09 17:18 1382web.xml <!-- URL 伪静态过滤 --&g ... -
简单工厂设计模式
2011-01-05 23:52 903http://www.cnblogs.com/zzj-4600 ... -
代理模式和装饰器模式的区别与联系
2011-01-04 20:04 2468最近上javaeye,看到不少人讨论java设计模式,本人 ... -
【转】线程池(java.util.concurrent.ThreadPoolExecutor)使用简介
2010-12-16 16:03 982在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许 ... -
Format的子类中SimpleDateFormat,NumberFormat,MessageFormat那些是线程安全的
2010-12-15 16:12 2857SimpleDateFormat,NumberFormat,M ... -
servlet线程安全
2010-12-14 20:01 2168JSP/Servlet的多线程原理: ... -
java中String、StringBuilder和StringBuffer,StringHelper 的区别
2010-12-09 18:00 1493String 字符串常量 StringBuffer 字符串变量 ... -
开始学习scala和ruby
2010-10-27 16:15 982开始学习scala和ruby -
缓存策略
2010-09-21 13:26 906http://hi.baidu.com/pepsichan/b ... -
StringTokenizer是一个很好的一个类
2010-09-20 14:58 1096StringTokenizer 是一个很好的类,一直没有怎么用 ...
相关推荐
本篇文章将深入探讨如何使用`java.util.concurrent` 实现线程池队列,以及其中的关键概念和技术。 线程池是一种线程使用模式,通过预先创建并维护一定数量的工作线程来避免频繁创建和销毁线程的开销。在Java中,`...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
"java.util.concurrent包中的线程池和消息队列" java.util.concurrent包中的线程池和消息队列是一种高效的多线程编程技术,主要用于解决处理器单元内多个线程执行的问题。线程池技术可以显著减少处理器单元的闲置...
Java.util.concurrent的引入是为了解决传统并发原语如wait()、notify()、synchronized和volatile的使用难度大、容易出错以及性能问题。 在并发编程中,我们经常会遇到需要管理多个线程执行任务的情况。传统的做法是...
"JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...
标题中的“Java实现的线程池、消息队列功能”是指在Java编程中,如何利用编程技术实现线程池和消息队列这两种重要的并发处理机制。线程池和消息队列是解决多线程环境下资源管理和任务调度的有效手段,它们在高并发、...
本资源包含两个 pdf 文档,一本根据 Jakob Jenkov 最新博客 (http://tutorials.jenkov.com/java-util-concurrent/index.html) 整理的 java_util_concurrent_user_guide_en.pdf,一个中文翻译的 java_util_concurrent...
Java并发工具包(java.util.concurrent)是Java平台上用于高效、安全地处理多线程编程的重要组件。这个包包含了丰富的并发工具类,旨在帮助开发者构建高度并发的程序,提高程序的性能和可伸缩性。本资源是该工具包的...
Java.util.concurrent(JUC)是Java平台中的一个核心包,专门用于处理多线程并发问题。这个包包含了大量的工具类和接口,极大地简化了并发编程的复杂性,提高了程序的性能和可伸缩性。本测试源文件主要是针对JUC并发...
此外,`java.util.concurrent` 包的实现充分利用了 JVM 的并发优化,如 CAS(Compare And Swap)操作,以实现高效无锁的数据结构。 总之,`java.util.concurrent` 提供的工具使得并发编程变得更加容易和高效,是 ...
- **线程池管理**:Atlassian Util Concurrent库提供了一种定制化的线程池实现,允许开发者根据具体需求调整线程数量、任务队列策略等,从而有效地管理和控制并发执行的任务。 - **锁和同步机制**:库中包含了各种...
在Java编程领域,`java.util.concurrent`包是并发编程的核心工具包,提供了高效、线程安全的类和接口,使得开发者能够更容易地处理多线程环境。本篇将深入探讨这个包中一些鲜为人知的知识点,以帮助你提升并发编程的...
在Java中,可以使用`java.util.Deque`接口的实现,例如`java.util.concurrent.LinkedBlockingDeque`,它支持双端插入和删除,可以作为线程池的工作队列。 - 在创建`ThreadPoolExecutor`时,可以通过传递`...
Java的并发库(java.util.concurrent)提供了丰富的线程池实现,包括`ThreadPoolExecutor`,它是基于工作窃取算法的高效线程池。本文主要分析的是基于`concurrent`包的一个特定线程池实现,探讨其实现原理和源码。 ...
1. **线程池**: `java.util.concurrent.ExecutorService` 是线程池的基础接口,它允许我们管理一组可重用的工作线程,以提高效率和资源利用率。`ThreadPoolExecutor` 是最常用的实现,可以自定义核心线程数、最大...
在Android中,我们通常使用`java.util.concurrent`包下的`ExecutorService`接口和其相关的类来创建线程池。 线程池的核心概念包括: 1. 工作线程(Worker Threads):线程池中的线程,负责执行任务。 2. 任务队列...