前言
在日常java开发过程中使用线程池一般都是通过Executors提供的静态方法创建线程池,但目前还没有提供使用DelayQueue(延迟队列)作为任务队列的线程池创建方法。在笔者另一篇博客中《DelayQueue--阅读源码从jdk开始》,有个场景需要使用DelayQueue实现定时的页面发布功能,在那次实现过程中使用DelayQueue的take方法获取到任务后再放入线程池,由于这里是串行take,如果在同一时刻有多个任务需要被执行,这时势必有有延迟,虽然延迟不多,但不是最佳实现方案。
通过前一篇对ThreadPoolExecutor总结(点这里),我们可以直接使用ThreadPoolExecutor的构造方法构造自定义的线程池,使用DelayQueue作为“任务队列”即可。
使用DelayQueue创建线程池
这个步骤很简单,只要理解了ThreadPoolExecutor构造方法的各个参数即可(对各个参数的详细讲解见上一篇文章):
DelayQueue queue = new DelayQueue<>();//延迟队列 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,10,1000l, TimeUnit.MILLISECONDS,queue);
之后,只需调用ThreadPoolExecutor的execute提交任务即可。
创建延迟任务类
我们知道ThreadPoolExecutor的execute方法,需要一个实现了Runnable接口的对象,那么这个任务类必须是实现Runnable接口;并且最终这个对象要能放到DelayQueue中,这个任务类必须实现Delayed接口。最终这个任务类实现如下:
public class TaskInfo implements Delayed,Runnable { //任务id private int id; //业务类型 private int type; //业务数据 private String data; //执行时间 private long excuteTime; public TaskInfo(int id, int type, String data, long excuteTime) { this.id = id; this.type = type; this.data = data; this.excuteTime = TimeUnit.NANOSECONDS.convert(excuteTime, TimeUnit.MILLISECONDS)+System.nanoTime(); } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getType() { return type; } public void setType(int type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } public long getExcuteTime() { return excuteTime; } public void setExcuteTime(long excuteTime) { this.excuteTime = excuteTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime- System.nanoTime() , TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { TaskInfo msg = (TaskInfo)o; return this.excuteTime>msg.excuteTime?1:( this.excuteTime<msg.excuteTime?-1:0); } @Override public void run() { System.out.println("run task:"+id); } }
初始化核心线程
上面已经创建好任务类了,也许大家会觉得直接new TaskInfo(),并且调用ThreadPoolExecutor的execute方法提交任务就行,如下:
//创建任务 TaskInfo t1 = new TaskInfo(1,1,"任务1",8000); TaskInfo t2 = new TaskInfo(2,2,"任务2",8000); //提交任务 threadPoolExecutor.execute(t1); threadPoolExecutor.execute(t2);
通过前一篇文章的分析,在线程池刚初始化时,由于核心线程数为0,此时执行execute提交任务,任务不会进入延迟队列,而是直接执行,就无法满足业务需求(任务被提前执行了)。正确做法是在线程初始化完成后,先调用prestartAllCoreThreads方法,先创建好核心线程,即:
threadPoolExecutor.prestartAllCoreThreads();
完成示例代码:
public class ThreadPoolExecutorTest { private static ExecutorService es = Executors.newFixedThreadPool(3);//3个线程的线程池 public static void main(String[] args){ DelayQueue queue = new DelayQueue<>();//延迟队列 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,10,1000l, TimeUnit.MILLISECONDS,queue); threadPoolExecutor.prestartAllCoreThreads();//初始化核心线程 TaskInfo t1 = new TaskInfo(1,1,"任务1",8000); TaskInfo t2 = new TaskInfo(2,2,"任务2",8000); TaskInfo t3 = new TaskInfo(3,3,"任务3",9000); TaskInfo t4 = new TaskInfo(4,4,"任务4",5000); TaskInfo t5 = new TaskInfo(5,5,"任务5",5000); TaskInfo t6 = new TaskInfo(6,6,"任务6",6000); TaskInfo t7 = new TaskInfo(7,7,"任务7",7000); TaskInfo t8 = new TaskInfo(8,8,"任务8",10000); threadPoolExecutor.execute(t1); threadPoolExecutor.execute(t2); threadPoolExecutor.execute(t3); threadPoolExecutor.execute(t4); threadPoolExecutor.execute(t5); threadPoolExecutor.execute(t6); threadPoolExecutor.execute(t7); threadPoolExecutor.execute(t8); } }
执行main方法,可以发现任务是按时延迟执行的,而且如果在同一刻如果有多个任务需要执行,这时也可以利用线程池并行执行,进一步降低延迟。
另外大家也可以注释掉threadPoolExecutor.prestartAllCoreThreads();这句,验证下如果不初始化核心线程会有什么后果。
心灵鸡汤
有的程序员觉得整天实现一些简单的功能没有技术含量,如果你觉得某项工作没有技术含量,那只是你自己把它做得没有技术含量,认真的写好自己的每一行代码,不停的去完善,它就会成为有技术含量的工作。想想达芬奇画鸡蛋的故事。
摘自--《天星老师语录》
相关推荐
DelayQueue的使用以及注意事项,这里需要由BlockingQueue的基本知识,一般的Queue的使用方法poll(),take(),drainTo()和offer(),put()这些应该懂。
2. **任务调度**:使用Redis的`ZREVRANGEBYSCORE`命令,定期查询当前时间戳之后但未过期的任务。这些任务可以从`Sorted Set`中移除,并交由工作线程处理。 3. **任务处理**:在工作线程中,从Redis获取任务对象并...
学习视频,可以丰富java知识。能够获得更多的专业技能
5. 为了防止Redis中的订单过期但DelayQueue中仍有未处理的订单,可以使用一个单独的线程或服务定期扫描Redis中的已过期键,确保订单的及时取消。 通过结合DelayQueue和Redis,我们可以在保证高并发性能的同时,实现...
Spring Boot延时任务之DelayQueue的使用详解 DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素。它提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。DelayQueue的元素...
"Java多线程并发开发之DelayQueue使用示例" DelayQueue是Java多线程并发开发中的一种常用的数据结构,它是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象。DelayQueue的主要作用是按照对象的延迟时间...
通过深入理解`DelayQueue`的工作原理和使用方式,我们可以有效地利用它来构建高性能、低延迟的并发应用。在实际项目中,根据需求合理地选择数据结构和并发工具,能显著提升程序的效率和可维护性。
为了使用 DelayQueue,我们需要首先声明一个 Delayed 的对象,例如,我们可以声明一个 Task 对象, Task 对象实现了 Delayed 接口,用于表示一个具有延迟执行的任务。 ``` public class Task<T extends Runnable> ...
基于DelayQueue的简单的定时任务队列.zip Quick Start class Main { public static void main(String[] args) { // 初始化任务队列 JobScheduler scheduler = new JobScheduler("default"); // 向队列中提交任务...
local delayQueue implemented by JDK & two kinds of distributed delayQueue based redis 1. 基本介绍 RedisSynDelayQueue 基于redis,并发情况下会加分布式锁,单线程场景(syn=false)性能较好, 并发场景性能较...
该项目是SpringBoot框架下的延迟消息Starter,提供对DelayQueue、Redisson和RabbitMQ三种延迟消息机制的集成支持。项目包含32个文件,涵盖24个Java源文件、4个XML配置文件、1个Git忽略文件、1个Markdown文件、1个...
重点介绍了ScheduledThreadPoolExecutor的内部工作机制,如何使用DelayQueue来存储等待执行的任务。DelayQueue内部实现了一个基于时间优先级的PriorityQueue,保证任务能按计划时间顺序执行。文档还详细描述了任务...
数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...
- **使用**:`available.acquire();` 和 `available.release();` 分别用于获取和释放许可。如果当前没有可用的许可,调用 `acquire()` 的线程会被阻塞直到有许可被释放。 #### 2. CountDownLatch(计数器倒置锁) ...
- **分析架构与设计**:评估应用是否使用分布式对象(例如EJB)、数据库连接方式、同步或异步调用等。 - **性能术语理解**:了解关键性能指标的含义,比如负载(峰值或平均值)、点击(页面访问或HTTP请求)、响应...
DelayQueue是一个无界的并发队列,它使用Delayed接口的特性来存储和管理元素。这个队列的独特之处在于,元素只有在其延迟时间过去后才会被处理。以下是DelayQueue的一些关键特点: 1. 队列中的元素按照它们的延迟...
延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列...3.每个业务都要维护一个自己的扫表逻辑。 当业务越来越多时,发现扫表部分的逻辑会重复开发,但是非常类似 延时队列能
4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 ...
4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque ...
延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10...