BinaryJedisCluster public String set(final byte[] key, final byte[] value) { return new JedisClusterCommand<String>(connectionHandler, maxRedirections) { @Override public String execute(Jedis connection) { return connection.set(key, value); } }.runBinary(key); }
JedisClusterCommand public T runBinary(byte[] key) { if (key == null) { throw new JedisClusterException("No way to dispatch this command to Redis Cluster."); } return runWithRetries(key, this.redirections, false, false); } private T runWithRetries(byte[] key, int redirections, boolean tryRandomNode, boolean asking) { if (redirections <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { //获取 连接 connection = connectionHandler.getConnectionFromSlot( //获取槽 JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } // release current connection before recursion releaseConnection(connection); connection = null; // retry with random connection return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { // if MOVED redirection occurred, if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(connection); } // release current connection before recursion or renewing releaseConnection(connection); connection = null; if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { } else { throw new JedisClusterException(jre); } return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection); } } @Override public Jedis getConnectionFromSlot(int slot) { JedisPool connectionPool = cache.getSlotPool(slot); if (connectionPool != null) { // It can't guaranteed to get valid connection because of node // assignment return connectionPool.getResource(); } else { return getConnection(); } }
public abstract class JedisClusterConnectionHandler { protected final JedisClusterInfoCache cache; public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) { this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout); //通过slot 初始化集群信息 initializeSlotsCache(nodes, poolConfig); } abstract Jedis getConnection(); abstract Jedis getConnectionFromSlot(int slot); public Jedis getConnectionFromNode(HostAndPort node) { cache.setNodeIfNotExist(node); return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource(); } public class JedisClusterInfoCache { private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); private final GenericObjectPoolConfig poolConfig; private int connectionTimeout; private int soTimeout; private static final int MASTER_NODE_INDEX = 2; public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, int timeout) { this(poolConfig, timeout, timeout); } public JedisClusterInfoCache(final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout) { this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; } public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); try { this.nodes.clear(); this.slots.clear(); List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos int size = slotInfo.size(); for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.size() <= 0) { continue; } HostAndPort targetNode = generateHostAndPort(hostInfos); setNodeIfNotExist(targetNode); if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } } } finally { w.unlock(); } } //初始化集群信息 public void discoverClusterSlots(Jedis jedis) { w.lock(); try { this.slots.clear(); //通过 slots 命令获取集群信息 List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= 2) { continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos List<Object> hostInfos = (List<Object>) slotInfo.get(2); if (hostInfos.size() <= 0) { continue; } // at this time, we just use master, discard slave information HostAndPort targetNode = generateHostAndPort(hostInfos); setNodeIfNotExist(targetNode); assignSlotsToNode(slotNums, targetNode); } } finally { w.unlock(); } } private HostAndPort generateHostAndPort(List<Object> hostInfos) { return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue()); } public void setNodeIfNotExist(HostAndPort node) { w.lock(); try { String nodeKey = getNodeKey(node); if (nodes.containsKey(nodeKey)) return; JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), connectionTimeout, soTimeout, null, 0, null); nodes.put(nodeKey, nodePool); } finally { w.unlock(); } } public void assignSlotToNode(int slot, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = nodes.get(getNodeKey(targetNode)); if (targetPool == null) { setNodeIfNotExist(targetNode); targetPool = nodes.get(getNodeKey(targetNode)); } slots.put(slot, targetPool); } finally { w.unlock(); } } public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = nodes.get(getNodeKey(targetNode)); if (targetPool == null) { setNodeIfNotExist(targetNode); targetPool = nodes.get(getNodeKey(targetNode)); } for (Integer slot : targetSlots) { slots.put(slot, targetPool); } } finally { w.unlock(); } } public JedisPool getNode(String nodeKey) { r.lock(); try { return nodes.get(nodeKey); } finally { r.unlock(); } } public JedisPool getSlotPool(int slot) { r.lock(); try { return slots.get(slot); } finally { r.unlock(); } } public Map<String, JedisPool> getNodes() { r.lock(); try { return new HashMap<String, JedisPool>(nodes); } finally { r.unlock(); } } public static String getNodeKey(HostAndPort hnp) { return hnp.getHost() + ":" + hnp.getPort(); } public static String getNodeKey(Client client) { return client.getHost() + ":" + client.getPort(); } public static String getNodeKey(Jedis jedis) { return getNodeKey(jedis.getClient()); } private List<Integer> getAssignedSlotArray(List<Object> slotInfo) { List<Integer> slotNums = new ArrayList<Integer>(); for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)) .intValue(); slot++) { slotNums.add(slot); } return slotNums; } } public Map<String, JedisPool> getNodes() { return cache.getNodes(); } private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } for (HostAndPort node : startNodes) { cache.setNodeIfNotExist(node); } } public void renewSlotCache() { for (JedisPool jp : getShuffledNodesPool()) { Jedis jedis = null; try { jedis = jp.getResource(); cache.discoverClusterSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } } public void renewSlotCache(Jedis jedis) { try { cache.discoverClusterSlots(jedis); } catch (JedisConnectionException e) { renewSlotCache(); } } protected List<JedisPool> getShuffledNodesPool() { List<JedisPool> pools = new ArrayList<JedisPool>(); pools.addAll(cache.getNodes().values()); Collections.shuffle(pools); return pools; } }
总结:
1.JedisCluster 会初始化一个 连接获取集群信息通过 solts 命令。(JedisClusterInfoCache 构造方法初始化)
2.get ,set 的时候。会通过key JedisClusterCRC16.getSlot(key) 定位到solt
3. 然后根据solt获取 jedis
public Jedis getConnectionFromSlot(int slot) {
JedisPool connectionPool = cache.getSlotPool(slot);
4.执行操作
读操作:主库,从库都会读
写操作:主库写
相关推荐
JedisCluster jedisCluster = new JedisCluster(nodes, poolConfig); // 在集群中执行命令 String result = jedisCluster.get("key"); jedisCluster.set("key", "value"); // 关闭JedisCluster实例 jedisCluster....
redis命令实践 Redis数据结构、原理分析、应用实战 什么是Redis Redis的作用 ...Redis实战及源码分析 分布式锁实战 管道模式 Redis的应用架构 缓存与数据一致性问题 缓存雪崩与缓存穿透 布隆过滤器
redis基本命令 Redis数据结构、原理分析、应用实战 什么是Redis Redis的作用 ...Redis实战及源码分析 分布式锁实战 管道模式 Redis的应用架构 缓存与数据一致性问题 缓存雪崩与缓存穿透 布隆过滤器
JedisCluster jedisCluster = new JedisCluster(new HostAndPort("node1_ip", node1_port), 2000); ``` 3. **编写业务逻辑**: - 使用注入的`StringRedisTemplate`或`JedisCluster`对象进行数据的读写操作。 4....
**Jedis:Redis的Java客户端** Jedis是Java开发者用于操作Redis数据库的开源客户端库。Redis是一个高性能的键值...通过阅读和分析源码,开发者可以更好地定制Jedis以满足特定项目需求,或为社区贡献自己的改进和优化。
Redis 是一个基于键值对的高性能内存数据存储系统,由 C 语言编写,常被用于缓存和数据...Java 开发者可以使用 Jedis 或 JedisCluster 进行连接和操作,同时需要关注缓存管理和数据同步策略以确保系统的稳定性和性能。
10-14 JedisCluster执行源码分析.mp4 10-13 smart客户端实现原理.mp4 10-12 ask重定向.mp4 10-11 moved异常说明和操作.mp4 10-10 客户端路由-目录.mp4 10-1 集群伸缩目录.mp4 1-9 特性5-功能丰富.mp4 1-8 ...
通过源码分析,我们可以看到它在设计上的灵活性和实用性,以及在处理高并发场景时的优秀性能。无论是开发者还是运维人员,理解并掌握这套解决方案的内在机制,都能在构建大型Web应用时提供有力支持。
在“狂神redis源码笔记”中,我们可以期待学习到Redis的源码分析、内部机制以及如何通过Java进行高效操作。源码分析是深入理解Redis工作原理的关键,有助于开发者优化使用方式或进行定制化开发。 Redis的核心知识点...
深入源码分析,我们可以看到Spring-data-redis如何巧妙地封装了Jedis客户端,提供了抽象的RedisOperations接口,使得操作Redis变得简单。例如,RedisTemplate中的opsForValue()方法返回一个ValueOperations实例,...
Redis是一款高性能的键值对数据库,它以内存存储为主,数据持久...通过阅读本学习笔记和源码分析,你可以深入了解Redis的工作原理,掌握如何在实际项目中高效地使用Redis。不断实践和探索,你将成为Redis的熟练驾驭者。
3. 源码分析: Repoll的源代码可以帮助学习者深入了解如何与Redis服务器进行通信,以及如何构建一个完整的管理平台。源代码可能包含以下几个部分: - 客户端库:使用如Jedis(Java)或redis-py(Python)等客户端...
它以C语言编写,支持网络,是完全开源的,拥有丰富的数据结构,如字符串、哈希、列表、集合、有序集合等,同时还提供了发布/订阅、事务、持久化、主从复制等功能,广泛应用于缓存、消息中间件、实时分析、社交网络等...
- **Jedis (Java Cluster Client)**: Java应用程序可以通过Jedis客户端库来连接Redis集群。 - **r3c (C++ Cluster Client)**: 对于使用C++开发的应用程序,可以使用r3c作为客户端库。 #### 11. 新增节点 - **添加...
4. 示例代码分析:在TestRedisSerializer-master这个项目中,`TestRedisSerializer`可能是测试类,它演示了如何序列化和反序列化Java对象并存储到Redis。序列化是将Java对象转换为字节流的过程,以便于存储或传输,...
《Redis in Action:Java+Python》是一本深入...通过阅读《Redis in Action》并结合提供的Java和Python源码,开发者不仅能掌握Redis的基本操作,还能深入了解其在实际项目中的应用和优化技巧,提升开发效率和系统性能。
最后,手册可能还会涉及Redis与其他技术的集成,如与Spring框架的整合,使用Jedis或Lettuce等客户端库进行开发,以及在微服务架构中的应用案例。 总之,学习Redis不仅需要理解其核心概念和数据结构,还要掌握持久化...
- **10.3 Java 客户端 Jedis**:使用 Java 编写的 Redis 客户端,支持集群模式。 - **10.4 C++ 客户端 r3c**:适用于 C++ 应用程序的 Redis 集群客户端。 #### 十一、节点管理 - **11.1 添加新主节点**:通过 `...
- 要实现集群,需配置`server.xml`中的`Cluster`元素,设置`Channel`和`Manager`。 2. **Redis**: - Redis是一个高性能的键值数据库,常被用作缓存服务。 - 在Session共享中,Redis作为Session仓库,存储所有...