`
hellohank
  • 浏览: 146642 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

记录一下延时队列常见几种方案实现

阅读更多

在实际业务场景中,有许多要用到延时消息或消费的功能,最常见的是:下订单后,半小时或指定时间段内如果没有付款,就取消订单。如果使用定时任务轮询的话,不太合适,一来定时任务有一个时间间隔,同时也会导致单线程消息的速度跟不上。

对于这类,常用的解决方案如下:

  • 定时任务轮询

优点:简单方便,实现快速,如果使用得当,可支持分支式集群环境

缺点:轮询的时间间隔及排除处理的方式,会导致触发不及时,而且在量非常大的场景,表现越差

实现代码:略

  • 阻塞队列:DelayQueue

优点:JDK自带,稳定可靠

缺点:考虑复杂场景时,实现较为复杂,另外,如果消息量较多时,内存成为瓶颈,分布式环境实现也会比较复杂。

核心代码:略

  • 时间轮:HashedWheelTimer

优点:Netty自带功能,稳定可靠

缺点:同上DelayQueue。

核心代码:

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

public class HashedWheelTimerTest {
    static class MyTimerTask implements TimerTask {
        boolean flag;

        public MyTimerTask(boolean flag) {
            this.flag = flag;
        }

        public void run(Timeout timeout) throws Exception {
            System.out.println("要去数据库删除订单了。。。。");
            this.flag = false;
        }
    }

    public static void main(String[] argv) {
        MyTimerTask timerTask = new MyTimerTask(true);
        Timer timer = new HashedWheelTimer();
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
        timer.newTimeout(timerTask, 12, TimeUnit.SECONDS);
        int i = 1;
        while (timerTask.flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(i + "秒过去了");
            i++;
        }
        timer.stop();
    }
}

 

  • MQ延时消费:RocketMQ/RabbitMQ等

优点:使用MQ自带的延时功能,方便可靠。

缺点:要搭建并维护MQ体系。

核心代码:不同MQ的具体代码不一样,略。

  • Redis队列消费

PS:以下redis的相关代码基于redisson,如果使用jedis或lettuce,逻辑和原理相同,只是调用的组件接口不一样。

自实现redis延时队列

    优点:基于redis,稳定性的性能可以达到一个比较好的平稳

    缺点:要考虑特殊情况(如redis消息丢失等),并自己实现相应的消息收发逻辑。

    核心代码:

    

import org.redisson.Redisson;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 使用redis实现的延时队列
 */
public class RedisDelayQueue {
    public static final String EXPIRED_QUEUE_KEY = "queue:delay:list";

    public static void main(String[] args) {
        DelayTaskProducer producer = new DelayTaskProducer();
        long now = System.currentTimeMillis();
        producer.produce("消息1:dataId=123", now + TimeUnit.SECONDS.toMillis(5));
        producer.produce("消息2:dataId=45", now + TimeUnit.SECONDS.toMillis(15));
        producer.produce("消息3:dataId=234", now + TimeUnit.SECONDS.toMillis(75));
        producer.produce("消息4:dataId=534", now + TimeUnit.SECONDS.toMillis(55));

        //创建多个消息实例
//        for(int i=0;i<10;i++){
        new DelayTaskConsumer().start();
//        }
    }

    public static class DelayTaskProducer {
        /***
         * 将要延时处理的消息放到redis中
         * @param msg 具体消息内容
         * @param expiredAt 指定在哪个时间点到期消费
         */
        public void produce(String msg, long expiredAt) {
            RedissonClient redissonClient = createClient(1);
            RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(EXPIRED_QUEUE_KEY);
            set.add(expiredAt, msg);
        }
    }

    public static class DelayTaskConsumer {
        private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

        public void start() {
            scheduledExecutorService.scheduleWithFixedDelay(new DelayTaskHandler(), 1, 200, TimeUnit.MILLISECONDS);
        }
    }

    public static class DelayTaskHandler implements Runnable {
        @Override
        public void run() {
            RedissonClient redissonClient = createClient(1);
            RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(EXPIRED_QUEUE_KEY);
            Double firstScore = set.firstScore();
            if (firstScore == null || firstScore > System.currentTimeMillis()) {
                return;//如果队列为空,则时间还没到,则不执行
            }
            String firstMsg = set.takeFirst();
            System.out.println("开始消费消息:" + firstMsg);
        }
    }

    protected static RedissonClient createClient(int db) {
        Config config = new Config();
        config.setCodec(new StringCodec());
        config.useSingleServer()
                .setAddress("redis://172.18.12.34:6379")
                .setPassword("beta1234")
                .setConnectionPoolSize(500)
                .setIdleConnectionTimeout(10000)
                .setTimeout(3000)
                .setConnectTimeout(30000)
                .setRetryAttempts(3)
                .setRetryInterval(1000)
                .setDnsMonitoringInterval(-1)
                .setPingConnectionInterval(10000)
                .setDatabase(db);
        return Redisson.create(config);
    }
}

 
基于redis的key过期广播

    优点:触发及时可靠,逻辑简单

    缺点:要防止数据丢失后重新加载的情况;开启服务端相应广播事件队列;对redis存在性能消耗等。

    核心代码:使用redis的key过期事件监听,要开启redis服务,具体开启方式见:https://www.iteye.com/blog/hellohank-2524409,消息收发的示例代码如下:

    

import com.alibaba.fastjson.JSON;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.StringCodec;

import java.util.concurrent.TimeUnit;

/**
 * 基于redis的key过期事件广播队列监听。所以,需要开启redis服务的这个功能。具体看:https://www.iteye.com/blog/hellohank-2524409
 */
public class RedisDelayQueueWithExpiredListener extends RedisDelayQueue{
    public static void main(String[] args) {
        int db = 2;
        RedissonClient redissonClient = createClient(db);
        RTopic topic = redissonClient.getTopic("__keyevent@" + db + "__:expired", new StringCodec());
        topic.addListener(String.class, new MessageListener() {
            @Override
            public void onMessage(CharSequence channel, Object msg) {
                System.out.println("onMessage:" + channel + "; Thread: " + Thread.currentThread().toString());
                System.out.println(msg);
            }
        });
        String key = "test_expire_listen";
        Object value = "val";
        RBucket bucket = redissonClient.getBucket(key);
        bucket.set(JSON.toJSONString(value), 5, TimeUnit.SECONDS);
        bucket.expire(6,TimeUnit.SECONDS);
    }
}

 

    

分享到:
评论

相关推荐

    在内购使用中存在的几种丢单的情况

    ### 在内购使用中存在的几种丢单的情况 #### 知识点一:客户端获取到交易回调后-purchasedTransaction **情况描述:** 在iOS应用内购过程中,存在一种常见丢单情况,即当客户端成功获取到交易回调并在执行`-(void)...

    Q Replication

    在高可用性解决方案中,故障转移(Failover)和切换回(Switchback)是两种常见的机制,用于确保主数据库出现故障时能够快速地将服务转移到备用数据库上,并在主数据库恢复后能够平稳地将服务再转移回来。...

    STM32按键扫描代码

    常见的滤波方法有Debouncing(去抖动)算法,例如延时判断法和两次读取比较法。延时判断法是在检测到按键变化后,等待一小段时间(如20ms),再次检查按键状态,如果仍然改变,则认为是真实的按键动作。两次读取比较...

    谈谈对分布式事务的一点理解和解决方案.docx

    本文将深入探讨分布式事务的基本概念、存在的挑战以及几种常见的解决方案。 #### 分布式事务的概念 分布式事务是指跨越多个网络计算节点的事务处理过程。在分布式系统中,为了保证业务的一致性,往往需要在不同的...

    cakephp_queue:CakePHP的简约JobQueue插件

    总结来说,"cakephp_queue"插件是CakePHP框架下实现任务队列的一种有效手段,它的简洁设计和易用性使得开发者能快速地集成并管理异步任务。通过合理利用这个插件,你可以提升应用的效率,改善用户体验,同时降低系统...

    爬取高清美图_爬取高清美图的源码_

    在IT行业中,网络爬虫是一种常见的技术,用于自动化地从互联网上抓取大量数据,而“爬取高清美图”正是这样一个应用场景。本项目通过编写源码实现批量下载高清图片的功能,尤其对于需要会员权限才能访问的超清图片,...

    基于ASP的开良图片爬虫ASP.zip

    4. **URL管理**:为了防止重复爬取和控制爬取范围,可能需要实现URL的去重和队列管理。 5. **错误处理与重试机制**:面对网络问题或服务器限制,图片爬虫需要有良好的错误处理和重试策略。 6. **代理IP与延时设置*...

    μC/OS-II:源码公开的实时嵌入式操作系统

    - **2.22 任务间的通讯**:介绍了几种常用的任务间通信方法。 - **2.23 消息邮箱**:一种简单的通信机制,用于发送单个消息。 - **2.24 消息队列**:可以容纳多个消息的通信机制。 - **2.25 中断**:中断是实时...

    多线程爬取1000个网页_python爬虫_thread_

    5. **下载进度和状态管理**:使用队列或数据库记录已爬取的URL和进度,以便在程序中断后能恢复。 6. **并发控制**:根据网络条件和目标网站的容忍度,合理设置并发数量,避免过于频繁的请求。 7. **网页解析**:...

    linux architecture

    - **页回收算法**:介绍了内核中用于决定何时释放内存页面的几种算法(如LRU、clock等)。 - **交换机制**:探讨了如何将暂时不用的页面移动到辅助存储设备上的交换空间,以缓解内存压力。 #### 十九、审计 - **...

    分布式数据库一致性与容错.pptx

    为了实现这一目标,需要解决几个关键问题: 1. **ACID原则在分布式系统中的挑战**:传统的事务处理遵循ACID(原子性、一致性、隔离性、持久性)原则,但在分布式环境中维护这些属性变得更加困难。主要挑战包括网络...

    UCOS-II 资料 源码解析 移植过程

    ### UCOS-II 操作系统概览与源码解析 #### 第一章:范例与基本配置 ##### 1.1 安装µC/OS-II µC/OS-II是一款实时操作系统(RTOS),适用于嵌入式...µC/OS-II提供了几种延时方法,包括基于时间片和绝对时间的延时。

    JavaSpider:Java蜘蛛机器人

    JavaSpider,正如其名,是一种基于Java编程语言实现的网络爬虫工具,也被称为Java蜘蛛机器人。这个项目可能是一个开源的、用于数据抓取和信息提取的框架,它允许开发者编写自定义爬虫程序,以自动化的方式从互联网上...

Global site tag (gtag.js) - Google Analytics