引言
在我的项目中有这样一个场景:页面链接是同一个,但是可以有多个子页面,不同的时间要展示不同子页面,类似一个页面排期功能。也许你们觉得要实现这个功能比较简单,实现过程为:获取所有子页面的生效时间,对每个生效时间点创建一个定时器,每个定时器执行内容为使用新的子页面进行渲染。对于单个或者少量页面这样做完全没有问题,但是在我的项目中每天都有上万个这样的页面需要进行排期。如果采用这种方式,势必会创建上万个定时器放到jvm内存,这显然是不科学的。
我们的做法是把 每个子页面排期看成是一个任务放到任务表,任务执行时间即为子页面的开始时间,再通过一个分布式任务调度器,每次获取将来5分钟内即将执行的任务列表。把这些任务放到一个DelayQueue中,每个子页面开始时间到达时,从DelayQueue中取出,执行页面渲染,这时用户浏览到的页面就是最新的内容。
这里通过分布式任务调度器,可以把任务均分到各个服务器上,并且每次获取任务是指取将来5分钟内即将执行的任务列表,这个列表一般不会太多,可以直接放到队列中。当然如果很多也没关系,可以指定获取最大任务条数。通过上述处理可以控制放入DelayQueue的任务数,减少不必要的内存消耗。如下图:假如5分钟内即将执行的任务列表有9个,通过分布式调度分配到每台机器上的任务数即为3个:
关于分布式任务调度,可以使用淘宝的tbschedule等类似的框架支持,也可以自己实现。这里不详细讲解分布式任务调度怎么实现,今天的主角是DelayQueue—延迟队列。
现在我们已经获取到每台机器上的即将执行的任务列表,接下来就是把这些任务放到DelayQueue,通过其take方法获取到期的任务定时执行。
DelayQueue基本特征
DelayQueue延迟队列同时具备:无界队列、阻塞队列、优先队列的特征。分别来看下:
无界队列:通过调用DelayQueue的offer方法(或add方法),把待执行的任务对象放入队列,该方法是非阻塞的。这个队列是无界队列,内存足够的情况下,理论上存放的任务对象数是无限的。
阻塞队列:DelayQueue实现了BlockingQueue接口,是一个阻塞队列。但该队列只是在取对象时阻塞,对应两个方法:1、take()方法,获取并移除队列头的对象,如果时间还未到,就阻塞等待。2、poll(long timeout, TimeUnit unit) 方法,阻塞时间长度为timeout,然后获取并移除队列头的对象,如果对象延迟时间还未到,就返回null。
优先队列:DelayQueue的一个重要的成员是一个优先队列PriorityQueue,PriorityQueue内部是一个二叉小顶堆实现,其特点就是头部元素对应的权值是队列中最小的,也就是通过poll()方法获取到的对象是最优先的。
Delayed接口
DelayQueue延迟队列中存放的对象,必须是实现Delayed接口的类对象。Delayed接口,是Comparable的子类:
public interface Delayed extends Comparable<Delayed>
所有要实现Delayed接口必须重写其getDelay、compareTo方法。看一个实现例子:
public class TaskInfo implements Delayed { //任务id private int id; //业务类型 private int type; //业务数据 private String data; //执行时间 private long excuteTime; public TaskInfo(int id, int type, String data, long excuteTime) { this.id = id; this.type = type; this.data = data; this.excuteTime = TimeUnit.NANOSECONDS.convert(excuteTime, TimeUnit.MILLISECONDS)+System.nanoTime(); } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getType() { return type; } public void setType(int type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } public long getExcuteTime() { return excuteTime; } public void setExcuteTime(long excuteTime) { this.excuteTime = excuteTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime- System.nanoTime() , TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { TaskInfo msg = (TaskInfo)o; return this.excuteTime>msg.excuteTime?1:( this.excuteTime<msg.excuteTime?-1:0); } }
通过DelayQueue的offer方法加入对象是,会根据对象compareTo方法把对象放到优先队列PriorityQueue中的指定位置;通过DelayQueue的take方法获取对象时,会调用对象的getDelay方法,确定延迟获取时间,需要注意的是这里的时间单位为纳秒,示例代码中通过unit.convert(this.excuteTime- System.nanoTime() , TimeUnit.NANOSECONDS)进行转换。
DelayQueue使用示例
public class DelayTest { private static DelayQueue<TaskInfo> queue = new DelayQueue<>();//延迟队列 private static ExecutorService es = Executors.newFixedThreadPool(3);//3个线程的线程池 public static void main(String[] args){ while (true) { try { if (queue.size() <=0){ //获取任务放入队列 getTask(); if(queue.size() <= 0){ System.out.println("没有任务睡眠10秒"); //没有任务睡眠10秒 Thread.sleep(10*1000); } }else{ TaskInfo task = queue.take(); es.submit(()->{ System.out.println("执行任务:" + task.getId() + ":" + task.getData()); }); } }catch (Exception e){ e.printStackTrace(); } } } //模拟从数据库获取 将来10秒中内即将执行的任务 public static void getTask(){ Random r = new Random(); int t = r.nextInt(2); if(t==0){ return; } TaskInfo t1 = new TaskInfo(1,1,"任务1",1000); TaskInfo t2 = new TaskInfo(2,2,"任务2",2000); TaskInfo t3 = new TaskInfo(3,3,"任务3",3000); TaskInfo t4 = new TaskInfo(4,4,"任务4",4000); TaskInfo t5 = new TaskInfo(5,5,"任务5",5000); TaskInfo t6 = new TaskInfo(6,6,"任务6",6000); TaskInfo t7 = new TaskInfo(7,7,"任务7",7000); TaskInfo t8 = new TaskInfo(8,8,"任务8",8000); queue.offer(t1); queue.offer(t2); queue.offer(t3); queue.offer(t4); queue.offer(t5); queue.offer(t6); queue.offer(t7); queue.offer(t8); } }
示例代码讲解:
1、首先创建了一个DelayQueue的延迟队列;并通过Executors.newFixedThreadPool(3)创建了一个3个线程数的线程池。
2、main方法循环体中判断如果队列中没有对象,就模拟从数据库中获取10秒内即将执行的任务,并放入DelayQueue。如果数据库中没有10秒内即将执行的任务,程序睡眠10秒。
3、如果队列中有对象,调用DelayQueue的take()方法,获取到期的任务信息,并把任务信息交给线程池进行处理。
实例中,模拟创建了8个任务,每个任务的延迟执行时间分别为1到8秒。
执行main方法,每隔1秒打印一条信息,打印完整信息如下:
执行任务:1:任务1 执行任务:2:任务2 执行任务:3:任务3 执行任务:4:任务4 执行任务:5:任务5 执行任务:6:任务6 执行任务:7:任务7 执行任务:8:任务8
测试结果符合我们的预期,这个测试示例其实就是文章开头业务场景的简化版实现。
DelayQueue源码解析
首先看下DelayQueue的成员变量:
//为了保证线程安全:对队列中每次存取操作,都需要进行加锁,采用的重入锁 private final transient ReentrantLock lock = new ReentrantLock(); //优先队列,延迟对象最终放到改队列中,保证每次从头部取出的对象,是应该最先被执行的 private final PriorityQueue<E> q = new PriorityQueue<E>(); //leader线程,其等待延迟时间为优先队列中,最优先对象的延迟时间。其他线程无限期等待 private Thread leader = null; //配合重入锁使用,对线程进行等待,唤醒等操作 private final Condition available = lock.newCondition();
三个加入队列方法
add、offer、put三个加入队列方法,其中add和put都是直接调用offer方法,所以调用三个方法中的任意一个都是等效的。首先看下offer方法:
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock();//加锁 try { q.offer(e);//调用PriorityQueue的offer方法,放入队列 if (q.peek() == e) {//判断刚加入的对象,是不是头节点 leader = null; available.signal();//唤醒take()或poll(..)方法中的等待 } return true; } finally { lock.unlock();//释放锁 } }
这个方法最值得关注的地方是,放入队列后,判断刚放入队列的对象是不是PriorityQueue队列的头节点,如果是需要唤醒take()或poll(..)方法中的等待阻塞,重新获取头节点对象的延迟等待时间。
add、put方法都是直接调用offer方法,源码为:
public boolean add(E e) { return offer(e); } public void put(E e) { offer(e); }
四个获取对象方法
poll()、poll(..)、take()、peek()这四个方法都可以实现从队列头获取一个对象,但每个方法实现都不相同。
peek()方法:非阻塞方法:
public E peek() { final ReentrantLock lock = this.lock; lock.lock();//加锁 try { return q.peek(); } finally { lock.unlock();//释放锁 } }
DelayQueue的peek方法,本质上调用的是PriorityQueue的peek方法,只是多了一个加锁操作。该方法会返回头部节点对象,但不会从队列中移除。peek的含义为:瞟一眼。
poll()方法:从队列头部获取并移除一个对象,非阻塞方法,源码实现为:
public E poll() { final ReentrantLock lock = this.lock; lock.lock();//加锁 try { E first = q.peek();//获取队列中的头节点对象(不会移除) if (first == null || first.getDelay(NANOSECONDS) > 0) return null;//调用对象的getDelay方法,如果延迟时间还未到,直接返回空 else return q.poll();//如果延迟时间已经到达,直接调用PriorityQueue队列的取出并移除的poll方法 } finally { lock.unlock(); } }
poll()方法 首先调用peek方法获取到头节点对象,通过调用对象的getDelay方法判断延迟时间是否到达,如果没有到达返回null,否则调用PriorityQueue的poll方法 取出并移除头节点对象 并返回。
take()方法:DelayQueue的核心方法,常用于任务延迟执行,是阻塞方法。源码为:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加中断锁 try { for (;;) { E first = q.peek();//获取头节点 if (first == null) available.await();//头结点为空,释放锁无限期等待,等待offer方法放入对象,再次获得锁 else { long delay = first.getDelay(NANOSECONDS);//获取头节点对象延迟时间 if (delay <= 0) return q.poll();//延迟时间已过,直接从队列中移除并取出返回 first = null; // don't retain ref while waiting if (leader != null) available.await();//如果不是leader线程,无限期等待 else { Thread thisThread = Thread.currentThread(); leader = thisThread;//设置当前线程为leader线程 try { available.awaitNanos(delay);//释放锁,等待头结点延迟时间到来,再获得锁。 } finally { if (leader == thisThread) leader = null;//释放leader线程引用 } } } } } finally { if (leader == null && q.peek() != null) available.signal();//唤醒某一个线程,获得锁,设置leader线程 lock.unlock();//释放锁 } }
take方法主要实现逻辑为(for循环体):
1、获取头节点对象,如果为空,线程释放锁,并进入无限期等待。等待offer方式,放入对象后,通过signal()方法唤醒。
2、如果头节点对象不为空,获取该对象的延迟时间,如果小于0,直接从队列中取出并移除该对象,返回。
3、如果头节点对象延迟时间大于0,判断是否“leader线程”是否已经存在,如果存在说明当前线程为“追随者线程”,进入无限期等待(等待leader线程take方法完成后,唤醒)。
4、如果“leader线程”不存在,把当前线程设置为“leader线程”,释放锁并等待头节点对象的延迟时间后,重新获得锁,下次循环获取头节点对象返回。
5、finally代码块,每次leader线程执行完成take方法后,需要唤醒其他线程获得锁成为新的leader线程。
take方法实现了一个“领导者-追随者模式”的线程处理方式,只有leader线程会等待指定时间后获得锁,其他线程都会进入无限期等待。这也是为什么在DelayQueue中都是使用signal唤醒,而不使用signalAll的原因(只需要一个线程成为leader线程)。
这个图,展示有3个线程调用DelayQueue的take方法,只会有一个线程成为”leader线程”,这里假设为线程1。其他两个线程为“追随者”,无限期等待,在”leader线程”执行完成之后调用signal方法随机唤醒一个线程成为新的”leader线程”。
poll(..)方法:带延迟参数的poll方法,是阻塞方法,源码为:
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout);//把 指定延迟时间 转换成纳秒 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//加中断锁 try { for (;;) { E first = q.peek(); if (first == null) {//如果头节点为空 if (nanos <= 0) return null;//指定延迟时间 小于0直接返回null else nanos = available.awaitNanos(nanos);//等待 指定延迟时间后,再重新获得锁 } else { long delay = first.getDelay(NANOSECONDS);//获取头节点对象的延迟时间 if (delay <= 0) return q.poll();//如果对象延迟时间已过期,直接取出并移除该对象,返回 if (nanos <= 0) return null;//如果对象延迟时间还未到,但指定延迟时间已到,返回null first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos);//如果“指定延迟时间”小于“对象延迟时间”或者不是leader线程,等待指定时间后 再次被唤醒。 else {//如果“指定延迟时间”大于等于“对象延迟时间”并且 leader线程为空 Thread thisThread = Thread.currentThread(); leader = thisThread;//指定当前线程为leader线程 try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft;//重新计算最新的 “指定延迟时间" } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal();//leader线程执行结束后,唤醒某个“追随者”线程 lock.unlock(); } }
poll(..)方法: 如果指定的延迟时间,小于头结点对象的延迟时间,返回为空,非阻塞。
如果指定的延迟时间,大于头结点对象的延迟时间,会阻塞,阻塞长度为头结点对象的延迟时间。这样说会比较抽象,看一个例子:
public class DelayTest2 { private static DelayQueue<TaskInfo> queue = new DelayQueue<>();//延迟队列 public static void main(String[] args) throws Exception{ getTask(); for(int i=0;i<3;i++){//启动线程数 new Thread(new Runnable() { @Override public void run() { try { TaskInfo task = queue.poll(10000, TimeUnit.MILLISECONDS);//延迟时间 if(task == null){ System.out.println("任务为空"); }else { System.out.println("执行任务:" + task.getId() + ":" + task.getData()); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } //模拟从数据库获取 将来10秒中内即将执行的任务 public static void getTask(){ TaskInfo t1 = new TaskInfo(1,1,"任务1",1000); TaskInfo t2 = new TaskInfo(2,2,"任务2",2000); TaskInfo t3 = new TaskInfo(3,3,"任务3",3000); TaskInfo t4 = new TaskInfo(4,4,"任务4",4000); TaskInfo t5 = new TaskInfo(5,5,"任务5",5000); TaskInfo t6 = new TaskInfo(6,6,"任务6",6000); TaskInfo t7 = new TaskInfo(7,7,"任务7",7000); TaskInfo t8 = new TaskInfo(8,8,"任务8",8000); queue.offer(t1); queue.offer(t2); queue.offer(t3); queue.offer(t4); queue.offer(t5); queue.offer(t6); queue.offer(t7); queue.offer(t8); } }
该实例会启动3个线程同时调用queue.poll(10000, TimeUnit.MILLISECONDS)方法,其中一个线程会被设置为“leader线程”,等待时间为头结点的延迟时间,其他线程的等待时间都为10000ms。当“leader线程”执行完成后,会选择另外某个现在做为“leader线程”等待时间改为当前头结点的延迟时间。
执行这段代码的main方法,会每隔1秒打印一条信息,完整打印信息如下:
执行任务:1:任务1 执行任务:2:任务2 执行任务:3:任务3
如果把指定延迟时间改为500,即:queue.poll(500, TimeUnit.MILLISECONDS),重新执行main()方法,该方法返回为空,这时不会阻塞,并立即打印三条消息:
任务为空 任务为空 任务为空
poll(..)方法的使用场景为:按指定时间段,分批次执行延迟队列中的任务。从源码上看,在指定延迟时间大于头节点对象延迟时间时的实现 跟take()方法很像,只是“追随者线程”的等待时间有区别:poll(..)方法是等待指定延迟时间,take()方法是无限期等待。
DelayQueue的其他方法都比较简单(remove,clear等),这里不再一一列举。
相关推荐
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
│ 高并发编程第一阶段06讲、用Runnable接口将线程的逻辑执行单元从控制中抽取出来.mp4 │ 高并发编程第一阶段07讲、策略模式在Thread和Runnable中的应用分析.mp4 │ 高并发编程第一阶段08讲、构造Thread对象你...
源码刻意学习小组[目录]一,学习周期(2个月)时间内容主要类第一周(2019/12 / 09-2019 / 12.15)简单集合ArrayList,HashMap,LinkedList第二周(2019/12 / 16-2019 / 12.22)原子类不安全,AtomicInteger,...
│ 高并发编程第一阶段06讲、用Runnable接口将线程的逻辑执行单元从控制中抽取出来.mp4 │ 高并发编程第一阶段07讲、策略模式在Thread和Runnable中的应用分析.mp4 │ 高并发编程第一阶段08讲、构造Thread对象你...
铅笔头识别数据集,1692张原始训练图,640*640分辨率,91.1%的正确识别率,标注支持coco json格式
高校网络教学的体系规划与创建.docx
SpringBoot的学生心理咨询评估系统,你看这篇就够了(附源码)
内容概要:本文详细介绍了如何使用遗传算法优化BP神经网络,以提高交通流量预测的准确性。文中首先解释了BP神经网络的基本结构及其局限性,即容易陷入局部最优解的问题。随后,作者展示了遗传算法的工作原理,并将其应用于优化BP神经网络的权重和偏置。通过定义适应度函数、选择、交叉和变异等步骤,实现了对BP神经网络的有效改进。实验结果显示,优化后的BP神经网络在交通流量预测中的精度显著高于传统的BP神经网络,特别是在处理复杂的非线性问题时表现出色。 适用人群:对机器学习、深度学习以及交通流量预测感兴趣的科研人员和技术开发者。 使用场景及目标:适用于需要进行精确交通流量预测的应用场景,如智能交通系统、城市规划等领域。主要目标是通过遗传算法优化BP神经网络,解决其易陷入局部最优的问题,从而提高预测精度和稳定性。 其他说明:文中提供了详细的Python代码实现,帮助读者更好地理解和实践这一优化方法。同时,强调了遗传算法在全局搜索方面的优势,以及其与BP神经网络结合所带来的性能提升。此外,还讨论了一些具体的实施技巧,如适应度函数的设计、交叉和变异操作的选择等。 标签1,标签2,标签3,标签4,标签5
内容概要:本文详细介绍了H5U框架在PLC与触摸屏集成方面的应用,特别是在总线伺服控制和跨平台移植方面。文章首先解析了伺服控制的核心代码,如使能模块和绝对定位指令,强调了标准化控制流程的优势。接着讨论了触摸屏交互,通过直接映射PLC的DB块地址简化了数据处理。然后介绍了总线配置,尤其是EtherCAT总线初始化及其容错设计。此外,文章还探讨了框架的移植性和报警处理设计,展示了其在不同PLC品牌间的易用性和高效的故障恢复能力。 适合人群:从事工业自动化领域的工程师和技术人员,特别是有PLC编程经验和需要进行伺服控制系统开发的人群。 使用场景及目标:①快速搭建和调试基于PLC和触摸屏的自动化控制系统;②提高多轴设备的调试效率;③实现跨平台的无缝移植;④优化报警管理和故障恢复机制。 其他说明:该框架不仅提供了详细的代码示例和注释,还包含了丰富的实战经验和最佳实践,使得新手能够快速上手,而资深工程师可以在此基础上进一步创新。
内容概要:本文档《UE5开发.txt》全面介绍了Unreal Engine 5(UE5)的基本概念、安装配置、项目创建、文件结构及常用功能。UE5是一款强大的游戏引擎,支持实时渲染、蓝图创作、C++编程等功能。文档详细描述了UE5的安装步骤,包括硬件要求和环境配置;项目创建过程,涵盖项目模板选择、质量预设、光线追踪等设置;文件结构解析,重点介绍了Config、Content和.uproject文件的重要性。此外,文档深入讲解了蓝图编辑器的使用,包括变量、数组、集合、字典等数据类型的操作,以及事件、函数、宏和事件分发器的应用。蓝图作为一种可视化脚本工具,使开发者无需编写C++代码即可快速创建逻辑,适用于快速开发和迭代。 适合人群:具备一定编程基础的游戏开发者、设计师和对游戏开发感兴趣的初学者,尤其是希望深入了解UE5引擎及其蓝图系
餐馆点菜系统概要设计说明书.doc
5+1档轿车手动变速箱设计说明书.doc
1万吨自来水厂详细设计说明书.doc
wordpress外贸电商企业产品主题 页面展示图https://i-blink.csdnimg.cn/direct/e45b2e2e8e27423eb79bda5f4c1216d7.png
低效林改造作业设计说明书.doc
西门子200smart编程软件V2.8.2.1
135调速器操纵手柄 设计说明书.doc
内容概要:本文档为蓝桥杯全国软件和信息技术专业人才竞赛提供了全面的指导,涵盖竞赛概述、流程与规则、核心考点与备赛策略、实战技巧与避坑指南以及备赛资源推荐。蓝桥杯竞赛由工信部人才交流中心主办,涉及算法设计、软件开发、嵌入式系统、电子设计等领域。文档详细介绍了参赛流程(报名、省赛、国赛、国际赛),并针对软件类和电子类竞赛分别阐述了高频考点和备赛建议。对于软件类,强调了算法与数据结构的重要性,如排序、动态规划、图论等;对于电子类,则侧重于硬件基础和开发工具的使用。此外,还提供了详细的答题策略、常见陷阱规避方法及工具调试技巧。; 适合人群:高校本专科生、研究生,尤其是对算法设计、软件开发、嵌入式系统等领域感兴趣的计算机科学及相关专业的学生。; 使用场景及目标:①帮助参赛选手熟悉竞赛流程和规则,明确各阶段任务;②提供系统的备赛策略,包括高频考点的学习和专项突破;③指导选手掌握实战技巧,避免常见错误,提高答题效率和准确性。; 阅读建议:此文档不仅提供了理论知识,还包含了大量实战经验和备赛资源推荐,建议读者结合自身情况制定个性化的备赛计划,充分利用提供的资源进行练习和准备。
基于行块抽取正文内容的java版本的改进算法.zip
内容概要:本文详细介绍了基于西门子S7-200 PLC和MCGS组态软件的快递分拣系统的设计与实现方法。首先阐述了硬件配置的关键要点,包括IO分配表的具体设置以及传感器和执行机构的连接方式。接着深入解析了PLC程序中的梯形图逻辑,涵盖主传送带的连锁保护、机械臂动作的自保持逻辑和安全复位机制等核心部分。同时探讨了MCGS组态画面的应用,展示了如何通过脚本实现动态效果和数据统计功能。此外,文中还分享了一些调试经验和常见问题的解决方案,如防止传感器抖动、优化数据传输效率等。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是对PLC编程和组态软件有一定了解的人群。 使用场景及目标:适用于需要构建高效可靠的快递分拣系统的物流企业或相关项目开发者。目标是帮助读者掌握从硬件选型到软件编程的一整套实施流程,确保系统能够稳定运行并达到预期性能指标。 其他说明:文章不仅提供了理论指导,还结合实际案例进行了详细的步骤讲解,有助于读者更好地理解和应用于实践中。