`

JedisClientUtil.java

 
阅读更多

/**************************************JedisClient.java **************************************/

package com.avit.cache.redis;

 

import java.util.Arrays;

import java.util.HashSet;

import java.util.List;

import java.util.Set;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import org.apache.logging.log4j.LogManager;

import org.apache.logging.log4j.Logger;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

import redis.clients.jedis.ScanParams;

import redis.clients.jedis.ScanResult;

 

import com.avit.util.SysConfig;

 

public class JedisClient {

public static Logger log = LogManager.getLogger(JedisClient.class);

private static JedisPool  jc;

 

public final static ExecutorService batchAddRedisCache = Executors.newFixedThreadPool(40);

 

static {

String nodes = SysConfig.getSystemConfig("redis.user.group.node.ip", "10.18.24.29");

String port = SysConfig.getSystemConfig("redis.user.group.node.port", "6870");

int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));

int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));

int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));

int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));

 

 

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

// 资源池中最大连接数

jedisPoolConfig.setMaxTotal(maxTotal);

// 资源池允许最大空闲的连接数

jedisPoolConfig.setMaxIdle(maxIdle);

// 资源池确保最少空闲的连接数

jedisPoolConfig.setMinIdle(minIdle);

// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效

jedisPoolConfig.setBlockWhenExhausted(true);

// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)

jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnBorrow(true);

// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnReturn(false);

 

// 是否开启空闲资源监测

jedisPoolConfig.setTestWhileIdle(true);

// 空闲资源的检测周期(单位为毫秒),-1:不检测

jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);

// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除

jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);

// 做空闲资源检测时,每次的采样数

// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测

jedisPoolConfig.setNumTestsPerEvictionRun(-1);

 

jc = new JedisPool(jedisPoolConfig,nodes, Integer.parseInt(port), maxWaitMillis);

}

 

/**

* 将给定集合的并集存储在指定的集合

*/

public static long sunionstore(String dstkey, String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

 

return jedis.sunionstore(dstkey, keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return 0;

}

/*

* 将给定集合的差集存储在指定的集合

*/

public static long sdiffstore(String dstkey, String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sdiffstore(dstkey, keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

 

jedis.close();

}

}

return 0;

}

 

/*

* 返回第一个集合中独有的元素

*/

public static Set<String> sdiff(String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sdiff(keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

 

public static void sadd(String key, String member) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.sadd(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

}

 

/*

* 返回集合中的所有的成员

*/

public static Set<String> smembers(String key) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.smembers(key);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

/*

* 给主键设置过期时间

*/

public static void expire(String key, int seconds) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.expire(key,seconds);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

public static Long delete(String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.del(keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return 0L;

}

 

public static void sadd(String key, String... member) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.sadd(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

/*

* 返回集合中元素的数量

*/

public static long scard(String dstkey) {

Jedis jedis = null;

try {

 

jedis = jc.getResource();

Long log1 =  jedis.scard(dstkey);

return log1;

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return 0L;

}

 

/*

* 返回主键是否存在

*/

public static boolean exists(String msgKey) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.exists(msgKey);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return false;

}

/*

* 移除集合中的一个或多个成员元素

*/

public static void srem(String key, String value) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.srem(key, value);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

/*

* 修改主键名称

*/

public static void rename(String oldKey , String newKey) {

 

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.rename(oldKey, newKey);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

/*

* 批量添加,一次插入不超过1024

*/

public static void batchSaddCache(String[] dstUsers, final String key)  {

 

//一次添加上限1024

if(dstUsers.length > 1024)  {

 

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;//总页数

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

 

try {

 

final CountDownLatch countDownLatch = new CountDownLatch(pagecount);

 

for(int i = 1; i <= pagecount; i++) {

 

List<String> subList = null;

if (m == 0) {

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}else{

if(i == pagecount){

subList = arrays.subList((i - 1) * pagesize, totalcount);

}else{

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}

}

final List<String> finalSubList = subList;

batchAddRedisCache.submit(new  Runnable() {

public void run() {

try {

String[] strings = new String[finalSubList.size()];

 

finalSubList.toArray(strings);

JedisClient.sadd(key,strings);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

countDownLatch.countDown();

}

 

}

});

}

countDownLatch.await();

} catch (Exception ex) {

log.error("",ex);

 

} else {

 

if(dstUsers.length <= 0) {

 

} else {

JedisClient.sadd(key,dstUsers);

}

}

}

/*

* 如果超过10000条数据,需要循环读取

*/

public static Set<String> scan(String key) {

 

Jedis jedis = null;

try {

 

jedis = jc.getResource();

Long count = jedis.scard(key);

    if(count > 10000) {

      Set<String> result = new HashSet<String>();

      ScanParams scanParams = new ScanParams();

  scanParams.count(2000);

  String counter = ScanParams.SCAN_POINTER_START;

 

  while(true){

  

  ScanResult<String> ret = jedis.sscan(key, counter, scanParams);

  counter = ret.getStringCursor();

  result.addAll(ret.getResult());

  

  if(result.size() >= count || counter.equals("0")) {//遍历完成

  break;

  }

  }

  return result;

    } else {

    return jedis.smembers(key);

    }

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

/*

* 将给定集合的交集存储在指定的集合

*/

public static Long sinterstore(String key, String... keys) {

Jedis jedis = null;

try {

 

jedis = jc.getResource();

return jedis.sinterstore(key, keys);

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return 0L;

}

/*

* 元素是否是集合的成员

*/

public static boolean sismember(String key, String member) {

 

 

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sismember(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return false;

}

 

public static void main(String[] args) {

/*

String dstKey = "dstTestKey";

 

 

JedisClient.sadd("testKey1", "nihao");

JedisClient.sadd("testKey1", "hello");

JedisClient.sadd("testKey1", "word");

 

JedisClient.sadd("testKey3", "nihao");

JedisClient.sadd("testKey3", "this is text");

JedisClient.sadd("testKey3", "wo");

 

 

JedisClient.sunionstore(dstKey, new String[] {"testKey1", "testKey3"});

 

Set<String> reuslt1 = JedisClient.smembers(dstKey);

System.out.println(reuslt1);

 

JedisClient.sadd("testKey4", "key4");

JedisClient.sadd("testKey4", "key4 hahah");

JedisClient.sadd("testKey4", "key4 hahawo");

 

        JedisClient.sunionstore(dstKey, new String[] {"testKey4"});

 

Set<String> reuslt3 = JedisClient.smembers(dstKey);

System.out.println(reuslt3);

 

JedisClient.sadd("testKey2", "word");

JedisClient.sadd("testKey2", "xxx");

JedisClient.sadd("testKey2", "test");

 

JedisClient.sdiffstore(dstKey, new String[] {dstKey, "testKey2"});

 

Set<String> reuslt2 = JedisClient.smembers(dstKey);

System.out.println(reuslt2);*/

 

Set<String> set = new HashSet<String>();

set.add("USER_GROUP_CACHE_KEY_A3601099727_0");

set.add("USER_GROUP_CACHE_KEY_A110163759_0");

String bd = "USER_GROUP_CACHE_KEY_"+"A110163759";

for (String a : set) {

boolean flag = a.indexOf(bd) != -1;

System.out.println(flag);

}

System.out.println("jja")

}

}

 

 

 /****************************************JedisClusterClient.java***************************************************/

package com.avit.cache.redis;

 

import java.util.Arrays;

import java.util.HashSet;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Set;

import java.util.concurrent.CountDownLatch;

 

import com.avit.common.queue.TaskConsumer;

import com.avit.util.SysConfig;

import redis.clients.jedis.HostAndPort;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisCluster;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

 

public class JedisClusterClient {

 

private static JedisCluster jc;

 

static {

String nodes = SysConfig.getSystemConfig("redis.nodes", "");

int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));

int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));

int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));

int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));

 

Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();

for (String node : nodes.split(",")) {

String host = node.split(":")[0];

int port = Integer.parseInt(node.split(":")[1]);

jedisClusterNodes.add(new HostAndPort(host, port));

}

 

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

// 资源池中最大连接数

jedisPoolConfig.setMaxTotal(maxTotal);

// 资源池允许最大空闲的连接数

jedisPoolConfig.setMaxIdle(maxIdle);

// 资源池确保最少空闲的连接数

jedisPoolConfig.setMinIdle(minIdle);

// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效

jedisPoolConfig.setBlockWhenExhausted(true);

// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)

jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnBorrow(true);

// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnReturn(false);

 

// 是否开启空闲资源监测

jedisPoolConfig.setTestWhileIdle(true);

// 空闲资源的检测周期(单位为毫秒),-1:不检测

jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);

// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除

jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);

// 做空闲资源检测时,每次的采样数

// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测

jedisPoolConfig.setNumTestsPerEvictionRun(-1);

 

jc = new JedisCluster(jedisClusterNodes, jedisPoolConfig);

}

 

public static String buildKey(String keyPre, String code) {

        StringBuffer sb = new StringBuffer();

        sb.append(keyPre).append("_").append(code);

        return sb.toString();

    }

 

public static void set(String key, String value) {

jc.set(key, value);

}

 

public static String get(String key) {

String value = jc.get(key);

return value;

}

 

public static void sadd(String key, String member) {

jc.sadd(key, member);

}

 

public static void sadd(String key, String... member) {

jc.sadd(key, member);

}

 

public static void srem(String key, String member) {

jc.srem(key, member);

}

 

public static void srem(String key, String[] member) {

jc.srem(key, member);

}

 

public static Set<String> smembers(String key) {

return jc.smembers(key);

}

 

public static void expire(String key, int seconds) {

jc.expire(key, seconds);

}

 

public static String spop(String key) {

return jc.spop(key);

}

 

public static boolean sismember(String key, String member) {

return jc.sismember(key, member);

}

 

public static String srandmember(String key) {

 

return jc.srandmember(key);

}

 

 

public static Long lpush(String key, String content) {

 

return jc.lpush(key, content);

 

public static String rpop (String key) {

return jc.rpop(key);

}

 

public static long incr(String key) {

return jc.incr(key);

}

 

public static Long rpush(String key, String... content) {

 

return jc.rpush(key, content);

 

public static JedisClusterPipeline getJedisClusterPipeline()  {

return JedisClusterPipeline.pipelined(jc);

}

 

public static long sunionstore(String dstkey , String... keys) {

return jc.sunionstore(dstkey, keys);

}

 

public static long sdiffstore(String dstkey , String... keys) {

return jc.sdiffstore(dstkey, keys);

}

 

public static long scard(String dstkey) {

return jc.scard(dstkey);

}

 

public static long zadd(String key,double score, String member) {

return jc.zadd(key, score, member);

}

public static long zcard(String key) {

return jc.zcard(key);

}

public static long zremrangeByScore(String key, String start, String end) {

return jc.zremrangeByScore(key, start, end);

}

 

 

/**

* 判断集群是否有节点挂掉

* @return

*/

public static boolean isNodesWell(){

Map<String, JedisPool> clusterNodes = jc.getClusterNodes();

try {

for (Entry<String, JedisPool> enty : clusterNodes.entrySet()) {

Jedis jd = enty.getValue().getResource();

if (jd.isConnected()) jd.close();

}

} catch (Exception e) {

return false ;

}

return true ;

}

 

/**

* 判断key是否存在

*/

public static Boolean exists(String key) {

return jc.exists(key);

}

 

public static Long delete(String... keys) {

return jc.del(keys);

}

 

 

public static void batchSaddCache(String[] dstUsers, final String key)  {

 

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

 

}

 

 

try {

final CountDownLatch countDownLatch = new CountDownLatch(pagecount);

for(int i = 1; i <= pagecount; i++) {

 

 

List<String> subList = null;

if (m == 0) {

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}else{

if(i == pagecount){

subList = arrays.subList((i - 1) * pagesize, totalcount);

}else{

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}

}

final List<String> finalSubList = subList;

TaskConsumer.batchAddRedisCache.submit(new  Runnable() {

public void run() {

try {

String[] strings = new String[finalSubList.size()];

 

finalSubList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

} catch (Exception ex) {

System.out.println(ex);

} finally {

countDownLatch.countDown();

}

 

}

});

 

}

countDownLatch.await();

} catch (Exception ex) {

 

 

 

} else {

if(dstUsers.length <= 0) {

 

} else {

JedisClusterClient.sadd(key,dstUsers);

}

 

}

}

 

 

 

public static void batchSaddCache(String[] dstUsers, String key, int dstUserNumOneKey) {

 

 

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

for(int i = 1; i <= pagecount; i++) {

if (m == 0) {

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

 

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

}else{

if(i == pagecount){

List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

}else{

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

}

}

}

 

} else {

JedisClusterClient.sadd(key,dstUsers);

}

}

 

 

public static void batchSrem(String offMsgKey, String[] dstUsers) {

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

for(int i = 1; i <= pagecount; i++) {

if (m == 0) {

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

 

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

}else{

if(i == pagecount){

List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

 

}else{

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

 

}

}

}

 

} else {

JedisClusterClient.srem(offMsgKey,dstUsers);

}

 

}

 

public static void main(String[] args) {

 

 

JedisClusterClient.set("you","testMSg hello");

String foo1 = JedisClusterClient.get("you");

System.out.println(foo1);

 

String foo = JedisClusterClient.get("foo");

System.out.println(foo);

 

String umrKey = "UMR_0__VodS04194";

long t1 = System.currentTimeMillis();

Set<String> msgCodes = JedisClusterClient.smembers(umrKey);

long t2 = System.currentTimeMillis();

System.out.println("set size is " + msgCodes.size() + " cost " + (t2 - t1) + "ms");

 

long t3 = System.currentTimeMillis();

String code = JedisClusterClient.srandmember(umrKey);

long t4 = System.currentTimeMillis();

System.out.println("get random value "+ code + " cost " + (t4 - t3) + "ms");

 

}

/*************************************************************JedisClusterPipeline.java*************/

package com.avit.cache.redis;

 

import java.io.Closeable;

import java.lang.reflect.Field;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.Queue;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import redis.clients.jedis.BinaryJedisCluster;

import redis.clients.jedis.Client;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisCluster;

import redis.clients.jedis.JedisClusterConnectionHandler;

import redis.clients.jedis.JedisClusterInfoCache;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisSlotBasedConnectionHandler;

import redis.clients.jedis.PipelineBase;

import redis.clients.jedis.exceptions.JedisMovedDataException;

import redis.clients.jedis.exceptions.JedisRedirectionException;

import redis.clients.util.JedisClusterCRC16;

import redis.clients.util.SafeEncoder;

 

/**

 * 在集群模式下提供批量操作的功能。 <br/>

 * 由于集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变更),

 * 因此,该实现不保证一定成功,建议在批量操作之前调用 refreshCluster() 方法重新获取集群信息。<br />

 * 应用需要保证不论成功还是失败都会调用close() 方法,否则可能会造成泄露。<br/>

 * 如果失败需要应用自己去重试,因此每个批次执行的命令数量需要控制。防止失败后重试的数量过多。<br />

 * 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的情况下使用,且允许失败或有对应的重试策略。<br />

 */

public class JedisClusterPipeline extends PipelineBase implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);

 

// 部分字段没有对应的获取方法,只能采用反射来做

// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口

private static final Field FIELD_CONNECTION_HANDLER;

private static final Field FIELD_CACHE; 

static {

FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");

FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");

}

 

private JedisSlotBasedConnectionHandler connectionHandler;

private JedisClusterInfoCache clusterInfoCache;

private Queue<Client> clients = new LinkedList<Client>();// 根据顺序存储每个命令对应的Client

private Map<JedisPool, Jedis> jedisMap = new HashMap<JedisPool, Jedis>();// 用于缓存连接

private boolean hasDataInBuf = false;// 是否有数据在缓存区

 

/**

* 根据jedisCluster实例生成对应的JedisClusterPipeline

* @param 

* @return

*/

public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {

JedisClusterPipeline pipeline = new JedisClusterPipeline();

    pipeline.setJedisCluster(jedisCluster);

    return pipeline;

}

 

public JedisClusterPipeline() {

}

 

public void setJedisCluster(JedisCluster jedis) {

connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);

clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);

}

 

/**

* 刷新集群信息,当集群信息发生变更时调用

* @param 

* @return

*/

public void refreshCluster() {

connectionHandler.renewSlotCache();

}

 

/**

* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化

*/

public void sync() {

innerSync(null);

}

 

/**

* 同步读取所有数据 并按命令顺序返回一个列表

* @return 按照命令的顺序返回所有的数据

*/

public List<Object> syncAndReturnAll() {

List<Object> responseList = new ArrayList<Object>();

 

innerSync(responseList);

 

return responseList;

}

 

private void innerSync(List<Object> formatted) {

try {

for (Client client : clients) {

// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。

// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了

Object data = generateResponse(client.getOne()).get();

if (null != formatted) {

formatted.add(data);

}

}

} catch (JedisRedirectionException jre) {

if (jre instanceof JedisMovedDataException) {

// if MOVED redirection occurred, rebuilds cluster's slot cache,

// recommended by Redis cluster specification

refreshCluster();

}

 

throw jre;

} finally {

// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染

for (Jedis jedis : jedisMap.values()) {

flushCachedData(jedis);

}

 

hasDataInBuf = false;

close();

}

}

 

@Override

public void close() {

clean();

 

clients.clear();

 

for (Jedis jedis : jedisMap.values()) {

if (hasDataInBuf) {

flushCachedData(jedis);

}

 

jedis.close();

}

 

jedisMap.clear();

 

hasDataInBuf = false;

}

 

private void flushCachedData(Jedis jedis) {

try {

jedis.getClient().getAll();

} catch (RuntimeException ex) {

// 其中一个client出问题,后面出问题的几率较大

LOGGER.error("flushCachedData error...", ex);

}

}

 

@Override

protected Client getClient(String key) {

byte[] bKey = SafeEncoder.encode(key);

 

return getClient(bKey);

}

 

@Override

protected Client getClient(byte[] key) {

Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

 

Client client = jedis.getClient();

clients.add(client);

 

return client;

}

 

private Jedis getJedis(int slot) {

JedisPool pool = clusterInfoCache.getSlotPool(slot);

 

// 根据pool从缓存中获取Jedis

Jedis jedis = jedisMap.get(pool);

if (null == jedis) {

jedis = pool.getResource();

jedisMap.put(pool, jedis);

}

 

hasDataInBuf = true;

return jedis;

}

 

private static Field getField(Class<?> cls, String fieldName) {

try {

Field field = cls.getDeclaredField(fieldName);

field.setAccessible(true);

 

return field;

} catch (Exception e) {

throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);

}

}

 

@SuppressWarnings({"unchecked" })

private static <T> T getValue(Object obj, Field field) {

try {

return (T)field.get(obj);

} catch (Exception e) {

LOGGER.error("get value fail", e);

 

throw new RuntimeException(e);

}

}

}

 

 

 

分享到:
评论

相关推荐

    "三菱PLC与触摸屏联合开发气压传动焊条包装线技术详解",No.945 三菱PLC和触摸屏基于气压传动的焊条包装线的研发 ,核心关键词:三菱PLC; 触摸屏; 气压传动; 焊条包装线; 研

    "三菱PLC与触摸屏联合开发气压传动焊条包装线技术详解",No.945 三菱PLC和触摸屏基于气压传动的焊条包装线的研发 ,核心关键词:三菱PLC; 触摸屏; 气压传动; 焊条包装线; 研发; No.945,"三菱PLC与触摸屏在气压传动焊条包装线研发项目No.945中的应用"

    vb图书馆管理系统(源代码+论文).rar

    1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于人工智能、计算机科学与技术等相关专业,更为适合; 4、下载使用后,可先查看README.md文件(如有),本项目仅用作交流学习参考,请切勿用于商业用途。

    [matlab系统程序]MATLAB危险区域预警系统.zip

    本项目是自己做的设计,有GUI界面,完美运行,适合小白及有能力的同学进阶学习,大家可以下载使用,整体有非常高的借鉴价值,大家一起交流学习。该资源主要针对计算机、通信、人工智能、自动化等相关专业的学生、老师或从业者下载使用,亦可作为期末课程设计、课程大作业、毕业设计等。 项目整体具有较高的学习借鉴价值!基础能力强的可以在此基础上修改调整,以实现不同的功能。

    [matlab系统程序]MATLAB图像去雾.zip

    本项目是自己做的设计,有GUI界面,完美运行,适合小白及有能力的同学进阶学习,大家可以下载使用,整体有非常高的借鉴价值,大家一起交流学习。该资源主要针对计算机、通信、人工智能、自动化等相关专业的学生、老师或从业者下载使用,亦可作为期末课程设计、课程大作业、毕业设计等。 项目整体具有较高的学习借鉴价值!基础能力强的可以在此基础上修改调整,以实现不同的功能。

    基于文献知识与知识图谱补全方法用于COVID-19药物再利用的创新算法

    内容概要:文章介绍了针对COVID-19的药物再利用的创新方法,这种方法融合了基于文献的知识(LitCovid和CORD-19数据集)及先进的知识图谱补全技术。具体采用了基于神经网络的TransE、RotatE等多种算法预测药物再利用的潜力,并通过开放和封闭的发现模式为预测结果提供合理的机制解释,包括发现模式、准确性分类及定性评估等手段,增强了方法的实用性。研究表明,TransE表现最优,并成功预测并验证了一系列药物作为COVID-19的治疗候选人选。此外,方法不仅适用于COVID-19,还具备应用于其他疾病药物再利用及其他临床问题解决的潜力。此研究为快速高效地推进药物再利用提供了一个新的计算框架。 适合人群:生物医学科研人员,从事药品再利用、人工智能药物筛选的专业研究人员,对生物信息数据分析和处理感兴趣的学者或技术人员。 使用场景及目标:① 利用计算模型预测药物能否被重新应用于新的适应症,尤其是在面对突发公共卫生事件时加快新药物的研发进程。② 对现有药物进行再评价,以发现更广泛、安全、有效的治疗用途,为临床治疗提供依据和理论指导。③ 探讨通过自动化手段发掘药物作用机理的技术路径。 其他说明:作者团队来自多个国家和地区,研究获得了多项国家级基金支持,论文详尽描述了实验细节,并附上了全部代码和数据资源供后续拓展和重复研究使用。

    "基于三菱PLC与组态王技术的智能交通灯车辆监测系统:No.808的实践与应用",No.808 基于三菱PLC和组态王的智能交通灯车辆监测 ,关键词: 基于三菱PLC; 组态王; 智能交通

    "基于三菱PLC与组态王技术的智能交通灯车辆监测系统:No.808的实践与应用",No.808 基于三菱PLC和组态王的智能交通灯车辆监测 ,关键词: 基于三菱PLC; 组态王; 智能交通灯; 车辆监测; No.808,"三菱PLC与组态王协同的智能交通灯车辆监测系统No.808"

    minecraft1.16.1生存基地 搭配了1.16.1的BSL着色器 BSL光影:https://cdn.modrinth.com/data/Q1vvjJYV/versions/oGcsNfpD/

    在湖上建造的生存基地,希望大家喜欢

    基于西门子S7-1200 PLC与Wincc组态技术的智能路口交通指挥系统解决方案 ,No.698 西门子S7-1200 和Wincc组态基于PLC的路口交通指挥系统 ,No.698; 西门子S7-1

    基于西门子S7-1200 PLC与Wincc组态技术的智能路口交通指挥系统解决方案。,No.698 西门子S7-1200 和Wincc组态基于PLC的路口交通指挥系统 ,No.698; 西门子S7-1200; Wincc组态; PLC; 路口交通指挥系统; 交通控制系统。,基于PLC与Wincc组态的西门子S7-1200交通指挥系统

    电子设计大赛+C题+FPGA+省级获奖

    本资源为无线传输信号模拟系统的完整设计报告,基于ZYNQ7020开发平台实现,包含硬件设计、FPGA算法逻辑、软件控制及详细测试方案。系统可生成直达信号、多径信号及合路信号,支持参数动态调节,适用于通信系统仿真、教学实验及科研开发。 资源内容 设计报告全文:方案论证、理论分析、电路设计、程序流程图、测试结果。 附录数据:AM调制频谱、载波有效值测量、多径时延/衰减/初相实测数据。 配套资料:系统架构图、DAC模块电路图、FPGA算法逻辑框图(PDF+高清图)。 适用场景 设计参考 FPGA数字信号处理开发 无线信道模拟与通信系统仿真 科研项目中的信号生成与测试

    毕业设计&课程设计&毕设&课设-java-ssm网络视频播放器

    项目均经过测试,可正常运行! 环境说明: 开发语言:java JDK版本:jdk1.8 框架:springboot 数据库:mysql 5.7/8 数据库工具:navicat 开发软件:eclipse/idea

    sqllite查询数据库的语句

    sqllite查询数据库的语句

    (源码)基于物联网的Buddy康复激励系统.zip

    # 基于物联网的Buddy康复激励系统 ## 项目简介 Buddy是一个旨在支持和激励个人在日常生活中的身体活动,从而促进康复和保持健康的系统。它由两部分组成可穿戴设备和名为“Wrfel”的游戏组件。通过可穿戴设备追踪用户的步数和心率等身体数据,并在显示屏上展示。名为“Motivationsbuddy”的角色会在每次活动时陪伴用户,并通过提醒和小提示激励用户保持活动。此外,用户还可以通过设备与其他人员进行网络联系。每周用户可以通过掷骰子的方式选择新的活动。收集到的可穿戴设备数据也会在骰子游戏的界面上进行展示。 ## 项目的主要特性和功能 1. 穿戴设备的步数和心率监测功能实时追踪用户的步数和心率,并在显示屏上展示数据。 2. 激励功能通过提醒和小提示激励用户保持活动。 3. 网络联系功能用户可以与其他人员进行网络联系,分享活动数据和经验。 4. 掷骰子活动选择功能每周用户可以通过掷骰子的方式选择新的活动,增加活动的多样性和趣味性。

    (源码)基于MFC框架的指纹识别系统.zip

    # 基于MFC框架的指纹识别系统 ## 项目简介 本项目是一个基于MFC(Microsoft Foundation Classes)框架的指纹识别系统,主要用于指纹的采集、预处理、特征提取、特征过滤、特征匹配和入库等操作。系统通过本地文件夹存储指纹库信息,并提供分步测试、登记和识别功能。 ## 项目的主要特性和功能 1. 指纹采集与预处理 使用指纹采集器(中控ZK4500)进行指纹图像的采集。 通过中值滤波、高斯锐化、均值化等方法对指纹图像进行预处理。 2. 特征提取与过滤 使用Sobel算法进行方向计算,提取图像梯度信息。 通过掩码计算和Gabor滤波增强指纹图像。 使用基于边界的特征过滤算法,减少特征点数量,提高识别速度。 3. 指纹识别与登记 提供指纹登记功能,用户可以通过采集指纹并输入姓名进行登记。 提供指纹识别功能,通过采集指纹并与指纹库中的信息进行匹配,识别用户身份。

    基于Unet技术的医学图像分割系统-DL00366:以皮肤病数据训练的自动分割模型,DL00366-基于Unet的医学图像分割系统 用Unet来做医学图像分割 我们将会以皮肤病的数据作为示范,训练

    基于Unet技术的医学图像分割系统——DL00366:以皮肤病数据训练的自动分割模型,DL00366-基于Unet的医学图像分割系统 用Unet来做医学图像分割。 我们将会以皮肤病的数据作为示范,训练一个皮肤病分割的模型出来,用户输入图像,模型可以自动分割去皮肤病的区域和正常的区域。 ,DL00366; 基于Unet的医学图像分割系统; 皮肤疾病数据; 模型训练; 图像自动分割。,基于Unet的皮肤病图像分割系统

    毕业设计&课程设计&毕设&课设-java-旅游景点线路网站

    项目均经过测试,可正常运行! 环境说明: 开发语言:java JDK版本:jdk1.8 框架:springboot 数据库:mysql 5.7/8 数据库工具:navicat 开发软件:eclipse/idea

    前端Node:第四章:大事件

    前端Node:第四章:大事件

    (源码)基于区块链的金融管理系统.zip

    # 基于区块链的金融管理系统 ## 项目简介 本项目是一个基于区块链技术的金融管理系统,旨在提供一个去中心化、安全可靠的平台,用于处理公司间的财务交易。通过使用智能合约和Python SDK,用户可以进行银行操作、注册公司、登录系统以及进行各种财务操作。 ## 项目的主要特性和功能 ### 主要特性 1. 去中心化利用区块链技术,实现数据的去中心化管理。 2. 安全性通过智能合约和区块链技术,保障数据的安全性和不可篡改性。 3. 可靠性确保交易的可靠性和持久性。 ### 功能 1. 银行界面展示银行相关的数据,如存款、贷款等。 2. 注册与登录允许用户注册新账户并登录系统。 3. 公司管理允许用户创建公司账户,管理公司的财务信息。 4. 财务操作包括转账、购买、融资、还款等操作。 5. 智能合约交互通过Python SDK与智能合约进行交互,实现各种功能。 ## 安装使用步骤 ### 假设用户已经下载了本项目的源码文件

    西门子S7-200 PLC与组态王联合楼宇消防系统电气控制设计解决方案 No.950,No.950 基于西门子S7-200 PLC和组态王楼宇消防系统电气控制系统设计 ,核心关键词:西门子

    西门子S7-200 PLC与组态王联合楼宇消防系统电气控制设计解决方案 No.950,No.950 基于西门子S7-200 PLC和组态王楼宇消防系统电气控制系统设计 ,核心关键词:西门子S7-200 PLC;组态王楼宇消防系统;电气控制系统设计;No.950,基于西门子S7-200 PLC的楼宇消防电气控制系统设计

    java JDK11版本安装包

    Java Development Kit (JDK) 11 版本简介 Java Development Kit (JDK) 作为Java平台的核心组件之一,是开发人员用来构建、运行和测试Java应用程序的必备工具集。JDK 11 是Oracle公司于2018年9月发布的长期支持(LTS)版本,标志着Java语言发展的一个重要里程碑。它不仅继承了之前版本的优点,还引入了一系列新特性与改进,以更好地适应现代软件开发的需求。 主要特点: 性能提升:通过优化垃圾回收机制等手段,JDK 11在性能方面取得了显著进步。 模块化系统:基于Java 9中引入的模块化系统进一步优化,使得开发者能够更高效地组织代码结构,提高安全性及可维护性。 增强的安全性:新增了多项安全功能,比如TLS 1.3的支持,以及对现有加密算法的加强。 新的APIs:增加了许多实用的新APIs,如用于处理HTTP请求的HttpClient API正式版、本地变量类型推断var关键字等。 移除过时元素:为了保持框架的简洁性和现代化,JDK 11移除了部分不推荐使用的API和选项。

    2008-2020年各省国内发明专利申请授权量数据.xlsx

    2008-2020年各省国内发明专利申请授权量数据 1、时间:2008-2020年 2、来源:国家统计J、统计nj 3、指标:行政区划代码、地区、年份、国内发明专利申请授权量(项) 4、范围:31省

Global site tag (gtag.js) - Google Analytics