`

redisson-2.10.4源代码分析

阅读更多

    

redis 学习问题总结

http://aperise.iteye.com/blog/2310639

ehcache memcached redis 缓存技术总结

http://aperise.iteye.com/blog/2296219

redis-stat 离线安装

http://aperise.iteye.com/blog/2310254

redis  cluster 非ruby方式启动

http://aperise.iteye.com/blog/2310254

redis-sentinel安装部署

http://aperise.iteye.com/blog/2342693

spring-data-redis使用

 http://aperise.iteye.com/blog/2342615

redis客户端redisson实战

http://aperise.iteye.com/blog/2396196

redisson-2.10.4源代码分析

http://aperise.iteye.com/blog/2400528

tcmalloc jemalloc libc选择

http://blog.csdn.net/u010994304/article/details/49906819

 

1.RedissonClient一主两从部署时连接池组成

       对如下图的主从部署(1主2从): 
       redisson纯java操作代码如下: 

Config config = new Config();// 创建配置
  config.useMasterSlaveServers() // 指定使用主从部署方式
  //.setReadMode(ReadMode.SLAVE) 默认值SLAVE,读操作只在从节点进行
  //.setSubscriptionMode(SubscriptionMode.SLAVE) 默认值SLAVE,订阅操作只在从节点进行
  //.setMasterConnectionMinimumIdleSize(10) 默认值10,针对每个master节点初始化10个连接
  //.setMasterConnectionPoolSize(64) 默认值64,针对每个master节点初始化10个连接,最大可以扩展至64个连接
  //.setSlaveConnectionMinimumIdleSize(10) 默认值10,针对每个slave节点初始化10个连接
  //.setSlaveConnectionPoolSize(64) 默认值,针对每个slave节点初始化10个连接,最大可以扩展至64个连接
  //.setSubscriptionConnectionMinimumIdleSize(1) 默认值1,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接
  //.setSubscriptionConnectionPoolSize(50) 默认值50,在SubscriptionMode=SLAVE时候,针对每个slave节点初始化1个连接,最大可以扩展至50个连接
  .setMasterAddress("redis://192.168.29.24:6379")  // 设置redis主节点
  .addSlaveAddress("redis://192.168.29.24:7000") // 设置redis从节点
  .addSlaveAddress("redis://192.168.29.24:7001"); // 设置redis从节点
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右)

       上面代码执行完毕后,如果在redis服务端所在服务器执行以下linux命令:

#6379上建立了10个连接
netstat -ant |grep 6379|grep  ESTABLISHED
#7000上建立了11个连接
netstat -ant |grep 7000|grep  ESTABLISHED
#7001上建立了11个连接
netstat -ant |grep 7001|grep  ESTABLISHED

       你会发现redisson连接到redis服务端总计建立了32个连接,其中masterpool占据10个连接,slavepool占据20个连接,另外pubSubConnectionPool占据2个连接,连接池中池化对象分布如下图:

  • MasterConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为只有一个master,所以上图创建了10个连接;
  • MasterPubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为默认SubscriptionMode=SubscriptionMode.SLAVE,所以master上不会创建连接池所以上图MasterPubSubConnectionPool里没有创建任何连接;
  • SlaveConnectionPool:默认针对每个不同的IP+port组合,初始化10个对象,最大可扩展至64个,因为有两个slave,每个slave上图创建了10个连接,总计创建了20个连接;
  • PubSubConnectionPool:默认针对每个不同的IP+port组合,初始化1个对象,最大可扩展至50个,因为有两个slave,每个slave上图创建了1个连接,总计创建了2个连接。

       从上图可以看出,连接池是针对每个IP端口都有一个独立的池,连接池也按照主从进行划分,那么这些连接池是在哪里初始化的?如何初始化的?读操作和写操作如何进行的?这就是今天要解答的问题,要解答这些问题最好还是查看redisson的源码。

 

2.Redisson初始化连接池源码分析

    2.1 Redisson.java 

       RedissonClient.java是一个接口类,它的实现类是Redisson.java,对于Redisson.java的介绍先以一张Redisson的4大组件关系图开始,如下图:

       对Redisson.java的代码注释如下:

/**  
* 根据配置Config创建redisson操作类RedissonClient  
* @param config for Redisson  
* @return Redisson instance  
*/  
public static RedissonClient create(Config config) {  
    //调用构造方法  
    Redisson redisson = new Redisson(config);  
    if (config.isRedissonReferenceEnabled()) {  
        redisson.enableRedissonReferenceSupport();  
    }  
    return redisson;  
}  

/**  
* Redisson构造方法  
* @param config for Redisson  
* @return Redisson instance  
*/  
protected Redisson(Config config) {  
    //赋值变量config  
    this.config = config;  
    //产生一份对于传入config的备份  
    Config configCopy = new Config(config);  

    //根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化  
    connectionManager = ConfigSupport.createConnectionManager(configCopy);  
    //连接池对象回收调度器  
    evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());  
    //Redisson的对象编码类  
    codecProvider = configCopy.getCodecProvider();  
    //Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider  
    resolverProvider = configCopy.getResolverProvider();  
}

       其中与连接池相关的就是ConnectionManager,ConnectionManager的初始化转交工具类ConfigSupport.java进行,ConfigSupport.java会根据部署方式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)的不同而分别进行。

 

    2.2 ConfigSupport.java

       这里现将ConfigSupport.java创建ConnectionManager的核心代码注释如下: 

/**  
* 据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化  
* @param configCopy for Redisson  
* @return ConnectionManager instance  
*/  
public static ConnectionManager createConnectionManager(Config configCopy) {  
    if (configCopy.getMasterSlaveServersConfig() != null) {//配置configCopy类型为主从模式  
        validate(configCopy.getMasterSlaveServersConfig());  
        return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy);  
    } else if (configCopy.getSingleServerConfig() != null) {//配置configCopy类型为单机模式  
        validate(configCopy.getSingleServerConfig());  
        return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy);  
    } else if (configCopy.getSentinelServersConfig() != null) {//配置configCopy类型为哨兵模式  
        validate(configCopy.getSentinelServersConfig());  
        return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy);  
    } else if (configCopy.getClusterServersConfig() != null) {//配置configCopy类型为集群模式  
        validate(configCopy.getClusterServersConfig());  
        return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy);  
    } else if (configCopy.getElasticacheServersConfig() != null) {//配置configCopy类型为亚马逊云模式  
        validate(configCopy.getElasticacheServersConfig());  
        return new ElasticacheConnectionManager(configCopy.getElasticacheServersConfig(), configCopy);  
    } else if (configCopy.getReplicatedServersConfig() != null) {//配置configCopy类型为微软云模式  
        validate(configCopy.getReplicatedServersConfig());  
        return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy);  
    } else if (configCopy.getConnectionManager() != null) {//直接返回configCopy自带的默认ConnectionManager  
        return configCopy.getConnectionManager();  
    }else {  
        throw new IllegalArgumentException("server(s) address(es) not defined!");  
    }  
}

       上面可以看到根据传入的配置Config.java的不同,会分别创建不同的ConnectionManager的实现类。

 

    2.3 MasterSlaveConnectionManager.java

       这里开始介绍ConnectionManager,ConnectionManager.java是一个接口类,它有6个实现类,分别对应着不同的部署模式(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式),如下如所示:
       这里以主从部署方式进行讲解,先通过一张图了解MasterSlaveConnectionManager的组成:


       上图中最终要的组件要数MasterSlaveEntry,在后面即将进行介绍,这里注释MasterSlaveConnectionManager.java的核心代码如下:  

/**   
* MasterSlaveConnectionManager的构造方法 
* @param cfg for MasterSlaveServersConfig 
* @param config for Config   
*/    
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
    //调用构造方法
    this(config);
    //
    initTimer(cfg);
    this.config = cfg;
    //初始化MasterSlaveEntry
    initSingleEntry();
}
/**   
* MasterSlaveConnectionManager的构造方法 
* @param cfg for Config 
*/    
public MasterSlaveConnectionManager(Config cfg) {
    //读取redisson的jar中的文件META-INF/MANIFEST.MF,打印出Bundle-Version对应的Redisson版本信息
    Version.logVersion();
    //EPOLL是linux的多路复用IO模型的增强版本,这里如果启用EPOLL,就让redisson底层netty使用EPOLL的方式,否则配置netty里的NIO非阻塞方式
    if (cfg.isUseLinuxNativeEpoll()) {
        if (cfg.getEventLoopGroup() == null) {
            //使用linux IO非阻塞模型EPOLL
            this.group = new EpollEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
        } else {
            this.group = cfg.getEventLoopGroup();
        }
        this.socketChannelClass = EpollSocketChannel.class;
    } else {
        if (cfg.getEventLoopGroup() == null) {
            //使用linux IO非阻塞模型NIO
            this.group = new NioEventLoopGroup(cfg.getNettyThreads(), new DefaultThreadFactory("redisson-netty"));
        } else {
            this.group = cfg.getEventLoopGroup();
        }
        this.socketChannelClass = NioSocketChannel.class;
    }
    if (cfg.getExecutor() == null) {
        //线程池大小,对于2U 2CPU 8cores/cpu,意思是有2块板子,每个板子上8个物理CPU,那么总计物理CPU个数为16
        //对于linux有个超线程概念,意思是每个物理CPU可以虚拟出2个逻辑CPU,那么总计逻辑CPU个数为32
        //这里Runtime.getRuntime().availableProcessors()取的是逻辑CPU的个数,所以这里线程池大小会是64
        int threads = Runtime.getRuntime().availableProcessors() * 2;
        if (cfg.getThreads() != 0) {
            threads = cfg.getThreads();
        }
        executor = Executors.newFixedThreadPool(threads, new DefaultThreadFactory("redisson"));
    } else {
        executor = cfg.getExecutor();
    }

    this.cfg = cfg;
    this.codec = cfg.getCodec();
    //一个可以获取异步执行任务返回值的回调对象,本质是对于java的Future的实现,监控MasterSlaveConnectionManager的shutdown进行一些必要的处理
    this.shutdownPromise = newPromise();
    //一个持有MasterSlaveConnectionManager的异步执行服务
    this.commandExecutor = new CommandSyncService(this);
}
/**   
* 初始化定时调度器
* @param config for MasterSlaveServersConfig 
*/   
protected void initTimer(MasterSlaveServersConfig config) {
    //读取超时时间配置信息
    int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout(), config.getReconnectionTimeout()};
    Arrays.sort(timeouts);
    int minTimeout = timeouts[0];
    //设置默认超时时间
    if (minTimeout % 100 != 0) {
        minTimeout = (minTimeout % 100) / 2;
    } else if (minTimeout == 100) {
        minTimeout = 50;
    } else {
        minTimeout = 100;
    }
    //创建定时调度器
    timer = new HashedWheelTimer(Executors.defaultThreadFactory(), minTimeout, TimeUnit.MILLISECONDS, 1024);
    
    // to avoid assertion error during timer.stop invocation
    try {
        Field leakField = HashedWheelTimer.class.getDeclaredField("leak");
        leakField.setAccessible(true);
        leakField.set(timer, null);
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    //检测MasterSlaveConnectionManager的空闲连接的监视器IdleConnectionWatcher,会清理不用的空闲的池中连接对象
    connectionWatcher = new IdleConnectionWatcher(this, config);
}

/**   
* 创建MasterSlaveConnectionManager的MasterSlaveEntry  
*/    
protected void initSingleEntry() {
    try {
        //主从模式下0~16383加入到集合slots  
        HashSet<ClusterSlotRange> slots = new HashSet<ClusterSlotRange>();
        slots.add(singleSlotRange);

        MasterSlaveEntry entry;
        if (config.checkSkipSlavesInit()) {//ReadMode不为MASTER并且SubscriptionMode不为MASTER才执行
            entry = new SingleEntry(slots, this, config);
            RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
            f.syncUninterruptibly();
        } else {//默认主从部署ReadMode=SLAVE,SubscriptionMode=SLAVE,这里会执行
            entry = createMasterSlaveEntry(config, slots);
        }
        //将每个分片0~16383都指向创建的MasterSlaveEntry
        for (int slot = singleSlotRange.getStartSlot(); slot < singleSlotRange.getEndSlot() + 1; slot++) {
            addEntry(slot, entry);
        }
        //DNS相关
        if (config.getDnsMonitoringInterval() != -1) {
            dnsMonitor = new DNSMonitor(this, Collections.singleton(config.getMasterAddress()), 
                    config.getSlaveAddresses(), config.getDnsMonitoringInterval());
            dnsMonitor.start();
        }
    } catch (RuntimeException e) {
        stopThreads();
        throw e;
    }
}
/**   
* MasterSlaveEntry的构造方法 
* @param config for MasterSlaveServersConfig  
* @param slots for HashSet<ClusterSlotRange> 
* @return MasterSlaveEntry
*/    
protected MasterSlaveEntry createMasterSlaveEntry(MasterSlaveServersConfig config, HashSet<ClusterSlotRange> slots) {
    //创建MasterSlaveEntry
    MasterSlaveEntry entry = new MasterSlaveEntry(slots, this, config);
    //从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化
    List<RFuture<Void>> fs = entry.initSlaveBalancer(java.util.Collections.<URI>emptySet());
    for (RFuture<Void> future : fs) {
        future.syncUninterruptibly();
    }
    主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化
    RFuture<Void> f = entry.setupMasterEntry(config.getMasterAddress());
    f.syncUninterruptibly();
    return entry;
}

       上面个人觉得有两处代码值得我们特别关注,特别说明如下:

  • entry.initSlaveBalancer:从节点连接池SlaveConnectionPoolPubSubConnectionPool的默认的最小连接数初始化。
  • entry.setupMasterEntry:主节点连接池MasterConnectionPoolMasterPubSubConnectionPool的默认的最小连接数初始化。

 

    2.4 MasterSlaveEntry.java

       用一张图来解释MasterSlaveEntry的组件如下:

       MasterSlaveEntry.java里正是我们一直在寻找着的四个连接池MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,这里注释MasterSlaveEntry.java的核心代码如下:

/**   
* MasterSlaveEntry的构造方法 
* @param slotRanges for Set<ClusterSlotRange>   
* @param connectionManager for ConnectionManager   
* @param config for MasterSlaveServersConfig 
*/    
public MasterSlaveEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {  
    //主从模式下0~16383加入到集合slots  
    for (ClusterSlotRange clusterSlotRange : slotRanges) {  
        for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {  
            slots.add(i);  
        }  
    }  
    //赋值MasterSlaveConnectionManager给connectionManager  
    this.connectionManager = connectionManager;  
    //赋值config  
    this.config = config;  
  
    //创建LoadBalancerManager  
    //其实LoadBalancerManager里持有者从节点的SlaveConnectionPool和PubSubConnectionPool  
    //并且此时连接池里还没有初始化默认的最小连接数  
    slaveBalancer = new LoadBalancerManager(config, connectionManager, this);  
    //创建主节点连接池MasterConnectionPool,此时连接池里还没有初始化默认的最小连接数  
    writeConnectionHolder = new MasterConnectionPool(config, connectionManager, this);  
    //创建主节点连接池MasterPubSubConnectionPool,此时连接池里还没有初始化默认的最小连接数  
    pubSubConnectionHolder = new MasterPubSubConnectionPool(config, connectionManager, this);  
}  
  
/**   
* 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化 
* @param disconnectedNodes for Collection<URI> 
* @return List<RFuture<Void>>   
*/   
public List<RFuture<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {  
    //这里freezeMasterAsSlave=true  
    boolean freezeMasterAsSlave = !config.getSlaveAddresses().isEmpty() && !config.checkSkipSlavesInit() && disconnectedNodes.size() < config.getSlaveAddresses().size();  
  
    List<RFuture<Void>> result = new LinkedList<RFuture<Void>>();  
    //把主节点当作从节点处理,因为默认ReadMode=ReadMode.SLAVE,所以这里不会添加针对该节点的连接池  
    RFuture<Void> f = addSlave(config.getMasterAddress(), freezeMasterAsSlave, NodeType.MASTER);  
    result.add(f);  
    //读取从节点的地址信息,然后针对每个从节点地址创建SlaveConnectionPool和PubSubConnectionPool  
    //SlaveConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】  
    //PubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】  
    for (URI address : config.getSlaveAddresses()) {  
        f = addSlave(address, disconnectedNodes.contains(address), NodeType.SLAVE);  
        result.add(f);  
    }  
    return result;  
}  
  
/**   
* 从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化 
* @param address for URI 
* @param freezed for boolean 
* @param nodeType for NodeType 
* @return RFuture<Void> 
*/   
private RFuture<Void> addSlave(URI address, boolean freezed, NodeType nodeType) {  
    //创建到从节点的连接RedisClient  
    RedisClient client = connectionManager.createClient(NodeType.SLAVE, address);  
    ClientConnectionsEntry entry = new ClientConnectionsEntry(client,  
            this.config.getSlaveConnectionMinimumIdleSize(),  
            this.config.getSlaveConnectionPoolSize(),  
            this.config.getSubscriptionConnectionMinimumIdleSize(),  
            this.config.getSubscriptionConnectionPoolSize(), connectionManager, nodeType);  
    //默认只有主节点当作从节点是会设置freezed=true  
    if (freezed) {  
        synchronized (entry) {  
            entry.setFreezed(freezed);  
            entry.setFreezeReason(FreezeReason.SYSTEM);  
        }  
    }  
    //调用slaveBalancer来对从节点连接池SlaveConnectionPool和PubSubConnectionPool的默认的最小连接数初始化  
    return slaveBalancer.add(entry);  
}  
  
/**   
* 主节点连接池MasterConnectionPool和MasterPubSubConnectionPool的默认的最小连接数初始化 
* @param address for URI 
* @return RFuture<Void> 
*/   
public RFuture<Void> setupMasterEntry(URI address) {  
    //创建到主节点的连接RedisClient  
    RedisClient client = connectionManager.createClient(NodeType.MASTER, address);  
    masterEntry = new ClientConnectionsEntry(  
            client,   
            config.getMasterConnectionMinimumIdleSize(),   
            config.getMasterConnectionPoolSize(),  
            config.getSubscriptionConnectionMinimumIdleSize(),  
            config.getSubscriptionConnectionPoolSize(),   
            connectionManager,   
            NodeType.MASTER);  
    //如果配置的SubscriptionMode=SubscriptionMode.MASTER就初始化MasterPubSubConnectionPool  
    //默认SubscriptionMode=SubscriptionMode.SLAVE,MasterPubSubConnectionPool这里不会初始化最小连接数  
    if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {  
        //MasterPubSubConnectionPool【初始化1个RedisPubSubConnection,最大可以扩展至50个】  
        RFuture<Void> f = writeConnectionHolder.add(masterEntry);  
        RFuture<Void> s = pubSubConnectionHolder.add(masterEntry);  
        return CountListener.create(s, f);  
    }  
    //调用MasterConnectionPool使得连接池MasterConnectionPool里的对象最小个数为10个  
    //MasterConnectionPool【初始化10个RedisConnection,最大可以扩展至64个】  
    return writeConnectionHolder.add(masterEntry);  
}
       上面代码个人觉得有四个地方值得我们特别关注,它们是一个连接池创建对象的入口,列表如下:
  • writeConnectionHolder.add(masterEntry):其实writeConnectionHolder的类型就是MasterConnectionPool,这里是连接池MasterConnectionPool里添加对象 
  • pubSubConnectionHolder.add(masterEntry):其实pubSubConnectionHolder的类型是MasterPubSubConnectionPool,这里是连接池MasterPubSubConnectionPool添加对象
  • slaveConnectionPool.add(entry):这里是连接池SlaveConnectionPool里添加对象
  • pubSubConnectionPool.add(entry):这里是连接池PubSubConnectionPool里添加对象

 

 

    2.5 LoadBalancerManager.java

       图解LoadBalancerManager.java的内部组成如下:

       LoadBalancerManager.java里面有着从节点相关的两个重要的连接池SlaveConnectionPoolPubSubConnectionPool,这里注释LoadBalancerManager.java的核心代码如下:

/**   
* LoadBalancerManager的构造方法 
* @param config for MasterSlaveServersConfig  
* @param connectionManager for ConnectionManager   
* @param entry for MasterSlaveEntry 
*/    
public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {  
    //赋值connectionManager  
    this.connectionManager = connectionManager;  
    //创建连接池SlaveConnectionPool  
    slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);  
    //创建连接池PubSubConnectionPool  
    pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);  
}  
/**   
* LoadBalancerManager的连接池SlaveConnectionPool和PubSubConnectionPool里池化对象添加方法,也即池中需要对象时,调用此方法添加 
* @param entry for ClientConnectionsEntry 
* @return RFuture<Void> 
*/    
public RFuture<Void> add(final ClientConnectionsEntry entry) {  
    final RPromise<Void> result = connectionManager.newPromise();  
    //创建一个回调监听器,在池中对象创建失败时进行2次莫仍尝试  
    FutureListener<Void> listener = new FutureListener<Void>() {  
        AtomicInteger counter = new AtomicInteger(2);  
        @Override  
        public void operationComplete(Future<Void> future) throws Exception {  
            if (!future.isSuccess()) {  
                result.tryFailure(future.cause());  
                return;  
            }  
            if (counter.decrementAndGet() == 0) {  
                String addr = entry.getClient().getIpAddr();  
                ip2Entry.put(addr, entry);  
                result.trySuccess(null);  
            }  
        }  
    };  
    //调用slaveConnectionPool添加RedisConnection对象到池中  
    RFuture<Void> slaveFuture = slaveConnectionPool.add(entry);  
    slaveFuture.addListener(listener);  
    //调用pubSubConnectionPool添加RedisPubSubConnection对象到池中  
    RFuture<Void> pubSubFuture = pubSubConnectionPool.add(entry);  
    pubSubFuture.addListener(listener);  
    return result;  
}

       至此,我们已经了解了开篇提到的四个连接池是在哪里创建的。

 

3. Redisson的4类连接池

       这里我们来详细介绍下Redisson的连接池实现类,Redisson里有4种连接池,它们是MasterConnectionPool、MasterPubSubConnectionPool、SlaveConnectionPool和PubSubConnectionPool,它们的父类都是ConnectionPool,其类继承关系图如下: 

       通过上图我们了解了ConnectionPool类的继承关系图,再来一张图来了解下ConnectionPool.java类的组成,如下:

       好了,再来图就有点啰嗦了,注释ConnectionPool.java代码如下:

abstract class ConnectionPool<T extends RedisConnection> {  
    private final Logger log = LoggerFactory.getLogger(getClass());  
    //维持着连接池对应的redis节点信息  
    //比如1主2从部署MasterConnectionPool里的entries只有一个主节点(192.168.29.24 6379)  
    //比如1主2从部署MasterPubSubConnectionPool里的entries为空,因为SubscriptionMode=SubscriptionMode.SLAVE  
    //比如1主2从部署SlaveConnectionPool里的entries有3个节点(192.168.29.24 6379,192.168.29.24 7000,192.168.29.24 7001,但是注意192.168.29.24 6379冻结属性freezed=true不会参与读操作除非2个从节点全部宕机才参与读操作)  
    //比如1主2从部署PubSubConnectionPool里的entries有2个节点(192.168.29.24 7000,192.168.29.24 7001),因为SubscriptionMode=SubscriptionMode.SLAVE,主节点不会加入  
    protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>();  
    //持有者RedissonClient的组件ConnectionManager  
    final ConnectionManager connectionManager;  
    //持有者RedissonClient的组件ConnectionManager里的MasterSlaveServersConfig  
    final MasterSlaveServersConfig config;  
    //持有者RedissonClient的组件ConnectionManager里的MasterSlaveEntry  
    final MasterSlaveEntry masterSlaveEntry;  
  
    //构造函数  
    public ConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {  
        this.config = config;  
        this.masterSlaveEntry = masterSlaveEntry;  
        this.connectionManager = connectionManager;  
    }  
  
    //连接池中需要增加对象时候调用此方法  
    public RFuture<Void> add(final ClientConnectionsEntry entry) {  
        final RPromise<Void> promise = connectionManager.newPromise();  
        promise.addListener(new FutureListener<Void>() {  
            @Override  
            public void operationComplete(Future<Void> future) throws Exception {  
                entries.add(entry);  
            }  
        });  
        initConnections(entry, promise, true);  
        return promise;  
    }  
  
    //初始化连接池中最小连接数  
    private void initConnections(final ClientConnectionsEntry entry, final RPromise<Void> initPromise, boolean checkFreezed) {  
        final int minimumIdleSize = getMinimumIdleSize(entry);  
  
        if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {  
            initPromise.trySuccess(null);  
            return;  
        }  
  
        final AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);  
        int startAmount = Math.min(50, minimumIdleSize);  
        final AtomicInteger requests = new AtomicInteger(startAmount);  
        for (int i = 0; i < startAmount; i++) {  
            createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);  
        }  
    }  
  
    //创建连接对象到连接池中  
    private void createConnection(final boolean checkFreezed, final AtomicInteger requests, final ClientConnectionsEntry entry, final RPromise<Void> initPromise,  
            final int minimumIdleSize, final AtomicInteger initializedConnections) {  
  
        if ((checkFreezed && entry.isFreezed()) || !tryAcquireConnection(entry)) {  
            int totalInitializedConnections = minimumIdleSize - initializedConnections.get();  
            Throwable cause = new RedisConnectionException(  
                    "Unable to init enough connections amount! Only " + totalInitializedConnections + " from " + minimumIdleSize + " were initialized. Server: "  
                                        + entry.getClient().getAddr());  
            initPromise.tryFailure(cause);  
            return;  
        }  
          
        acquireConnection(entry, new Runnable() {  
              
            @Override  
            public void run() {  
                RPromise<T> promise = connectionManager.newPromise();  
                createConnection(entry, promise);  
                promise.addListener(new FutureListener<T>() {  
                    @Override  
                    public void operationComplete(Future<T> future) throws Exception {  
                        if (future.isSuccess()) {  
                            T conn = future.getNow();  
  
                            releaseConnection(entry, conn);  
                        }  
  
                        releaseConnection(entry);  
  
                        if (!future.isSuccess()) {  
                            int totalInitializedConnections = minimumIdleSize - initializedConnections.get();  
                            String errorMsg;  
                            if (totalInitializedConnections == 0) {  
                                errorMsg = "Unable to connect to Redis server: " + entry.getClient().getAddr();  
                            } else {  
                                errorMsg = "Unable to init enough connections amount! Only " + totalInitializedConnections   
                                        + " from " + minimumIdleSize + " were initialized. Redis server: " + entry.getClient().getAddr();  
                            }  
                            Throwable cause = new RedisConnectionException(errorMsg, future.cause());  
                            initPromise.tryFailure(cause);  
                            return;  
                        }  
  
                        int value = initializedConnections.decrementAndGet();  
                        if (value == 0) {  
                            log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());  
                            if (!initPromise.trySuccess(null)) {  
                                throw new IllegalStateException();  
                            }  
                        } else if (value > 0 && !initPromise.isDone()) {  
                            if (requests.incrementAndGet() <= minimumIdleSize) {  
                                createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);  
                            }  
                        }  
                    }  
                });  
            }  
        });  
  
    }  
  
    //连接池中租借出连接对象  
    public RFuture<T> get(RedisCommand<?> command) {  
        for (int j = entries.size() - 1; j >= 0; j--) {  
            final ClientConnectionsEntry entry = getEntry();  
            if (!entry.isFreezed()   
                    && tryAcquireConnection(entry)) {  
                return acquireConnection(command, entry);  
            }  
        }  
          
        List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();  
        List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();  
        for (ClientConnectionsEntry entry : entries) {  
            if (entry.isFreezed()) {  
                freezed.add(entry.getClient().getAddr());  
            } else {  
                failedAttempts.add(entry.getClient().getAddr());  
            }  
        }  
  
        StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");  
        if (!freezed.isEmpty()) {  
            errorMsg.append(" Disconnected hosts: " + freezed);  
        }  
        if (!failedAttempts.isEmpty()) {  
            errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);  
        }  
  
        RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());  
        return connectionManager.newFailedFuture(exception);  
    }  
  
    //连接池中租借出连接对象执行操作RedisCommand  
    public RFuture<T> get(RedisCommand<?> command, ClientConnectionsEntry entry) {  
        if ((!entry.isFreezed() || entry.getFreezeReason() == FreezeReason.SYSTEM) &&   
                tryAcquireConnection(entry)) {  
            return acquireConnection(command, entry);  
        }  
  
        RedisConnectionException exception = new RedisConnectionException(  
                "Can't aquire connection to " + entry);  
        return connectionManager.newFailedFuture(exception);  
    }  
    
    //通过向redis服务端发送PING看是否返回PONG来检测连接
    private void ping(RedisConnection c, final FutureListener<String> pingListener) {  
        RFuture<String> f = c.async(RedisCommands.PING);  
        f.addListener(pingListener);  
    }  
  
    //归还连接对象到连接池  
    public void returnConnection(ClientConnectionsEntry entry, T connection) {  
        if (entry.isFreezed()) {  
            connection.closeAsync();  
        } else {  
            releaseConnection(entry, connection);  
        }  
        releaseConnection(entry);  
    }  
  
    //释放连接池中连接对象  
    protected void releaseConnection(ClientConnectionsEntry entry) {  
        entry.releaseConnection();  
    }  
  
    //释放连接池中连接对象  
    protected void releaseConnection(ClientConnectionsEntry entry, T conn) {  
        entry.releaseConnection(conn);  
    }  
}

       用一张图来解释ConnectionPool干了些啥,如下图:

       都到这里了,不介意再送一张图了解各种部署方式下的连接池分布了,如下图:


4.Redisson的读写操作句柄类RedissonObject

       对于Redisson的任何操作,都需要获取到操作句柄类RedissonObject,RedissonObject根据不同的数据类型有不同的RedissonObject实现类,RedissonObject的类继承关系图如下:

       例如想设置redis服务端的key=key的值value=123,你需要查询Redis命令和Redisson对象匹配列表,找到如下对应关系:

       然后我们就知道调用代码这么写:

Config config = new Config();// 创建配置  
  config.useMasterSlaveServers() // 指定使用主从部署方式  
  .setMasterAddress("redis://192.168.29.24:6379")  // 设置redis主节点  
  .addSlaveAddress("redis://192.168.29.24:7000") // 设置redis从节点  
  .addSlaveAddress("redis://192.168.29.24:7001"); // 设置redis从节点  
RedissonClient redisson = Redisson.create(config);// 创建客户端(发现这一操作非常耗时,基本在2秒-4秒左右) 

//任何Redisson操作首先需要获取对应的操作句柄
//RBucket是操作句柄之一,实现类是RedissonBucket
RBucket<String> rBucket = redissonClient.getBucket("key");

//通过操作句柄rBucket进行读操作
rBucket.get();

//通过操作句柄rBucket进行写操作
rBucket.set("123");

       至于其它的redis命令对应的redisson操作对象,都可以官网的Redis命令和Redisson对象匹配列表 查到。

 

6.Redisson的读写操作源码分析

       从一个读操作的代码作为入口分析代码,如下:

//任何Redisson操作首先需要获取对应的操作句柄,RBucket是操作句柄之一,实现类是RedissonBucket  
RBucket<String> rBucket = redissonClient.getBucket("key");  
  
//通过操作句柄rBucket进行读操作  
rBucket.get();  
       继续追踪上面RBucket的get方法,如下:

       上面我们看到不管是读操作还是写操作都转交CommandAsyncExecutor进行处理,那么这里我们需要看一下CommandAsyncExecutor.java里关于读写操作处理的核心代码,注释代码如下:
private NodeSource getNodeSource(String key) {
    //通过公式CRC16.crc16(key.getBytes()) % MAX_SLOT
    //计算出一个字符串key对应的分片在0~16383中哪个分片
    int slot = connectionManager.calcSlot(key);
    //之前已经将0~16383每个分片对应到唯一的一个MasterSlaveEntry,这里取出来
    MasterSlaveEntry entry = connectionManager.getEntry(slot);
    //这里将MasterSlaveEntry包装成NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    return new NodeSource(entry);
}
@Override
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
    RPromise<R> mainPromise = connectionManager.newPromise();
    //获取NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    NodeSource source = getNodeSource(key);
    调用异步执行方法async
    async(true, source, codec, command, params, mainPromise, 0);
    return mainPromise;
}
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
        final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt) {
    //操作被取消,那么直接返回
    if (mainPromise.isCancelled()) {
        free(params);
        return;
    }
    //连接管理器无法连接,释放参数所占资源,然后返回
    if (!connectionManager.getShutdownLatch().acquire()) {
        free(params);
        mainPromise.tryFailure(new RedissonShutdownException("Redisson is shutdown"));
        return;
    }
    
    final AsyncDetails<V, R> details = AsyncDetails.acquire();
    if (isRedissonReferenceSupportEnabled()) {
        try {
            for (int i = 0; i < params.length; i++) {
                RedissonReference reference = RedissonObjectFactory.toReference(getConnectionManager().getCfg(), params[i]);
                if (reference != null) {
                    params[i] = reference;
                }
            }
        } catch (Exception e) {
            connectionManager.getShutdownLatch().release();
            free(params);
            mainPromise.tryFailure(e);
            return;
        }
    }
    
    //开始从connectionManager获取池中的连接
    //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作
    final RFuture<RedisConnection> connectionFuture;
    if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行
        connectionFuture = connectionManager.connectionReadOp(source, command);
    } else {//对于写操作默认readOnlyMode=false,这里会执行
        connectionFuture = connectionManager.connectionWriteOp(source, command);
    }
    
    //创建RPromise,用于操作失败时候重试
    final RPromise<R> attemptPromise = connectionManager.newPromise();
    details.init(connectionFuture, attemptPromise, readOnlyMode, source, codec, command, params, mainPromise, attempt);
    //创建FutureListener,监测外部请求是否已经取消了之前提交的读写操作,如果取消了,那么就让正在执行的读写操作停止
    FutureListener<R> mainPromiseListener = new FutureListener<R>() {
        @Override
        public void operationComplete(Future<R> future) throws Exception {
            if (future.isCancelled() && connectionFuture.cancel(false)) {
                log.debug("Connection obtaining canceled for {}", command);
                details.getTimeout().cancel();
                if (details.getAttemptPromise().cancel(false)) {
                    free(params);
                }
            }
        }
    };

    //创建TimerTask,用于操作失败后通过定时器进行操作重试
    final TimerTask retryTimerTask = new TimerTask() {
        @Override
        public void run(Timeout t) throws Exception {
            if (details.getAttemptPromise().isDone()) {
                return;
            }
            if (details.getConnectionFuture().cancel(false)) {
                connectionManager.getShutdownLatch().release();
            } else {
                if (details.getConnectionFuture().isSuccess()) {
                    if (details.getWriteFuture() == null || !details.getWriteFuture().isDone()) {
                        if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
                            if (details.getWriteFuture().cancel(false)) {
                                if (details.getException() == null) {
                                    details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams()) + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
                                }
                                details.getAttemptPromise().tryFailure(details.getException());
                            }
                            return;
                        }
                        details.incAttempt();
                        Timeout timeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
                        details.setTimeout(timeout);
                        return;
                    }

                    if (details.getWriteFuture().isDone() && details.getWriteFuture().isSuccess()) {
                        return;
                    }
                }
            }
            if (details.getMainPromise().isCancelled()) {
                if (details.getAttemptPromise().cancel(false)) {
                    free(details);
                    AsyncDetails.release(details);
                }
                return;
            }
            if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
                if (details.getException() == null) {
                    details.setException(new RedisTimeoutException("Unable to send command: " + command + " with params: " + LogHelper.toString(details.getParams() + " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts")));
                }
                details.getAttemptPromise().tryFailure(details.getException());
                return;
            }
            if (!details.getAttemptPromise().cancel(false)) {
                return;
            }
            int count = details.getAttempt() + 1;
            if (log.isDebugEnabled()) {
                log.debug("attempt {} for command {} and params {}",
                        count, details.getCommand(), Arrays.toString(details.getParams()));
            }
            details.removeMainPromiseListener();
            async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count);
            AsyncDetails.release(details);
        }
    };

    //配置对于读写操作的超时时间
    Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
    details.setTimeout(timeout);
    details.setupMainPromiseListener(mainPromiseListener);

    //给connectionFuture增加监听事件,当从连接池中获取连接成功,成功的事件会被触发,通知这里执行后续读写动作
    connectionFuture.addListener(new FutureListener<RedisConnection>() {
        @Override
        public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
            if (connFuture.isCancelled()) {//从池中获取连接被取消,直接返回
                return;
            }

            if (!connFuture.isSuccess()) {//从池中获取连接失败
                connectionManager.getShutdownLatch().release();
                details.setException(convertException(connectionFuture));
                return;
            }

            if (details.getAttemptPromise().isDone() || details.getMainPromise().isDone()) {//从池中获取连接失败,并且尝试了一定次数仍然失败,默认尝试次数为0
                releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
                return;
            }

            //从池中获取连接成功,这里取出连接对象RedisConnection
            final RedisConnection connection = connFuture.getNow();
            //如果需要重定向,这里进行重定向
            //重定向的情况有:集群模式对应的slot分布在其他节点,就需要进行重定向
            if (details.getSource().getRedirect() == Redirect.ASK) {
                List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
                RPromise<Void> promise = connectionManager.newPromise();
                list.add(new CommandData<Void, Void>(promise, details.getCodec(), RedisCommands.ASKING, new Object[]{}));
                list.add(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
                RPromise<Void> main = connectionManager.newPromise();
                ChannelFuture future = connection.send(new CommandsData(main, list));
                details.setWriteFuture(future);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("acquired connection for command {} and params {} from slot {} using node {}... {}",
                            details.getCommand(), Arrays.toString(details.getParams()), details.getSource(), connection.getRedisClient().getAddr(), connection);
                }
                //发送读写操作到RedisConnection,进行执行
                ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(), details.getCodec(), details.getCommand(), details.getParams()));
                details.setWriteFuture(future);
            }
            //对于写操作增加监听事件回调,对写操作是否成功,失败原因进行日志打印
            details.getWriteFuture().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkWriteFuture(details, connection);
                }
            });
            //返回RedisConnection连接到连接池
            releaseConnection(source, connectionFuture, details.isReadOnlyMode(), details.getAttemptPromise(), details);
        }
    });

    attemptPromise.addListener(new FutureListener<R>() {
        @Override
        public void operationComplete(Future<R> future) throws Exception {
            checkAttemptFuture(source, details, future);
        }
    });
}
       上面的代码我用一张读写操作处理流程图总结如下:

 

 

       至此,关于读写操作的源码讲解完毕。在上面的代码注释中,列出如下重点。

    6.1 分片SLOT的计算公式

       SLOT=CRC16.crc16(key.getBytes()) % MAX_SLOT

 

    6.2 每个ConnectionPool持有的ClientConnectionsEntry对象冻结判断条件

       一个节点被判断为冻结,必须同时满足以下条件:

  • 该节点有slave节点,并且从节点个数大于0;
  • 设置的配置ReadMode不为并且SubscriptionMode不为MASTER;
  • 该节点的从节点至少有一个存活着,也即如果有从节点宕机,宕机的从节点的个数小于该节点总的从节点个数

    6.3 读写负载图

 

 


 

7.Redisson的读写操作从连接池获取连接对象源码分析和Redisson里RedisClient使用netty源码分析

       读写操作首先都需要获取到一个连接对象,在上面的分析中我们知道读写操作都是通过CommandAsyncExecutor.java里的如下代码获取连接对象:

    //开始从connectionManager获取池中的连接  
    //这里采用异步方式,创建一个RFuture对象,等待池中连接,一旦获得连接,然后进行读和写操作  
    final RFuture<RedisConnection> connectionFuture;  
    if (readOnlyMode) {//对于读操作默认readOnlyMode=true,这里会执行  
        connectionFuture = connectionManager.connectionReadOp(source, command);  
    } else {//对于写操作默认readOnlyMode=false,这里会执行  
        connectionFuture = connectionManager.connectionWriteOp(source, command);  
    }  
       上面读操作调用了connectionManager.connectionReadOp从连接池获取连接对象,写操作调用了connectionManager.connectionWriteOp从连接池获取连接对象,我们继续跟进connectionManager关于connectionReadOp和connectionWriteOp的源代码,注释如下:
/**   
* 读操作通过ConnectionManager从连接池获取连接对象
* @param source for NodeSource 
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/ 
public RFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand<?> command) {
    //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    MasterSlaveEntry entry = source.getEntry();
    if (entry == null && source.getSlot() != null) {//这里不会执行source里slot=null
        entry = getEntry(source.getSlot());
    }
    if (source.getAddr() != null) {//这里不会执行source里addr=null
        entry = getEntry(source.getAddr());
        if (entry == null) {
            for (MasterSlaveEntry e : getEntrySet()) {
                if (e.hasSlave(source.getAddr())) {
                    entry = e;
                    break;
                }
            }
        }
        if (entry == null) {
            RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
            return RedissonPromise.newFailedFuture(ex);
        }
        
        return entry.connectionReadOp(command, source.getAddr());
    }
    
    if (entry == null) {//这里不会执行source里entry不等于null
        RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        return RedissonPromise.newFailedFuture(ex);
    }
    //MasterSlaveEntry里从连接池获取连接对象
    return entry.connectionReadOp(command);
}
/**   
* 写操作通过ConnectionManager从连接池获取连接对象
* @param source for NodeSource 
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/ 
public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
    //这里之前分析过source=NodeSource【slot=null,addr=null,redirect=null,entry=MasterSlaveEntry】
    MasterSlaveEntry entry = source.getEntry();
    if (entry == null) {
        entry = getEntry(source);
    }
    if (entry == null) {//这里不会执行source里entry不等于null
        RedisNodeNotFoundException ex = new RedisNodeNotFoundException("Node: " + source.getAddr() + " for slot: " + source.getSlot() + " hasn't been discovered yet");
        return RedissonPromise.newFailedFuture(ex);
    }
    //MasterSlaveEntry里从连接池获取连接对象
    return entry.connectionWriteOp(command);
}
       我们看到上面调用ConnectionManager从连接池获取连接对象,但是ConnectionManager却将获取连接操作转交MasterSlaveEntry处理,我们再一次回顾一下MasterSlaveEntry的组成: 

       MasterSlaveEntry里持有中我们开篇所提到的四个连接池,那么这里我们继续关注MasterSlaveEntry.java的源代码:
/**   
* 写操作从MasterConnectionPool连接池里获取连接对象
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/ 
public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
    //我们知道writeConnectionHolder的类型为MasterConnectionPool
    //这里就是从MasterConnectionPool里获取连接对象
    return writeConnectionHolder.get(command);
}

/**   
* 写操作从LoadBalancerManager里获取连接对象
* @param command for RedisCommand<?> 
* @return RFuture<RedisConnection>
*/ 
public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
    if (config.getReadMode() == ReadMode.MASTER) {
        //我们知道默认ReadMode=ReadMode.SLAVE,所以对于读操作这里不会执行
        return connectionWriteOp(command);
    }
    //我们知道slaveBalancer里持有者SlaveConnectionPool和PubSubConnectionPool
    //这里就是从SlaveConnectionPool里获取连接对象
    return slaveBalancer.nextConnection(command);
}
       似乎又绕回来了,最终的获取连接对象都转交到了从连接池ConnectionPool里获取连接对象,注释ConnectionPool里的获取连接对象代码如下: 
/**   
* 读写操作从ConnectionPool.java连接池里获取连接对象
* @param command for RedisCommand<?> 
* @return RFuture<T>
*/ 
public RFuture<T> get(RedisCommand<?> command) {
    for (int j = entries.size() - 1; j >= 0; j--) {
        final ClientConnectionsEntry entry = getEntry();
        if (!entry.isFreezed() && tryAcquireConnection(entry)) {
            //遍历ConnectionPool里维持的ClientConnectionsEntry列表
            //遍历的算法默认为RoundRobinLoadBalancer
            //ClientConnectionsEntry里对应的redis节点为非冻结节点,也即freezed=false
            return acquireConnection(command, entry);
        }
    }
    
    //记录失败重试信息
    List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();
    List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
    for (ClientConnectionsEntry entry : entries) {
        if (entry.isFreezed()) {
            freezed.add(entry.getClient().getAddr());
        } else {
            failedAttempts.add(entry.getClient().getAddr());
        }
    }

    StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
    if (!freezed.isEmpty()) {
        errorMsg.append(" Disconnected hosts: " + freezed);
    }
    if (!failedAttempts.isEmpty()) {
        errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);
    }
    //获取连接失败抛出异常
    RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
    return connectionManager.newFailedFuture(exception);
}

/**   
* 读写操作从ConnectionPool.java连接池里获取连接对象
* @param command for RedisCommand<?> 
* @param entry for ClientConnectionsEntry
* @return RFuture<T>
*/ 
private RFuture<T> acquireConnection(RedisCommand<?> command, final ClientConnectionsEntry entry) {
    //创建一个异步结果获取RPromise
    final RPromise<T> result = connectionManager.newPromise();
    //获取连接前首先将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1
    //该操作成功后将调用这里的回调函数AcquireCallback<T>
    AcquireCallback<T> callback = new AcquireCallback<T>() {
        @Override
        public void run() {
            result.removeListener(this);
            //freeConnectionsCounter值减1成功,说明获取可以获取到连接
            //这里才是真正获取连接的操作
            connectTo(entry, result);
        }
        
        @Override
        public void operationComplete(Future<T> future) throws Exception {
            entry.removeConnection(this);
        }
    };
    //异步结果获取RPromise绑定到上面的回调函数callback
    result.addListener(callback);
    //尝试将ClientConnectionsEntry里的空闲连接信号freeConnectionsCounter值减1,如果成功就调用callback从连接池获取连接
    acquireConnection(entry, callback);
    //返回异步结果获取RPromise
    return result;
}

/**   
* 真正从连接池中获取连接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/ 
private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {
    if (promise.isDone()) {
        releaseConnection(entry);
        return;
    }
    //从连接池中取出一个连接
    T conn = poll(entry);
    if (conn != null) {
        if (!conn.isActive()) {
            promiseFailure(entry, promise, conn);
            return;
        }

        connectedSuccessful(entry, promise, conn);
        return;
    }
    //如果仍然获取不到连接,可能连接池中连接对象都被租借了,这里开始创建一个新的连接对象放到连接池中
    createConnection(entry, promise);
}

/**   
* 从连接池中获取连接
* @param entry for ClientConnectionsEntry
* @return T
*/ 
protected T poll(ClientConnectionsEntry entry) {
    return (T) entry.pollConnection();
}

/**   
* 调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
* @param entry for ClientConnectionsEntry
* @param promise for RPromise<T>
*/ 
private void createConnection(final ClientConnectionsEntry entry, final RPromise<T> promise) {
    //调用ClientConnectionsEntry创建一个连接放置到连接池中并返回此连接
    RFuture<T> connFuture = connect(entry);
    connFuture.addListener(new FutureListener<T>() {
        @Override
        public void operationComplete(Future<T> future) throws Exception {
            if (!future.isSuccess()) {
                promiseFailure(entry, promise, future.cause());
                return;
            }

            T conn = future.getNow();
            if (!conn.isActive()) {
                promiseFailure(entry, promise, conn);
                return;
            }

            connectedSuccessful(entry, promise, conn);
        }
    });
}
       ConnectionPool.java里获取读写操作的连接,是遍历ConnectionPool里维持的ClientConnectionsEntry列表,找到一非冻结的ClientConnectionsEntry,然后调用ClientConnectionsEntry里的freeConnectionsCounter尝试将值减1,如果成功,说明连接池中可以获取到连接,那么就从ClientConnectionsEntry里获取一个连接出来,如果拿不到连接,会调用ClientConnectionsEntry创建一个新连接放置到连接池中,并返回此连接,这里回顾一下ClientConnectionsEntry的组成图
       我们继续跟进ClientConnectionsEntry.java的源代码,注释如下:
/**   
*  ClientConnectionsEntry里从freeConnections里获取一个连接并返回给读写操作使用
*/ 
public RedisConnection pollConnection() {
    return freeConnections.poll();
}

/**   
*  ClientConnectionsEntry里新创建一个连接对象返回给读写操作使用
*/ 
public RFuture<RedisConnection> connect() {
    //调用RedisClient利用netty连接redis服务端,将返回的netty的outboundchannel包装成RedisConnection并返回
    RFuture<RedisConnection> future = client.connectAsync();
    future.addListener(new FutureListener<RedisConnection>() {
        @Override
        public void operationComplete(Future<RedisConnection> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            
            RedisConnection conn = future.getNow();
            onConnect(conn);
            log.debug("new connection created: {}", conn);
        }
    });
    return future;
}
       上面的代码说明如果ClientConnectionsEntry里的freeConnections有空闲连接,那么直接返回该连接,如果没有那么调用RedisClient.connectAsync创建一个新的连接,这里我继续注释一下RedisClient.java的源代码如下:
package org.redisson.client;

import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RFuture;
import org.redisson.client.handler.RedisChannelInitializer;
import org.redisson.client.handler.RedisChannelInitializer.Type;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.redisson.misc.URIBuilder;

/**
 * 使用java里的网络编程框架Netty连接redis服务端
 * 作者: Nikita Koksharov
 */
public class RedisClient {
    private final Bootstrap bootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
    private final Bootstrap pubSubBootstrap;//Netty的工具类Bootstrap,用于连接建立等作用
    private final InetSocketAddress addr;//socket连接的地址
    //channels是netty提供的一个全局对象,里面记录着当前socket连接上的所有处于可用状态的连接channel
    //channels会自动监测里面的channel,当channel断开时,会主动踢出该channel,永远保留当前可用的channel列表
    private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private ExecutorService executor;//REACOTR模型的java异步执行线程池
    private final long commandTimeout;//超时时间
    private Timer timer;//定时器
    private boolean hasOwnGroup;
    private RedisClientConfig config;//redis连接配置信息

    //构造方法
    public static RedisClient create(RedisClientConfig config) {
        if (config.getTimer() == null) {
            config.setTimer(new HashedWheelTimer());
        }
        return new RedisClient(config);
    }
    //构造方法
    private RedisClient(RedisClientConfig config) {
        this.config = config;
        this.executor = config.getExecutor();
        this.timer = config.getTimer();
        
        addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());
        
        bootstrap = createBootstrap(config, Type.PLAIN);
        pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
        
        this.commandTimeout = config.getCommandTimeout();
    }

    //java的网路编程框架Netty工具类Bootstrap初始化
    private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap()
                        .channel(config.getSocketChannelClass())
                        .group(config.getGroup())
                        .remoteAddress(addr);
        //注册netty相关socket数据处理RedisChannelInitializer
        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
        //设置超时时间
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
        return bootstrap;
    }
    
    //构造方法
    @Deprecated
    public RedisClient(String address) {
        this(URIBuilder.create(address));
    }
    
    //构造方法
    @Deprecated
    public RedisClient(URI address) {
        this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), address);
        hasOwnGroup = true;
    }

    //构造方法
    @Deprecated
    public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, URI address) {
        this(timer, executor, group, address.getHost(), address.getPort());
    }
    
    //构造方法
    @Deprecated
    public RedisClient(String host, int port) {
        this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, 10000, 10000);
        hasOwnGroup = true;
    }

    //构造方法
    @Deprecated
    public RedisClient(Timer timer, ExecutorService executor, EventLoopGroup group, String host, int port) {
        this(timer, executor, group, NioSocketChannel.class, host, port, 10000, 10000);
    }
    
    //构造方法
    @Deprecated
    public RedisClient(String host, int port, int connectTimeout, int commandTimeout) {
        this(new HashedWheelTimer(), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2), new NioEventLoopGroup(), NioSocketChannel.class, host, port, connectTimeout, commandTimeout);
    }

    //构造方法
    @Deprecated
    public RedisClient(final Timer timer, ExecutorService executor, EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, 
                        int connectTimeout, int commandTimeout) {
        RedisClientConfig config = new RedisClientConfig();
        config.setTimer(timer).setExecutor(executor).setGroup(group).setSocketChannelClass(socketChannelClass)
        .setAddress(host, port).setConnectTimeout(connectTimeout).setCommandTimeout(commandTimeout);
        
        this.config = config;
        this.executor = config.getExecutor();
        this.timer = config.getTimer();
        
        addr = new InetSocketAddress(config.getAddress().getHost(), config.getAddress().getPort());
        
        //java的网路编程框架Netty工具类Bootstrap初始化
        bootstrap = createBootstrap(config, Type.PLAIN);
        pubSubBootstrap = createBootstrap(config, Type.PUBSUB);
        
        this.commandTimeout = config.getCommandTimeout();
    }

    //获取连接的IP地址
    public String getIpAddr() {
        return addr.getAddress().getHostAddress() + ":" + addr.getPort();
    }
    //获取socket连接的地址
    public InetSocketAddress getAddr() {
        return addr;
    }
    //获取超时时间
    public long getCommandTimeout() {
        return commandTimeout;
    }
    //获取netty的线程池
    public EventLoopGroup getEventLoopGroup() {
        return bootstrap.config().group();
    }
    //获取redis连接配置
    public RedisClientConfig getConfig() {
        return config;
    }
    //获取连接RedisConnection
    public RedisConnection connect() {
        try {
            return connectAsync().syncUninterruptibly().getNow();
        } catch (Exception e) {
            throw new RedisConnectionException("Unable to connect to: " + addr, e);
        }
    }
    //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
    public RFuture<RedisConnection> connectAsync() {
        final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();
        //netty连接redis服务端
        ChannelFuture channelFuture = bootstrap.connect();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(final ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    //将netty连接上的OutBoundChannel包装成RedisConnection并返回RedisConnection
                    final RedisConnection c = RedisConnection.getFrom(future.channel());
                    c.getConnectionPromise().addListener(new FutureListener<RedisConnection>() {
                        @Override
                        public void operationComplete(final Future<RedisConnection> future) throws Exception {
                            bootstrap.config().group().execute(new Runnable() {
                                @Override
                                public void run() {
                                    if (future.isSuccess()) {
                                        if (!f.trySuccess(c)) {
                                            c.closeAsync();
                                        }
                                    } else {
                                        f.tryFailure(future.cause());
                                        c.closeAsync();
                                    }
                                }
                            });
                        }
                    });
                } else {
                    bootstrap.config().group().execute(new Runnable() {
                        public void run() {
                            f.tryFailure(future.cause());
                        }
                    });
                }
            }
        });
        return f;
    }
    //获取订阅相关连接RedisPubSubConnection
    public RedisPubSubConnection connectPubSub() {
        try {
            return connectPubSubAsync().syncUninterruptibly().getNow();
        } catch (Exception e) {
            throw new RedisConnectionException("Unable to connect to: " + addr, e);
        }
    }

    //启动netty去连接redis服务端,设置java的Future尝试将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
    public RFuture<RedisPubSubConnection> connectPubSubAsync() {
        final RPromise<RedisPubSubConnection> f = new RedissonPromise<RedisPubSubConnection>();
        //netty连接redis服务端
        ChannelFuture channelFuture = pubSubBootstrap.connect();
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(final ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    //将netty连接上的OutBoundChannel包装成RedisPubSubConnection并返回RedisPubSubConnection
                    final RedisPubSubConnection c = RedisPubSubConnection.getFrom(future.channel());
                    c.<RedisPubSubConnection>getConnectionPromise().addListener(new FutureListener<RedisPubSubConnection>() {
                        @Override
                        public void operationComplete(final Future<RedisPubSubConnection> future) throws Exception {
                            bootstrap.config().group().execute(new Runnable() {
                                @Override
                                public void run() {
                                    if (future.isSuccess()) {
                                        if (!f.trySuccess(c)) {
                                            c.closeAsync();
                                        }
                                    } else {
                                        f.tryFailure(future.cause());
                                        c.closeAsync();
                                    }
                                }
                            });
                        }
                    });
                } else {
                    bootstrap.config().group().execute(new Runnable() {
                        public void run() {
                            f.tryFailure(future.cause());
                        }
                    });
                }
            }
        });
        return f;
    }

    //关闭netty网络连接
    public void shutdown() {
        shutdownAsync().syncUninterruptibly();
        if (hasOwnGroup) {
            timer.stop();
            executor.shutdown();
            try {
                executor.awaitTermination(15, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            bootstrap.config().group().shutdownGracefully();
            
        }
    }

    //异步关闭netty网络连接
    public ChannelGroupFuture shutdownAsync() {
        for (Channel channel : channels) {
            RedisConnection connection = RedisConnection.getFrom(channel);
            if (connection != null) {
                connection.setClosed(true);
            }
        }
        return channels.close();
    }

    @Override
    public String toString() {
        return "[addr=" + addr + "]";
    }
}
       上面就是Redisson利用java网络编程框架netty连接redis的全过程,如果你对netty比较熟悉,阅读上面的代码应该不是问题。

 

  • 大小: 165.2 KB
  • 大小: 168 KB
  • 大小: 18.3 KB
  • 大小: 865.6 KB
  • 大小: 90.9 KB
  • 大小: 37.5 KB
  • 大小: 262.9 KB
  • 大小: 568.5 KB
  • 大小: 800.4 KB
  • 大小: 892.5 KB
  • 大小: 429.1 KB
  • 大小: 1.1 MB
  • 大小: 30.8 KB
  • 大小: 71.6 KB
  • 大小: 464.3 KB
  • 大小: 1 MB
  • 大小: 1.6 MB
分享到:
评论

相关推荐

    jackson-annotations-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-annotations-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-annotations-2.10.4.pom; 包含翻译后的API文档:jackson-annotations-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven...

    jackson-databind-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-databind-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-databind-2.10.4.pom; 包含翻译后的API文档:jackson-databind-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven...

    jackson-core-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-core-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-core-2.10.4.pom; 包含翻译后的API文档:jackson-core-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven坐标:...

    joda-time-2.10.4-API文档-中文版.zip

    赠送源代码:joda-time-2.10.4-sources.jar; 赠送Maven依赖信息文件:joda-time-2.10.4.pom; 包含翻译后的API文档:joda-time-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven坐标:joda-time:joda-time:2.10.4...

    jackson-databind-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-databind-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-databind-2.10.4.pom; 包含翻译后的API文档:jackson-databind-2.10.4-javadoc-API文档-中文(简体)版.zip; Maven坐标:...

    ehcache-2.10.4.jar

    ehcache-2.10.4.jar

    joda-time-2.10.4-API文档-中英对照版.zip

    赠送源代码:joda-time-2.10.4-sources.jar; 赠送Maven依赖信息文件:joda-time-2.10.4.pom; 包含翻译后的API文档:joda-time-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:joda-time:joda-...

    jackson-core-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-core-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-core-2.10.4.pom; 包含翻译后的API文档:jackson-core-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:...

    freetype-2.10.4.tar.gz

    "freetype-2.10.4.tar.gz"是一个包含源代码的压缩包,解压后可以找到项目文件,按照官方文档的编译指南进行配置和构建。此外,FreeType提供了一套API供开发者调用,用于字体的加载、字形提取、渲染等操作。 五、...

    jackson-annotations-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-annotations-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-annotations-2.10.4.pom; 包含翻译后的API文档:jackson-annotations-2.10.4-javadoc-API文档-中文(简体)-英语-对照版.zip...

    jackson-datatype-jsr310-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-datatype-jsr310-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jsr310-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jsr310-2.10.4-javadoc-API文档-中文(简体)版....

    jackson-datatype-jdk8-2.10.4-API文档-中文版.zip

    赠送源代码:jackson-datatype-jdk8-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jdk8-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jdk8-2.10.4-javadoc-API文档-中文(简体)版.zip; ...

    zipkin-server-2.10.4-exec

    zipkin-server-2.10.4-exec zipkin实现链路追踪,使用方法请看本人博客

    jackson-datatype-jsr310-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-datatype-jsr310-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jsr310-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jsr310-2.10.4-javadoc-API文档-中文(简体)-英语...

    jackson-datatype-jdk8-2.10.4-API文档-中英对照版.zip

    赠送源代码:jackson-datatype-jdk8-2.10.4-sources.jar; 赠送Maven依赖信息文件:jackson-datatype-jdk8-2.10.4.pom; 包含翻译后的API文档:jackson-datatype-jdk8-2.10.4-javadoc-API文档-中文(简体)-英语-对照...

    jSerialComm-2.10.4.jar

    Java串口通信库JSerialComm,JSerialComm(jSerialComm-2.10.4.jar)是一个较新的Java串口通信库。

    PyPI 官网下载 | ansible-2.10.4.tar.gz

    在Python的世界里,Ansible是一个不可或缺的库,它使得基础设施即代码(IAC)的理念得以轻松实现。本文将详细探讨Ansible 2.10.4版本中的关键知识点,并通过实际应用示例来加深理解。 首先,我们来看Ansible的核心...

    scala-2.10.4.msi

    scala-2.10.4.msi,windows下的安装包,适用于scala的学习,非常实用。

    scala-2.10.4

    Scala-2.10.4是该语言的一个稳定版本,为开发者提供了高效、可扩展的编程环境。这个版本在2014年发布,是Scala 2.x系列的一个重要里程碑。 首先,让我们深入了解Scala的核心特性: 1. **类型系统**:Scala具有静态...

Global site tag (gtag.js) - Google Analytics