`
jaesonchen
  • 浏览: 309819 次
  • 来自: ...
社区版块
存档分类
最新评论

Java延时队列DelayQueue的使用

 
阅读更多
摘要: DelayQueue的使用场景以及介绍

问题背景

最近的某个业务中,遇到一个问题,一个用户动作,会产生A和B两个行为,分别通过对应的esb消息总线发出。

我们的业务线对AB两条esb消息队列进行监听(两个进程),做数据的同步更新操作。

 

在正常过程中,A行为比B行为先产生,并且A行为优先级高于B行为,数据最终会根据A行为做更新。

但是在实际应用中,出现了并发问题,数据最终根据B行为做了更新,覆盖了A行为。

 

最开始通过redis缓存进行上锁,在收到A消息时,在redis中添加一个key,处理完毕后删除key 。处理过程中收到B消息,直接返回。

但测试的时候发现并不可用,可能先收到B消息,后收到A消息, 但是先更新A数据,再更新B数据,还是进行了覆盖。

还有一种方法是修改底层代码,通过自定义sql的方法,先比较再update 。

 

问题分析

除此之外,还在考虑是否还有别的办法,问题的产生原因就是A和B的消息队列基本都在同一时间点拿到数据,对程序来说造成了并发操作。

如果我们可以把B的消息队列的都延迟一个时间点,保证两个消息队列不在同一时间点获得数据,基本上就可以解决这个问题。

 

于是就上网开始搜索,查到了延迟队列DelayQueue。

虽然我们不能让公司的消息队列延迟发送,但是我们可以延迟处理。当收到消息时先不处理,放入延迟消息队列中,另外一个线程再从延迟队列中获得数据进行处理。

 

类介绍

public classDelayQueue<EextendsDelayed> extendsAbstractQueue<E>
    implements BlockingQueue<E>

DelayQueue 是 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。

 

放入DelayQueue的对象需要实现Delayed接口。

 

public interfaceDelayedextendsComparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    longgetDelay(TimeUnit unit);
}

 

测试demo


import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/6/21
 */
public classDelayQueueTest{

    publicstaticvoidmain(String[] args){
        DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();

        //生产者
        producer(delayQueue);

        //消费者
        consumer(delayQueue);

        while (true){
            try {
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 每100毫秒创建一个对象,放入延迟队列,延迟时间1毫秒
     * @param delayQueue
     */
    privatestaticvoidproducer(final DelayQueue<DelayedElement> delayQueue){
        new Thread(new Runnable() {
            @Override
            publicvoidrun(){
                while (true){
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    DelayedElement element = new DelayedElement(1000,"test");
                    delayQueue.offer(element);
                }
            }
        }).start();

        /**
         * 每秒打印延迟队列中的对象个数
         */
        new Thread(new Runnable() {
            @Override
            publicvoidrun(){
                while (true){
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("delayQueue size:"+delayQueue.size());
                }
            }
        }).start();
    }

    /**
     * 消费者,从延迟队列中获得数据,进行处理
     * @param delayQueue
     */
    privatestaticvoidconsumer(final DelayQueue<DelayedElement> delayQueue){
        new Thread(new Runnable() {
            @Override
            publicvoidrun(){
                while (true){
                    DelayedElement element = null;
                    try {
                        element =  delayQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(System.currentTimeMillis()+"---"+element);
                }
            }
        }).start();
    }
}


classDelayedElementimplementsDelayed{

    private final long delay; //延迟时间
    private final long expire;  //到期时间
    private final String msg;   //数据
    private final long now; //创建时间

    publicDelayedElement(long delay, String msg){
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间
        now = System.currentTimeMillis();
    }

    /**
     * 需要实现的接口,获得延迟时间   用过期时间-当前时间
     * @param unit
     * @return
     */
    @Override
    publiclonggetDelay(TimeUnit unit){
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 用于延迟队列内部比较排序   当前时间的延迟时间 - 比较对象的延迟时间
     * @param o
     * @return
     */
    @Override
    publicintcompareTo(Delayed o){
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString(){
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delay);
        sb.append(", expire=").append(expire);
        sb.append(", msg='").append(msg).append('\'');
        sb.append(", now=").append(now);
        sb.append('}');
        return sb.toString();
    }
}

 

补充说明

1.参考网上一些的例子,有些   compareTo  方法就是错的, 要么造成队列中数据积压,要么不能起到延迟的效果。所以一定要经过自己的用例测试确保没有问题。

2.楼主的使用场景,需要考虑,如果进程关闭时,要先等本地延迟队列中的数据被处理完后,再结束进程。

分享到:
评论

相关推荐

    一口气说出Java 6种延时队列的实现方法(面试官也得服)

    Java 延时队列的实现方法 延时队列是一种特殊的队列,它可以根据指定的延迟时间来消费队列中的消息。在 Java 中,实现延时队列有多种方式,以下是六种常见的实现方法: 1. DelayQueue 延时队列 DelayQueue 是 ...

    java利用delayedQueue实现本地的延迟队列

    DelayedQueue 的实现是基于 Java 中的阻塞队列的接口 BlockingQueue, DelayQueue 是其的一种实现。 DelayQueue 提供了一个无界的阻塞队列,用于存放实现了 Delayed 接口的对象。 DelayQueue 能够保证队列中的对象...

    DelayQueue延迟队列和Redis缓存实现订单自动取消功能

    在Java编程中,DelayQueue是一种特殊的并发队列,它遵循先进先出(FIFO)原则,但具有一个独特的特性:元素只有在其指定的延迟时间过去之后才能被获取和处理。这个特性使得DelayQueue成为实现定时任务和延迟操作的...

    基于Redis实现的延迟消息队列

    整个延迟队列由4个部分组成: 1. JobPool用来存放所有Job的元信息。 2. DelayBucket是一组以时间为维度的有序队列,用来存放所有需要延迟的Job(这里只存放Job Id)。 3. Timer负责实时扫描各个Bucket,并将delay...

    延时队列我在项目里是怎么实现的?.doc

    标题中的“延时队列”是指一种特殊类型的队列,它的主要特点是消息不会立即被处理,而是会在预定的时间延迟后才被消费。这种技术在很多场景下非常有用,例如在需要定时任务、订单超时处理或者消息缓存等方面。描述中...

    Java延迟队列原理与用法实例详解

    在Java中,延迟队列是通过 DelayQueue 实现的,Delayed 接口定义了延迟队列的元素必须实现的两个方法:compareTo 和 getDelay。 Delayed 接口是一个标记接口,表示元素可以延迟执行。compareTo 方法用于比较元素的...

    互联网延迟队列解决方案设计.pdf

    1. 基于编程语言的解决方案:这种方案通常使用编程语言中提供的定时器、延时队列等组件。例如,Java中的Timer和DelayQueue,可以实现简单的延迟任务。但这种方法一般适用于单机场景,因为它难以应对分布式系统中的...

    高级Java人才培训专家-05-延迟队列精准发布文章

    - **原理:** `DelayQueue` 是一个支持延时获取元素的阻塞队列,内部利用优先队列实现。 - **局限性:** 若程序异常中断,`DelayQueue` 中的任务将丢失。 - **2. 使用RabbitMQ实现延迟任务:** - **方案:** 结合消息...

    基于Netty+SpringBoot+LevelDB实现的高性能、高可靠性的消息队列+源代码+文档说明

    1.延迟消息BUG:延时消息基于jdk自带的delayQueue实现,系统宕机重启后服务端读取leveldb中的消息后将消息重新放回延时队列,会重新设置到期时间。例如:设置一条消息5分钟后推送,中途系统宕机,系统重启后会从当前...

    Java并发编程之阻塞队列详解

    4. **DelayQueue**:这是一个延时阻塞队列,其中的元素只有在其指定的延迟时间到达后才能被获取。DelayQueue也是无界的,只在尝试获取延迟未到的元素时才会阻塞消费者。 阻塞队列提供了一些关键的方法,如: - `put...

    定时任务与线程池:并发编程的关键

    深入分析了Java中的并发编程,特别是关于定时任务和定时线程池的使用。首先,文档详细介绍了ScheduledThreadPoolExecutor类的结构和工作原理。这个类用于处理延时任务或定时任务,通过三种不同的任务提交方式:...

Global site tag (gtag.js) - Google Analytics