`

redis 带有分片的哨兵连接池(ShardedJedisSentinelPool)

 
阅读更多

ShardedJedisSentinelPool.java

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.regex.Pattern;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Hashing;
import redis.clients.util.Pool;

/**
 * 带有sharded的sentinelpool
 * 
 * @author guweiqiang
 */
public class ShardedJedisSentinelPool extends Pool<ShardedJedis> {

	public static final int MAX_RETRY_SENTINEL = 10;

	protected final Logger log = Logger.getLogger(getClass().getName());

	protected GenericObjectPoolConfig poolConfig;

	protected int timeout = Protocol.DEFAULT_TIMEOUT;

	private int sentinelRetry = 0;

	protected String password;

	protected int database = Protocol.DEFAULT_DATABASE;

	protected Set<MasterListener> masterListeners = new HashSet<MasterListener>();

	private volatile List<HostAndPort> currentHostMasters;

	public ShardedJedisSentinelPool(List<String> masters, Set<String> sentinels) {
		this(masters, sentinels, new GenericObjectPoolConfig(),
				Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE);
	}

	public ShardedJedisSentinelPool(List<String> masters,
			Set<String> sentinels, String password) {
		this(masters, sentinels, new GenericObjectPoolConfig(),
				Protocol.DEFAULT_TIMEOUT, password);
	}

	public ShardedJedisSentinelPool(final GenericObjectPoolConfig poolConfig,
			List<String> masters, Set<String> sentinels) {
		this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
				Protocol.DEFAULT_DATABASE);
	}

	public ShardedJedisSentinelPool(List<String> masters,
			Set<String> sentinels, final GenericObjectPoolConfig poolConfig,
			int timeout, final String password) {
		this(masters, sentinels, poolConfig, timeout, password,
				Protocol.DEFAULT_DATABASE);
	}

	public ShardedJedisSentinelPool(List<String> masters,
			Set<String> sentinels, final GenericObjectPoolConfig poolConfig,
			final int timeout) {
		this(masters, sentinels, poolConfig, timeout, null,
				Protocol.DEFAULT_DATABASE);
	}

	public ShardedJedisSentinelPool(List<String> masters,
			Set<String> sentinels, final GenericObjectPoolConfig poolConfig,
			final String password) {
		this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password);
	}

	public ShardedJedisSentinelPool(List<String> masters,
			Set<String> sentinels, final GenericObjectPoolConfig poolConfig,
			int timeout, final String password, final int database) {
		this.poolConfig = poolConfig;
		this.timeout = timeout;
		this.password = password;
		this.database = database;

		List<HostAndPort> masterList = initSentinels(sentinels, masters);
		initPool(masterList);
	}

	public void destroy() {
		for (MasterListener m : masterListeners) {
			m.shutdown();
		}

		super.destroy();
	}

	public List<HostAndPort> getCurrentHostMaster() {
		return currentHostMasters;
	}

	private void initPool(List<HostAndPort> masters) {
		if (!equals(currentHostMasters, masters)) {
			StringBuffer sb = new StringBuffer();
			for (HostAndPort master : masters) {
				sb.append(master.toString());
				sb.append(" ");
			}
			log.info("Created ShardedJedisPool to master at [" + sb.toString()
					+ "]");
			List<JedisShardInfo> shardMasters = makeShardInfoList(masters);
			initPool(poolConfig, new ShardedJedisFactory(shardMasters,
					Hashing.MURMUR_HASH, null));
			currentHostMasters = masters;
		}
	}

	private boolean equals(List<HostAndPort> currentShardMasters,
			List<HostAndPort> shardMasters) {
		if (currentShardMasters != null && shardMasters != null) {
			if (currentShardMasters.size() == shardMasters.size()) {
				for (int i = 0; i < currentShardMasters.size(); i++) {
					if (!currentShardMasters.get(i).equals(shardMasters.get(i)))
						return false;
				}
				return true;
			}
		}
		return false;
	}

	private List<JedisShardInfo> makeShardInfoList(List<HostAndPort> masters) {
		List<JedisShardInfo> shardMasters = new ArrayList<JedisShardInfo>();
		for (HostAndPort master : masters) {
			JedisShardInfo jedisShardInfo = new JedisShardInfo(
					master.getHost(), master.getPort(), timeout);
			jedisShardInfo.setPassword(password);

			shardMasters.add(jedisShardInfo);
		}
		return shardMasters;
	}

	private List<HostAndPort> initSentinels(Set<String> sentinels,
			final List<String> masters) {

		Map<String, HostAndPort> masterMap = new HashMap<String, HostAndPort>();
		List<HostAndPort> shardMasters = new ArrayList<HostAndPort>();

		log.info("Trying to find all master from available Sentinels...");

		for (String masterName : masters) {
			HostAndPort master = null;
			boolean fetched = false;

			while (!fetched && sentinelRetry < MAX_RETRY_SENTINEL) {
				for (String sentinel : sentinels) {
					final HostAndPort hap = toHostAndPort(Arrays
							.asList(sentinel.split(":")));

					log.fine("Connecting to Sentinel " + hap);

					Jedis jedis = null;
					try {
						jedis = new Jedis(hap.getHost(), hap.getPort());
						master = masterMap.get(masterName);
						if (master == null) {
							List<String> hostAndPort = jedis
									.sentinelGetMasterAddrByName(masterName);
							if (hostAndPort != null && hostAndPort.size() > 0) {
								master = toHostAndPort(hostAndPort);
								log.fine("Found Redis master at " + master);
								shardMasters.add(master);
								masterMap.put(masterName, master);
								fetched = true;
								jedis.disconnect();
								break;
							}
						}
					} catch (JedisConnectionException e) {
						log.warning("Cannot connect to sentinel running @ "
								+ hap + ". Trying next one.");
					} finally {
						if(jedis != null){
							jedis.close();
						}
					}
				}

				if (null == master) {
					try {
						log.severe("All sentinels down, cannot determine where is "
								+ masterName
								+ " master is running... sleeping 1000ms, Will try again.");
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					fetched = false;
					sentinelRetry++;
				}
			}

			// Try MAX_RETRY_SENTINEL times.
			if (!fetched && sentinelRetry >= MAX_RETRY_SENTINEL) {
				log.severe("All sentinels down and try " + MAX_RETRY_SENTINEL
						+ " times, Abort.");
				throw new JedisConnectionException(
						"Cannot connect all sentinels, Abort.");
			}
		}

		// All shards master must been accessed.
		if (masters.size() != 0 && masters.size() == shardMasters.size()) {

			log.info("Starting Sentinel listeners...");
			for (String sentinel : sentinels) {
				final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel
						.split(":")));
				MasterListener masterListener = new MasterListener(masters,
						hap.getHost(), hap.getPort());
				masterListeners.add(masterListener);
				masterListener.start();
			}
		}

		return shardMasters;
	}

	private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
		String host = getMasterAddrByNameResult.get(0);
		int port = Integer.parseInt(getMasterAddrByNameResult.get(1));

		return new HostAndPort(host, port);
	}

	/**
	 * PoolableObjectFactory custom impl.
	 */
	protected static class ShardedJedisFactory implements
			PooledObjectFactory<ShardedJedis> {
		private List<JedisShardInfo> shards;
		private Hashing algo;
		private Pattern keyTagPattern;

		public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo,
				Pattern keyTagPattern) {
			this.shards = shards;
			this.algo = algo;
			this.keyTagPattern = keyTagPattern;
		}

		public PooledObject<ShardedJedis> makeObject() throws Exception {
			ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
			return new DefaultPooledObject<ShardedJedis>(jedis);
		}

		public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis)
				throws Exception {
			final ShardedJedis shardedJedis = pooledShardedJedis.getObject();
			for (Jedis jedis : shardedJedis.getAllShards()) {
				try {
					try {
						jedis.quit();
					} catch (Exception e) {

					}
					jedis.disconnect();
				} catch (Exception e) {

				}
			}
		}

		public boolean validateObject(
				PooledObject<ShardedJedis> pooledShardedJedis) {
			try {
				ShardedJedis jedis = pooledShardedJedis.getObject();
				for (Jedis shard : jedis.getAllShards()) {
					if (!shard.ping().equals("PONG")) {
						return false;
					}
				}
				return true;
			} catch (Exception ex) {
				return false;
			}
		}

		public void activateObject(PooledObject<ShardedJedis> p)
				throws Exception {

		}

		public void passivateObject(PooledObject<ShardedJedis> p)
				throws Exception {

		}
	}

	protected class JedisPubSubAdapter extends JedisPubSub {
		@Override
		public void onMessage(String channel, String message) {
		}

		@Override
		public void onPMessage(String pattern, String channel, String message) {
		}

		@Override
		public void onPSubscribe(String pattern, int subscribedChannels) {
		}

		@Override
		public void onPUnsubscribe(String pattern, int subscribedChannels) {
		}

		@Override
		public void onSubscribe(String channel, int subscribedChannels) {
		}

		@Override
		public void onUnsubscribe(String channel, int subscribedChannels) {
		}
	}

	protected class MasterListener extends Thread {

		protected List<String> masters;
		protected String host;
		protected int port;
		protected long subscribeRetryWaitTimeMillis = 5000;
		protected Jedis jedis;
		protected AtomicBoolean running = new AtomicBoolean(false);

		protected MasterListener() {
		}

		public MasterListener(List<String> masters, String host, int port) {
			this.masters = masters;
			this.host = host;
			this.port = port;
		}

		public MasterListener(List<String> masters, String host, int port,
				long subscribeRetryWaitTimeMillis) {
			this(masters, host, port);
			this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
		}

		public void run() {

			running.set(true);

			while (running.get()) {

				jedis = new Jedis(host, port);

				try {
					jedis.subscribe(new JedisPubSubAdapter() {
						@Override
						public void onMessage(String channel, String message) {
							log.fine("Sentinel " + host + ":" + port
									+ " published: " + message + ".");

							String[] switchMasterMsg = message.split(" ");

							if (switchMasterMsg.length > 3) {

								int index = masters.indexOf(switchMasterMsg[0]);
								if (index >= 0) {
									HostAndPort newHostMaster = toHostAndPort(Arrays
											.asList(switchMasterMsg[3],
													switchMasterMsg[4]));
									List<HostAndPort> newHostMasters = new ArrayList<HostAndPort>();
									for (int i = 0; i < masters.size(); i++) {
										newHostMasters.add(null);
									}
									Collections.copy(newHostMasters,
											currentHostMasters);
									newHostMasters.set(index, newHostMaster);

									initPool(newHostMasters);
								} else {
									StringBuffer sb = new StringBuffer();
									for (String masterName : masters) {
										sb.append(masterName);
										sb.append(",");
									}
									log.fine("Ignoring message on +switch-master for master name "
											+ switchMasterMsg[0]
											+ ", our monitor master name are ["
											+ sb + "]");
								}

							} else {
								log.severe("Invalid message received on Sentinel "
										+ host
										+ ":"
										+ port
										+ " on channel +switch-master: "
										+ message);
							}
						}
					}, "+switch-master");

				} catch (JedisConnectionException e) {

					if (running.get()) {
						log.severe("Lost connection to Sentinel at " + host
								+ ":" + port
								+ ". Sleeping 5000ms and retrying.");
						try {
							Thread.sleep(subscribeRetryWaitTimeMillis);
						} catch (InterruptedException e1) {
							e1.printStackTrace();
						}
					} else {
						log.fine("Unsubscribing from Sentinel at " + host + ":"
								+ port);
					}
				}
			}
		}

		public void shutdown() {
			try {
				log.fine("Shutting down listener on " + host + ":" + port);
				running.set(false);
				// This isn't good, the Jedis object is not thread safe
				jedis.disconnect();
			} catch (Exception e) {
				log.severe("Caught exception while shutting down: "
						+ e.getMessage());
			}
		}
	}
}

 

TestShardedSentinelPool.java

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.ShardedJedis;

/**
 * 带有sharded的sentinelpool
 * 
 * @author guweiqiang
 */
public class TestShardedSentinelPool {

	public static void main(String[] args) {
		GenericObjectPoolConfig config = new GenericObjectPoolConfig();
		config.setMaxTotal(10000);
		config.setMaxIdle(100);  
		config.setMinIdle(0);  
		config.setMaxWaitMillis(15000);
		
		List<String> masters = new ArrayList<String>();
		masters.add("mymaster47");
		masters.add("mymaster50");
		
		Set<String> sentinels = new HashSet<String>();
		sentinels.add("172.19.59.47:26379");
		sentinels.add("172.19.59.48:26379");
		sentinels.add("172.19.59.50:26379");
		sentinels.add("172.19.59.50:36379");
		
		ShardedJedisSentinelPool shardedJedisSentinelPool = new ShardedJedisSentinelPool(masters, sentinels, config, 60000);
		ShardedJedis shardedJedis = null;
		
		try{  
			shardedJedis = shardedJedisSentinelPool.getResource(); // 从pool里获取一个jedis
			shardedJedis.set("k8", "88888888888");
			System.out.println(shardedJedis.get("k8"));  
			shardedJedis = shardedJedisSentinelPool.getResource(); // 从pool里获取一个jedis
			shardedJedis.set("k7", "77777777777");
		    System.out.println(shardedJedis.get("k7"));  
		}catch(Exception e){  
		    e.printStackTrace();  
		}finally{  
			shardedJedisSentinelPool.returnResource(shardedJedis);  // 归还一个jedis到pool中
			if(shardedJedisSentinelPool != null) {
				shardedJedisSentinelPool.destroy();
			}
		}  	
	}

}

 

 

运行结果:

2017-8-24 10:25:24 com.suning.test.redispool.ShardedJedisSentinelPool initSentinels
信息: Trying to find all master from available Sentinels...
2017-8-24 10:25:24 com.suning.test.redispool.ShardedJedisSentinelPool initSentinels
信息: Starting Sentinel listeners...
2017-8-24 10:25:24 com.suning.test.redispool.ShardedJedisSentinelPool initPool
信息: Created ShardedJedisPool to master at [172.19.59.47:6379 172.19.59.50:6379 ]
88888888888
77777777777

 

 

分享到:
评论

相关推荐

    redis源码以及哨片分片连接池实现

    总之,了解Redis源码、哨兵系统和分片连接池的实现对于开发高效、可靠的分布式应用至关重要。通过研究这些内容,我们可以更好地优化系统性能,提升服务的稳定性和可扩展性。在实际项目中,结合使用哨兵和分片策略,...

    Spring mvc整合redis实例(redis连接池)

    有了连接工厂后,我们可以创建一个`RedisTemplate`,它是Spring Data Redis提供的模板类,简化了操作Redis的API。 ```xml &lt;bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"&gt; ...

    Redis哨兵模式配置文件

    3. **sentinel-1.conf, sentinel-2.conf**:由于你提到的是"一主二从三哨兵"的配置,这意味着有三个哨兵节点,每个哨兵节点都会有自己的sentinel.conf配置文件。这些文件的内容与主sentinel.conf类似,但可能包含...

    redis连接池

    总之,Redis连接池在Ecmall框架中的应用有助于提升系统性能,优化资源管理,确保系统稳定性,尤其是在高并发场景下,它的价值更为突出。通过合理配置连接池参数,可以更好地适应不同业务需求,提供更加顺畅的用户...

    一个简单的支持多个db的redis连接池

    一个简单的支持多个db的redis连接池一个简单的支持多个db的redis连接池一个简单的支持多个db的redis连接池一个简单的支持多个db的redis连接池一个简单的支持多个db的redis连接池一个简单的支持多个db的redis连接池一...

    Redis哨兵主从模式+keepalived.docx

    "Redis哨兵主从模式+keepalived" Redis哨兵主从模式是指 Redis 的一种高可用性解决方案,通过哨兵模式实现自动故障切换和虚拟IP漂移,以提供高可用性的Redis服务。下面是相关知识点的详细解释: 一、Redis主从复制...

    redis 哨兵模式集群文档

    ### Redis哨兵模式集群部署详解 #### 一、Redis哨兵模式概述 Redis哨兵(Sentinel)是一种用于实现高可用性的解决方案。它通过一组哨兵进程来监控主服务器和从服务器是否工作正常,在主服务器出现问题时自动进行故障...

    Redis分片池 代码

    综上所述,`Redis分片池`是为了应对大数据量存储和高并发访问的需求,通过合理的数据分布和连接管理,实现Redis集群的高效运作。提供的代码示例`custom_system`可能是实现这些功能的一个自定义系统,帮助开发者更好...

    spring整合redis(spring模板+连接池+哨兵+json序列化+集群).rar

    本教程将深入探讨如何在Spring环境中整合Redis,包括使用Spring Redis模板、连接池、哨兵系统以及JSON序列化,并进一步讨论在集群环境中的应用。 首先,Spring Redis模板是Spring Data Redis项目的一部分,它为操作...

    redis工具类以及redis 连接池配置

    创建jedis池配置实例,redis各种crud方法,包括使用主从同步,读写分离。工具类中包括存放hash表键值对,键值对以map的方式储存,删除键值对,永久存放键值对,设置过期时间,需要的直接去gitHab上看...

    Python Redis连接 存取数据 示例代码

    资源为使用Python语言实现redis数据库的连接,包含连接池的连接、进行数据的存取,相关示例代码资源为使用Python语言实现redis数据库的连接,包含连接池的连接、进行数据的存取,相关示例代码资源为使用Python语言...

    spring集成redis单节点、集群、哨兵配置

    在IT行业中,Redis是一个广泛应用的高性能键值存储系统,它以内存存储...Spring Data Redis提供了ClusterConnectionFactory来处理集群连接。配置需要指定所有节点的IP和端口,以及集群配置的相关属性。例如: ```xml ...

    redis(哨兵模式配置)

    哨兵(Sentinel)系统是Redis提供的一种高可用性解决方案,能够监控、故障检测以及自动故障迁移,确保服务的稳定运行。 哨兵模式配置主要包括以下几个关键部分: 1. **哨兵(Sentinel)启动配置**:每个Sentinel节点...

    redis连接池jar jedis+common

    3. **配置JedisPool**:创建JedisPool时需要指定Redis服务器的IP和端口、密码(如有)、数据库编号等信息,以及连接池相关参数。 4. **使用JedisPool**:在需要使用Jedis时,从池中borrow一个实例,执行操作后返回给...

    jfinal redis cluster plugin-JFinal redis cluster集群插件 带连接池和Jedis包

    在使用这个插件时,开发者需要按照readme.txt的指导进行配置,包括设置Redis集群的节点信息、连接池参数等。然后,通过JFinal提供的拦截器或者Service等方式,就可以在业务代码中方便地调用Redis的操作,如设置、...

    redis主从配置+sentinel哨兵模式

    ### Redis 主从配置与 Sentinel 哨兵模式详解 #### 一、Redis 主从配置 在分布式系统中,为了提高数据处理能力和系统的可用性,通常会采用 Redis 的主从复制模型。通过设置一个主节点(Master)和一个或多个从节点...

    Linux下安装配置redis详细教程,并配置哨兵模式,redis中文详解

    Linux下安装配置redis详细教程,并配置哨兵模式,redis中文详解. 注意:对只使用redis服务,只需要在主Redis里面配置requirepass,在从Redis里面配置masterauth,密码保持一致,密码尽可能复杂,以免被攻击破解。 ...

    Redis单机、主从、哨兵Jave-Jedis连接代码

    本篇文章将探讨如何使用Java的Jedis库连接Redis的不同部署模式:单机、主从复制和哨兵(Sentinel)系统。 首先,我们从单机模式开始。在单机模式下,Redis服务器独立运行,没有备份或故障转移机制。使用Jedis连接...

    redis集群搭建(一主一从一哨兵)

    在本教程中,我们将关注如何构建一个基础的 Redis 集群,其中包括一个主节点、一个从节点以及一个哨兵(Sentinel)系统,以实现高可用性。 1. **Redis 集群概念** Redis 集群是一种分布式解决方案,将数据分散存储...

    redis jedis 单机、集群、分片存储连接操作redis

    单机版:该类主要完成redis...分片存储:redis分片存储,该方式通过计算hash把数据均匀的存储在相应的redis库中,该方式只能操作通过分片存储的数据,其他单独客户端存储的数据,因hash定位 问题,无法找到键值数据。

Global site tag (gtag.js) - Google Analytics