项目中一直在使用redis的subscribe功能,偶然会发生订阅断开的问题,一直无法定位,在此之下只能阅读源码定位问题
首先从spring.xml入手观察配置
<bean id="twaListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="twocAlertListener"/> </bean> <bean id="twocAlertListener" class="com.fnic.wifi.server.redis.TwocAlertListener" /> <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"> <property name="connectionFactory" ref="jedisConnFactory"/> <property name="messageListeners"> <map> <entry key-ref="twaListener"> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="c_sta_login" /> </bean> </entry> </map> </property> </bean>
RedisMessageListenerContainer类中有个Map专门放MessageListenerAdapter类型的监听,同时这个类也是整个监听的核心类,总共有1000行
RedisMessageListenerContainer
其实现了InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle几个接口
InitializingBean:主要实现afterPropertiesSet方法,来定义spring设置完properties后进行的处理,在spring init这个bean时候会被调用
DisposableBean:实现destroy方法,在spring销毁bean时会调用
BeanNameAware:实现setBeanName方法来为bean进行取名,在RedisMessageListenerContainer中该name被用于内部线程的线程名
SmartLifecycle:spring的bean生命周期类,spring会调用start,stop等操作来完成RedisMessageListenerContainer类的启动
启动顺序
1.spring先完成对于bean属性的set,其中包含listener map的set操作
2.调用afterPropertiesSet方法
public void afterPropertiesSet() { if (taskExecutor == null) { manageExecutor = true; taskExecutor = createDefaultTaskExecutor(); } if (subscriptionExecutor == null) { subscriptionExecutor = taskExecutor; } initialized = true; }
此方法构造了一个线程池来跑监听线程。
3.spring调用start方法来开启这个bean
public void start() { if (!running) { running = true; // wait for the subscription to start before returning // technically speaking we can only be notified right before the subscription starts synchronized (monitor) { lazyListen(); if (listening) { try { // wait up to 5 seconds for Subscription thread monitor.wait(initWait); } catch (InterruptedException e) { // stop waiting } } } if (logger.isDebugEnabled()) { logger.debug("Started RedisMessageListenerContainer"); } } }
4.最重要的一步 lazyListen();方法的调用,下面是其源码
private void lazyListen() { boolean debug = logger.isDebugEnabled(); boolean started = false; if (isRunning()) { if (!listening) { synchronized (monitor) { if (!listening) { if (channelMapping.size() > 0 || patternMapping.size() > 0) { subscriptionExecutor.execute(subscriptionTask); listening = true; started = true; } } } if (debug) { if (started) { logger.debug("Started listening for Redis messages"); } else { logger.debug("Postpone listening for Redis messages until actual listeners are added"); } } } } }
构造一个SubscriptionTask,并且提交给第二步afterPropertiesSet方法中创建的线程池来执行,SubscriptionTask是一个RedisMessageListenerContainer中的重要的内部类
SubscriptionTask
其run方法如下,在spring容器启动时刻,最终会在eventuallyPerformSubscription方法处阻塞,执行底层jedis的监听,调用的也是jedis的subscribe方法
public void run() { synchronized (localMonitor) { subscriptionTaskRunning = true; } try { connection = connectionFactory.getConnection(); if (connection.isSubscribed()) { throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening"); } boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory); // NB: async drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription. if (!asyncConnection) { synchronized (monitor) { monitor.notify(); } } SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription(); if (asyncConnection) { SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime()); synchronized (monitor) { monitor.notify(); } } } catch (Throwable t) { handleSubscriptionException(t); } finally { // this block is executed once the subscription thread has ended, this may or may not mean // the connection has been unsubscribed, depending on driver synchronized (localMonitor) { subscriptionTaskRunning = false; localMonitor.notify(); } } }
当有链接异常发生时,会进入catch代码块的handleSubscriptionException(t);方法,其实现如下
protected void handleSubscriptionException(Throwable ex) { listening = false; subscriptionTask.closeConnection(); if (ex instanceof RedisConnectionFailureException) { if (isRunning()) { logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms"); sleepBeforeRecoveryAttempt(); lazyListen(); } } else { logger.error("SubscriptionTask aborted with exception:", ex); } }
RedisMessageListenerContainer在jedis的subscribe抛异常时重新调用了lazyListen();试图重新加载监听线程
理论上来说当链接断开时只要底层jedis的subscribe处报异常时,spring-data-redis的RedisMessageListenerContainer的内部线程都应该处在异常->重连->异常->重连的循环之中。
相关推荐
赠送jar包:spring-session-data-redis-2.0.4.RELEASE.jar; 赠送原API文档:spring-session-data-redis-2.0.4.RELEASE-javadoc.jar; 赠送源代码:spring-session-data-redis-2.0.4.RELEASE-sources.jar; 赠送...
spring-data-redis-1.8.1.RELEASE-sources.jar(spring-data-redis-1.8.1.RELEASE-sources.jar()
10. **源码分析**:`spring-data-redis-1.7.6.RELEASE-sources.jar`包含了源码,对于开发者来说,这是一个宝贵的资源,可以深入理解其内部实现,提高解决问题的能力。 总的来说,Spring Data Redis 1.7.6版本提供了...
spring-data-redis至redis练习源码
3. **创建集群**:使用`redis-trib.rb`工具(在Redis源码包的`utils`目录下)将这些节点连接起来,创建并初始化集群。 4. **分配槽**:Redis集群通过槽(Slot)来管理数据分布,共有16384个槽,每个键根据哈希值被...
spring和redis缓存工具类包
源码阅读可以帮助我们更深入地了解Spring Data Redis如何处理各种数据类型,以及它是如何与Redis客户端协作完成复杂操作的。 总的来说,Spring Data Redis为Java开发者提供了一个强大的工具,使得我们可以方便地...
在Spring Data Redis 1.0.2版本中,包含了完整的源码,这对于开发者深入理解其内部机制和扩展自定义功能非常有价值。源码分析可以帮助我们更好地理解如何实现自定义序列化器、连接池配置、以及如何利用Redis ...
通过Spring Data Redis,开发者可以方便地进行键值对操作、哈希存储、列表、集合和有序集合的操作,以及发布/订阅等高级功能。 二、集成Spring Data Redis 在Java项目中集成Spring Data Redis,首先需要在pom.xml...
3. **高级功能**:Spring Data Redis还支持Redis的高级特性,如发布/订阅(Publish/Subscribe)、事务(Transaction)、lua脚本(Lua Scripting)以及地理空间操作(Geo-Spatial Operations)。 4. **Reactive编程**:随着...
解析SpringBoot整合SpringDataRedis的过程 SpringBoot整合SpringDataRedis的过程主要是指将SpringBoot框架与SpringDataRedis项目进行集成,以便更好地使用Redis数据库。下面将详细介绍这个过程的步骤和实现细节。 ...
Spring Data Redis是一个强大的Java库,它为开发人员提供了一种简单的方式来利用Redis内存数据存储进行数据操作。在本文中,我们将深入探讨Spring Data Redis的基本概念、功能和如何通过一个小例子来实现其应用。 ...
Spring-data-redis是一个强大的库,它为Spring框架提供了与Redis数据存储交互的抽象层。在分布式系统中,Redis Cluster提供了一种高可用性和水平扩展的解决方案。本文将深入探讨如何使用Spring-data-redis来操作...
Spring Data 是一个由 Spring 社区开发的框架,旨在简化数据访问层的开发工作,它提供了与各种持久化技术(如 JPA、MongoDB、Neo4j、Redis 等)集成的统一接口。这个"spring-data-应用源码"包含了一些示例项目,可以...
spring和redis集成有很多方式,看到网上很多都是使用redistemplate自己去做redis 的一些操作,但是对于我们开发来说,肯定是使用越方便越好,于是乎就有了spring的对redis或者memcahe这些换成框架的封装,只需要引入...
1. **配置Spring-Session**:在Spring Boot项目中,添加spring-session和spring-session-data-redis依赖,并在配置文件中设置Redis的相关连接信息。 2. **启用Spring-Session**:在Spring Boot的主配置类上添加@...
1. **添加依赖**:在`pom.xml`中,你需要引入Spring Session和Redis的相关依赖,如`spring-boot-starter-data-redis`和`spring-session-data-redis`。 2. **配置Redis**:在Spring Boot的配置文件`application....
springboot整合redis集群(三种方式)源码