`
15286802013
  • 浏览: 1146 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Spring-data-redis: 分布式队列

阅读更多
Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础.
    Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。
    我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。不过本实例中,并没有提供关于队列消费之后的消息确认机制,如果你感兴趣可以自己尝试实现它。
    1) Redis中的"队列"为双端队列,基于list数据结构实现,并提供了"队列阻塞"功能.
    2) 如果你期望使用redis做"分布式队列"server,且数据存取较为密集时,务必配置(redis.conf)中关于list数据结构的限制:
Java代码  收藏代码
//当list中数据个数达到阀值是,将会被重构为linkedlist 
//如果队列的存/取速度较为接近,此值可以稍大 
list-max-ziplist-entries 5120 
list-max-ziplist-value 1024 
    3) Redis已经提供了"队列"的持久化能力,无需额外的技术支持
    4) Redis并没有提供JMS语义中"queue"消息的消费确认的功能,即当队列中的消息被redis-client接收之后,并不会执行"确认消息已到达"的操作;如果你的分布式队列,需要严格的消息确认,需要额外的技术支持.
    5) Redis并不能像JMS那样提供高度中心化的"队列"服务集群,它更适合"快速/小巧/及时消费"的情景.
    6) 本例中,对于消息的接收,是在一个后台线程中进行(参见下文RedisQueue),其实我们可以使用线程池的方式来做,以提高性能. 不过此方案,需要基于2个前提:
        A) 如果单个queue中的消息较多,且每条消息的处理时间较长(即消费速度比接收的速度慢)
        B) 如果此线程池可以被多个queue公用线程资源 ,如果一个queue就创建一个线程池,实在是有些浪费且存在不安全问题.
        C) 需要确认,多线程环境中对queue的操作,有可能在客户端层面打乱了队列的顺序,而造成异常.比如线程1从queue中获得data1,线程2从queue中获得data2,有可能因为线程调度的问题,导致data2被优先执行.

一.配置文件:
Java代码  收藏代码
<beans xmlns="http://www.springframework.org/schema/beans"  
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName"> 
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> 
        <property name="maxActive" value="32"></property> 
        <property name="maxIdle" value="6"></property> 
        <property name="maxWait" value="15000"></property> 
        <property name="minEvictableIdleTimeMillis" value="300000"></property> 
        <property name="numTestsPerEvictionRun" value="3"></property> 
        <property name="timeBetweenEvictionRunsMillis" value="60000"></property> 
        <property name="whenExhaustedAction" value="1"></property> 
    </bean> 
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy"> 
        <property name="poolConfig" ref="jedisPoolConfig"></property> 
        <property name="hostName" value="127.0.0.1"></property> 
        <property name="port" value="6379"></property> 
        <property name="password" value="0123456"></property> 
        <property name="timeout" value="15000"></property> 
        <property name="usePool" value="true"></property> 
    </bean> 
    <bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> 
        <property name="connectionFactory" ref="jedisConnectionFactory"></property> 
        <property name="defaultSerializer"> 
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> 
        </property> 
    </bean> 
    <bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/> 
    <bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy"> 
        <property name="redisTemplate" ref="jedisTemplate"></property> 
        <property name="key" value="user:queue"></property> 
        <property name="listener" ref="jedisQueueListener"></property> 
    </bean> 
</beans> 
二.程序实例:
1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。
Java代码  收藏代码
public interface RedisQueueListener<T> { 
 
    public void onMessage(T value); 

Java代码  收藏代码
public class QueueListener<String> implements RedisQueueListener<String> { 
 
    @Override 
    public void onMessage(String value) { 
        System.out.println(value); 
         
    } 
 

2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。
Java代码  收藏代码
public class RedisQueue<T> implements InitializingBean,DisposableBean{ 
    private RedisTemplate redisTemplate; 
    private String key; 
    private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据 
    private byte[] rawKey; 
    private RedisConnectionFactory factory; 
    private RedisConnection connection;//for blocking 
    private BoundListOperations<String, T> listOperations;//noblocking 
     
    private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑 
     
    private RedisQueueListener listener;//异步回调 
    private Thread listenerThread; 
     
    private boolean isClosed; 
     
    public void setRedisTemplate(RedisTemplate redisTemplate) { 
        this.redisTemplate = redisTemplate; 
    } 
 
    public void setListener(RedisQueueListener listener) { 
        this.listener = listener; 
    } 
 
    public void setKey(String key) { 
        this.key = key; 
    } 
     
 
    @Override 
    public void afterPropertiesSet() throws Exception { 
        factory = redisTemplate.getConnectionFactory(); 
        connection = RedisConnectionUtils.getConnection(factory); 
        rawKey = redisTemplate.getKeySerializer().serialize(key); 
        listOperations = redisTemplate.boundListOps(key); 
        if(listener != null){ 
            listenerThread = new ListenerThread(); 
            listenerThread.setDaemon(true); 
            listenerThread.start(); 
        } 
    } 
     
     
    /**
     * blocking
     * remove and get last item from queue:BRPOP
     * @return
     */ 
    public T takeFromTail(int timeout) throws InterruptedException{  
        lock.lockInterruptibly(); 
        try{ 
            List<byte[]> results = connection.bRPop(timeout, rawKey); 
            if(CollectionUtils.isEmpty(results)){ 
                return null; 
            } 
            return (T)redisTemplate.getValueSerializer().deserialize(results.get(1)); 
        }finally{ 
            lock.unlock(); 
        } 
    } 
     
    public T takeFromTail() throws InterruptedException{ 
        return takeFromTail(0); 
    } 
     
    /**
     * 从队列的头,插入
     */ 
    public void pushFromHead(T value){ 
        listOperations.leftPush(value); 
    } 
     
    public void pushFromTail(T value){ 
        listOperations.rightPush(value); 
    } 
     
    /**
     * noblocking
     * @return null if no item in queue
     */ 
    public T removeFromHead(){ 
        return listOperations.leftPop(); 
    } 
     
    public T removeFromTail(){ 
        return listOperations.rightPop(); 
    } 
     
    /**
     * blocking
     * remove and get first item from queue:BLPOP
     * @return
     */ 
    public T takeFromHead(int timeout) throws InterruptedException{ 
        lock.lockInterruptibly(); 
        try{ 
            List<byte[]> results = connection.bLPop(timeout, rawKey); 
            if(CollectionUtils.isEmpty(results)){ 
                return null; 
            } 
            return (T)redisTemplate.getValueSerializer().deserialize(results.get(1)); 
        }finally{ 
            lock.unlock(); 
        } 
    } 
     
    public T takeFromHead() throws InterruptedException{ 
        return takeFromHead(0); 
    } 
 
    @Override 
    public void destroy() throws Exception { 
        if(isClosed){ 
            return; 
        } 
        shutdown(); 
        RedisConnectionUtils.releaseConnection(connection, factory); 
    } 
     
    private void shutdown(){ 
        try{ 
            listenerThread.interrupt(); 
        }catch(Exception e){ 
            // 
        } 
    } 
     
    class ListenerThread extends Thread { 
         
        @Override 
        public void run(){ 
            try{ 
                while(true){ 
                    T value = takeFromHead();//cast exception? you should check. 
                    //逐个执行 
                    if(value != null){ 
                        try{ 
                            listener.onMessage(value); 
                        }catch(Exception e){ 
                            // 
                        } 
                    } 
                } 
            }catch(InterruptedException e){ 
                // 
            } 
        } 
    } 
     

    3) 使用与测试:
Java代码  收藏代码
public static void main(String[] args) throws Exception{ 
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml"); 
    RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue"); 
    redisQueue.pushFromHead("test:app"); 
    Thread.sleep(15000); 
    redisQueue.pushFromHead("test:app"); 
    Thread.sleep(15000); 
    redisQueue.destroy(); 

    在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。
分享到:
评论

相关推荐

    2024年机器人大作业代码

    2024年机器人大作业代码

    学生信息管理系统,idea-mysql小项目,记录一下

    这是mysql文件直接导入就行了,可以查一下相关指令例如:mysql -u root -p mydb_copy < mydb.sql就好了,这里就不多赘述了

    搜索关键字飞入飞出效果.zip

    Android 毕业设计,Android 毕业设计,小Android 程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    基于ssm的团员管理系统源代码(完整前后端+mysql+说明文档+LW).zip

    管理员 管理员信息管理 学院管理 辅导员管理 学生信息管理 公告信息 辅导员 个人资料修改 团员信息管理 优秀团员管理 团费缴纳管理 团员活动管理(主题,内容,参与人数,日期) 团员活的报名 学生 个人资料修改 入团申请管理(提交申请,申请结果查看) 团员活动查看(只能查看,不能修改,活动报名) 团员活动报名 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)

    基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目),个人大四的毕业设计、经导师指导并认可通过的高分设计项目,评审分99分,代码完整确保可以运行,小白也可以亲自搞定,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业。 基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)基于springboot图书管理系统源码+数据库+详细使用说明(高分毕设项目)个人大四的毕业设计、经导师指导并认可通过的高分设计项目,评审分99分,代码完整确保可以运行,小白也可以亲自搞定,主要针对计算机相关专业的正在做毕设的学生和需要。

    Python项目-自动办公-51 Excel_案例_把文件夹整理到Excel中.zip

    Python课程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    汽车检测33-YOLO(v5至v9)数据集合集.rar

    汽车检测33-YOLO(v5至v9)数据集合集.rar多对象-V4 2023-03-12 9:33 PM ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解和搜索非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 对于最先进的计算机视觉培训笔记本,您可以与此数据集一起使用 该数据集包含4278张图像。 多对象以Yolo V5 Pytorch格式注释。 将以下预处理应用于每个图像: *调整大小为640x640(拉伸) 应用以下扩展来创建每个源图像的3个版本: 将以下转换应用于每个图像的边界框: *以下90度旋转之一的同等概率:无,顺时针,逆时针方向

    Python项目-自动办公-44 excel处理实例(二维表转一维表).zip

    Python课程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    三亚市2005-2024年近20年历史气象数据下载

    三亚市2005-2024年近20年的历史气象数据,每3小时更新一次数据,参数包含气温、气压、降水量、云层、能见度、风向、湿度等,几万条数据

    公开整理-全国高校各专业及分方向研究生录取人数大数据(更新至2022年).zip

    详细介绍及样例数据:https://blog.csdn.net/T0620514/article/details/144542157

    javaweb音乐网系统-lw.zip

    项目包含前后台完整源码。 项目都经过严格调试,确保可以运行! 具体项目介绍可查看博主文章或私聊获取 助力学习实践,提升编程技能,快来获取这份宝贵的资源吧!

    Python项目-自动办公-08 用Python设置Word文档里表格的格式.zip

    Python课程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    STM32F103通过WIFI接收配置信息修改BC260-NBIOT模块的目标IP和端口程序代码

    1、嵌入式物联网单片机项目开发实战。例程经过精心编写,简单好用。 2、代码使用KEIL 标准库开发,当前在STM32F103运行,如果是STM32F103其他型号芯片,依然适用,请自行更改KEIL芯片型号以及FLASH容量即可。 3、软件下载时,请注意keil选择项是jlink还是stlink。 4、有偿指导v:wulianjishu666; 5、如果接入其他传感器,请查看发布的其他资料。 6、单片机与模块的接线,在代码当中均有定义,请自行对照。 7、若硬件差异,请根据自身情况调整代码,程序仅供参考学习。 8、代码有注释说明,请耐心阅读。

    瓶罐瓶子罐子检测75-YOLO(v5至v9)、COCO、CreateML、Darknet数据集合集.rar

    瓶罐瓶子罐子检测75-YOLO(v5至v9)、COCO、CreateML、Darknet数据集合集.rar街7级-V2 2023-04-28 11:45 PM ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解和搜索非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 对于最先进的计算机视觉培训笔记本,您可以与此数据集一起使用 该数据集包括8934张图像。 街道以可可格式注释。 将以下预处理应用于每个图像: *像素数据的自动取向(带有Exif-Arientation剥离) *调整大小为640x640(拉伸) 没有应用图像增强技术。

    基于ssm的高速公路收费系统源代码(完整前后端+mysql+说明文档+LW).zip

    管理员 管理员信息管理 负责人管理 员工信息管理 公告信息管理 小型车收费标准设置(元/每公里) 大卡车收费标准设置(元/吨公里) 收费信息统计,统计小车和卡车收费,按月统计 负责人 个人资料修改 公告查看 小车收费统计(某员工某月统计) 大卡车收费统计(某员工某月统计) 员工 个人资料修改 公告查看 小型车收费登记(车牌号,车辆照片,行使公里数,收费金额,收费日期,收费员,按公里数可以自动计算费用 收费金额=收费标准*公里数) 大卡车金额设置(每吨/元)(车牌号,车辆照片,行使公里数,吨,收费金额,收费日期,收费员, 收费金额=收费标准*吨*公里数 ) 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7

    【图像加密解密】基于matlab Logistic映射和线性反馈移位寄存器组合的图像加密解密【含Matlab源码 9866期】复现.zip

    Matlab领域上传的视频均有对应的完整代码,皆可运行,亲测可用,适合小白; 1、代码压缩包内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、仿真咨询 如需其他服务,可私信博主; 4.1 博客或资源的完整代码提供 4.2 期刊或参考文献复现 4.3 Matlab程序定制 4.4 科研合作

    Python项目-实例-08 抖音表白.zip

    Python课程设计,含有代码注释,新手也可看懂。毕业设计、期末大作业、课程设计、高分必看,下载下来,简单部署,就可以使用。 包含:项目源码、数据库脚本、软件工具等,该项目可以作为毕设、课程设计使用,前后端代码都在里面。 该系统功能完善、界面美观、操作简单、功能齐全、管理便捷,具有很高的实际应用价值。

    桌球检测10-YOLO(v5至v9)、Darknet、Paligemma、TFRecord、VOC数据集合集.rar

    桌球检测10-YOLO(v5至v9)、Darknet、Paligemma、TFRecord、VOC数据集合集.rar大理石-V3版本 ============================= *与您的团队在计算机视觉项目上合作 *收集和组织图像 *了解和搜索非结构化图像数据 *注释,创建数据集 *导出,训练和部署计算机视觉模型 *使用主动学习随着时间的推移改善数据集 对于最先进的计算机视觉培训笔记本,您可以与此数据集一起使用 该数据集包括105张图像。 大理石以Yolo V3 Darknet格式注释。 将以下预处理应用于每个图像: 没有应用图像增强技术。

    基于java的华奥汽车销售集团网源码.zip

    项目包含前后台完整源码。 项目都经过严格调试,确保可以运行! 具体项目介绍可查看博主文章或私聊获取 助力学习实践,提升编程技能,快来获取这份宝贵的资源吧!

    喜来登五星酒店酒店数字客房管理系统.docx

    喜来登五星酒店酒店数字客房管理系统.docx

Global site tag (gtag.js) - Google Analytics