`

Redis Sentinel 读写分离 Slave 连接池

 
阅读更多
更好阅读体验,请移步:http://www.jack-yin.com/coding/spring-boot/2683.html

0. 背景

Reids除了配置集群实现高可用之外,对于单机版的Redis,可以通过Master-Slave架构,配合使用Sentinel机制实现高可用架构,
同时客户端可以实现自动失效转移。

类似于JdbcTemplate,Spring中使用RedisTemplate来操作Redis。Spring Boot中只需引入如下Maven依赖,即可自动配置
一个RedisTemplate实例。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

RedisTemplate需要一个RedisConnectionFactory来管理Redis连接。 可以在项目中定义一个RedisSentinelConfiguration给
RedisConnectionFactory,即可生成一个基于Sentinel的连接池,并且实现了自动失效转移:当master失效时,Sentinel自动提升一个slave
成为master保证Redis的master连接高可用。

下面是基于Sentinel的RedisConnectionFactory的典型配置

@Value("${spring.redis.password}")
    private String redisPasswd;

    @Bean
    public RedisConnectionFactory jedisConnectionFactory() {
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
                .master("mymaster")
                .sentinel("192.168.0.1", 26479)
                .sentinel("192.168.0.2", 26479)
                .sentinel("192.168.0.3", 26479);
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(sentinelConfig);
        System.out.println(jedisConnectionFactory.getClientConfiguration().getClientName());
        return jedisConnectionFactory;
    }

查看 org.springframework.data.redis.connection.jedis.JedisConnectionFactory源码发现,
当配置了RedisSentinelConfiguration后,RedisConnectionFactory会返回一个JedisSentinelPool连接池。该连接池里面所有的连接
都是连接到Master上面的。 同时,在JedisSentinelPool中为每一个Sentinel都配置了+switch-master频道的监听。 当监听到+switch-master消息后
表示发生了master切换,有新的Master产生,然后会重新初始化到新Master的连接池。

至此,我们知道基于Sentinel可以创建RedisConnectionFactory,并可实现自动失效转移,
但RedisConnectionFactory只会创建到Master的连接。 一般情况下,如果所有的连接都是连接到Master上面,Slave就完全当成Master的备份了,造成性能浪费。
通常,Slave只是单纯的复制Master的数据,为避免数据不一致,不应该往Slave写数据,可以在Redis配置文件中配置slave-read-only yes,让Slave拒绝所有的写操作。
于是,对于一个基于Sentinel的Master-Slave Redis 服务器来说,可以将Master配置为可读写服务器,将所有Slave配置为只读服务器来实现读写分离,以充分利用服务器资源,
并提高整个Redis系统的性能。

1. 提出问题

JedisSentinelPool连接池中的连接都是到Master的连接,那么如何获取到Slave的连接池呢? 分析了spring-boot-starter-data-redis和jedis之后,发现,
并没有现成的Slave连接池可以拿来用,于是决定写一个。

2. 分析问题

通过RedisSentinelConfiguration,可以拿到sentinel的IP和端口,就可以连接到sentinel,再调用sentinel slaves mymaster命令,就可以拿到slave的IP和port。
然后就可以创建到slave的连接了。

继续查看JedisFactory源码,了解到其实现了PooledObjectFactory<Jedis>接口,该接口来自org.apache.commons.pool2,由此可见,Jedis连接池是借助Apache
commons.pool2来实现的。

[-----------------UML-1---------------------------]


由图看到,JedisConnectionFactory创建一个JedisSentinelPool,JedisSentinelPool创建JedisFactory,JedisFactory实现了PooledObjectFactory接口
,在MakeObject()方法中产生新的Redis连接。 在JedisSentinelPool中定义MasterListener还订阅+switch-master频道,一旦发生Master转移事件,自动作失效转移
重新初始化master连接池。

3. 解决问题

模仿JedisConnectionFactory,JedisSentinelPool,和JedisFactory, 创建JedisSentinelSlaveConnectionFactory,JedisSentinelSlavePool和JedisSentinelSlaveFactory
它们之间的关系,如图UML-2所示。

[-----------------UML-2---------------------------]

其中,JedisSentinelSlaveConnectionFactory就是可以传递给RedisTemplate的。JedisSentinelSlaveConnectionFactory继承自JedisConnectionFactory
并且覆盖了createRedisSentinelPool方法,在JedisConnectionFactory中,该方法会返回一个JedisSentinelPool,而新的方法会返回JedisSentinelSlavePool。
JedisSentinelSlavePool和JedisSentinelPool都是继承自Pool<Jedis>的。 JedisSentinelSlavePool会生成JedisSentinelSlaveFactory,
JedisSentinelSlaveFactory实现了PooledObjectFactory<Jedis>接口,在public PooledObject<Jedis> makeObject()方法中,通过sentinel连接,
调用sentinel slaves命令,获取所有可用的slave的ip和port,然后随机的创建一个slave连接并返回。

JedisSentinelSlaveConnectionFactory的createRedisSentinelPool方法
@Override
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){
        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
            poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

1) 通过配置RedisSentinelConfiguration传递sentinel配置和master name给JedisSentinelSlaveConnectionFactory,然后sentinel配置和master name
会传递到JedisSentinelSlavePool和JedisSentinelSlaveFactory中
2)创建 JedisSentinelSlavePool,在JedisSentinelSlavePool中启动监听,监听"+switch-master"频道,一旦新master产生,即初始化连接池
3) 连接池有JedisSentinelSlaveFactory来代理,JedisSentinelSlaveFactory实现了PooledObjectFactory<Jedis>
在makeObject()中首先根据配置的Sentinel Set找到一个可用的sentinel连接,然后执行sentinel slaves master_name获取所有slave列表
随机选择一个slave创建连接。 如果连接不成功则重试,最大重试5次,依然不能成功创建连接则抛出异常。
4) 由图uml-2可知,JedisConnectionFactory实现了InitializingBean,Spring会在Bean初始化之后,调用接口方法void afterPropertiesSet() throws Exception;
在这个方法中创建连接池
5) JedisConnectionFactory实现了DisposableBean,会在Spring 容器销毁时,调用public void destroy() 方法销毁连接池
6)

4 实战
4.1  redis-sentinel-slave-connection-factory 工程结构
1) pom文件
---------------------------pom.xml-------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.jack.yin</groupId>
  <artifactId>redis-sentinel-slave-connection-factory</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>spring-boot-starter-redis-readonly-connection-factory</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.0.1.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>


    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.7.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.20.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2) JedisSentinelSlaveFactory.java
----------------------JedisSentinelSlaveFactory.java----------------------
package redis.clients.jedis;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.JedisURIHelper;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;
import java.net.URI;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class JedisSentinelSlaveFactory implements PooledObjectFactory<Jedis> {
    private final  String masterName;
    private final int retryTimeWhenRetrieveSlave = 5;

    private final AtomicReference<HostAndPort> hostAndPortOfASentinel = new AtomicReference<HostAndPort>();
    private final int connectionTimeout;
    private final int soTimeout;
    private final String password;
    private final int database;
    private final String clientName;
    private final boolean ssl;
    private final SSLSocketFactory sslSocketFactory;
    private SSLParameters sslParameters;
    private HostnameVerifier hostnameVerifier;

    public JedisSentinelSlaveFactory(final String host, final int port, final int connectionTimeout,
                        final int soTimeout, final String password, final int database, final String clientName,
                        final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters,
                        final HostnameVerifier hostnameVerifier,String masterName) {
        this.hostAndPortOfASentinel.set(new HostAndPort(host, port));
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
        this.ssl = ssl;
        this.sslSocketFactory = sslSocketFactory;
        this.sslParameters = sslParameters;
        this.hostnameVerifier = hostnameVerifier;
        this.masterName = masterName;
    }

    public JedisSentinelSlaveFactory(final URI uri, final int connectionTimeout, final int soTimeout,
                        final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory,
                        final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier,String masterName) {
        if (!JedisURIHelper.isValid(uri)) {
            throw new InvalidURIException(String.format(
                "Cannot open Redis connection due invalid URI. %s", uri.toString()));
        }

        this.hostAndPortOfASentinel.set(new HostAndPort(uri.getHost(), uri.getPort()));
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = JedisURIHelper.getPassword(uri);
        this.database = JedisURIHelper.getDBIndex(uri);
        this.clientName = clientName;
        this.ssl = ssl;
        this.sslSocketFactory = sslSocketFactory;
        this.sslParameters = sslParameters;
        this.hostnameVerifier = hostnameVerifier;
        this.masterName = masterName;
    }

    public void setHostAndPortOfASentinel(final HostAndPort hostAndPortOfASentinel) {
        this.hostAndPortOfASentinel.set(hostAndPortOfASentinel);
    }

    @Override
    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        final BinaryJedis jedis = pooledJedis.getObject();
        if (jedis.getDB() != database) {
            jedis.select(database);
        }

    }

    @Override
    public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
        final BinaryJedis jedis = pooledJedis.getObject();
        if (jedis.isConnected()) {
            try {
                try {
                    jedis.quit();
                } catch (Exception e) {
                }
                jedis.disconnect();
            } catch (Exception e) {

            }
        }

    }

    @Override
    public PooledObject<Jedis> makeObject() throws Exception {
        final Jedis jedisSentinel = getASentinel();

        List<Map<String,String>> slaves = jedisSentinel.sentinelSlaves(this.masterName);
        if(slaves == null || slaves.isEmpty()) {
            throw new JedisException(String.format("No valid slave for master: %s",this.masterName));
        }

        DefaultPooledObject<Jedis> result = tryToGetSlave(slaves);

        if(null != result) {
            return result;
        } else {
            throw new JedisException(String.format("No valid slave for master: %s, after try %d times.",
                this.masterName,retryTimeWhenRetrieveSlave));
        }

    }

    private DefaultPooledObject<Jedis> tryToGetSlave(List<Map<String,String>> slaves) {
        SecureRandom sr = new SecureRandom();
        int retry = retryTimeWhenRetrieveSlave;
        while(retry >= 0) {
            retry--;
            int randomIndex = sr.nextInt(slaves.size());
            String host = slaves.get(randomIndex).get("ip");
            String port = slaves.get(randomIndex).get("port");
            final Jedis jedisSlave = new Jedis(host,Integer.valueOf(port), connectionTimeout,soTimeout,
                ssl, sslSocketFactory,sslParameters, hostnameVerifier);
            try {
                jedisSlave.connect();
                if (null != this.password) {
                    jedisSlave.auth(this.password);
                }
                if (database != 0) {
                    jedisSlave.select(database);
                }
                if (clientName != null) {
                    jedisSlave.clientSetname(clientName);
                }
                return  new DefaultPooledObject<Jedis>(jedisSlave);

            } catch (Exception e) {
                jedisSlave.close();
                slaves.remove(randomIndex);
                continue;
            }
        }

        return null;
    }

    private Jedis getASentinel() {
        final HostAndPort hostAndPort = this.hostAndPortOfASentinel.get();
        final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
            soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);

        try {
            jedis.connect();
        } catch (JedisException je) {
            jedis.close();
            throw je;
        }
        return jedis;
    }

    @Override
    public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        // TODO maybe should select db 0? Not sure right now.
    }

    @Override
    public boolean validateObject(PooledObject<Jedis> pooledJedis) {
        final BinaryJedis jedis = pooledJedis.getObject();
        try {
            HostAndPort hostAndPort = this.hostAndPortOfASentinel.get();

            String connectionHost = jedis.getClient().getHost();
            int connectionPort = jedis.getClient().getPort();

            return hostAndPort.getHost().equals(connectionHost)
                && hostAndPort.getPort() == connectionPort && jedis.isConnected()
                && jedis.ping().equals("PONG");
        } catch (final Exception e) {
            return false;
        }
    }
}

3) JedisSentinelSlavePool.java
-----------------------------------JedisSentinelSlavePool.java-------------------------------
package redis.clients.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.Pool;

import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class JedisSentinelSlavePool extends Pool<Jedis> {
    private final  String masterName;

    protected GenericObjectPoolConfig poolConfig;

    protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
    protected int soTimeout = Protocol.DEFAULT_TIMEOUT;

    protected String password;

    protected int database = Protocol.DEFAULT_DATABASE;

    protected String clientName;

    protected final Set<JedisSentinelSlavePool.MasterListener> masterListeners = new HashSet<JedisSentinelSlavePool.MasterListener>();

    protected Logger logger = LoggerFactory.getLogger(JedisSentinelSlavePool.class.getName());

    private volatile JedisSentinelSlaveFactory factory;
    private volatile HostAndPort currentSentinel;

    private Set<String> sentinels;

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig) {
        this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
            Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels) {
        this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null,
            Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, String password) {
        this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password) {
        this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final int timeout) {
        this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final String password) {
        this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password,
                             final int database) {
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password,
                             final int database, final String clientName) {
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final int timeout, final int soTimeout,
                             final String password, final int database) {
        this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
                             final String password, final int database, final String clientName) {
        this.poolConfig = poolConfig;
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
        this.masterName = masterName;
        this.sentinels = sentinels;

        HostAndPort aSentinel = initsentinels(this.sentinels, masterName);
        initPool(aSentinel);
    }

    public void destroy() {
        for (JedisSentinelSlavePool.MasterListener m : masterListeners) {
            m.shutdown();
        }

        super.destroy();
    }

    public HostAndPort getCurrentSentinel() {
        return currentSentinel;
    }

    private void initPool(HostAndPort sentinel) {
        if (!sentinel.equals(currentSentinel)) {
            currentSentinel = sentinel;
            if (factory == null) {
                factory = new JedisSentinelSlaveFactory(sentinel.getHost(), sentinel.getPort(), connectionTimeout,
                    soTimeout, password, database, clientName, false, null, null, null,masterName);
                initPool(poolConfig, factory);
            } else {
                factory.setHostAndPortOfASentinel(currentSentinel);
                // although we clear the pool, we still have to check the
                // returned object
                // in getResource, this call only clears idle instances, not
                // borrowed instances
                internalPool.clear();
            }

            logger.info("Created JedisPool to sentinel at " + sentinel);
        }
    }

    private HostAndPort initsentinels(Set<String> sentinels, final String masterName) {

        HostAndPort aSentinel = null;
        boolean sentinelAvailable = false;

        logger.info("Trying to find a valid sentinel from available Sentinels...");

        for (String sentinelStr : sentinels) {
            final HostAndPort hap = HostAndPort.parseString(sentinelStr);

            logger.info("Connecting to Sentinel " + hap);

            Jedis jedis = null;
            try {
                jedis = new Jedis(hap.getHost(), hap.getPort());
                sentinelAvailable = true;

                List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
                if (masterAddr == null || masterAddr.size() != 2) {
                    logger.warn("Can not get master addr from sentinel, master name: " + masterName
                        + ". Sentinel: " + hap + ".");
                    continue;
                }

                aSentinel = hap;
                logger.info("Found a Redis Sentinel at " + aSentinel);
                break;
            } catch (JedisException e) {
                logger.warn("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
                    + ". Trying next one.");
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
        }

        if (aSentinel == null) {
            if (sentinelAvailable) {
                // can connect to sentinel, but master name seems to not monitored
                throw new JedisException("Can connect to sentinel, but " + masterName
                    + " seems to be not monitored...");
            } else {
                throw new JedisConnectionException("All sentinels down, cannot determine where is "
                    + masterName + " master is running...");
            }
        }

        logger.info("Found Redis sentinel running at " + aSentinel + ", starting Sentinel listeners...");

        for (String sentinel : sentinels) {
            final HostAndPort hap = HostAndPort.parseString(sentinel);
            JedisSentinelSlavePool.MasterListener masterListener = new JedisSentinelSlavePool.MasterListener(masterName, hap.getHost(), hap.getPort());
            // whether MasterListener threads are alive or not, process can be stopped
            masterListener.setDaemon(true);
            masterListeners.add(masterListener);
            masterListener.start();
        }

        return aSentinel;
    }


    /**
     * @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be
     *             done using @see {@link redis.clients.jedis.Jedis#close()}
     */
    @Override
    @Deprecated
    public void returnBrokenResource(final Jedis resource) {
        if (resource != null) {
            returnBrokenResourceObject(resource);
        }
    }

    /**
     * @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be
     *             done using @see {@link redis.clients.jedis.Jedis#close()}
     */
    @Override
    @Deprecated
    public void returnResource(final Jedis resource) {
        if (resource != null) {
            resource.resetState();
            returnResourceObject(resource);
        }
    }

    private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
        String host = getMasterAddrByNameResult.get(0);
        int port = Integer.parseInt(getMasterAddrByNameResult.get(1));

        return new HostAndPort(host, port);
    }

    protected class MasterListener extends Thread {

        protected String masterName;
        protected String host;
        protected int port;
        protected long subscribeRetryWaitTimeMillis = 5000;
        protected volatile Jedis j;
        protected AtomicBoolean running = new AtomicBoolean(false);

        protected MasterListener() {
        }

        public MasterListener(String masterName, String host, int port) {
            super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
            this.masterName = masterName;
            this.host = host;
            this.port = port;
        }

        public MasterListener(String masterName, String host, int port,
                              long subscribeRetryWaitTimeMillis) {
            this(masterName, host, port);
            this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
        }

        @Override
        public void run() {

            running.set(true);

            while (running.get()) {

                j = new Jedis(host, port);

                try {
                    // double check that it is not being shutdown
                    if (!running.get()) {
                        break;
                    }

                    j.subscribe(new SentinelSlaveChangePubSub(), "+switch-master","+slave","+sdown","+odown","+reboot");

                } catch (JedisConnectionException e) {

                    if (running.get()) {
                        logger.error("Lost connection to Sentinel at " + host + ":" + port
                            + ". Sleeping 5000ms and retrying.", e);
                        try {
                            Thread.sleep(subscribeRetryWaitTimeMillis);
                        } catch (InterruptedException e1) {
                            logger.info( "Sleep interrupted: ", e1);
                        }
                    } else {
                        logger.info("Unsubscribing from Sentinel at " + host + ":" + port);
                    }
                } finally {
                    j.close();
                }
            }
        }

        public void shutdown() {
            try {
                logger.info("Shutting down listener on " + host + ":" + port);
                running.set(false);
                // This isn't good, the Jedis object is not thread safe
                if (j != null) {
                    j.disconnect();
                }
            } catch (Exception e) {
                logger.error("Caught exception while shutting down: ", e);
            }
        }

        private class SentinelSlaveChangePubSub extends JedisPubSub {
            @Override
            public void onMessage(String channel, String message) {
                if(masterName==null) {
                    logger.error("Master Name is null!");
                    throw new InvalidParameterException("Master Name is null!");
                }
                logger.info("Get message on chanel: "  + channel + " published: " + message + "." +   " current sentinel " + host + ":" + port );

                String[] msg = message.split(" ");
                List<String> msgList = Arrays.asList(msg);
                if(msgList.isEmpty()) {return;}
                boolean needResetPool = false;
                if( masterName.equalsIgnoreCase(msgList.get(0))) { //message from channel +switch-master
                    //message looks like [+switch-master mymaster 192.168.0.2 6479 192.168.0.1 6479]
                    needResetPool = true;
                }
                int tmpIndex = msgList.indexOf("@") + 1;
                //message looks like  [+reboot slave 192.168.0.3:6479 192.168.0.3 6479 @ mymaster 192.168.0.1 6479]
                if(tmpIndex >0 && masterName.equalsIgnoreCase(msgList.get(tmpIndex)) ) { //message from other channels
                    needResetPool = true;
                }
                if(needResetPool) {
                    HostAndPort aSentinel = initsentinels(sentinels, masterName);
                    initPool(aSentinel);
                } else {
                    logger.info("message is not for master " + masterName);
                }

            }
        }
    }
}
4) JedisSentinelSlaveConnectionFactory.java
-------------------------------------JedisSentinelSlaveConnectionFactory.java------------------
package redis.clients.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import redis.clients.util.Pool;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;
import java.time.Duration;
import java.util.*;

public class JedisSentinelSlaveConnectionFactory extends JedisConnectionFactory {
    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig) {
        super(sentinelConfig);
    }

    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisClientConfiguration clientConfig){
        super(sentinelConfig,clientConfig);
    }

    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisPoolConfig poolConfig) {
        super(sentinelConfig,poolConfig);
    }


    @Override
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){
        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
            poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

    private int getConnectTimeout() {
        return Math.toIntExact(getClientConfiguration().getConnectTimeout().toMillis());
    }

    private Set<String> convertToJedisSentinelSet(Collection<RedisNode> nodes) {

        if (CollectionUtils.isEmpty(nodes)) {
            return Collections.emptySet();
        }

        Set<String> convertedNodes = new LinkedHashSet<>(nodes.size());
        for (RedisNode node : nodes) {
            if (node != null) {
                convertedNodes.add(node.asString());
            }
        }
        return convertedNodes;
    }

    private int getReadTimeout() {
        return Math.toIntExact(getClientConfiguration().getReadTimeout().toMillis());
    }

    static class MutableJedisClientConfiguration implements JedisClientConfiguration {

        private boolean useSsl;
        private @Nullable
        SSLSocketFactory sslSocketFactory;
        private @Nullable
        SSLParameters sslParameters;
        private @Nullable
        HostnameVerifier hostnameVerifier;
        private boolean usePooling = true;
        private GenericObjectPoolConfig poolConfig = new JedisPoolConfig();
        private @Nullable
        String clientName;
        private Duration readTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT);
        private Duration connectTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT);

        public static JedisClientConfiguration create(JedisShardInfo shardInfo) {

            JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration();
            configuration.setShardInfo(shardInfo);
            return configuration;
        }

        public static JedisClientConfiguration create(GenericObjectPoolConfig jedisPoolConfig) {

            JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration();
            configuration.setPoolConfig(jedisPoolConfig);
            return configuration;
        }

        /* (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUseSsl()
         */
        @Override
        public boolean isUseSsl() {
            return useSsl;
        }

        public void setUseSsl(boolean useSsl) {
            this.useSsl = useSsl;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslSocketFactory()
         */
        @Override
        public Optional<SSLSocketFactory> getSslSocketFactory() {
            return Optional.ofNullable(sslSocketFactory);
        }

        public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
            this.sslSocketFactory = sslSocketFactory;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslParameters()
         */
        @Override
        public Optional<SSLParameters> getSslParameters() {
            return Optional.ofNullable(sslParameters);
        }

        public void setSslParameters(SSLParameters sslParameters) {
            this.sslParameters = sslParameters;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getHostnameVerifier()
         */
        @Override
        public Optional<HostnameVerifier> getHostnameVerifier() {
            return Optional.ofNullable(hostnameVerifier);
        }

        public void setHostnameVerifier(HostnameVerifier hostnameVerifier) {
            this.hostnameVerifier = hostnameVerifier;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUsePooling()
         */
        @Override
        public boolean isUsePooling() {
            return usePooling;
        }

        public void setUsePooling(boolean usePooling) {
            this.usePooling = usePooling;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getPoolConfig()
         */
        @Override
        public Optional<GenericObjectPoolConfig> getPoolConfig() {
            return Optional.ofNullable(poolConfig);
        }

        public void setPoolConfig(GenericObjectPoolConfig poolConfig) {
            this.poolConfig = poolConfig;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getClientName()
         */
        @Override
        public Optional<String> getClientName() {
            return Optional.ofNullable(clientName);
        }

        public void setClientName(String clientName) {
            this.clientName = clientName;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getReadTimeout()
         */
        @Override
        public Duration getReadTimeout() {
            return readTimeout;
        }

        public void setReadTimeout(Duration readTimeout) {
            this.readTimeout = readTimeout;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getConnectTimeout()
         */
        @Override
        public Duration getConnectTimeout() {
            return connectTimeout;
        }

        public void setConnectTimeout(Duration connectTimeout) {
            this.connectTimeout = connectTimeout;
        }

        public void setShardInfo(JedisShardInfo shardInfo) {

            setSslSocketFactory(shardInfo.getSslSocketFactory());
            setSslParameters(shardInfo.getSslParameters());
            setHostnameVerifier(shardInfo.getHostnameVerifier());
            setUseSsl(shardInfo.getSsl());
            setConnectTimeout(Duration.ofMillis(shardInfo.getConnectionTimeout()));
            setReadTimeout(Duration.ofMillis(shardInfo.getSoTimeout()));
        }
    }
}


4.2 测试

在应用中,只需配置如下的JedisSentinelSlaveConnectionFactory,Spring Boot会自动配置一个
RedisTemplate<String,String> redisTemplate和StringRedisTemplate stringRedisTemplate;
在代码中使用@Autowired注入即可。

@Bean
    public RedisConnectionFactory jedisConnectionFactory() {
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
                .master("mymaster")
                .sentinel("192.168.0.1", 26479)
                .sentinel("192.168.0.2", 26479)
                .sentinel("192.168.0.3", 26479);
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder()
            .clientName("MyRedisClient")
            .build();
        JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
        return jedisConnectionFactory;
    }

1) pom.xml
------------------pom.xml-------------------------
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>demo</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<dependency>
<groupId>com.jack.yin</groupId>
<artifactId>redis-sentinel-slave-connection-factory</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>


</project>
2) RedisConfiguration.java
----------------------------------RedisConfiguration.java-------------------------------
package com.jack.yin.redis.configuration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisSentinelSlaveConnectionFactory;

@Configuration
public class RedisConfiguration {
    @Value("${spring.redis.password}")
    private String redisPasswd;

    @Bean
    public RedisConnectionFactory jedisConnectionFactory() {
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
                .master("mymaster")
                .sentinel("192.168.0.1", 26479)
                .sentinel("192.168.0.2", 26479)
                .sentinel("192.168.0.3", 26479);
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder()
            .clientName("MyRedisClient")
            .build();
        JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
        return jedisConnectionFactory;
    }

}

3) RedisDemoApplication.java
-----------------------------RedisDemoApplication.java-------------------------------
package com.jack.yin.redis.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication(scanBasePackages = "com.jack.yin.redis")
public class RedisDemoApplication {

public static void main(String[] args) {
SpringApplication.run(RedisDemoApplication.class, args);
}
}

4) DemoApplicationTests.java
-------------------------DemoApplicationTests.java-------------------------------
package com.jack.yin.redis.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Enumeration;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;

@RunWith(SpringRunner.class)
@SpringBootTest(classes=RedisDemoApplication.class)
public class DemoApplicationTests {

@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;

    protected Logger log = Logger.getLogger(getClass().getName());

@Test
public void testGetAndSet() throws  Exception{
System.out.println(redisTemplate.opsForValue().get("hello"));
redisTemplate.opsForValue().set("set-key","don't allowed to set");
//org.springframework.dao.InvalidDataAccessApiUsageException: READONLY You can't write against a read only slave.;
System.out.println(redisTemplate.opsForValue().get("sss"));
System.out.println(redisTemplate.opsForValue().get("bbb"));
}

}


5. 总结
优点:
连接池中的连接是随机建立的到所有slave的连接
当监测到master失效转移会自动初始化连接池,确保不会连接到master上
新增slave时可以自动被发现
slave下线会被自动侦测到,然后重新初始化连接池,确保不会连接到已经下线的slave
缺点:
reids slave 需要设置slave-read-only yes
slave同步master数据需要时间,在一个短暂时间内master和slave数据会不一致
  • 大小: 57.1 KB
  • 大小: 43.1 KB
分享到:
评论

相关推荐

    redis哨兵的使用

    ### Redis哨兵机制详解 #### 一、Redis主从复制 ...主从复制提供了数据备份和读写分离的能力,而哨兵则进一步增强了系统的健壮性和可用性。这些技术组合在一起,为Redis应用提供了一个高效、可靠的数据存储方案。

    CoreDemo2020.rar

    在实际项目中,为了更灵活地管理多个数据库实例,我们可以封装一个Redis连接池,根据操作类型动态分配连接。此外,考虑到高可用性,可以使用Sentinel或者Cluster来监控和管理主从节点,自动处理故障切换。 项目的`...

    redis-2.8.13安装配置主从服务器Master-Slave

    这个类可能包含了连接池管理、事务处理、发布订阅等高级功能。 7. **性能优化**:在实际部署中,还可以考虑其他优化措施,如限制客户端连接数、使用多线程处理命令、调整内存策略等。同时,定期做 RDB 快照和 AOF ...

    分布式redis储存方式

    使用Java与Redis交互时,可以考虑使用连接池(如JedisPool)提高性能,减少网络开销。同时,合理设置超时时间、缓冲池大小等参数,以优化客户端性能。 9. **安全性与监控** 分布式Redis在Java应用中,应考虑数据...

    Tomcat 8+ redis cluster session

    3. **配置Redisson YAML**:创建一个`redisson.yaml`文件,用于设置Redisson客户端的具体配置,如连接池大小、密码等。 ```yaml nettyThreads: 0 redisson: config: read-slave-enabled: true masters-slaves...

    23_怎么保证redis是高并发以及高可用的?.zip

    - 连接池:为每个应用设置合理的连接池大小,避免频繁的TCP连接创建和销毁,提高连接复用率。 - 内存管理:合理设置最大内存限制,避免OOM(Out of Memory)情况,可以启用LRU(Least Recently Used)或LFU(Least ...

    Redis V3.0 中文文档

    第 1 章Redis 介绍. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8 第 2 章数据类型初探. . . . . . . . . . . . . . . . . . . . . . . . . . . . . ...

    redis资料,PPT、讲义、代码

    学习Redis时,理解其性能优化策略也是关键,如内存管理(如何合理设置最大内存、淘汰策略)、命令阻塞控制(如何避免阻塞操作)和连接池管理等。此外,安全方面,如设置访问密码、限制网络访问等也需关注。 最后,...

    Redis 3.0 中文版 - v1.1.pdf

    第 1 章Redis 介绍. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 8 第 2 章数据类型初探. . . . . . . . . . . . . . . . . . . . . . . . . . . . . ...

    Redis实战PDF价值50元,如何实现哨兵的主从,以及jedisJAR包

    在大型分布式系统中,为了保证数据的高可用性和容错性,通常会采用主从复制(Master-Slave Replication)结合哨兵(Sentinel)系统的架构。 主从复制是Redis的基础高可用方案,它允许数据从主节点实时同步到多个从...

    单点登录源码

    Druid | 数据库连接池 | [https://github.com/alibaba/druid](https://github.com/alibaba/druid) FluentValidator | 校验框架 | [https://github.com/neoremind/fluent-validator]...

Global site tag (gtag.js) - Google Analytics