`
1028826685
  • 浏览: 938942 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

redisson-2.10.4源代码分析

 
阅读更多

    

redis 学习问题总结

http://aperise.iteye.com/blog/2310639

ehcache memcached redis 缓存技术总结

http://aperise.iteye.com/blog/2296219

redis-stat 离线安装

http://aperise.iteye.com/blog/2310254

redis  cluster 非ruby方式启动

http://aperise.iteye.com/blog/2310254

redis-sentinel安装部署

http://aperise.iteye.com/blog/2342693

spring-data-redis使用

 http://aperise.iteye.com/blog/2342615

redis客户端redisson实战

http://aperise.iteye.com/blog/2396196

redisson-2.10.4源代码分析

http://aperise.iteye.com/blog/2400528

tcmalloc jemalloc libc选择

http://blog.csdn.net/u010994304/article/details/49906819

 

1.RedissonClient一主两从部署时连接池组成

       对如下图的主从部署(1主2从): 
       redisson纯java操作代码如下: 

Java代码  收藏代码
  1. Config config = new Config();// 创建配置  
  2.   config.useMasterSlaveServers() // 指定使用主从部署方式  
  3.   //.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行  
  4.   //.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行  
  5.   //.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接  
  6.   //.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接  
  7.   //.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接  
  8.   //.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接  
  9.   //.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接  
  10.   //.setSubscriptionConnectionPoolSize(50) 默认值50,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接  
  11.   .setMasterAddress("redis://192.168.29.24:6379")  // 设置redis主节点  
  12.   .addSlaveAddress("redis://192.168.29.24:7000") // 设置redis从节点  
  13.   .addSlaveAddress("redis://192.168.29.24:7001"); // 设置redis从节点  
  14. RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)  

       上面代码执行完毕后,如果在redis服务端所在服务器执行以下linux命令:

Java代码  收藏代码
  1. #6379上建立了10个连接  
  2. netstat -ant |grep 6379|grep  ESTABLISHED  
  3. #7000上建立了11个连接  
  4. netstat -ant |grep 7000|grep  ESTABLISHED  
  5. #7001上建立了11个连接  
  6. netstat -ant |grep 7001|grep  ESTABLISHED  

       你会发现redisson连接到redis服务端总计建立了32个连接,其中masterpool占据10个连接,slavepool占据20个连接,另外pubSubConnectionPool占据2个连接,连接池中池化对象分布如下图:

  • MasterConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为只有一个master,所以上图创建了10个连接;
  • MasterPubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为默认SubscriptionMode=SubscriptionMode.SLAVE,所以master上不会创建连接池所以上图MasterPubSubConnectionPool里没有创建任何连接;
  • SlaveConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为有两个slave,每个slave上图创建了10个连接,总计创建了20个连接;
  • PubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为有两个slave,每个slave上图创建了1个连接,总计创建了2个连接。

       从上图可以看出,连接池是针对每个IP端口都有一个独立的池,连接池也按照主从进行划分,那么这些连接池是在哪里初始化的?如何初始化的?读操作和写操作如何进行的?这就是今天要解答的问题,要解答这些问题最好还是查看redisson的源码。

 

2.Redisson初始化连接池源码分析

    2.1 Redisson.java 

       RedissonClient.java是一个接口类,它的实现类是Redisson.java,对于Redisson.java的介绍先以一张Redisson的4大组件关系图开始,如下图:

       对Redisson.java的代码注释如下:

Java代码  收藏代码
  1. /**   
  2. * 根据配置Config创建redisson操作类RedissonClient   
  3. * @param config for Redisson   
  4. * @return Redisson instance   
  5. */    
  6. public static RedissonClient create(Config config) {    
  7.     //调用构造方法    
  8.     Redisson redisson = new Redisson(config);    
  9.     if (config.isRedissonReferenceEnabled()) {    
  10.         redisson.enableRedissonReferenceSupport();    
  11.     }    
  12.     return redisson;    
  13. }    
  14.   
  15. /**   
  16. * Redisson构造方法   
  17. * @param config for Redisson   
  18. * @return Redisson instance   
  19. */    
  20. protected Redisson(Config config) {    
  21.     //赋值变量config    
  22.     this.config = config;    
  23.     //产生一份对于传入config的备份    
  24.     Config configCopy = new Config(config);    
  25.   
  26.     //根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化    
  27.     connectionManager = ConfigSupport.createConnectionManager(configCopy);    
  28.     //连接池对象回收调度器    
  29.     evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());    
  30.     //Redisson的对象编码类    
  31.     codecProvider = configCopy.getCodecProvider();    
  32.     //Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider    
  33.     resolverProvider = configCopy.getResolverProvider();    
  34. }  

       其中与连接池相关的就是ConnectionManager,ConnectionManager的初始化转交工具类ConfigSupport.java进行,ConfigSupport.java会根据部署方式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)的不同而分别进行。

 

    2.2 ConfigSupport.java

       这里现将ConfigSupport.java创建ConnectionManager的核心代码注释如下: 

Java代码  收藏代码
  1. /**   
  2. * 据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化   
  3. * @param configCopy for Redisson   
  4. * @return ConnectionManager instance   
  5. */    
  6. public static ConnectionManager createConnectionManager(Config configCopy) {    
  7.     if (configCopy.getMasterSlaveServersConfig() != null) {//配置configCopy类型为主从模式    
  8.         validate(configCopy.getMasterSlaveServersConfig());    
  9.         return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy);    
  10.     } else if (configCopy.getSingleServerConfig() != null) {//配置configCopy类型为单机模式    
  11.         validate(configCopy.getSingleServerConfig());    
  12.         return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy);    
  13.     } else if (configCopy.getSentinelServersConfig() != null) {//配置configCopy类型为哨兵模式    
  14.         validate(configCopy.getSentinelServersConfig());    
  15.         return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy);    
  16.     } else if (configCopy.getClusterServersConfig() != null) {//配置configCopy类型为集群模式    
  17.         validate(configCopy.getClusterServersConfig());    
  18.         return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy);    
  19.     } else if (configCopy.getElasticacheServersConfig() != null) {//配置configCopy类型为亚马逊云模式    
  20.         validate(configCopy.getElasticacheServersConfig());    
  21.         return new ElasticacheConnectionManager(configCopy.getElasticacheServersConfig(), configCopy);    
  22.     } else if (configCopy.getReplicatedServersConfig() != null) {//配置configCopy类型为微软云模式    
  23.         validate(configCopy.getReplicatedServersConfig());    
  24.         return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy);    
  25.     } else if (configCopy.getConnectionManager() != null) {//直接返回configCopy自带的默认ConnectionManager    
  26.         return configCopy.getConnectionManager();    
  27.     }else {    
  28.         throw new IllegalArgumentException("server(s) address(es) not defined!");    
  29.     }    
  30. }  

       上面可以看到根据传入的配置Config.java的不同,会分别创建不同的ConnectionManager的实现类。

 

    2.3 MasterSlaveConnectionManager.java

       这里开始介绍ConnectionManager,ConnectionManager.java是一个接口类,它有6个实现类,分别对应着不同的部署模式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式),如下如所示:
       这里以主从部署方式进行讲解,先通过一张图了解MasterSlaveConnectionManager的组成:


       上图中最终要的组件要数MasterSlaveEntry,在后面即将进行介绍,这里注释MasterSlaveConnectionManager.java的核心代码如下:  

Java代码  收藏代码
  1. /**    
  2. * MasterSlaveConnectionManager的构造方法  
  3. * @param cfg for MasterSlaveServersConfig  
  4. * @param config for Config    
  5. */      
  6. public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {  
  7.     //调用构造方法  
  8.     this(config);  
  9.     //  
  10.     initTimer(cfg);  
  11.     this.config = cfg;  
  12.     //初始化MasterSlaveEntry  
  13.     initSingleEntry();  
  14. }  
  15. /**    
  16. * MasterSlaveConnectionManager的构造方法  
  17. * @param cfg for Config  
  18. */      
  19. public MasterSlaveConnectionManager(Config cfg) {  
  20.     //读取redisson的jar中的文件META-INF/MANIFEST.MF,打印出Bundle-Version对应的Redisson版本信息  
  21.     Version.logVersion();  
  22.     //EPOLL是linux的多路复用IO模型的增强版本,这里如果启用EPOLL,就让redisson底层netty使用EPOLL的方式,否则配置netty里的NIO非阻塞方式  
  23.     if (cfg.isUseLinuxNativeEpoll()) {  
  24.         if (cfg.getEventLoopGroup() == null) {  
  25.             //使用linux IO非阻塞模型EPOLL  
  26.             this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));  
  27.         } else {  
  28.             this.group = cfg.getEventLoopGroup();  
  29.         }  
  30.         this.socketChannelClass = EpollSocketChannel.class;  
  31.     } else {  
  32.         if (cfg.getEventLoopGroup() == null) {  
  33.             //使用linux IO非阻塞模型NIO  
  34.             this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));  
  35.         } else {  
  36.             this.group = cfg.getEventLoopGroup();  
  37.         }  
  38.         this.socketChannelClass = NioSocketChannel.class;  
  39.     }  
  40.     if (cfg.getExecutor() == null) {  
  41.         //线程池大小,对于2U 2CPU 8cores/cpu,意思是有2块板子,每个板子上8个物理CPU,那么总计物理CPU个数为16  
  42.         //对于linux有个超线程概念,意思是每个物理CPU可以虚拟出2个逻辑CPU,那么总计逻辑CPU个数为32  
  43.         //这里Runtime.getRuntime().availableProcessors()取的是逻辑CPU的个数,所以这里线程池大小会是64  
  44.         int threads = Runtime.getRuntime().availableProcessors() * 2;  
  45.         if (cfg.getThreads() != 0) {  
  46.             threads = cfg.getThreads();  
  47.         }  
  48.         executor = Executors.newFixedThreadPool(threads, new DefaultThreadFactory("redisson"));  
  49.     } else {  
  50.         executor = cfg.getExecutor();  
  51.     }  
  52.   
  53.     this.cfg = cfg;  
  54.     this.codec = cfg.getCodec();  
  55.     //一个可以获取异步执行任务返回值的回调对象,本质是对于java的Future的实现,监控MasterSlaveConnectionManager的shutdown进行一些必要的处理  
  56.     this.shutdownPromise = newPromise();  
  57.     //一个持有MasterSlaveConnectionManager的异步执行服务  
  58.     this.commandExecutor = new CommandSyncService(this);  
  59. }  
  60. /**    
  61. * 初始化定时调度器 
  62. * @param config for MasterSlaveServersConfig  
  63. */     
  64. protected void initTimer(MasterSlaveServersConfig config) {  
  65.     //读取超时时间配置信息  
  66.     int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};  
  67.     Arrays.sort(timeouts);  
  68.     int minTimeout = timeouts[0];  
  69.     //设置默认超时时间  
  70.     if (minTimeout % 100 != 0) {  
  71.         minTimeout = (minTimeout % 100) / 2;  
  72.     } else if (minTimeout == 100) {  
  73.         minTimeout = 50;  
  74.     } else {  
  75.         minTimeout = 100;  
  76.     }  
  77.     //创建定时调度器  
  78.     timer = new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);  
  79.       
  80.     // to avoid assertion error during timer.stop invocation  
  81.     try {  
  82.         Field leakField = HashedWheelTimer.class.getDeclaredField("leak");  
  83.         leakField.setAccessible(true);  
  84.         leakField.set(timer, null);  
  85.     } catch (Exception e) {  
  86.         throw new IllegalStateException(e);  
  87.     }  
  88.     //检测MasterSlaveConnectionManager的空闲连接的监视器IdleConnectionWatcher,会清理不用的空闲的池中连接对象  
  89.     connectionWatcher = new IdleConnectionWatcher(this, config);  
  90. }  
  91.   
  92. /**    
  93. * 创建MasterSlaveConnectionManager的MasterSlaveEntry   
  94. */      
  95. protected void initSingleEntry() {  
  96.     try {  
  97.         //主从模式下0~16383加入到集合slots    
  98.         HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();  
  99.         slots.add(singleSlotRange);  
  100.   
  101.         MasterSlaveEntry entry;  
  102.         if (config.checkSkipSlavesInit()) {//ReadMode不为MASTER并且SubscriptionMode不为MASTER才执行  
  103.             entry = new SingleEntry(slots, this, config);  
  104.             RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());  
  105.             f.syncUninterruptibly();  
  106.         } else {//默认主从部署ReadMode=SLAVE,SubscriptionMode=SLAVE,这里会执行  
  107.             entry = createMasterSlaveEntry(config, slots);  
  108.         }  
  109.         //将每个分片0~16383都指向创建的MasterSlaveEntry  
  110.         for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {  
  111.             addEntry(slot, entry);  
  112.         }  
  113.         //DNS相关  
  114.         if (config.getDnsMonitoringInterval() != -1) {  
  115.             dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()),   
  116.                     config.getSlaveAddresses(), config.getDnsMonitoringInterval());  
  117.             dnsMonitor.start();  
  118.         }  
  119.     } catch (RuntimeException e) {  
  120.         stopThreads();  
  121.         throw e;  
  122.     }  
  123. }  
  124. /**    
  125. * MasterSlaveEntry的构造方法  
  126. * @param config for MasterSlaveServersConfig   
  127. * @param slots for HashSet<ClusterSlotRange>  
  128. * @return MasterSlaveEntry 
  129. */      
  130. protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet<ClusterSlotRange> slots) {  
  131.     //创建MasterSlaveEntry  
  132.     MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);  
  133.     //从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化  
  134.     List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());  
  135.     for (RFuture<Void> future : fs) {  
  136.         future.syncUninterruptibly();  
  137.     }  
  138.     主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化  
  139.     RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());  
  140.     f.syncUninterruptibly();  
  141.     return entry;  
  142. }  

       上面个人觉得有两处代码值得我们特别关注,特别说明如下:

  • entry.initSlaveBalancer:从节点连接池SlaveConnectionPoolPubSubConnectionPool的默认的最小连接数初始化。
  • entry.setupMasterEntry:主节点连接池MasterConnectionPoolMasterPubSubConnectionPool的默认的最小连接数初始化。

 

    2.4 MasterSlaveEntry.java

       用一张图来解释MasterSlaveEntry的组件如下:

       MasterSlaveEntry.java里正是我们一直在寻找着的四个连接池MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,这里注释MasterSlaveEntry.java的核心代码如下:

Java代码  收藏代码
  1. /**    
  2. * MasterSlaveEntry的构造方法  
  3. * @param slotRanges for Set<ClusterSlotRange>    
  4. * @param connectionManager for ConnectionManager    
  5. * @param config for MasterSlaveServersConfig  
  6. */      
  7. public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {    
  8.     //主从模式下0~16383加入到集合slots    
  9.     for (ClusterSlotRange clusterSlotRange : slotRanges) {    
  10.         for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {    
  11.             slots.add(i);    
  12.         }    
  13.     }    
  14.     //赋值MasterSlaveConnectionManager给connectionManager    
  15.     this.connectionManager = connectionManager;    
  16.     //赋值config    
  17.     this.config = config;    
  18.     
  19.     //创建LoadBalancerManager    
  20.     //其实LoadBalancerManager里持有者从节点的SlaveConnectionPool和PubSubConnectionPool    
  21.     //并且此时连接池里还没有初始化默认的最小连接数    
  22.     slaveBalancer = new LoadBalancerManager(config, connectionManager, this);    
  23.     //创建主节点连接池MasterConnectionPool,此时连接池里还没有初始化默认的最小连接数    
  24.     writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);    
  25.     //创建主节点连接池MasterPubSubConnectionPool,此时连接池里还没有初始化默认的最小连接数    
  26.     pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);    
  27. }    
  28.     
  29. /**    
  30. * 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化  
  31. * @param disconnectedNodes for Collection<URI>  
  32. * @return List<RFuture<Void>>    
  33. */     
  34. public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {    
  35.     //这里freezeMasterAsSlave=true    
  36.     boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() && !config.checkSkipSlavesInit() && disconnectedNodes.size() < config.getSlaveAddresses().size();    
  37.     
  38.     List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();    
  39.     //把主节点当作从节点处理,因为默认ReadMode=ReadMode.SLAVE,所以这里不会添加针对该节点的连接池    
  40.     RFuture<Void> f = addSlave(config.getMasterAddress(), freezeMasterAsSlave, NodeType.MASTER);    
  41.     result.add(f);    
  42.     //读取从节点的地址信息,然后针对每个从节点地址创建SlaveConnectionPool和PubSubConnectionPool    
  43.     //SlaveConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】    
  44.     //PubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】    
  45.     for (URI address : config.getSlaveAddresses()) {    
  46.         f = addSlave(address, disconnectedNodes.contains(address), NodeType.SLAVE);    
  47.         result.add(f);    
  48.     }    
  49.     return result;    
  50. }    
  51.     
  52. /**    
  53. * 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化  
  54. * @param address for URI  
  55. * @param freezed for boolean  
  56. * @param nodeType for NodeType  
  57. * @return RFuture<Void>  
  58. */     
  59. private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {    
  60.     //创建到从节点的连接RedisClient    
  61.     RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);    
  62.     ClientConnectionsEntry entry = new ClientConnectionsEntry(client,    
  63.             this.config.getSlaveConnectionMinimumIdleSize(),    
  64.             this.config.getSlaveConnectionPoolSize(),    
  65.             this.config.getSubscriptionConnectionMinimumIdleSize(),    
  66.             this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);    
  67.     //默认只有主节点当作从节点是会设置freezed=true    
  68.     if (freezed) {    
  69.         synchronized (entry) {    
  70.             entry.setFreezed(freezed);    
  71.             entry.setFreezeReason(FreezeReason.SYSTEM);    
  72.         }    
  73.     }    
  74.     //调用slaveBalancer来对从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化    
  75.     return slaveBalancer.add(entry);    
  76. }    
  77.     
  78. /**    
  79. * 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化  
  80. * @param address for URI  
  81. * @return RFuture<Void>  
  82. */     
  83. public RFuture<Void> setupMasterEntry(URI address) {    
  84.     //创建到主节点的连接RedisClient    
  85.     RedisClient client = connectionManager.createClient(NodeType.MASTER, address);    
  86.     masterEntry = new ClientConnectionsEntry(    
  87.             client,     
  88.             config.getMasterConnectionMinimumIdleSize(),     
  89.             config.getMasterConnectionPoolSize(),    
  90.             config.getSubscriptionConnectionMinimumIdleSize(),    
  91.             config.getSubscriptionConnectionPoolSize(),     
  92.             connectionManager,     
  93.             NodeType.MASTER);    
  94.     //如果配置的SubscriptionMode=SubscriptionMode.MASTER就初始化MasterPubSubConnectionPool    
  95.     //默认SubscriptionMode=SubscriptionMode.SLAVE,MasterPubSubConnectionPool这里不会初始化最小连接数    
  96.     if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {    
  97.         //MasterPubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】    
  98.         RFuture<Void> f = writeConnectionHolder.add(masterEntry);    
  99.         RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);    
  100.         return CountListener.create(s, f);    
  101.     }    
  102.     //调用MasterConnectionPool使得连接池MasterConnectionPool里的对象最小个数为10个    
  103.     //MasterConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】    
  104.     return writeConnectionHolder.add(masterEntry);    
  105. }  
       上面代码个人觉得有四个地方值得我们特别关注,它们是一个连接池创建对象的入口,列表如下:
  • writeConnectionHolder.add(masterEntry):其实writeConnectionHolder的类型就是MasterConnectionPool,这里是连接池MasterConnectionPool里添加对象 
  • pubSubConnectionHolder.add(masterEntry):其实pubSubConnectionHolder的类型是MasterPubSubConnectionPool,这里是连接池MasterPubSubConnectionPool添加对象
  • slaveConnectionPool.add(entry):这里是连接池SlaveConnectionPool里添加对象
  • pubSubConnectionPool.add(entry):这里是连接池PubSubConnectionPool里添加对象

 

 

    2.5 LoadBalancerManager.java

       图解LoadBalancerManager.java的内部组成如下:

       LoadBalancerManager.java里面有着从节点相关的两个重要的连接池SlaveConnectionPoolPubSubConnectionPool,这里注释LoadBalancerManager.java的核心代码如下:

Java代码  收藏代码
  1. /**    
  2. * LoadBalancerManager的构造方法  
  3. * @param config for MasterSlaveServersConfig   
  4. * @param connectionManager for ConnectionManager    
  5. * @param entry for MasterSlaveEntry  
  6. */      
  7. public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {    
  8.     //赋值connectionManager    
  9.     this.connectionManager = connectionManager;    
  10.     //创建连接池SlaveConnectionPool    
  11.     slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);    
  12.     //创建连接池PubSubConnectionPool    
  13.     pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);    
  14. }    
  15. /**    
  16. * LoadBalancerManager的连接池SlaveConnectionPool和PubSubConnectionPool里池化对象添加方法,也即池中需要对象时,调用此方法添加  
  17. * @param entry for ClientConnectionsEntry  
  18. * @return RFuture<Void>  
  19. */      
  20. public RFuture<Void> add(final ClientConnectionsEntry entry) {    
  21.     final RPromise<Void> result = connectionManager.newPromise();    
  22.     //创建一个回调监听器,在池中对象创建失败时进行2次莫仍尝试    
  23.     FutureListener<Void> listener = new FutureListener<Void>() {    
  24.         AtomicInteger counter = new AtomicInteger(2);    
  25.         @Override    
  26.         public void operationComplete(Future<Void> future) throws Exception {    
  27.             if (!future.isSuccess()) {    
  28.                 result.tryFailure(future.cause());    
  29.                 return;    
  30.             }    
  31.             if (counter.decrementAndGet() == 0) {    
  32.                 String addr = entry.getClient().getIpAddr();    
  33.                 ip2Entry.put(addr, entry);    
  34.                 result.trySuccess(null);    
  35.             }    
  36.         }    
  37.     };    
  38.     //调用slaveConnectionPool添加RedisConnection对象到池中    
  39.     RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);    
  40.     slaveFuture.addListener(listener);    
  41.     //调用pubSubConnectionPool添加RedisPubSubConnection对象到池中    
  42.     RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);    
  43.     pubSubFuture.addListener(listener);    
  44.     return result;    
  45. }  

       至此,我们已经了解了开篇提到的四个连接池是在哪里创建的。

 

3. Redisson的4类连接池

       这里我们来详细介绍下Redisson的连接池实现类,Redisson里有4种连接池,它们是MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,它们的父类都是ConnectionPool,其类继承关系图如下:  

       通过上图我们了解了ConnectionPool类的继承关系图,再来一张图来了解下ConnectionPool.java类的组成,如下:

       好了,再来图就有点啰嗦了,注释ConnectionPool.java代码如下:

Java代码  收藏代码
  1. abstract class ConnectionPool<T extends RedisConnection> {    
  2.     private final Logger log = LoggerFactory.getLogger(getClass());    
  3.     //维持着连接池对应的redis节点信息    
  4.     //比如1主2从部署MasterConnectionPool里的entries只有一个主节点(192.168.29.24 6379)    
  5.     //比如1主2从部署MasterPubSubConnectionPool里的entries为空,因为SubscriptionMode=SubscriptionMode.SLAVE    
  6.     //比如1主2从部署SlaveConnectionPool里的entries有3个节点(192.168.29.24 6379,192.168.29.24 7000,192.168.29.24 7001,但是注意192.168.29.24 6379冻结属性freezed=true不会参与读操作除非2个从节点全部宕机才参与读操作)    
  7.     //比如1主2从部署PubSubConnectionPool里的entries有2个节点(192.168.29.24 7000,192.168.29.24 7001),因为SubscriptionMode=SubscriptionMode.SLAVE,主节点不会加入    
  8.     protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>();    
  9.     //持有者RedissonClient的组件ConnectionManager    
  10.     final ConnectionManager connectionManager;    
  11.     //持有者RedissonClient的组件ConnectionManager里的MasterSlaveServersConfig    
  12.     final MasterSlaveServersConfig config;    
  13.     //持有者RedissonClient的组件ConnectionManager里的MasterSlaveEntry    
  14.     final MasterSlaveEntry masterSlaveEntry;    
  15.     
  16.     //构造函数    
  17.     public ConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {    
  18.         this.config = config;    
  19.         this.masterSlaveEntry = masterSlaveEntry;    
  20.         this.connectionManager = connectionManager;    
  21.     }    
  22.     
  23.     //连接池中需要增加对象时候调用此方法    
  24.     public RFuture<Void> add(final ClientConnectionsEntry entry) {    
  25.         final RPromise<Void> promise = connectionManager.newPromise();    
  26.         promise.addListener(new FutureListener<Void>() {    
  27.             @Override    
  28.             public void operationComplete(Future<Void> future) throws Exception {    
  29.                 entries.add(entry);    
  30.             }    
  31.         });    
  32.         initConnections(entry, promise, true);    
  33.         return promise;    
  34.     }    
  35.     
  36.     //初始化连接池中最小连接数    
  37.     private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {    
  38.         final int minimumIdleSize = getMinimumIdleSize(entry);    
  39.     
  40.         if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {    
  41.             initPromise.trySuccess(null);    
  42.             return;    
  43.         }    
  44.     
  45.         final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);    
  46.         int startAmount = Math.min(50, minimumIdleSize);    
  47.         final AtomicInteger requests = new AtomicInteger(startAmount);    
  48.         for (int i = 0; i < startAmount; i++) {    
  49.             createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);    
  50.         }    
  51.     }    
  52.     
  53.     //创建连接对象到连接池中    
  54.     private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final RPromise<Void> initPromise,    
  55.             final int minimumIdleSize, final AtomicInteger initializedConnections) {    
  56.     
  57.         if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {    
  58.             int totalInitializedConnections = minimumIdleSize - initializedConnections.get();    
  59.             Throwable cause = new RedisConnectionException(    
  60.                     "Unable to init enough connections amount! Only " + totalInitializedConnections + " from " + minimumIdleSize + " were initialized. Server: "    
  61.                                         + entry.getClient().getAddr());    
  62.             initPromise.tryFailure(cause);    
  63.             return;    
  64.         }    
  65.             
  66.         acquireConnection(entry, new Runnable() {    
  67.                 
  68.             @Override    
  69.             public void run() {    
  70.                 RPromise<T> promise = connectionManager.newPromise();    
  71.                 createConnection(entry, promise);    
  72.                 promise.addListener(new FutureListener<T>() {    
  73.                     @Override    
  74.                     public void operationComplete(Future<T> future) throws Exception {    
  75.                         if (future.isSuccess()) {    
  76.                             T conn = future.getNow();    
  77.     
  78.                             releaseConnection(entry, conn);    
  79.                         }    
  80.     
  81.                         releaseConnection(entry);    
  82.     
  83.                         if (!future.isSuccess()) {    
  84.                             int totalInitializedConnections = minimumIdleSize - initializedConnections.get();    
  85.                             String errorMsg;    
  86.                             if (totalInitializedConnections == 0) {    
  87.                                 errorMsg = "Unable to connect to Redis server: " + entry.getClient().getAddr();    
  88.                             } else {    
  89.                                 errorMsg = "Unable to init enough connections amount! Only " + totalInitializedConnections     
  90.                                         + " from " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr();    
  91.                             }    
  92.                             Throwable cause = new RedisConnectionException(errorMsg, future.cause());    
  93.                             initPromise.tryFailure(cause);    
  94.                             return;    
  95.                         }    
  96.     
  97.                         int value = initializedConnections.decrementAndGet();    
  98.                         if (value == 0) {    
  99.                             log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());    
  100.                             if (!initPromise.trySuccess(null)) {    
  101.                                 throw new IllegalStateException();    
  102.                             }    
  103.                         } else if (value > 0 && !initPromise.isDone()) {    
  104.                             if (requests.incrementAndGet() <= minimumIdleSize) {    
  105.                                 createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);    
  106.                             }    
  107.                         }    
  108.                     }    
  109.                 });    
  110.             }    
  111.         });    
  112.     
  113.     }    
  114.     
  115.     //连接池中租借出连接对象    
  116.     public RFuture<T> get(RedisCommand<?> command) {    
  117.         for (int j = entries.size() - 1; j >= 0; j--) {    
  118.             final ClientConnectionsEntry entry = getEntry();    
  119.             if (!entry.isFreezed()     
  120.                     && tryAcquireConnection(entry)) {    
  121.                 return acquireConnection(command, entry);    
  122.             }    
  123.         }    
  124.             
  125.         List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();    
  126.         List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();    
  127.         for (ClientConnectionsEntry entry : entries) {    
  128.             if (entry.isFreezed()) {    
  129.                 freezed.add(entry.getClient().getAddr());    
  130.             } else {    
  131.                 failedAttempts.add(entry.getClient().getAddr());    
  132.             }    
  133.         }    
  134.     
  135.         StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");    
  136.         if (!freezed.isEmpty()) {    
  137.             errorMsg.append(" Disconnected hosts: " + freezed);    
  138.         }    
  139.         if (!failedAttempts.isEmpty()) {    
  140.             errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);    
  141.         }    
  142.     
  143.         RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());    
  144.         return connectionManager.newFailedFuture(exception);    
  145.     }    
  146.     
  147.     //连接池中租借出连接对象执行操作RedisCommand    
  148.     public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) {    
  149.         if ((!entry.isFreezed() || entry.getFreezeReason() == FreezeReason.SYSTEM) &&     
  150.                 tryAcquireConnection(entry)) {    
  151.             return acquireConnection(command, entry);    
  152.         }    
  153.     
  154.         RedisConnectionException exception = new RedisConnectionException(    
  155.                 "Can't aquire connection to " + entry);    
  156.         return connectionManager.newFailedFuture(exception);    
  157.     }    
  158.       
  159.     //通过向redis服务端发送PING看是否返回PONG来检测连接  
  160.     private void ping(RedisConnection c, final FutureListener<String> pingListener) {    
  161.         RFuture<String> f = c.async(RedisCommands.PING);    
  162.         f.addListener(pingListener);    
  163.     }    
  164.     
  165.     //归还连接对象到连接池    
  166.     public void returnConnection(ClientConnectionsEntry entry, T connection) {    
  167.         if (entry.isFreezed()) {    
  168.             connection.closeAsync();    
  169.         } else {    
  170.             releaseConnection(entry, connection);    
  171.         }    
  172.         releaseConnection(entry);    
  173.     }    
  174.     
  175.     //释放连接池中连接对象    
  176.     protected void releaseConnection(ClientConnectionsEntry entry) {    
  177.         entry.releaseConnection();    
  178.     }    
  179.     
  180.     //释放连接池中连接对象    
  181.     protected void releaseConnection(ClientConnectionsEntry entry, T conn) {    
  182.         entry.releaseConnection(conn);    
  183.     }    
  184. }  

       用一张图来解释ConnectionPool干了些啥,如下图:

       都到这里了,不介意再送一张图了解各种部署方式下的连接池分布了,如下图:


4.Redisson的读写操作句柄类RedissonObject

       对于Redisson的任何操作,都需要获取到操作句柄类RedissonObject,RedissonObject根据不同的数据类型有不同的RedissonObject实现类,RedissonObject的类继承关系图如下:

       例如想设置redis服务端的key=key的值value=123,你需要查询Redis命令和Redisson对象匹配列表,找到如下对应关系:

       然后我们就知道调用代码这么写:

Java代码  收藏代码
  1. Config config = new Config();// 创建配置    
  2.   config.useMasterSlaveServers() // 指定使用主从部署方式    
  3.   .setMasterAddress("redis://192.168.29.24:6379")  // 设置redis主节点    
  4.   .addSlaveAddress("redis://192.168.29.24:7000") // 设置redis从节点    
  5.   .addSlaveAddress("redis://192.168.29.24:7001"); // 设置redis从节点    
  6. RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)   
  7.   
  8. //任何Redisson操作首先需要获取对应的操作句柄  
  9. //RBucket是操作句柄之一,实现类是RedissonBucket  
  10. RBucket<String> rBucket = redissonClient.getBucket("key");  
  11.   
  12. //通过操作句柄rBucket进行读操作  
  13. rBucket.get();  
  14.   
  15. //通过操作句柄rBucket进行写操作  
  16. rBucket.set("123");  

       至于其它的redis命令对应的redisson操作对象,都可以官网的Redis命令和Redisson对象匹配列表 查到。

 

6.Redisson的读写操作源码分析

       从一个读操作的代码作为入口分析代码,如下:

Java代码  收藏代码
  1. //任何Redisson操作首先需要获取对应的操作句柄,RBucket是操作句柄之一,实现类是RedissonBucket    
  2. RBucket<String> rBucket = redissonClient.getBucket("key");    
  3.     
  4. //通过操作句柄rBucket进行读操作    
  5. rBucket.get();    
       继续追踪上面RBucket的get方法,如下:

       上面我们看到不管是读操作还是写操作都转交CommandAsyncExecutor进行处理,那么这里我们需要看一下CommandAsyncExecutor.java里关于读写操作处理的核心代码,注释代码如下:
Java代码  收藏代码
  1. private NodeSource getNodeSource(String key) {  
  2.     //通过公式CRC16.crc16(key.getBytes()) % MAX_SLOT  
  3.     //计算出一个字符串key对应的分片在0~16383中哪个分片  
  4.     int slot = connectionManager.calcSlot(key);  
  5.     //之前已经将0~16383每个分片对应到唯一的一个MasterSlaveEntry,这里取出来  
  6.     MasterSlaveEntry entry = connectionManager.getEntry(slot);  
  7.     //这里将MasterSlaveEntry包装成NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】  
  8.     return new NodeSource(entry);  
  9. }  
  10. @Override  
  11. public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {  
  12.     RPromise<R> mainPromise = connectionManager.newPromise();  
  13.     //获取NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】  
  14.     NodeSource source = getNodeSource(key);  
  15.     调用异步执行方法async  
  16.     async(true, source, codec, command, params, mainPromise, 0);  
  17.     return mainPromise;  
  18. }  
  19. protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,  
  20.         final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt) {  
  21.     //操作被取消,那么直接返回  
  22.     if (mainPromise.isCancelled()) {  
  23.         free(params);  
  24.         return;  
  25.     }  
  26.     //连接管理器无法连接,释放参数所占资源,然后返回  
  27.     if (!connectionManager.getShutdownLatch().acquire()) {  
  28.         free(params);  
  29.         mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));  
  30.         return;  
  31.     }  
  32.       
  33.     final AsyncDetails<V, R> details = AsyncDetails.acquire();  
  34.     if (isRedissonReferenceSupportEnabled()) {  
  35.         try {  
  36.             for (int i = 0; i < params.length; i++) {  
  37.                 RedissonReference reference = RedissonObjectFactory.toReference(getConnectionManager().getCfg(), params[i]);  
  38.                 if (reference != null) {  
  39.                     params[i] = reference;  
  40.                 }  
  41.             }  
  42.         } catch (Exception e) {  
  43.             connectionManager.getShutdownLatch().release();  
  44.             free(params);  
  45.             mainPromise.tryFailure(e);  
  46.             return;  
  47.         }  
  48.     }  
  49.       
  50.     //开始从connectionManager获取池中的连接  
  51.     //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作  
  52.     final RFuture<RedisConnection> connectionFuture;  
  53.     if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行  
  54.         connectionFuture = connectionManager.connectionReadOp(source, command);  
  55.     } else {//对于写操作默认readOnlyMode=false,这里会执行  
  56.         connectionFuture = connectionManager.connectionWriteOp(source, command);  
  57.     }  
  58.       
  59.     //创建RPromise,用于操作失败时候重试  
  60.     final RPromise<R> attemptPromise = connectionManager.newPromise();  
  61.     details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt);  
  62.     //创建FutureListener,监测外部请求是否已经取消了之前提交的读写操作,如果取消了,那么就让正在执行的读写操作停止  
  63.     FutureListener<R> mainPromiseListener = new FutureListener<R>() {  
  64.         @Override  
  65.         public void operationComplete(Future<R> future) throws Exception {  
  66.             if (future.isCancelled() && connectionFuture.cancel(false)) {  
  67.                 log.debug("Connection obtaining canceled for {}", command);  
  68.                 details.getTimeout().cancel();  
  69.                 if (details.getAttemptPromise().cancel(false)) {  
  70.                     free(params);  
  71.                 }  
  72.             }  
  73.         }  
  74.     };  
  75.   
  76.     //创建TimerTask,用于操作失败后通过定时器进行操作重试  
  77.     final TimerTask retryTimerTask = new TimerTask() {  
  78.         @Override  
  79.         public void run(Timeout t) throws Exception {  
  80.             if (details.getAttemptPromise().isDone()) {  
  81.                 return;  
  82.             }  
  83.             if (details.getConnectionFuture().cancel(false)) {  
  84.                 connectionManager.getShutdownLatch().release();  
  85.             } else {  
  86.                 if (details.getConnectionFuture().isSuccess()) {  
  87.                     if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {  
  88.                         if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {  
  89.                             if (details.getWriteFuture().cancel(false)) {  
  90.                                 if (details.getException() == null) {  
  91.                                     details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams()) + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));  
  92.                                 }  
  93.                                 details.getAttemptPromise().tryFailure(details.getException());  
  94.                             }  
  95.                             return;  
  96.                         }  
  97.                         details.incAttempt();  
  98.                         Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);  
  99.                         details.setTimeout(timeout);  
  100.                         return;  
  101.                     }  
  102.   
  103.                     if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) {  
  104.                         return;  
  105.                     }  
  106.                 }  
  107.             }  
  108.             if (details.getMainPromise().isCancelled()) {  
  109.                 if (details.getAttemptPromise().cancel(false)) {  
  110.                     free(details);  
  111.                     AsyncDetails.release(details);  
  112.                 }  
  113.                 return;  
  114.             }  
  115.             if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {  
  116.                 if (details.getException() == null) {  
  117.                     details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));  
  118.                 }  
  119.                 details.getAttemptPromise().tryFailure(details.getException());  
  120.                 return;  
  121.             }  
  122.             if (!details.getAttemptPromise().cancel(false)) {  
  123.                 return;  
  124.             }  
  125.             int count = details.getAttempt() + 1;  
  126.             if (log.isDebugEnabled()) {  
  127.                 log.debug("attempt {} for command {} and params {}",  
  128.                         count, details.getCommand(), Arrays.toString(details.getParams()));  
  129.             }  
  130.             details.removeMainPromiseListener();  
  131.             async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);  
  132.             AsyncDetails.release(details);  
  133.         }  
  134.     };  
  135.   
  136.     //配置对于读写操作的超时时间  
  137.     Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);  
  138.     details.setTimeout(timeout);  
  139.     details.setupMainPromiseListener(mainPromiseListener);  
  140.   
  141.     //给connectionFuture增加监听事件,当从连接池中获取连接成功,成功的事件会被触发,通知这里执行后续读写动作  
  142.     connectionFuture.addListener(new FutureListener<RedisConnection>() {  
  143.         @Override  
  144.         public void operationComplete(Future<RedisConnection> connFuture) throws Exception {  
  145.             if (connFuture.isCancelled()) {//从池中获取连接被取消,直接返回  
  146.                 return;  
  147.             }  
  148.   
  149.             if (!connFuture.isSuccess()) {//从池中获取连接失败  
  150.                 connectionManager.getShutdownLatch().release();  
  151.                 details.setException(convertException(connectionFuture));  
  152.                 return;  
  153.             }  
  154.   
  155.             if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {//从池中获取连接失败,并且尝试了一定次数仍然失败,默认尝试次数为0  
  156.                 releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);  
  157.                 return;  
  158.             }  
  159.   
  160.             //从池中获取连接成功,这里取出连接对象RedisConnection  
  161.             final RedisConnection connection = connFuture.getNow();  
  162.             //如果需要重定向,这里进行重定向  
  163.             //重定向的情况有:集群模式对应的slot分布在其他节点,就需要进行重定向  
  164.             if (details.getSource().getRedirect() == Redirect.ASK) {  
  165.                 List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);  
  166.                 RPromise<Void> promise = connectionManager.newPromise();  
  167.                 list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));  
  168.                 list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));  
  169.                 RPromise<Void> main = connectionManager.newPromise();  
  170.                 ChannelFuture future = connection.send(new CommandsData(main, list));  
  171.                 details.setWriteFuture(future);  
  172.             } else {  
  173.                 if (log.isDebugEnabled()) {  
  174.                     log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",  
  175.                             details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);  
  176.                 }  
  177.                 //发送读写操作到RedisConnection,进行执行  
  178.                 ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));  
  179.                 details.setWriteFuture(future);  
  180.             }  
  181.             //对于写操作增加监听事件回调,对写操作是否成功,失败原因进行日志打印  
  182.             details.getWriteFuture().addListener(new ChannelFutureListener() {  
  183.                 @Override  
  184.                 public void operationComplete(ChannelFuture future) throws Exception {  
  185.                     checkWriteFuture(details, connection);  
  186.                 }  
  187.             });  
  188.             //返回RedisConnection连接到连接池  
  189.             releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);  
  190.         }  
  191.     });  
  192.   
  193.     attemptPromise.addListener(new FutureListener<R>() {  
  194.         @Override  
  195.         public void operationComplete(Future<R> future) throws Exception {  
  196.             checkAttemptFuture(source, details, future);  
  197.         }  
  198.     });  
  199. }  
       上面的代码我用一张读写操作处理流程图总结如下:

 

 

       至此,关于读写操作的源码讲解完毕。在上面的代码注释中,列出如下重点。

    6.1 分片SLOT的计算公式

       SLOT=CRC16.crc16(key.getBytes()) % MAX_SLOT

 

    6.2 每个ConnectionPool持有的ClientConnectionsEntry对象冻结判断条件

       一个节点被判断为冻结,必须同时满足以下条件:

  • 该节点有slave节点,并且从节点个数大于0;
  • 设置的配置ReadMode不为并且SubscriptionMode不为MASTER;
  • 该节点的从节点至少有一个存活着,也即如果有从节点宕机,宕机的从节点的个数小于该节点总的从节点个数

    6.3 读写负载图

 

 


 

7.Redisson的读写操作从连接池获取连接对象源码分析和Redisson里RedisClient使用netty源码分析

       读写操作首先都需要获取到一个连接对象,在上面的分析中我们知道读写操作都是通过CommandAsyncExecutor.java里的如下代码获取连接对象:

Java代码  收藏代码
  1. //开始从connectionManager获取池中的连接    
  2. //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作    
  3. final RFuture<RedisConnection> connectionFuture;    
  4. if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行    
  5.     connectionFuture = connectionManager.connectionReadOp(source, command);    
  6. else {//对于写操作默认readOnlyMode=false,这里会执行    
  7.     connectionFuture = connectionManager.connectionWriteOp(source, command);    
  8. }    
       上面读操作调用了connectionManager.connectionReadOp从连接池获取连接对象,写操作调用了connectionManager.connectionWriteOp从连接池获取连接对象,我们继续跟进connectionManager关于connectionReadOp和connectionWriteOp的源代码,注释如下:
Java代码  收藏代码
  1. /**    
  2. * 读操作通过ConnectionManager从连接池获取连接对象 
  3. * @param source for NodeSource  
  4. * @param command for RedisCommand<?>  
  5. * @return RFuture<RedisConnection> 
  6. */   
  7. public RFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {  
  8.     //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】  
  9.     MasterSlaveEntry entry = source.getEntry();  
  10.     if (entry == null && source.getSlot() != null) {//这里不会执行source里slot=null  
  11.         entry = getEntry(source.getSlot());  
  12.     }  
  13.     if (source.getAddr() != null) {//这里不会执行source里addr=null  
  14.         entry = getEntry(source.getAddr());  
  15.         if (entry == null) {  
  16.             for (MasterSlaveEntry e : getEntrySet()) {  
  17.                 if (e.hasSlave(source.getAddr())) {  
  18.                     entry = e;  
  19.                     break;  
  20.                 }  
  21.             }  
  22.         }  
  23.         if (entry == null) {  
  24.             RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");  
  25.             return RedissonPromise.newFailedFuture(ex);  
  26.         }  
  27.           
  28.         return entry.connectionReadOp(command, source.getAddr());  
  29.     }  
  30.       
  31.     if (entry == null) {//这里不会执行source里entry不等于null  
  32.         RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");  
  33.         return RedissonPromise.newFailedFuture(ex);  
  34.     }  
  35.     //MasterSlaveEntry里从连接池获取连接对象  
  36.     return entry.connectionReadOp(command);  
  37. }  
  38. /**    
  39. * 写操作通过ConnectionManager从连接池获取连接对象 
  40. * @param source for NodeSource  
  41. * @param command for RedisCommand<?>  
  42. * @return RFuture<RedisConnection> 
  43. */   
  44. public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {  
  45.     //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】  
  46.     MasterSlaveEntry entry = source.getEntry();  
  47.     if (entry == null) {  
  48.         entry = getEntry(source);  
  49.     }  
  50.     if (entry == null) {//这里不会执行source里entry不等于null  
  51.         RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");  
  52.         return RedissonPromise.newFailedFuture(ex);  
  53.     }  
  54.     //MasterSlaveEntry里从连接池获取连接对象  
  55.     return entry.connectionWriteOp(command);  
  56. }  
       我们看到上面调用ConnectionManager从连接池获取连接对象,但是ConnectionManager却将获取连接操作转交MasterSlaveEntry处理,我们再一次回顾一下MasterSlaveEntry的组成: 

       MasterSlaveEntry里持有中我们开篇所提到的四个连接池,那么这里我们继续关注MasterSlaveEntry.java的源代码:
Java代码  收藏代码
  1. /**    
  2. * 写操作从MasterConnectionPool连接池里获取连接对象 
  3. * @param command for RedisCommand<?>  
  4. * @return RFuture<RedisConnection> 
  5. */   
  6. public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {  
  7.     //我们知道writeConnectionHolder的类型为MasterConnectionPool  
  8.     //这里就是从MasterConnectionPool里获取连接对象  
  9.     return writeConnectionHolder.get(command);  
  10. }  
  11.   
  12. /**    
  13. * 写操作从LoadBalancerManager里获取连接对象 
  14. * @param command for RedisCommand<?>  
  15. * @return RFuture<RedisConnection> 
  16. */   
  17. public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {  
  18.     if (config.getReadMode() == ReadMode.MASTER) {  
  19.         //我们知道默认ReadMode=ReadMode.SLAVE,所以对于读操作这里不会执行  
  20.         return connectionWriteOp(command);  
  21.     }  
  22.     //我们知道slaveBalancer里持有者SlaveConnectionPool和PubSubConnectionPool  
  23.     //这里就是从SlaveConnectionPool里获取连接对象  
  24.     return slaveBalancer.nextConnection(command);  
  25. }  
       似乎又绕回来了,最终的获取连接对象都转交到了从连接池ConnectionPool里获取连接对象,注释ConnectionPool里的获取连接对象代码如下: 
Java代码  收藏代码
  1. /**    
  2. * 读写操作从ConnectionPool.java连接池里获取连接对象 
  3. * @param command for RedisCommand<?>  
  4. * @return RFuture<T> 
  5. */   
  6. public RFuture<T> get(RedisCommand<?> command) {  
  7.     for (int j = entries.size() - 1; j >= 0; j--) {  
  8.         final ClientConnectionsEntry entry = getEntry();  
  9.         if (!entry.isFreezed() && tryAcquireConnection(entry)) {  
  10.             //遍历ConnectionPool里维持的ClientConnectionsEntry列表  
  11.             //遍历的算法默认为RoundRobinLoadBalancer  
  12.             //ClientConnectionsEntry里对应的redis节点为非冻结节点,也即freezed=false  
  13.             return acquireConnection(command, entry);  
  14.         }  
  15.     }  
  16.       
  17.     //记录失败重试信息  
  18.     List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();  
  19.     List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();  
  20.     for (ClientConnectionsEntry entry : entries) {  
  21.         if (entry.isFreezed()) {  
  22.             freezed.add(entry.getClient().getAddr());  
  23.         } else {  
  24.             failedAttempts.add(entry.getClient().getAddr());  
  25.         }  
  26.     }  
  27.   
  28.     StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");  
  29.     if (!freezed.isEmpty()) {  
  30.         errorMsg.append(" Disconnected hosts: " + freezed);  
  31.     }  
  32.     if (!failedAttempts.isEmpty()) {  
  33.         errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);  
  34.     }  
  35.     //获取连接失败抛出异常  
  36.     RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());  
  37.     return connectionManager.newFailedFuture(exception);  
  38. }  
  39.   
  40. /**    
  41. * 读写操作从ConnectionPool.java连接池里获取连接对象 
  42. * @param command for RedisCommand<?>  
  43. * @param entry for ClientConnectionsEntry 
  44. * @return RFuture<T> 
  45. */   
  46. private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {  
  47.     //创建一个异步结果获取RPromise  
  48.     final RPromise<T> result = connectionManager.newPromise();  
  49.     //获取连接前首先将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1  
  50.     //该操作成功后将调用这里的回调函数AcquireCallback<T>  
  51.     AcquireCallback<T> callback = new AcquireCallback<T>() {  
  52.         @Override  
  53.         public void run() {  
  54.             result.removeListener(this);  
  55.             //freeConnectionsCounter值减1成功,说明获取可以获取到连接  
  56.             //这里才是真正获取连接的操作  
  57.             connectTo(entry, result);  
  58.         }  
  59.           
  60.         @Override  
  61.         public void operationComplete(Future<T> future) throws Exception {  
  62.             entry.removeConnection(this);  
  63.         }  
  64.     };  
  65.     //异步结果获取RPromise绑定到上面的回调函数callback  
  66.     result.addListener(callback);  
  67.     //尝试将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1,如果成功就调用callback从连接池获取连接  
  68.     acquireConnection(entry, callback);  
  69.     //返回异步结果获取RPromise  
  70.     return result;  
  71. }  
  72.   
  73. /**    
  74. * 真正从连接池中获取连接 
  75. * @param entry for ClientConnectionsEntry 
  76. * @param promise for RPromise<T> 
  77. */   
  78. private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {  
  79.     if (promise.isDone()) {  
  80.         releaseConnection(entry);  
  81.         return;  
  82.     }  
  83.     //从连接池中取出一个连接  
  84.     T conn = poll(entry);  
  85.     if (conn != null) {  
  86.         if (!conn.isActive()) {  
  87.             promiseFailure(entry, promise, conn);  
  88.             return;  
  89.         }  
  90.   
  91.         connectedSuccessful(entry, promise, conn);  
  92.         return;  
  93.     }  
  94.     //如果仍然获取不到连接,可能连接池中连接对象都被租借了,这里开始创建一个新的连接对象放到连接池中  
  95.     createConnection(entry, promise);  
  96. }  
  97.   
  98. /**    
  99. * 从连接池中获取连接 
  100. * @param entry for ClientConnectionsEntry 
  101. * @return T 
  102. */   
  103. protected T poll(ClientConnectionsEntry entry) {  
  104.     return (T) entry.pollConnection();  
  105. }  
  106.   
  107. /**    
  108. * 调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接 
  109. * @param entry for ClientConnectionsEntry 
  110. * @param promise for RPromise<T> 
  111. */   
  112. private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {  
  113.     //调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接  
  114.     RFuture<T> connFuture = connect(entry);  
  115.     connFuture.addListener(new FutureListener<T>() {  
  116.         @Override  
  117.         public void operationComplete(Future<T> future) throws Exception {  
  118.             if (!future.isSuccess()) {  
  119.                 promiseFailure(entry, promise, future.cause());  
  120.                 return;  
  121.             }  
  122.   
  123.             T conn = future.getNow();  
  124.             if (!conn.isActive()) {  
  125.                 promiseFailure(entry, promise, conn);  
  126.                 return;  
  127.             }  
  128.   
  129.             connectedSuccessful(entry, promise, conn);  
  130.         }  
  131.     });  
  132. }  
       ConnectionPool.java里获取读写操作的连接,是遍历ConnectionPool里维持的ClientConnectionsEntry列表,找到一非冻结的ClientConnectionsEntry,然后调用ClientConnectionsEntry里的freeConnectionsCounter尝试将值减1,如果成功,说明连接池中可以获取到连接,那么就从ClientConnectionsEntry里获取一个连接出来,如果拿不到连接,会调用ClientConnectionsEntry创建一个新连接放置到连接池中,并返回此连接,这里回顾一下ClientConnectionsEntry的组成图
       我们继续跟进ClientConnectionsEntry.java的源代码,注释如下:
Java代码  收藏代码
  1. /**    
  2. *  ClientConnectionsEntry里从freeConnections里获取一个连接并返回给读写操作使用 
  3. */   
  4. public RedisConnection pollConnection() {  
  5.     return freeConnections.poll();  
  6. }  
  7.   
  8. /**    
  9. *  ClientConnectionsEntry里新创建一个连接对象返回给读写操作使用 
  10. */   
  11. public RFuture<RedisConnection> connect() {  
  12.     //调用RedisClient利用netty连接redis服务端,将返回的netty的outboundchannel包装成RedisConnection并返回  
  13.     RFuture<RedisConnection> future = client.connectAsync();  
  14.     future.addListener(new FutureListener<RedisConnection>() {  
  15.         @Override  
  16.         public void operationComplete(Future<RedisConnection> future) throws Exception {  
  17.             if (!future.isSuccess()) {  
  18.                 return;  
  19.             }  
  20.               
  21.             RedisConnection conn = future.getNow();  
  22.             onConnect(conn);  
  23.             log.debug("new connection created: {}", conn);  
  24.         }  
  25.     });  
  26.     return future;  
  27. }  
       上面的代码说明如果ClientConnectionsEntry里的freeConnections有空闲连接,那么直接返回该连接,如果没有那么调用RedisClient.connectAsync创建一个新的连接,这里我继续注释一下RedisClient.java的源代码如下:
Java代码  收藏代码
  1. package org.redisson.client;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.net.URI;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7. import java.util.concurrent.TimeUnit;  
  8. import org.redisson.api.RFuture;  
  9. import org.redisson.client.handler.RedisChannelInitializer;  
  10. import org.redisson.client.handler.RedisChannelInitializer.Type;  
  11. import org.redisson.misc.RPromise;  
  12. import org.redisson.misc.RedissonPromise;  
  13. import io.netty.bootstrap.Bootstrap;  
  14. import io.netty.channel.Channel;  
  15. import io.netty.channel.ChannelFuture;  
  16. import io.netty.channel.ChannelFutureListener;  
  17. import io.netty.channel.ChannelOption;  
  18. import io.netty.channel.EventLoopGroup;  
  19. import io.netty.channel.group.ChannelGroup;  
  20. import io.netty.channel.group.ChannelGroupFuture;  
  21. import io.netty.channel.group.DefaultChannelGroup;  
  22. import io.netty.channel.nio.NioEventLoopGroup;  
  23. import io.netty.channel.socket.SocketChannel;  
  24. import io.netty.channel.socket.nio.NioSocketChannel;  
  25. import io.netty.util.HashedWheelTimer;  
  26. import io.netty.util.Timer;  
  27. import io.netty.util.concurrent.Future;  
  28. import io.netty.util.concurrent.FutureListener;  
  29. import io.netty.util.concurrent.GlobalEventExecutor;  
  30. import org.redisson.misc.URIBuilder;  
  31.   
  32. /** 
  33.  * 使用java里的网络编程框架Netty连接redis服务端 
  34.  * 作者: Nikita Koksharov 
  35.  */  
  36. public class RedisClient {  
  37.     private final Bootstrap bootstrap;//Netty的工具类Bootstrap,用于连接建立等作用  
  38.     private final Bootstrap pubSubBootstrap;//Netty的工具类Bootstrap,用于连接建立等作用  
  39.     private final InetSocketAddress addr;//socket连接的地址  
  40.     //channels是netty提供的一个全局对象,里面记录着当前socket连接上的所有处于可用状态的连接channel  
  41.     //channels会自动监测里面的channel,当channel断开时,会主动踢出该channel,永远保留当前可用的channel列表  
  42.     private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);  
  43.   
  44.     private ExecutorService executor;//REACOTR模型的java异步执行线程池  
  45.     private final long commandTimeout;//超时时间  
  46.     private Timer timer;//定时器  
  47.     private boolean hasOwnGroup;  
  48.     private RedisClientConfig config;//redis连接配置信息  
  49.   
  50.     //构造方法  
  51.     public static RedisClient create(RedisClientConfig config) {  
  52.         if (config.getTimer() == null) {  
  53.             config.setTimer(new HashedWheelTimer());  
  54.         }  
  55.         return new RedisClient(config);  
  56.     }  
  57.     //构造方法  
  58.     private RedisClient(RedisClientConfig config) {  
  59.         this.config = config;  
  60.         this.executor = config.getExecutor();  
  61.         this.timer = config.getTimer();  
  62.           
  63.         addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());  
  64.           
  65.         bootstrap = createBootstrap(config, Type.PLAIN);  
  66.         pubSubBootstrap = createBootstrap(config, Type.PUBSUB);  
  67.           
  68.         this.commandTimeout = config.getCommandTimeout();  
  69.     }  
  70.   
  71.     //java的网路编程框架Netty工具类Bootstrap初始化  
  72.     private Bootstrap createBootstrap(RedisClientConfig config, Type type) {  
  73.         Bootstrap bootstrap = new Bootstrap()  
  74.                         .channel(config.getSocketChannelClass())  
  75.                         .group(config.getGroup())  
  76.                         .remoteAddress(addr);  
  77.         //注册netty相关socket数据处理RedisChannelInitializer  
  78.         bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));  
  79.         //设置超时时间  
  80.         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());  
  81.         return bootstrap;  
  82.     }  
  83.       
  84.     //构造方法  
  85.     @Deprecated  
  86.     public RedisClient(String address) {  
  87.         this(URIBuilder.create(address));  
  88.     }  
  89.       
  90.     //构造方法  
  91.     @Deprecated  
  92.     public RedisClient(URI address) {  
  93.         this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);  
  94.         hasOwnGroup = true;  
  95.     }  
  96.   
  97.     //构造方法  
  98.     @Deprecated  
  99.     public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) {  
  100.         this(timer, executor, group, address.getHost(), address.getPort());  
  101.     }  
  102.       
  103.     //构造方法  
  104.     @Deprecated  
  105.     public RedisClient(String host, int port) {  
  106.         this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 1000010000);  
  107.         hasOwnGroup = true;  
  108.     }  
  109.   
  110.     //构造方法  
  111.     @Deprecated  
  112.     public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int port) {  
  113.         this(timer, executor, group, NioSocketChannel.class, host, port, 1000010000);  
  114.     }  
  115.       
  116.     //构造方法  
  117.     @Deprecated  
  118.     public RedisClient(String host, int port, int connectTimeout, int commandTimeout) {  
  119.         this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);  
  120.     }  
  121.   
  122.     //构造方法  
  123.     @Deprecated  
  124.     public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port,   
  125.                         int connectTimeout, int commandTimeout) {  
  126.         RedisClientConfig config = new RedisClientConfig();  
  127.         config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)  
  128.         .setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);  
  129.           
  130.         this.config = config;  
  131.         this.executor = config.getExecutor();  
  132.         this.timer = config.getTimer();  
  133.           
  134.         addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());  
  135.           
  136.         //java的网路编程框架Netty工具类Bootstrap初始化  
  137.         bootstrap = createBootstrap(config, Type.PLAIN);  
  138.         pubSubBootstrap = createBootstrap(config, Type.PUBSUB);  
  139.           
  140.         this.commandTimeout = config.getCommandTimeout();  
  141.     }  
  142.   
  143.     //获取连接的IP地址  
  144.     public String getIpAddr() {  
  145.         return addr.getAddress().getHostAddress() + ":" + addr.getPort();  
  146.     }  
  147.     //获取socket连接的地址  
  148.     public InetSocketAddress getAddr() {  
  149.         return addr;  
  150.     }  
  151.     //获取超时时间  
  152.     public long getCommandTimeout() {  
  153.         return commandTimeout;  
  154.     }  
  155.     //获取netty的线程池  
  156.     public EventLoopGroup getEventLoopGroup() {  
  157.         return bootstrap.config().group();  
  158.     }  
  159.     //获取redis连接配置  
  160.     public RedisClientConfig getConfig() {  
  161.         return config;  
  162.     }  
  163.     //获取连接RedisConnection  
  164.     public RedisConnection connect() {  
  165.         try {  
  166.             return connectAsync().syncUninterruptibly().getNow();  
  167.         } catch (Exception e) {  
  168.             throw new RedisConnectionException("Unable to connect to: " + addr, e);  
  169.         }  
  170.     }  
  171.     //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection  
  172.     public RFuture<RedisConnection> connectAsync() {  
  173.         final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();  
  174.         //netty连接redis服务端  
  175.         ChannelFuture channelFuture = bootstrap.connect();  
  176.         channelFuture.addListener(new ChannelFutureListener() {  
  177.             @Override  
  178.             public void operationComplete(final ChannelFuture future) throws Exception {  
  179.                 if (future.isSuccess()) {  
  180.                     //将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection  
  181.                     final RedisConnection c = RedisConnection.getFrom(future.channel());  
  182.                     c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() {  
  183.                         @Override  
  184.                         public void operationComplete(final Future<RedisConnection> future) throws Exception {  
  185.                             bootstrap.config().group().execute(new Runnable() {  
  186.                                 @Override  
  187.                                 public void run() {  
  188.                                     if (future.isSuccess()) {  
  189.                                         if (!f.trySuccess(c)) {  
  190.                                             c.closeAsync();  
  191.                                         }  
  192.                                     } else {  
  193.                                         f.tryFailure(future.cause());  
  194.                                         c.closeAsync();  
  195.                                     }  
  196.                                 }  
  197.                             });  
  198.                         }  
  199.                     });  
  200.                 } else {  
  201.                     bootstrap.config().group().execute(new Runnable() {  
  202.                         public void run() {  
  203.                             f.tryFailure(future.cause());  
  204.                         }  
  205.                     });  
  206.                 }  
  207.             }  
  208.         });  
  209.         return f;  
  210.     }  
  211.     //获取订阅相关连接RedisPubSubConnection  
  212.     public RedisPubSubConnection connectPubSub() {  
  213.         try {  
  214.             return connectPubSubAsync().syncUninterruptibly().getNow();  
  215.         } catch (Exception e) {  
  216.             throw new RedisConnectionException("Unable to connect to: " + addr, e);  
  217.         }  
  218.     }  
  219.   
  220.     //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection  
  221.     public RFuture<RedisPubSubConnection> connectPubSubAsync() {  
  222.         final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>();  
  223.         //netty连接redis服务端  
  224.         ChannelFuture channelFuture = pubSubBootstrap.connect();  
  225.         channelFuture.addListener(new ChannelFutureListener() {  
  226.             @Override  
  227.             public void operationComplete(final ChannelFuture future) throws Exception {  
  228.                 if (future.isSuccess()) {  
  229.                     //将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection  
  230.                     final RedisPubSubConnection c = RedisPubSubConnection.getFrom(future.channel());  
  231.                     c.<RedisPubSubConnection>getConnectionPromise().addListener(new FutureListener<RedisPubSubConnection>() {  
  232.                         @Override  
  233.                         public void operationComplete(final Future<RedisPubSubConnection> future) throws Exception {  
  234.                             bootstrap.config().group().execute(new Runnable() {  
  235.                                 @Override  
  236.                                 public void run() {  
  237.                                     if (future.isSuccess()) {  
  238.                                         if (!f.trySuccess(c)) {  
  239.                                             c.closeAsync();  
  240.                                         }  
  241.                                     } else {  
  242.                                         f.tryFailure(future.cause());  
  243.                                         c.closeAsync();  
  244.                                     }  
  245.                                 }  
  246.                             });  
  247.                         }  
  248.                     });  
  249.                 } else {  
  250.                     bootstrap.config().group().execute(new Runnable() {  
  251.                         public void run() {  
  252.                             f.tryFailure(future.cause());  
  253.                         }  
  254.                     });  
  255.                 }  
  256.             }  
  257.         });  
  258.         return f;  
  259.     }  
  260.   
  261.     //关闭netty网络连接  
  262.     public void shutdown() {  
  263.         shutdownAsync().syncUninterruptibly();  
  264.         if (hasOwnGroup) {  
  265.             timer.stop();  
  266.             executor.shutdown();  
  267.             try {  
  268.                 executor.awaitTermination(15, TimeUnit.SECONDS);  
  269.             } catch (InterruptedException e) {  
  270.                 Thread.currentThread().interrupt();  
  271.             }  
  272.             bootstrap.config().group().shutdownGracefully();  
  273.               
  274.         }  
  275.     }  
  276.   
  277.     //异步关闭netty网络连接  
  278.     public ChannelGroupFuture shutdownAsync() {  
  279.         for (Channel channel : channels) {  
  280.             RedisConnection connection = RedisConnection.getFrom(channel);  
  281.             if (connection != null) {  
  282.                 connection.setClosed(true);  
  283.             }  
  284.         }  
  285.         return channels.close();  
  286.     }  
  287.   
  288.     @Override  
  289.     public String toString() {  
  290.         return "[addr=" + addr + "]";  
  291.     }  
  292. }  
       上面就是Redisson利用java网络编程框架netty连接redis的全过程,如果你对netty比较熟悉,阅读上面的代码应该不是问题。

转载地址:http://aperise.iteye.com/blog/2400528

分享到:
评论

相关推荐

    jackson-annotations-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-annotations-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-annotations-2.10.4.pom; 包含翻译后的API文档:jackson-annotations-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven...

    jackson-databind-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-databind-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-databind-2.10.4.pom; 包含翻译后的API文档:jackson-databind-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven...

    jackson-core-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-core-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-core-2.10.4.pom; 包含翻译后的API文档:jackson-core-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven坐标:...

    joda-time-2.10.4-API文档-中文版.zip

    赠送源代码:joda-time-2.10.4-sources.jar; 赠送Maven依赖信息文件:joda-time-2.10.4.pom; 包含翻译后的API文档:joda-time-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven坐标:joda-time:joda-time:2.10.4...

    jackson-databind-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-databind-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-databind-2.10.4.pom; 包含翻译后的API文档:jackson-databind-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven坐标:...

    ehcache-2.10.4.jar

    ehcache-2.10.4.jar

    joda-time-2.10.4-API文档-中英对照版.zip

    赠送源代码:joda-time-2.10.4-sources.jar; 赠送Maven依赖信息文件:joda-time-2.10.4.pom; 包含翻译后的API文档:joda-time-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:joda-time:joda-...

    jackson-core-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-core-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-core-2.10.4.pom; 包含翻译后的API文档:jackson-core-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:...

    freetype-2.10.4.tar.gz

    "freetype-2.10.4.tar.gz"是一个包含源代码的压缩包,解压后可以找到项目文件,按照官方文档的编译指南进行配置和构建。此外,FreeType提供了一套API供开发者调用,用于字体的加载、字形提取、渲染等操作。 五、...

    jackson-annotations-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-annotations-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-annotations-2.10.4.pom; 包含翻译后的API文档:jackson-annotations-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip...

    jackson-datatype-jdk8-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-datatype-jdk8-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jdk8-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jdk8-2.10.4-javadoc-API文档-中文(简体)版.zip; ...

    jackson-datatype-jsr310-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-datatype-jsr310-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jsr310-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jsr310-2.10.4-javadoc-API文档-中文(简体)版....

    zipkin-server-2.10.4-exec

    zipkin-server-2.10.4-exec zipkin实现链路追踪,使用方法请看本人博客

    jackson-datatype-jdk8-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-datatype-jdk8-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jdk8-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jdk8-2.10.4-javadoc-API文档-中文(简体)-英语-对照...

    jackson-datatype-jsr310-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-datatype-jsr310-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jsr310-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jsr310-2.10.4-javadoc-API文档-中文(简体)-英语...

    PyPI 官网下载 | ansible-2.10.4.tar.gz

    在Python的世界里,Ansible是一个不可或缺的库,它使得基础设施即代码(IAC)的理念得以轻松实现。本文将详细探讨Ansible 2.10.4版本中的关键知识点,并通过实际应用示例来加深理解。 首先,我们来看Ansible的核心...

    scala-2.10.4.msi

    scala-2.10.4.msi,windows下的安装包,适用于scala的学习,非常实用。

    jSerialComm-2.10.4.jar

    Java串口通信库JSerialComm,JSerialComm(jSerialComm-2.10.4.jar)是一个较新的Java串口通信库。

    scala-2.10.4

    Scala-2.10.4是该语言的一个稳定版本,为开发者提供了高效、可扩展的编程环境。这个版本在2014年发布,是Scala 2.x系列的一个重要里程碑。 首先,让我们深入了解Scala的核心特性: 1. **类型系统**:Scala具有静态...

Global site tag (gtag.js) - Google Analytics