/**************************************JedisClient.java **************************************/
package com.avit.cache.redis;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import com.avit.util.SysConfig;
public class JedisClient {
public static Logger log = LogManager.getLogger(JedisClient.class);
private static JedisPool jc;
public final static ExecutorService batchAddRedisCache = Executors.newFixedThreadPool(40);
static {
String nodes = SysConfig.getSystemConfig("redis.user.group.node.ip", "10.18.24.29");
String port = SysConfig.getSystemConfig("redis.user.group.node.port", "6870");
int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));
int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));
int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));
int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 资源池中最大连接数
jedisPoolConfig.setMaxTotal(maxTotal);
// 资源池允许最大空闲的连接数
jedisPoolConfig.setMaxIdle(maxIdle);
// 资源池确保最少空闲的连接数
jedisPoolConfig.setMinIdle(minIdle);
// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效
jedisPoolConfig.setBlockWhenExhausted(true);
// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除
jedisPoolConfig.setTestOnBorrow(true);
// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除
jedisPoolConfig.setTestOnReturn(false);
// 是否开启空闲资源监测
jedisPoolConfig.setTestWhileIdle(true);
// 空闲资源的检测周期(单位为毫秒),-1:不检测
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);
// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除
jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);
// 做空闲资源检测时,每次的采样数
// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测
jedisPoolConfig.setNumTestsPerEvictionRun(-1);
jc = new JedisPool(jedisPoolConfig,nodes, Integer.parseInt(port), maxWaitMillis);
}
/**
* 将给定集合的并集存储在指定的集合
*/
public static long sunionstore(String dstkey, String... keys) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.sunionstore(dstkey, keys);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return 0;
}
/*
* 将给定集合的差集存储在指定的集合
*/
public static long sdiffstore(String dstkey, String... keys) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.sdiffstore(dstkey, keys);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return 0;
}
/*
* 返回第一个集合中独有的元素
*/
public static Set<String> sdiff(String... keys) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.sdiff(keys);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return new HashSet<String>();
}
public static void sadd(String key, String member) {
Jedis jedis = null;
try {
jedis = jc.getResource();
jedis.sadd(key, member);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
}
/*
* 返回集合中的所有的成员
*/
public static Set<String> smembers(String key) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.smembers(key);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return new HashSet<String>();
}
/*
* 给主键设置过期时间
*/
public static void expire(String key, int seconds) {
Jedis jedis = null;
try {
jedis = jc.getResource();
jedis.expire(key,seconds);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
}
public static Long delete(String... keys) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.del(keys);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return 0L;
}
public static void sadd(String key, String... member) {
Jedis jedis = null;
try {
jedis = jc.getResource();
jedis.sadd(key, member);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
}
/*
* 返回集合中元素的数量
*/
public static long scard(String dstkey) {
Jedis jedis = null;
try {
jedis = jc.getResource();
Long log1 = jedis.scard(dstkey);
return log1;
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return 0L;
}
/*
* 返回主键是否存在
*/
public static boolean exists(String msgKey) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.exists(msgKey);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return false;
}
/*
* 移除集合中的一个或多个成员元素
*/
public static void srem(String key, String value) {
Jedis jedis = null;
try {
jedis = jc.getResource();
jedis.srem(key, value);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
}
/*
* 修改主键名称
*/
public static void rename(String oldKey , String newKey) {
Jedis jedis = null;
try {
jedis = jc.getResource();
jedis.rename(oldKey, newKey);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
}
/*
* 批量添加,一次插入不超过1024
*/
public static void batchSaddCache(String[] dstUsers, final String key) {
//一次添加上限1024
if(dstUsers.length > 1024) {
List<String> arrays = Arrays.asList(dstUsers);
int pagesize = 1024;
int totalcount = arrays.size();
int pagecount = 0;//总页数
int m = totalcount % pagesize;
if(m > 0){
pagecount = totalcount / pagesize + 1;
}else{
pagecount = totalcount / pagesize;
}
try {
final CountDownLatch countDownLatch = new CountDownLatch(pagecount);
for(int i = 1; i <= pagecount; i++) {
List<String> subList = null;
if (m == 0) {
subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
}else{
if(i == pagecount){
subList = arrays.subList((i - 1) * pagesize, totalcount);
}else{
subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
}
}
final List<String> finalSubList = subList;
batchAddRedisCache.submit(new Runnable() {
public void run() {
try {
String[] strings = new String[finalSubList.size()];
finalSubList.toArray(strings);
JedisClient.sadd(key,strings);
} catch (Exception ex) {
log.error("",ex);
} finally {
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
} catch (Exception ex) {
log.error("",ex);
}
} else {
if(dstUsers.length <= 0) {
} else {
JedisClient.sadd(key,dstUsers);
}
}
}
/*
* 如果超过10000条数据,需要循环读取
*/
public static Set<String> scan(String key) {
Jedis jedis = null;
try {
jedis = jc.getResource();
Long count = jedis.scard(key);
if(count > 10000) {
Set<String> result = new HashSet<String>();
ScanParams scanParams = new ScanParams();
scanParams.count(2000);
String counter = ScanParams.SCAN_POINTER_START;
while(true){
ScanResult<String> ret = jedis.sscan(key, counter, scanParams);
counter = ret.getStringCursor();
result.addAll(ret.getResult());
if(result.size() >= count || counter.equals("0")) {//遍历完成
break;
}
}
return result;
} else {
return jedis.smembers(key);
}
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return new HashSet<String>();
}
/*
* 将给定集合的交集存储在指定的集合
*/
public static Long sinterstore(String key, String... keys) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.sinterstore(key, keys);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return 0L;
}
/*
* 元素是否是集合的成员
*/
public static boolean sismember(String key, String member) {
Jedis jedis = null;
try {
jedis = jc.getResource();
return jedis.sismember(key, member);
} catch (Exception ex) {
log.error("",ex);
} finally {
if(jedis != null) {
jedis.close();
}
}
return false;
}
public static void main(String[] args) {
/*
String dstKey = "dstTestKey";
JedisClient.sadd("testKey1", "nihao");
JedisClient.sadd("testKey1", "hello");
JedisClient.sadd("testKey1", "word");
JedisClient.sadd("testKey3", "nihao");
JedisClient.sadd("testKey3", "this is text");
JedisClient.sadd("testKey3", "wo");
JedisClient.sunionstore(dstKey, new String[] {"testKey1", "testKey3"});
Set<String> reuslt1 = JedisClient.smembers(dstKey);
System.out.println(reuslt1);
JedisClient.sadd("testKey4", "key4");
JedisClient.sadd("testKey4", "key4 hahah");
JedisClient.sadd("testKey4", "key4 hahawo");
JedisClient.sunionstore(dstKey, new String[] {"testKey4"});
Set<String> reuslt3 = JedisClient.smembers(dstKey);
System.out.println(reuslt3);
JedisClient.sadd("testKey2", "word");
JedisClient.sadd("testKey2", "xxx");
JedisClient.sadd("testKey2", "test");
JedisClient.sdiffstore(dstKey, new String[] {dstKey, "testKey2"});
Set<String> reuslt2 = JedisClient.smembers(dstKey);
System.out.println(reuslt2);*/
Set<String> set = new HashSet<String>();
set.add("USER_GROUP_CACHE_KEY_A3601099727_0");
set.add("USER_GROUP_CACHE_KEY_A110163759_0");
String bd = "USER_GROUP_CACHE_KEY_"+"A110163759";
for (String a : set) {
boolean flag = a.indexOf(bd) != -1;
System.out.println(flag);
}
System.out.println("jja")
}
}
/****************************************JedisClusterClient.java***************************************************/
package com.avit.cache.redis;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import com.avit.common.queue.TaskConsumer;
import com.avit.util.SysConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisClusterClient {
private static JedisCluster jc;
static {
String nodes = SysConfig.getSystemConfig("redis.nodes", "");
int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));
int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));
int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));
int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));
Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
for (String node : nodes.split(",")) {
String host = node.split(":")[0];
int port = Integer.parseInt(node.split(":")[1]);
jedisClusterNodes.add(new HostAndPort(host, port));
}
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 资源池中最大连接数
jedisPoolConfig.setMaxTotal(maxTotal);
// 资源池允许最大空闲的连接数
jedisPoolConfig.setMaxIdle(maxIdle);
// 资源池确保最少空闲的连接数
jedisPoolConfig.setMinIdle(minIdle);
// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效
jedisPoolConfig.setBlockWhenExhausted(true);
// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除
jedisPoolConfig.setTestOnBorrow(true);
// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除
jedisPoolConfig.setTestOnReturn(false);
// 是否开启空闲资源监测
jedisPoolConfig.setTestWhileIdle(true);
// 空闲资源的检测周期(单位为毫秒),-1:不检测
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);
// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除
jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);
// 做空闲资源检测时,每次的采样数
// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测
jedisPoolConfig.setNumTestsPerEvictionRun(-1);
jc = new JedisCluster(jedisClusterNodes, jedisPoolConfig);
}
public static String buildKey(String keyPre, String code) {
StringBuffer sb = new StringBuffer();
sb.append(keyPre).append("_").append(code);
return sb.toString();
}
public static void set(String key, String value) {
jc.set(key, value);
}
public static String get(String key) {
String value = jc.get(key);
return value;
}
public static void sadd(String key, String member) {
jc.sadd(key, member);
}
public static void sadd(String key, String... member) {
jc.sadd(key, member);
}
public static void srem(String key, String member) {
jc.srem(key, member);
}
public static void srem(String key, String[] member) {
jc.srem(key, member);
}
public static Set<String> smembers(String key) {
return jc.smembers(key);
}
public static void expire(String key, int seconds) {
jc.expire(key, seconds);
}
public static String spop(String key) {
return jc.spop(key);
}
public static boolean sismember(String key, String member) {
return jc.sismember(key, member);
}
public static String srandmember(String key) {
return jc.srandmember(key);
}
public static Long lpush(String key, String content) {
return jc.lpush(key, content);
}
public static String rpop (String key) {
return jc.rpop(key);
}
public static long incr(String key) {
return jc.incr(key);
}
public static Long rpush(String key, String... content) {
return jc.rpush(key, content);
}
public static JedisClusterPipeline getJedisClusterPipeline() {
return JedisClusterPipeline.pipelined(jc);
}
public static long sunionstore(String dstkey , String... keys) {
return jc.sunionstore(dstkey, keys);
}
public static long sdiffstore(String dstkey , String... keys) {
return jc.sdiffstore(dstkey, keys);
}
public static long scard(String dstkey) {
return jc.scard(dstkey);
}
public static long zadd(String key,double score, String member) {
return jc.zadd(key, score, member);
}
public static long zcard(String key) {
return jc.zcard(key);
}
public static long zremrangeByScore(String key, String start, String end) {
return jc.zremrangeByScore(key, start, end);
}
/**
* 判断集群是否有节点挂掉
* @return
*/
public static boolean isNodesWell(){
Map<String, JedisPool> clusterNodes = jc.getClusterNodes();
try {
for (Entry<String, JedisPool> enty : clusterNodes.entrySet()) {
Jedis jd = enty.getValue().getResource();
if (jd.isConnected()) jd.close();
}
} catch (Exception e) {
return false ;
}
return true ;
}
/**
* 判断key是否存在
*/
public static Boolean exists(String key) {
return jc.exists(key);
}
public static Long delete(String... keys) {
return jc.del(keys);
}
public static void batchSaddCache(String[] dstUsers, final String key) {
if(dstUsers.length > 1024) {//一次添加超过1024个元素 ,循环插入
List<String> arrays = Arrays.asList(dstUsers);
int pagesize = 1024;
int totalcount = arrays.size();
int pagecount = 0;
int m = totalcount % pagesize;
if(m > 0){
pagecount = totalcount / pagesize + 1;
}else{
pagecount = totalcount / pagesize;
}
try {
final CountDownLatch countDownLatch = new CountDownLatch(pagecount);
for(int i = 1; i <= pagecount; i++) {
List<String> subList = null;
if (m == 0) {
subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
}else{
if(i == pagecount){
subList = arrays.subList((i - 1) * pagesize, totalcount);
}else{
subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
}
}
final List<String> finalSubList = subList;
TaskConsumer.batchAddRedisCache.submit(new Runnable() {
public void run() {
try {
String[] strings = new String[finalSubList.size()];
finalSubList.toArray(strings);
JedisClusterClient.sadd(key,strings);
} catch (Exception ex) {
System.out.println(ex);
} finally {
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
} catch (Exception ex) {
}
} else {
if(dstUsers.length <= 0) {
} else {
JedisClusterClient.sadd(key,dstUsers);
}
}
}
public static void batchSaddCache(String[] dstUsers, String key, int dstUserNumOneKey) {
if(dstUsers.length > 1024) {//一次添加超过1024个元素 ,循环插入
List<String> arrays = Arrays.asList(dstUsers);
int pagesize = 1024;
int totalcount = arrays.size();
int pagecount = 0;
int m = totalcount % pagesize;
if(m > 0){
pagecount = totalcount / pagesize + 1;
}else{
pagecount = totalcount / pagesize;
}
for(int i = 1; i <= pagecount; i++) {
if (m == 0) {
List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
String[] strings = new String[subList.size()];
subList.toArray(strings);
JedisClusterClient.sadd(key,strings);
}else{
if(i == pagecount){
List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);
String[] strings = new String[subList.size()];
subList.toArray(strings);
JedisClusterClient.sadd(key,strings);
}else{
List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
String[] strings = new String[subList.size()];
subList.toArray(strings);
JedisClusterClient.sadd(key,strings);
}
}
}
} else {
JedisClusterClient.sadd(key,dstUsers);
}
}
public static void batchSrem(String offMsgKey, String[] dstUsers) {
if(dstUsers.length > 1024) {//一次添加超过1024个元素 ,循环插入
List<String> arrays = Arrays.asList(dstUsers);
int pagesize = 1024;
int totalcount = arrays.size();
int pagecount = 0;
int m = totalcount % pagesize;
if(m > 0){
pagecount = totalcount / pagesize + 1;
}else{
pagecount = totalcount / pagesize;
}
for(int i = 1; i <= pagecount; i++) {
if (m == 0) {
List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
String[] strings = new String[subList.size()];
subList.toArray(strings);
JedisClusterClient.srem(offMsgKey,strings);
}else{
if(i == pagecount){
List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);
String[] strings = new String[subList.size()];
subList.toArray(strings);
JedisClusterClient.srem(offMsgKey,strings);
}else{
List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));
String[] strings = new String[subList.size()];
subList.toArray(strings);
JedisClusterClient.srem(offMsgKey,strings);
}
}
}
} else {
JedisClusterClient.srem(offMsgKey,dstUsers);
}
}
public static void main(String[] args) {
JedisClusterClient.set("you","testMSg hello");
String foo1 = JedisClusterClient.get("you");
System.out.println(foo1);
String foo = JedisClusterClient.get("foo");
System.out.println(foo);
String umrKey = "UMR_0__VodS04194";
long t1 = System.currentTimeMillis();
Set<String> msgCodes = JedisClusterClient.smembers(umrKey);
long t2 = System.currentTimeMillis();
System.out.println("set size is " + msgCodes.size() + " cost " + (t2 - t1) + "ms");
long t3 = System.currentTimeMillis();
String code = JedisClusterClient.srandmember(umrKey);
long t4 = System.currentTimeMillis();
System.out.println("get random value "+ code + " cost " + (t4 - t3) + "ms");
}
/*************************************************************JedisClusterPipeline.java*************/
package com.avit.cache.redis;
import java.io.Closeable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.Client;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.util.JedisClusterCRC16;
import redis.clients.util.SafeEncoder;
/**
* 在集群模式下提供批量操作的功能。 <br/>
* 由于集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变更),
* 因此,该实现不保证一定成功,建议在批量操作之前调用 refreshCluster() 方法重新获取集群信息。<br />
* 应用需要保证不论成功还是失败都会调用close() 方法,否则可能会造成泄露。<br/>
* 如果失败需要应用自己去重试,因此每个批次执行的命令数量需要控制。防止失败后重试的数量过多。<br />
* 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的情况下使用,且允许失败或有对应的重试策略。<br />
*/
public class JedisClusterPipeline extends PipelineBase implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);
// 部分字段没有对应的获取方法,只能采用反射来做
// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口
private static final Field FIELD_CONNECTION_HANDLER;
private static final Field FIELD_CACHE;
static {
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
}
private JedisSlotBasedConnectionHandler connectionHandler;
private JedisClusterInfoCache clusterInfoCache;
private Queue<Client> clients = new LinkedList<Client>();// 根据顺序存储每个命令对应的Client
private Map<JedisPool, Jedis> jedisMap = new HashMap<JedisPool, Jedis>();// 用于缓存连接
private boolean hasDataInBuf = false;// 是否有数据在缓存区
/**
* 根据jedisCluster实例生成对应的JedisClusterPipeline
* @param
* @return
*/
public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {
JedisClusterPipeline pipeline = new JedisClusterPipeline();
pipeline.setJedisCluster(jedisCluster);
return pipeline;
}
public JedisClusterPipeline() {
}
public void setJedisCluster(JedisCluster jedis) {
connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);
clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
}
/**
* 刷新集群信息,当集群信息发生变更时调用
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
/**
* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
*/
public void sync() {
innerSync(null);
}
/**
* 同步读取所有数据 并按命令顺序返回一个列表
*
* @return 按照命令的顺序返回所有的数据
*/
public List<Object> syncAndReturnAll() {
List<Object> responseList = new ArrayList<Object>();
innerSync(responseList);
return responseList;
}
private void innerSync(List<Object> formatted) {
try {
for (Client client : clients) {
// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。
// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了
Object data = generateResponse(client.getOne()).get();
if (null != formatted) {
formatted.add(data);
}
}
} catch (JedisRedirectionException jre) {
if (jre instanceof JedisMovedDataException) {
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
refreshCluster();
}
throw jre;
} finally {
// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
for (Jedis jedis : jedisMap.values()) {
flushCachedData(jedis);
}
hasDataInBuf = false;
close();
}
}
@Override
public void close() {
clean();
clients.clear();
for (Jedis jedis : jedisMap.values()) {
if (hasDataInBuf) {
flushCachedData(jedis);
}
jedis.close();
}
jedisMap.clear();
hasDataInBuf = false;
}
private void flushCachedData(Jedis jedis) {
try {
jedis.getClient().getAll();
} catch (RuntimeException ex) {
// 其中一个client出问题,后面出问题的几率较大
LOGGER.error("flushCachedData error...", ex);
}
}
@Override
protected Client getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}
@Override
protected Client getClient(byte[] key) {
Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));
Client client = jedis.getClient();
clients.add(client);
return client;
}
private Jedis getJedis(int slot) {
JedisPool pool = clusterInfoCache.getSlotPool(slot);
// 根据pool从缓存中获取Jedis
Jedis jedis = jedisMap.get(pool);
if (null == jedis) {
jedis = pool.getResource();
jedisMap.put(pool, jedis);
}
hasDataInBuf = true;
return jedis;
}
private static Field getField(Class<?> cls, String fieldName) {
try {
Field field = cls.getDeclaredField(fieldName);
field.setAccessible(true);
return field;
} catch (Exception e) {
throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);
}
}
@SuppressWarnings({"unchecked" })
private static <T> T getValue(Object obj, Field field) {
try {
return (T)field.get(obj);
} catch (Exception e) {
LOGGER.error("get value fail", e);
throw new RuntimeException(e);
}
}
}
相关推荐
linux基础进阶笔记,配套视频:https://www.bilibili.com/list/474327672?sid=4493093&spm_id_from=333.999.0.0&desc=1
IMG20241115211541.jpg
GEE训练教程——Landsat5、8和Sentinel-2、DEM和各2哦想指数下载
该资源内项目源码是个人的课程设计、毕业设计,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! ## 项目备注 1、该资源内项目代码都经过严格测试运行成功才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。
基于springboot家政预约平台源码数据库文档.zip
Ucharts添加stack和折线图line的混合图
基于springboot员工在线餐饮管理系统源码数据库文档.zip
新能源汽车进出口数据 1、时间跨度:2018-2020年 2、指标说明:包含如下指标的进出口数据:混合动力客车(10座及以上)、纯电动客车(10座及以上)、非插电式混合动力乘用车、插电式混合动力乘用车、纯电动乘用车 二、新能源汽车进出口月销售数据(分地区、分类型、分 级别) 1、数据来源:见资料内说明 2、时间跨度:2014年1月-2021年5月 4、指标说明: 包含如下指标 2015年1月-2021年5月新能源乘用车终端月度销量(分类型)部分内容如下: 新能源乘用车(单月值、累计值 )、插电式混合动力 月度销量合计(狭义乘用车轿车、SUV、MPV、交叉型乘用车); 月度销量同比增速(狭义乘用车轿车、SUV、MPV、交叉型乘用车); 累计销量合计(狭义乘用车轿车、SUV、IPV、交叉型乘用车); 累计销量同比增速(狭义乘用车轿车、SUV、MPV、交叉型乘用车); 累计结构变化(狭义乘用车轿车、SUV、IPV、交叉型乘用车); 2015年1月-2021年5月新能源乘用车终端月度销量(分地区)内容如下: 更多见资源内
中心主题-241121215200.pdf
内容概要:本文档提供了多个蓝奏云下载链接及其对应解压密码,帮助用户快速获取所需文件。 适合人群:需要从蓝奏云下载文件的互联网用户。 使用场景及目标:方便地记录并分享蓝奏云上文件的下载地址和密码,提高下载效率。 阅读建议:直接查看并使用提供的链接和密码即可。若遇到失效情况,请尝试联系上传者确认更新后的链接。
基于Java web 实现的仓库管理系统源码,适用于初学者了解Java web的开发过程以及仓库管理系统的实现。
资源名称:Python-文件重命名-自定义添加文字-重命名 类型:windows—exe可执行工具 环境:Windows10或以上系统 功能: 1、点击按钮 "源原文"【浏览】表示:选择重命名的文件夹 2、点击按钮 "保存文件夹"【浏览】表示:保存的路径(为了方便可选择保存在 源文件中 ) 3、功能①:在【头部】添加自定义文字 4、功能②:在【尾部】添加自定义文字 5、功能③:输入源字符 ;输入替换字符 可以将源文件中的字符替换自定义的 6、功能④:自动加上编号_1 _2 _3 优点: 1、非常快的速度! 2、已打包—双击即用!无需安装! 3、自带GUI界面方便使用!
JDK8安装包
配合作者 一同使用 作者地址没有次下载路径 https://blog.csdn.net/weixin_52372189/article/details/127471149?fromshare=blogdetail&sharetype=blogdetail&sharerId=127471149&sharerefer=PC&sharesource=weixin_45375332&sharefrom=from_link
GEE训练教程
该资源内项目源码是个人的课程设计、毕业设计,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! ## 项目备注 1、该资源内项目代码都经过严格测试运行成功才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。
基于springboot交通感知与车路协同系统源码数据库文档.zip
基于springboot+vue 雅妮电影票购买系统源码数据库文档.zip
为了更好地理解 HTML5 的拖放功能,我们设计了一个简单有趣的示例:将水果从水果区拖放到购物笼中,实时更新数量和价格,并在所有水果被成功放置后,播放音效并显示提示。
该资源内项目源码是个人的课程设计、毕业设计,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! ## 项目备注 1、该资源内项目代码都经过严格测试运行成功才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。