搭建完redis集群后,可以通过jedis的JedisCluster来访问Redis集群,这里列出使用jedisCluster的spring bean配置方式:
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="5" /> <property name="maxTotal" value="1024" /> <property name="maxWaitMillis" value="5000" /> <property name="testOnBorrow" value="true" /> </bean> <bean id="redisCluster" class="redis.clients.jedis.JedisCluster" <constructor-arg name=“nodes"> <set> <ref bean="hostport1" /> <ref bean="hostport2" /> <ref bean="hostport3" /> <ref bean="hostport4" /> <ref bean="hostport5" /> <ref bean="hostport6" /> </set> </constructor-arg> <constructor-arg name="timeout" value="6000" /> <constructor-arg name="poolConfig" <ref bean="jedisPoolConfig" /> </constructor-arg> </bean>
//此处省略hostport1....6的配置
本质上,JedisCluster中的JedisPool同样也是实现了apache common pool2的对象池,其中的getResource可以拿到对应的Jedis连接。
JedisCluster是如何构建整个连接池
正如类图中所呈现的,JedisClusterConnectionHandler中使用了JedisClusterInfoCache作为缓存初始化容器,将Set<HostAndPort>作为JedisClusterConnectionHandler的构造函数参数传递过去之后,进行初始化slotsCache操作,
public JedisClusterConnectionHandler(Set<HostAndPort> nodes, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) { this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout); initializeSlotsCache(nodes, poolConfig); } private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) { for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); try { cache.discoverClusterNodesAndSlots(jedis); break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } for (HostAndPort node : startNodes) { cache.setNodeIfNotExist(node); } }
在cache.discoverClusterNodesAndSlots中,用到了Jedis.clusterNodes,它可以通过该Redis连接找到其他连接的相关配置,例如可以发现整个集群的配置,其中三个master,三个slave,并且能够识别出自身连接,可参考文档:http://redis.io/commands/cluster-nodes:
./redis-cli -c -h xxx -p 63xx cluster nodes e54b82fd2b5ab238906cff7fc6250a7bc66c6fec 192.168.1.1xx:6389 master - 0 1469151811362 31 connected 0-5460 166baa38c8ab56339c11f0446257c7a6059a219b 192.168.1.1xx:6389 slave 1609b090dfaaac702449b72d30b2330521ce2506 0 1469151812364 29 connected 1609b090dfaaac702449b72d30b2330521ce2506 192.168.1.1xx:6390 master - 0 1469151811362 29 connected 10923-16383 539627a393aa43e82ca8c16d1e935611fec4e709 192.168.1.1xx:6388 myself,master - 0 0 28 connected 5461-10922 d9b3738ff16e99075242b865a0b6cc137c20d502 192.168.1.1xx:6390 slave 539627a393aa43e82ca8c16d1e935611fec4e709 0 1469151810859 28 connected 101227d3cb13f08a47ad2afe1b348d0efc3cb3b0 192.168.1.1xx:6388 slave e54b82fd2b5ab238906cff7fc6250a7bc66c6fec 0 1469151810357 31 connected
命令返回的结果中,可以看到有3个master,3个slave,而且每个slave都有对应的masterid,每个master都有对应的slot范围。
在ClusterNodeInformationParser中,去解析每一行并将对应的slot填充进去,因为只有master上有slot,因此不会填充slave的slot:
public void discoverClusterSlots(Jedis jedis) { w.lock(); try { this.slots.clear(); List<Object> slots = jedis.clusterSlots(); for (Object slotInfoObj : slots) { List<Object> slotInfo = (List<Object>) slotInfoObj; if (slotInfo.size() <= 2) { continue; } List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos List<Object> hostInfos = (List<Object>) slotInfo.get(2); if (hostInfos.size() <= 0) { continue; } // at this time, we just use master, discard slave information HostAndPort targetNode = generateHostAndPort(hostInfos); setNodeIfNotExist(targetNode); assignSlotsToNode(slotNums, targetNode); } } finally { w.unlock(); } }
因此,当我们正常地通过访问JedisCluster的get/set时,通过计算key的slot来获取对应的Jedis Connection,根本不会使用到slave,只会访问master节点。只有一种情况,在tryRandomMode开启时(此时,正常通过slot无法获取有效连接时,可能考虑重新排序)。
@Override public Jedis getConnection() { // In antirez's redis-rb-cluster implementation, // getRandomConnection always return valid connection (able to // ping-pong) // or exception if all connections are invalid List<JedisPool> pools = getShuffledNodesPool(); for (JedisPool pool : pools) { Jedis jedis = null; try { jedis = pool.getResource(); if (jedis == null) { continue; } String result = jedis.ping(); if (result.equalsIgnoreCase("pong")) return jedis; pool.returnBrokenResource(jedis); } catch (JedisConnectionException ex) { if (jedis != null) { pool.returnBrokenResource(jedis); } } } throw new JedisConnectionException("no reachable node in cluster"); }
但此时拿到一个slave节点的可用Connection是非常危险的,加入当前的操作为写操作,将某个字段写入Redis时,由于master不会从slave节点进行复制,会导致该数据操作没有被持久化至master上。
开发基于redis key统一批量处理的中间层
根据redis cluster nodes命令来进行,该命令可以识别出当前集群其余节点的所有状态,master/slave,以及检测的slot位置。
cluster nodes e54b82fd2b5ab238906cff7fc6250a7bc66c6fec 192.168.1.163:6389 master - 0 1469600305090 31 connected 0-5460 166baa38c8ab56339c11f0446257c7a6059a219b 192.168.1.165:6389 slave 1609b090dfaaac702449b72d30b2330521ce2506 0 1469600304588 29 connected 1609b090dfaaac702449b72d30b2330521ce2506 192.168.1.163:6390 master - 0 1469600305592 29 connected 10923-16383 539627a393aa43e82ca8c16d1e935611fec4e709 192.168.1.163:6388 myself,master - 0 0 28 connected 5461-10922 d9b3738ff16e99075242b865a0b6cc137c20d502 192.168.1.165:6390 slave 539627a393aa43e82ca8c16d1e935611fec4e709 0 1469600305090 28 connected 101227d3cb13f08a47ad2afe1b348d0efc3cb3b0 192.168.1.165:6388 slave e54b82fd2b5ab238906cff7fc6250a7bc66c6fec 0 1469600304088 31 connected
cluster nodes 命令的输出有点儿复杂, 它的每一行都是由以下信息组成的:
- 节点 ID :例如 3fc783611028b1707fd65345e763befb36454d73 。
- ip:port :节点的 IP 地址和端口号, 例如 127.0.0.1:7000 , 其中 :0 表示的是客户端当前连接的 IP 地址和端口号。
- flags :节点的角色(例如 master 、 slave 、 myself )以及状态(例如 fail ,等等)。
- 如果节点是一个从节点的话, 那么跟在 flags 之后的将是主节点的节点 ID : 例如 127.0.0.1:7002 的主节点的节点 ID 就是 3c3a0c74aae0b56170ccb03a76b60cfe7dc1912e 。
- 集群最近一次向节点发送 PING 命令之后, 过去了多长时间还没接到回复。
- 节点最近一次返回 PONG 回复的时间。
- 节点的配置纪元(configuration epoch):详细信息请参考 Redis 集群规范 。
- 本节点的网络连接情况:例如 connected 。
- 节点目前包含的槽:例如 127.0.0.1:7001 目前包含号码为 5960 至 10921 的哈希槽。
可以看出redis cluster的slot范围:0-16383,可以采用二分查找的方式,以上面为例,可以分成3个部分的范围slot,以其开头为标识,通过Collections.binarySearch来进行二分查找搜索:
0——5460,5461——10922,10923——16383;
通过JedisPool/JedisConnection初始化客户端连接,并建立slotStarts,其中的ClusterNodeObject作为描述cluster nodes命令返回的行对象:
public void initCluster() { if (jedis instanceof BinaryJedisCluster) { BinaryJedisCluster jedisCluster = (BinaryJedisCluster) jedis; Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes(); Map<String, ClusterNodeObject> hpToNodeObjectMap = new HashMap<>(clusterNodes.size()); for (Map.Entry<String, JedisPool> entry : clusterNodes.entrySet()) { JedisPool jedisPool = entry.getValue(); Jedis jedis = jedisPool.getResource(); String clusterNodesCommand = jedis.clusterNodes(); String[] allNodes = clusterNodesCommand.split("\n"); for (String allNode : allNodes) { String[] splits = allNode.split(" "); String hostAndPort = splits[1]; ClusterNodeObject clusterNodeObject = new ClusterNodeObject(splits[0], splits[1], splits[2].contains("master"), splits[3], Long.parseLong(splits[4]), Long.parseLong(splits[5]), splits[6], splits[7].equalsIgnoreCase("connected"), splits.length == 9 ? splits[8] : null); hpToNodeObjectMap.put(hostAndPort, clusterNodeObject); } } List<Integer> slotStarts = new ArrayList<>(); for (ClusterNodeObject clusterNodeObject : hpToNodeObjectMap.values()) { if (clusterNodeObject.isConnected() && clusterNodeObject.isMaster()) { String slot = clusterNodeObject.getSlot(); String[] slotSplits = slot.split("-"); int slotStart = Integer.parseInt(slotSplits[0]); // int slotEnd = Integer.parseInt(slotSplits[1]); slotStarts.add(slotStart); } } Collections.sort(slotStarts); this.slotStarts = slotStarts; } }
在拿到各个redis key后,通过getSlotByKey方法,获得对应的node编号:
private int getSlotByKey(String key) { int slot = JedisClusterCRC16.getSlot(key); int slotInsertion = Collections.binarySearch(slotStarts, slot); if (slotInsertion < 0) { slotInsertion = Math.abs(slotInsertion + 2); } return slotInsertion; }
最后,当批量查询的keys数组>2时,再进行批量出,否则,只进行单独查询。
if (keys.length > 2 && jedis instanceof JedisCluster) { //如果批量请求key长度大于2,启动批量查询方式 Map<Integer, List<String>> keySlotsMapList = new HashMap<>(); for (String key : keys) { int slotByKey = getSlotByKey(key); if (!keySlotsMapList.containsKey(slotByKey)) { keySlotsMapList.put(slotByKey, new ArrayList<String>()); } keySlotsMapList.get(slotByKey).add(key); } for (Map.Entry<Integer, List<String>> entry : keySlotsMapList.entrySet()) { List<String> slotSameKeys = entry.getValue(); List<String> mgetValues = ((ZhenJedisCluster) jedis) .mget(slotSameKeys.toArray(new String[slotSameKeys.size()])); for (int i = 0; i < slotSameKeys.size(); i++) { result.set(keyList.indexOf(slotSameKeys.get(i)), mgetValues.get(i)); } } } else { for (String key : keys) { result.add(jedis.get(key)); } }
但不能跳过Jedis客户端的slot key检查,其中的批量操作依赖slot是否相同:
public T run(int keyCount, String... keys) { if (keys == null || keys.length == 0) { throw new JedisClusterException("No way to dispatch this command to Redis Cluster."); } // For multiple keys, only execute if they all share the // same connection slot. if (keys.length > 1) { int slot = JedisClusterCRC16.getSlot(keys[0]); for (int i = 1; i < keyCount; i++) { int nextSlot = JedisClusterCRC16.getSlot(keys[i]); if (slot != nextSlot) { throw new JedisClusterException("No way to dispatch this command to Redis Cluster " + "because keys have different slots."); } } } return runWithRetries(SafeEncoder.encode(keys[0]), this.redirections, false, false); }
虽然可以通过重写JedisCluster以及JedisClusterCommand类型(由于有一些依赖包访问权限的类型,需要将这些重写的类型同样放到redis.clients.jedis包中),跳过了JedisCluster的校验,仍然出现连接错误:
Exception in thread "main" redis.clients.jedis.exceptions.JedisDataException: CROSSSLOT Keys in request don't hash to the same slot at redis.clients.jedis.Protocol.processError(Protocol.java:117) at redis.clients.jedis.Protocol.process(Protocol.java:151) at redis.clients.jedis.Protocol.read(Protocol.java:205) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297) at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:233) at redis.clients.jedis.Connection.getMultiBulkReply(Connection.java:226) at redis.clients.jedis.Jedis.mget(Jedis.java:355) at redis.clients.jedis.ZhenJedisCluster$129.execute(ZhenJedisCluster.java:1382) at redis.clients.jedis.ZhenJedisCluster$129.execute(ZhenJedisCluster.java:1) at redis.clients.jedis.ZhenJedisClusterCommand.runWithRetries(ZhenJedisClusterCommand.java:119) at redis.clients.jedis.ZhenJedisClusterCommand.run(ZhenJedisClusterCommand.java:51) at redis.clients.jedis.ZhenJedisCluster.mget(ZhenJedisCluster.java:1384) at com.api.pub.cache.JedisClient.batchGet(JedisClient.java:525) at com.zhen.commons.redis.test.RedisTest.main(RedisTest.java:46)
可以看出,尽管两个slot在同一个连接上能够get到值,但是在cluster模式下,是通过slot判断而非节点node判断是否可以进行mget操作,不能靠跳过jedis客户端的方案来完成类似分组操作。
我们可以通过命令行操作,同样来验证这一点,注意redis-cli连接时保证在cluster模式下运行,加入-c参数:
redis-cli -c -h 192.168.1.138 -p 6388 192.168.1.138:6388> set key1 "key1" -> Redirected to slot [9189] located at 192.168.1.137:6390 OK 192.168.1.137:6390> set key2 "key2" -> Redirected to slot [4998] located at 192.168.1.137:6389 OK 192.168.1.137:6389> set key3 "key3" OK 192.168.1.137:6389> mget key2 key3 (error) CROSSSLOT Keys in request don't hash to the same slot 192.168.1.137:6389> get key2 "key2" 192.168.1.137:6389> get key3 "key3" 192.168.1.137:6389> get key1 -> Redirected to slot [9189] located at 192.168.1.137:6390 "key1" 192.168.1.137:6390> set {aaa}1 "1" OK 192.168.1.137:6390> set {aaa}2 "2" OK 192.168.1.137:6390> mget {aaa}1 {aaa}2 1) "1" 2) "2"
因此,暂且不能在RedisCluster模式下,通过增加中间层来对批量请求进行分组,并处理到对应的slot中,理想很好,但是不能够实现,因为服务端会进行一定的限制。
只能通过HASH_TAG来实现cluster模式下的mget/mset批量操作,我们可以在命令行中通过cluster keyslot ${key}来查看某个key对应的slot,可以从Jedis客户端的源码查看对应的key slot算法:
public static int getSlot(String key) { int s = key.indexOf("{"); if (s > -1) { int e = key.indexOf("}", s + 1); if (e > -1 && e != s + 1) { key = key.substring(s + 1, e); } } // optimization with modulo operator with power of 2 // equivalent to getCRC16(key) % 16384 return getCRC16(key) & (16384 - 1); }
可以看出,keySlot算法中,如果key包含{},就会使用第一个{}内部的字符串作为hash key,这样就可以保证拥有同样{}内部字符串的key就会拥有相同slot。
相关推荐
同时,了解JedisCluster内部实现也能帮助我们理解如何在分布式环境中有效地执行批量操作。 总结起来,要在Java中连接Redis集群并批量插入String类型数据,你需要: 1. 配置JedisCluster实例,包含所有集群节点的...
对jedischangyongApi的一些简单封装和分类,全部标有中文注释,可直接放入项目中使用,jedis集群配置可参考 https://blog.csdn.net/qq_31256487/article/details/83144088;
JedisCluster jedisCluster = new JedisCluster(nodes); ``` 2. **在集群中操作**:大部分命令适用于集群环境,但注意某些操作(如 keys 和 scan)可能无法在集群中正确工作,因为它们可能跨多个节点。 ```java ...
适用于redis集群,3.0版本以上支持集群
3. **客户端断连**: 当节点故障或网络问题导致客户端与某个节点断开连接时,JedisCluster会尝试重新连接,但可能会影响部分操作。 4. **数据一致性**: 由于主从复制的存在,读取操作可能会返回旧值,需要根据业务...
JedisCluster是Jedis库提供的一个类,用于连接和操作Redis集群。在SpringBoot中,我们可以通过配置`application.yml`或`application.properties`文件来设置JedisCluster的相关参数。以下是一个基本的YAML配置示例: ...
JedisClusterUtil.java
6. **关闭连接**:在完成所有操作后,记得关闭`JedisCluster`实例,释放资源: ```java jedisCluster.close(); ``` 在压缩包"redis-Demo-3.4.9"中,可能包含了演示如何使用Jedis操作Redis集群的具体示例代码,...
解决redis集群模式下,jedis不能批量操作问题..............................................................................
4. 执行操作:通过jedisCluster对象的API执行读写操作,如get、set、hgetall等。 源码示例中可能包含以下内容: - 创建JedisCluster实例:`new JedisCluster(new HostAndPort("192.168.1.1", 7000), new HostAnd...
集群操作的基本逻辑与单机版类似,只是需要使用`JedisCluster`对象。 ### Jedis接口工具类设计 为了提高代码的可复用性和可维护性,我们可以定义一个`RedisClient`接口,包含Redis操作的关键方法,然后针对单机和...
- **键操作**:通过`JedisCluster`实例,可以执行基本的键值操作,如设置、获取、删除键等。 ```java jedisCluster.set("key", "value"); String value = jedisCluster.get("key"); jedisCluster.del("key"); ``` ...
Jedis是Java开发的一款高效的Redis客户端库,广泛用于与Redis服务器进行交互,提供丰富的API来操作Redis中的数据结构。在学习Jedis的过程中,了解其源码对于深入理解Redis操作的底层实现以及提升Java编程能力非常有...
一旦有了JedisCluster实例,你就可以进行插入、读取等操作。例如,插入一个字符串键值对: ```java jedisCluster.set("key", "value"); String value = jedisCluster.get("key"); ``` 在SpringBoot应用中,你可以...
7. **批量操作**:使用mset、mget等批量处理多个键值对。 8. **pipeline和事务**:通过pipeline优化网络通信,减少IO次数,提升性能。 9. **异常处理**:捕获并处理JedisException等相关异常,提供友好的错误信息。 ...
JedisCluster jedisCluster = new JedisCluster(nodes, poolConfig); // 在集群中执行命令 String result = jedisCluster.get("key"); jedisCluster.set("key", "value"); // 关闭JedisCluster实例 jedisCluster....
在实际应用中,可以利用插件提供的API进行操作,如`set`、`get`、`del`等,这些方法应该已经封装了与Redis Cluster通信的逻辑。需要注意的是,由于Redis Cluster的数据分布特性,插入和查询数据可能会涉及到多个节点...
然后,可以通过`jedisCluster`实例进行相应的数据操作。 ### 注意事项 1. **关闭资源**:在完成操作后,记得关闭Jedis实例或归还到连接池: ```java jedis.close(); // 单个连接 jedisPool.returnResource...
Java中,JedisCluster库用于与Redis Cluster通信,它能自动发现集群节点并均衡请求。工程中的代码展示了如何连接到Redis Cluster,并进行数据的插入和查询。 4. **数据读取和插入操作**:在这些模式下,工程可能...
SpringBoot整合Redis(单机集群) SpringBoot整合Redis是指在SpringBoot项目中使用Redis作为缓存...在集群模式下,使用JedisCluster机制可以让SpringBoot项目自动 discovers Redis集群节点,自动负载均衡和故障转移。