`
15286802013
  • 浏览: 1149 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Spring-data-redis: 分布式队列

阅读更多
Redis中list数据结构,具有“双端队列”的特性,同时redis具有持久数据的能力,因此redis实现分布式队列是非常安全可靠的。它类似于JMS中的“Queue”,只不过功能和可靠性(事务性)并没有JMS严格。Redis本身的高性能和"便捷的"分布式设计(replicas,sharding),可以为实现"分布式队列"提供了良好的基础.
    Redis中的队列阻塞时,整个connection都无法继续进行其他操作,因此在基于连接池设计是需要注意。
    我们通过spring-data-redis,来实现“同步队列”,设计风格类似与JMS。不过本实例中,并没有提供关于队列消费之后的消息确认机制,如果你感兴趣可以自己尝试实现它。
    1) Redis中的"队列"为双端队列,基于list数据结构实现,并提供了"队列阻塞"功能.
    2) 如果你期望使用redis做"分布式队列"server,且数据存取较为密集时,务必配置(redis.conf)中关于list数据结构的限制:
Java代码  收藏代码
//当list中数据个数达到阀值是,将会被重构为linkedlist 
//如果队列的存/取速度较为接近,此值可以稍大 
list-max-ziplist-entries 5120 
list-max-ziplist-value 1024 
    3) Redis已经提供了"队列"的持久化能力,无需额外的技术支持
    4) Redis并没有提供JMS语义中"queue"消息的消费确认的功能,即当队列中的消息被redis-client接收之后,并不会执行"确认消息已到达"的操作;如果你的分布式队列,需要严格的消息确认,需要额外的技术支持.
    5) Redis并不能像JMS那样提供高度中心化的"队列"服务集群,它更适合"快速/小巧/及时消费"的情景.
    6) 本例中,对于消息的接收,是在一个后台线程中进行(参见下文RedisQueue),其实我们可以使用线程池的方式来做,以提高性能. 不过此方案,需要基于2个前提:
        A) 如果单个queue中的消息较多,且每条消息的处理时间较长(即消费速度比接收的速度慢)
        B) 如果此线程池可以被多个queue公用线程资源 ,如果一个queue就创建一个线程池,实在是有些浪费且存在不安全问题.
        C) 需要确认,多线程环境中对queue的操作,有可能在客户端层面打乱了队列的顺序,而造成异常.比如线程1从queue中获得data1,线程2从queue中获得data2,有可能因为线程调度的问题,导致data2被优先执行.

一.配置文件:
Java代码  收藏代码
<beans xmlns="http://www.springframework.org/schema/beans"  
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" default-autowire="byName"> 
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> 
        <property name="maxActive" value="32"></property> 
        <property name="maxIdle" value="6"></property> 
        <property name="maxWait" value="15000"></property> 
        <property name="minEvictableIdleTimeMillis" value="300000"></property> 
        <property name="numTestsPerEvictionRun" value="3"></property> 
        <property name="timeBetweenEvictionRunsMillis" value="60000"></property> 
        <property name="whenExhaustedAction" value="1"></property> 
    </bean> 
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy"> 
        <property name="poolConfig" ref="jedisPoolConfig"></property> 
        <property name="hostName" value="127.0.0.1"></property> 
        <property name="port" value="6379"></property> 
        <property name="password" value="0123456"></property> 
        <property name="timeout" value="15000"></property> 
        <property name="usePool" value="true"></property> 
    </bean> 
    <bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> 
        <property name="connectionFactory" ref="jedisConnectionFactory"></property> 
        <property name="defaultSerializer"> 
            <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> 
        </property> 
    </bean> 
    <bean id="jedisQueueListener" class="com.sample.redis.sdr.QueueListener"/> 
    <bean id="jedisQueue" class="com.sample.redis.sdr.RedisQueue" destroy-method="destroy"> 
        <property name="redisTemplate" ref="jedisTemplate"></property> 
        <property name="key" value="user:queue"></property> 
        <property name="listener" ref="jedisQueueListener"></property> 
    </bean> 
</beans> 
二.程序实例:
1) QueueListener:当队列中有数据时,可以执行类似于JMS的回调操作。
Java代码  收藏代码
public interface RedisQueueListener<T> { 
 
    public void onMessage(T value); 

Java代码  收藏代码
public class QueueListener<String> implements RedisQueueListener<String> { 
 
    @Override 
    public void onMessage(String value) { 
        System.out.println(value); 
         
    } 
 

2) RedisQueue:队列操作,内部封装redisTemplate实例;如果配置了“listener”,那么queue将采用“消息回调”的方式执行,listenerThread是一个后台线程,用来自动处理“队列信息”。如果不配置“listener”,那么你可以将redisQueue注入到其他spring bean中,手动去“take”数据即可。
Java代码  收藏代码
public class RedisQueue<T> implements InitializingBean,DisposableBean{ 
    private RedisTemplate redisTemplate; 
    private String key; 
    private int cap = Short.MAX_VALUE;//最大阻塞的容量,超过容量将会导致清空旧数据 
    private byte[] rawKey; 
    private RedisConnectionFactory factory; 
    private RedisConnection connection;//for blocking 
    private BoundListOperations<String, T> listOperations;//noblocking 
     
    private Lock lock = new ReentrantLock();//基于底层IO阻塞考虑 
     
    private RedisQueueListener listener;//异步回调 
    private Thread listenerThread; 
     
    private boolean isClosed; 
     
    public void setRedisTemplate(RedisTemplate redisTemplate) { 
        this.redisTemplate = redisTemplate; 
    } 
 
    public void setListener(RedisQueueListener listener) { 
        this.listener = listener; 
    } 
 
    public void setKey(String key) { 
        this.key = key; 
    } 
     
 
    @Override 
    public void afterPropertiesSet() throws Exception { 
        factory = redisTemplate.getConnectionFactory(); 
        connection = RedisConnectionUtils.getConnection(factory); 
        rawKey = redisTemplate.getKeySerializer().serialize(key); 
        listOperations = redisTemplate.boundListOps(key); 
        if(listener != null){ 
            listenerThread = new ListenerThread(); 
            listenerThread.setDaemon(true); 
            listenerThread.start(); 
        } 
    } 
     
     
    /**
     * blocking
     * remove and get last item from queue:BRPOP
     * @return
     */ 
    public T takeFromTail(int timeout) throws InterruptedException{  
        lock.lockInterruptibly(); 
        try{ 
            List<byte[]> results = connection.bRPop(timeout, rawKey); 
            if(CollectionUtils.isEmpty(results)){ 
                return null; 
            } 
            return (T)redisTemplate.getValueSerializer().deserialize(results.get(1)); 
        }finally{ 
            lock.unlock(); 
        } 
    } 
     
    public T takeFromTail() throws InterruptedException{ 
        return takeFromTail(0); 
    } 
     
    /**
     * 从队列的头,插入
     */ 
    public void pushFromHead(T value){ 
        listOperations.leftPush(value); 
    } 
     
    public void pushFromTail(T value){ 
        listOperations.rightPush(value); 
    } 
     
    /**
     * noblocking
     * @return null if no item in queue
     */ 
    public T removeFromHead(){ 
        return listOperations.leftPop(); 
    } 
     
    public T removeFromTail(){ 
        return listOperations.rightPop(); 
    } 
     
    /**
     * blocking
     * remove and get first item from queue:BLPOP
     * @return
     */ 
    public T takeFromHead(int timeout) throws InterruptedException{ 
        lock.lockInterruptibly(); 
        try{ 
            List<byte[]> results = connection.bLPop(timeout, rawKey); 
            if(CollectionUtils.isEmpty(results)){ 
                return null; 
            } 
            return (T)redisTemplate.getValueSerializer().deserialize(results.get(1)); 
        }finally{ 
            lock.unlock(); 
        } 
    } 
     
    public T takeFromHead() throws InterruptedException{ 
        return takeFromHead(0); 
    } 
 
    @Override 
    public void destroy() throws Exception { 
        if(isClosed){ 
            return; 
        } 
        shutdown(); 
        RedisConnectionUtils.releaseConnection(connection, factory); 
    } 
     
    private void shutdown(){ 
        try{ 
            listenerThread.interrupt(); 
        }catch(Exception e){ 
            // 
        } 
    } 
     
    class ListenerThread extends Thread { 
         
        @Override 
        public void run(){ 
            try{ 
                while(true){ 
                    T value = takeFromHead();//cast exception? you should check. 
                    //逐个执行 
                    if(value != null){ 
                        try{ 
                            listener.onMessage(value); 
                        }catch(Exception e){ 
                            // 
                        } 
                    } 
                } 
            }catch(InterruptedException e){ 
                // 
            } 
        } 
    } 
     

    3) 使用与测试:
Java代码  收藏代码
public static void main(String[] args) throws Exception{ 
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-redis-beans.xml"); 
    RedisQueue<String> redisQueue = (RedisQueue)context.getBean("jedisQueue"); 
    redisQueue.pushFromHead("test:app"); 
    Thread.sleep(15000); 
    redisQueue.pushFromHead("test:app"); 
    Thread.sleep(15000); 
    redisQueue.destroy(); 

    在程序运行期间,你可以通过redis-cli(客户端窗口)执行“lpush”,你会发现程序的控制台仍然能够正常打印队列信息。
分享到:
评论

相关推荐

    spring-data-redis-1.0.2

    《Spring Data Redis 1.0.2:解锁高效Redis数据存取的秘密》 Spring Data Redis是Spring框架的一个重要模块,它为开发人员提供了一种优雅的方式来利用Redis这一高性能的键值存储系统。在Spring Data Redis 1.0.2...

    SpringData与Redis集成

    而Redis是一款开源、高性能的键值对数据存储系统,常用于实现高速缓存、分布式锁、消息队列等功能。将SpringData与Redis整合,可以利用Spring的便利性和Redis的高效性,构建出高效能的应用。 首先,让我们详细了解...

    spring-boot2集成redis

    Redis 是一个开源的、基于键值对的数据存储系统,通常用于数据缓存、消息队列和分布式锁等功能。它以内存为数据存储介质,支持丰富的数据结构如字符串、哈希、列表、集合和有序集合。 2. **Spring Boot 2 对 Redis...

    redis-spring-boot-starter.rar

    而Redis作为一种高性能的键值数据存储系统,常被用作缓存、消息队列等多种场景,与Spring Boot结合使用能极大提升应用性能。本文将深入探讨`redis-spring-boot-starter`这一组件,帮助开发者更好地理解和运用它。 ...

    spring-data-redis-1.0.4.RELEASE-dist.zip

    - **Transaction支持**:尽管Redis本身不支持分布式事务,Spring Data Redis提供了一种模拟事务的机制,以满足特定场景下的需求。 4. **1.0.4.RELEASE版的变化** 这个版本可能包含了bug修复、性能优化和对Redis新...

    spring-boot-redis.zip

    - Spring Boot也支持使用Redis Sentinel进行高可用性配置,以及使用Redis Cluster进行分布式存储。 9. **实际编码示例**: - 压缩包中的`src`目录可能包含了一些示例代码,如配置类、Service类、Controller类等,...

    spring-redis-boot-starter-1.0.0_java_

    《Spring Redis Boot Starter 1.0.0:Java中的Redis集成与消息队列应用》 在Java开发领域,Spring框架的广泛使用使得开发者能够轻松构建高效、可扩展的应用程序。而Spring Boot则进一步简化了Spring的配置过程,...

    spring-data-redis-example

    3. 分布式锁:Spring Data Redis可以实现基于Redis的分布式锁,通过`RedisTemplate.opsForValue().setIfAbsent()`等方法来实现。 4. 事务支持:虽然Redis本身不支持复杂的事务,但Spring Data Redis提供了一种模拟...

    springboot 集成Redission 简单完美解决分布式锁

    Redisson是一个基于Redis的Java客户端,它提供了丰富的数据结构(如锁、队列、信号量等)以及分布式服务(如原子计数器、ID生成器等)。通过使用Redis作为中间件,Redisson可以在分布式环境中提供高性能的同步和异步...

    redisson-spring-boot-starter

    3. **Spring Data Redis 集成**:除了 Redisson 自带的 API,`redisson-spring-boot-starter` 还支持 Spring Data Redis 框架,可以使用 Spring Data 提供的 Repository 抽象来操作 Redis 数据,使得代码更简洁,...

    spring整合redis

    在IT行业中,Spring框架是Java领域最常用的轻量级应用框架之一,而Redis则是一款高性能的内存数据存储系统,常用于缓存、消息队列等场景。本文将深入探讨如何将Spring与Redis进行整合,为新手提供一个简单易懂的实践...

    pring-data-redisjar和源文件

    《Spring Data Redis 深入解析与实战指南》 在当今大数据时代,Redis作为一个高性能的键值数据库,因其高效的数据处理能力以及丰富的数据结构而备受青睐。Spring Data Redis是Spring框架的一部分,它为Redis提供了...

    tomcat-redis依赖jar包

    - Spring Data Redis:如果使用Spring框架,可以借助Spring Data Redis模块简化Redis的集成,提供更高级别的抽象和操作。 总结来说,"tomcat-redis依赖jar包"涉及到的是将Tomcat应用服务器与Redis缓存系统整合的...

    springboot整合redis.zip

    - 消息队列:Redis的发布订阅功能可以作为简单的消息队列使用,Spring Data Redis提供了`JedisConnectionFactory`和`RedisMessageListenerContainer`等类支持。 7. **注解驱动的数据访问** - 使用Spring Data ...

    redis技术入门及实战-SpringbootRedis.zip

    - Spring Data Redis是Spring Framework的一部分,提供了更高级的Redis操作API。 - 可以通过Repository接口定义Redis操作,例如`@Repository`注解的`RedisRepository`,自动实现CRUD操作。 - Spring Data Redis还...

    spring-redis-mysql整合

    在IT行业中,Spring框架是Java领域最常用的轻量级开源框架之一,而Redis则是一种高性能的内存数据存储系统,常用于缓存、消息队列等场景。MySQL则是世界上最流行的开源关系型数据库。将Spring、Redis和MySQL整合在...

    springcloud部署redis集群

    在SpringCloud框架中,部署Redis集群是实现高可用、数据持久化和分布式缓存的关键步骤。Redis是一款高性能的键值数据库,广泛应用于缓存、消息队列等多种场景。SpringCloud通过集成Spring Data Redis模块,使得在...

    SpringDataRedis.rar

    通过SpringDataRedis,开发者可以快速地在Spring应用中实现对Redis的高效管理,无论是简单的键值操作还是复杂的分布式数据结构操作,都能够得到优雅的解决方案。这个库大大降低了与Redis交互的复杂性,提高了开发...

Global site tag (gtag.js) - Google Analytics