`

JedisClientUtil.java

 
阅读更多

/**************************************JedisClient.java **************************************/

package com.avit.cache.redis;

 

import java.util.Arrays;

import java.util.HashSet;

import java.util.List;

import java.util.Set;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import org.apache.logging.log4j.LogManager;

import org.apache.logging.log4j.Logger;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

import redis.clients.jedis.ScanParams;

import redis.clients.jedis.ScanResult;

 

import com.avit.util.SysConfig;

 

public class JedisClient {

public static Logger log = LogManager.getLogger(JedisClient.class);

private static JedisPool  jc;

 

public final static ExecutorService batchAddRedisCache = Executors.newFixedThreadPool(40);

 

static {

String nodes = SysConfig.getSystemConfig("redis.user.group.node.ip", "10.18.24.29");

String port = SysConfig.getSystemConfig("redis.user.group.node.port", "6870");

int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));

int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));

int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));

int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));

 

 

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

// 资源池中最大连接数

jedisPoolConfig.setMaxTotal(maxTotal);

// 资源池允许最大空闲的连接数

jedisPoolConfig.setMaxIdle(maxIdle);

// 资源池确保最少空闲的连接数

jedisPoolConfig.setMinIdle(minIdle);

// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效

jedisPoolConfig.setBlockWhenExhausted(true);

// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)

jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnBorrow(true);

// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnReturn(false);

 

// 是否开启空闲资源监测

jedisPoolConfig.setTestWhileIdle(true);

// 空闲资源的检测周期(单位为毫秒),-1:不检测

jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);

// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除

jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);

// 做空闲资源检测时,每次的采样数

// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测

jedisPoolConfig.setNumTestsPerEvictionRun(-1);

 

jc = new JedisPool(jedisPoolConfig,nodes, Integer.parseInt(port), maxWaitMillis);

}

 

/**

* 将给定集合的并集存储在指定的集合

*/

public static long sunionstore(String dstkey, String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

 

return jedis.sunionstore(dstkey, keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return 0;

}

/*

* 将给定集合的差集存储在指定的集合

*/

public static long sdiffstore(String dstkey, String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sdiffstore(dstkey, keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

 

jedis.close();

}

}

return 0;

}

 

/*

* 返回第一个集合中独有的元素

*/

public static Set<String> sdiff(String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sdiff(keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

 

public static void sadd(String key, String member) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.sadd(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

}

 

/*

* 返回集合中的所有的成员

*/

public static Set<String> smembers(String key) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.smembers(key);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

/*

* 给主键设置过期时间

*/

public static void expire(String key, int seconds) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.expire(key,seconds);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

public static Long delete(String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.del(keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return 0L;

}

 

public static void sadd(String key, String... member) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.sadd(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

/*

* 返回集合中元素的数量

*/

public static long scard(String dstkey) {

Jedis jedis = null;

try {

 

jedis = jc.getResource();

Long log1 =  jedis.scard(dstkey);

return log1;

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return 0L;

}

 

/*

* 返回主键是否存在

*/

public static boolean exists(String msgKey) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.exists(msgKey);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return false;

}

/*

* 移除集合中的一个或多个成员元素

*/

public static void srem(String key, String value) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.srem(key, value);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

/*

* 修改主键名称

*/

public static void rename(String oldKey , String newKey) {

 

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.rename(oldKey, newKey);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

/*

* 批量添加,一次插入不超过1024

*/

public static void batchSaddCache(String[] dstUsers, final String key)  {

 

//一次添加上限1024

if(dstUsers.length > 1024)  {

 

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;//总页数

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

 

try {

 

final CountDownLatch countDownLatch = new CountDownLatch(pagecount);

 

for(int i = 1; i <= pagecount; i++) {

 

List<String> subList = null;

if (m == 0) {

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}else{

if(i == pagecount){

subList = arrays.subList((i - 1) * pagesize, totalcount);

}else{

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}

}

final List<String> finalSubList = subList;

batchAddRedisCache.submit(new  Runnable() {

public void run() {

try {

String[] strings = new String[finalSubList.size()];

 

finalSubList.toArray(strings);

JedisClient.sadd(key,strings);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

countDownLatch.countDown();

}

 

}

});

}

countDownLatch.await();

} catch (Exception ex) {

log.error("",ex);

 

} else {

 

if(dstUsers.length <= 0) {

 

} else {

JedisClient.sadd(key,dstUsers);

}

}

}

/*

* 如果超过10000条数据,需要循环读取

*/

public static Set<String> scan(String key) {

 

Jedis jedis = null;

try {

 

jedis = jc.getResource();

Long count = jedis.scard(key);

    if(count > 10000) {

      Set<String> result = new HashSet<String>();

      ScanParams scanParams = new ScanParams();

  scanParams.count(2000);

  String counter = ScanParams.SCAN_POINTER_START;

 

  while(true){

  

  ScanResult<String> ret = jedis.sscan(key, counter, scanParams);

  counter = ret.getStringCursor();

  result.addAll(ret.getResult());

  

  if(result.size() >= count || counter.equals("0")) {//遍历完成

  break;

  }

  }

  return result;

    } else {

    return jedis.smembers(key);

    }

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

/*

* 将给定集合的交集存储在指定的集合

*/

public static Long sinterstore(String key, String... keys) {

Jedis jedis = null;

try {

 

jedis = jc.getResource();

return jedis.sinterstore(key, keys);

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return 0L;

}

/*

* 元素是否是集合的成员

*/

public static boolean sismember(String key, String member) {

 

 

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sismember(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return false;

}

 

public static void main(String[] args) {

/*

String dstKey = "dstTestKey";

 

 

JedisClient.sadd("testKey1", "nihao");

JedisClient.sadd("testKey1", "hello");

JedisClient.sadd("testKey1", "word");

 

JedisClient.sadd("testKey3", "nihao");

JedisClient.sadd("testKey3", "this is text");

JedisClient.sadd("testKey3", "wo");

 

 

JedisClient.sunionstore(dstKey, new String[] {"testKey1", "testKey3"});

 

Set<String> reuslt1 = JedisClient.smembers(dstKey);

System.out.println(reuslt1);

 

JedisClient.sadd("testKey4", "key4");

JedisClient.sadd("testKey4", "key4 hahah");

JedisClient.sadd("testKey4", "key4 hahawo");

 

        JedisClient.sunionstore(dstKey, new String[] {"testKey4"});

 

Set<String> reuslt3 = JedisClient.smembers(dstKey);

System.out.println(reuslt3);

 

JedisClient.sadd("testKey2", "word");

JedisClient.sadd("testKey2", "xxx");

JedisClient.sadd("testKey2", "test");

 

JedisClient.sdiffstore(dstKey, new String[] {dstKey, "testKey2"});

 

Set<String> reuslt2 = JedisClient.smembers(dstKey);

System.out.println(reuslt2);*/

 

Set<String> set = new HashSet<String>();

set.add("USER_GROUP_CACHE_KEY_A3601099727_0");

set.add("USER_GROUP_CACHE_KEY_A110163759_0");

String bd = "USER_GROUP_CACHE_KEY_"+"A110163759";

for (String a : set) {

boolean flag = a.indexOf(bd) != -1;

System.out.println(flag);

}

System.out.println("jja")

}

}

 

 

 /****************************************JedisClusterClient.java***************************************************/

package com.avit.cache.redis;

 

import java.util.Arrays;

import java.util.HashSet;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Set;

import java.util.concurrent.CountDownLatch;

 

import com.avit.common.queue.TaskConsumer;

import com.avit.util.SysConfig;

import redis.clients.jedis.HostAndPort;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisCluster;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

 

public class JedisClusterClient {

 

private static JedisCluster jc;

 

static {

String nodes = SysConfig.getSystemConfig("redis.nodes", "");

int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));

int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));

int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));

int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));

 

Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();

for (String node : nodes.split(",")) {

String host = node.split(":")[0];

int port = Integer.parseInt(node.split(":")[1]);

jedisClusterNodes.add(new HostAndPort(host, port));

}

 

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

// 资源池中最大连接数

jedisPoolConfig.setMaxTotal(maxTotal);

// 资源池允许最大空闲的连接数

jedisPoolConfig.setMaxIdle(maxIdle);

// 资源池确保最少空闲的连接数

jedisPoolConfig.setMinIdle(minIdle);

// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效

jedisPoolConfig.setBlockWhenExhausted(true);

// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)

jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnBorrow(true);

// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnReturn(false);

 

// 是否开启空闲资源监测

jedisPoolConfig.setTestWhileIdle(true);

// 空闲资源的检测周期(单位为毫秒),-1:不检测

jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);

// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除

jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);

// 做空闲资源检测时,每次的采样数

// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测

jedisPoolConfig.setNumTestsPerEvictionRun(-1);

 

jc = new JedisCluster(jedisClusterNodes, jedisPoolConfig);

}

 

public static String buildKey(String keyPre, String code) {

        StringBuffer sb = new StringBuffer();

        sb.append(keyPre).append("_").append(code);

        return sb.toString();

    }

 

public static void set(String key, String value) {

jc.set(key, value);

}

 

public static String get(String key) {

String value = jc.get(key);

return value;

}

 

public static void sadd(String key, String member) {

jc.sadd(key, member);

}

 

public static void sadd(String key, String... member) {

jc.sadd(key, member);

}

 

public static void srem(String key, String member) {

jc.srem(key, member);

}

 

public static void srem(String key, String[] member) {

jc.srem(key, member);

}

 

public static Set<String> smembers(String key) {

return jc.smembers(key);

}

 

public static void expire(String key, int seconds) {

jc.expire(key, seconds);

}

 

public static String spop(String key) {

return jc.spop(key);

}

 

public static boolean sismember(String key, String member) {

return jc.sismember(key, member);

}

 

public static String srandmember(String key) {

 

return jc.srandmember(key);

}

 

 

public static Long lpush(String key, String content) {

 

return jc.lpush(key, content);

 

public static String rpop (String key) {

return jc.rpop(key);

}

 

public static long incr(String key) {

return jc.incr(key);

}

 

public static Long rpush(String key, String... content) {

 

return jc.rpush(key, content);

 

public static JedisClusterPipeline getJedisClusterPipeline()  {

return JedisClusterPipeline.pipelined(jc);

}

 

public static long sunionstore(String dstkey , String... keys) {

return jc.sunionstore(dstkey, keys);

}

 

public static long sdiffstore(String dstkey , String... keys) {

return jc.sdiffstore(dstkey, keys);

}

 

public static long scard(String dstkey) {

return jc.scard(dstkey);

}

 

public static long zadd(String key,double score, String member) {

return jc.zadd(key, score, member);

}

public static long zcard(String key) {

return jc.zcard(key);

}

public static long zremrangeByScore(String key, String start, String end) {

return jc.zremrangeByScore(key, start, end);

}

 

 

/**

* 判断集群是否有节点挂掉

* @return

*/

public static boolean isNodesWell(){

Map<String, JedisPool> clusterNodes = jc.getClusterNodes();

try {

for (Entry<String, JedisPool> enty : clusterNodes.entrySet()) {

Jedis jd = enty.getValue().getResource();

if (jd.isConnected()) jd.close();

}

} catch (Exception e) {

return false ;

}

return true ;

}

 

/**

* 判断key是否存在

*/

public static Boolean exists(String key) {

return jc.exists(key);

}

 

public static Long delete(String... keys) {

return jc.del(keys);

}

 

 

public static void batchSaddCache(String[] dstUsers, final String key)  {

 

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

 

}

 

 

try {

final CountDownLatch countDownLatch = new CountDownLatch(pagecount);

for(int i = 1; i <= pagecount; i++) {

 

 

List<String> subList = null;

if (m == 0) {

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}else{

if(i == pagecount){

subList = arrays.subList((i - 1) * pagesize, totalcount);

}else{

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}

}

final List<String> finalSubList = subList;

TaskConsumer.batchAddRedisCache.submit(new  Runnable() {

public void run() {

try {

String[] strings = new String[finalSubList.size()];

 

finalSubList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

} catch (Exception ex) {

System.out.println(ex);

} finally {

countDownLatch.countDown();

}

 

}

});

 

}

countDownLatch.await();

} catch (Exception ex) {

 

 

 

} else {

if(dstUsers.length <= 0) {

 

} else {

JedisClusterClient.sadd(key,dstUsers);

}

 

}

}

 

 

 

public static void batchSaddCache(String[] dstUsers, String key, int dstUserNumOneKey) {

 

 

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

for(int i = 1; i <= pagecount; i++) {

if (m == 0) {

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

 

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

}else{

if(i == pagecount){

List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

}else{

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

}

}

}

 

} else {

JedisClusterClient.sadd(key,dstUsers);

}

}

 

 

public static void batchSrem(String offMsgKey, String[] dstUsers) {

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

for(int i = 1; i <= pagecount; i++) {

if (m == 0) {

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

 

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

}else{

if(i == pagecount){

List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

 

}else{

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

 

}

}

}

 

} else {

JedisClusterClient.srem(offMsgKey,dstUsers);

}

 

}

 

public static void main(String[] args) {

 

 

JedisClusterClient.set("you","testMSg hello");

String foo1 = JedisClusterClient.get("you");

System.out.println(foo1);

 

String foo = JedisClusterClient.get("foo");

System.out.println(foo);

 

String umrKey = "UMR_0__VodS04194";

long t1 = System.currentTimeMillis();

Set<String> msgCodes = JedisClusterClient.smembers(umrKey);

long t2 = System.currentTimeMillis();

System.out.println("set size is " + msgCodes.size() + " cost " + (t2 - t1) + "ms");

 

long t3 = System.currentTimeMillis();

String code = JedisClusterClient.srandmember(umrKey);

long t4 = System.currentTimeMillis();

System.out.println("get random value "+ code + " cost " + (t4 - t3) + "ms");

 

}

/*************************************************************JedisClusterPipeline.java*************/

package com.avit.cache.redis;

 

import java.io.Closeable;

import java.lang.reflect.Field;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.Queue;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import redis.clients.jedis.BinaryJedisCluster;

import redis.clients.jedis.Client;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisCluster;

import redis.clients.jedis.JedisClusterConnectionHandler;

import redis.clients.jedis.JedisClusterInfoCache;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisSlotBasedConnectionHandler;

import redis.clients.jedis.PipelineBase;

import redis.clients.jedis.exceptions.JedisMovedDataException;

import redis.clients.jedis.exceptions.JedisRedirectionException;

import redis.clients.util.JedisClusterCRC16;

import redis.clients.util.SafeEncoder;

 

/**

 * 在集群模式下提供批量操作的功能。 <br/>

 * 由于集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变更),

 * 因此,该实现不保证一定成功,建议在批量操作之前调用 refreshCluster() 方法重新获取集群信息。<br />

 * 应用需要保证不论成功还是失败都会调用close() 方法,否则可能会造成泄露。<br/>

 * 如果失败需要应用自己去重试,因此每个批次执行的命令数量需要控制。防止失败后重试的数量过多。<br />

 * 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的情况下使用,且允许失败或有对应的重试策略。<br />

 */

public class JedisClusterPipeline extends PipelineBase implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);

 

// 部分字段没有对应的获取方法,只能采用反射来做

// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口

private static final Field FIELD_CONNECTION_HANDLER;

private static final Field FIELD_CACHE; 

static {

FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");

FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");

}

 

private JedisSlotBasedConnectionHandler connectionHandler;

private JedisClusterInfoCache clusterInfoCache;

private Queue<Client> clients = new LinkedList<Client>();// 根据顺序存储每个命令对应的Client

private Map<JedisPool, Jedis> jedisMap = new HashMap<JedisPool, Jedis>();// 用于缓存连接

private boolean hasDataInBuf = false;// 是否有数据在缓存区

 

/**

* 根据jedisCluster实例生成对应的JedisClusterPipeline

* @param 

* @return

*/

public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {

JedisClusterPipeline pipeline = new JedisClusterPipeline();

    pipeline.setJedisCluster(jedisCluster);

    return pipeline;

}

 

public JedisClusterPipeline() {

}

 

public void setJedisCluster(JedisCluster jedis) {

connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);

clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);

}

 

/**

* 刷新集群信息,当集群信息发生变更时调用

* @param 

* @return

*/

public void refreshCluster() {

connectionHandler.renewSlotCache();

}

 

/**

* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化

*/

public void sync() {

innerSync(null);

}

 

/**

* 同步读取所有数据 并按命令顺序返回一个列表

* @return 按照命令的顺序返回所有的数据

*/

public List<Object> syncAndReturnAll() {

List<Object> responseList = new ArrayList<Object>();

 

innerSync(responseList);

 

return responseList;

}

 

private void innerSync(List<Object> formatted) {

try {

for (Client client : clients) {

// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。

// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了

Object data = generateResponse(client.getOne()).get();

if (null != formatted) {

formatted.add(data);

}

}

} catch (JedisRedirectionException jre) {

if (jre instanceof JedisMovedDataException) {

// if MOVED redirection occurred, rebuilds cluster's slot cache,

// recommended by Redis cluster specification

refreshCluster();

}

 

throw jre;

} finally {

// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染

for (Jedis jedis : jedisMap.values()) {

flushCachedData(jedis);

}

 

hasDataInBuf = false;

close();

}

}

 

@Override

public void close() {

clean();

 

clients.clear();

 

for (Jedis jedis : jedisMap.values()) {

if (hasDataInBuf) {

flushCachedData(jedis);

}

 

jedis.close();

}

 

jedisMap.clear();

 

hasDataInBuf = false;

}

 

private void flushCachedData(Jedis jedis) {

try {

jedis.getClient().getAll();

} catch (RuntimeException ex) {

// 其中一个client出问题,后面出问题的几率较大

LOGGER.error("flushCachedData error...", ex);

}

}

 

@Override

protected Client getClient(String key) {

byte[] bKey = SafeEncoder.encode(key);

 

return getClient(bKey);

}

 

@Override

protected Client getClient(byte[] key) {

Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

 

Client client = jedis.getClient();

clients.add(client);

 

return client;

}

 

private Jedis getJedis(int slot) {

JedisPool pool = clusterInfoCache.getSlotPool(slot);

 

// 根据pool从缓存中获取Jedis

Jedis jedis = jedisMap.get(pool);

if (null == jedis) {

jedis = pool.getResource();

jedisMap.put(pool, jedis);

}

 

hasDataInBuf = true;

return jedis;

}

 

private static Field getField(Class<?> cls, String fieldName) {

try {

Field field = cls.getDeclaredField(fieldName);

field.setAccessible(true);

 

return field;

} catch (Exception e) {

throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);

}

}

 

@SuppressWarnings({"unchecked" })

private static <T> T getValue(Object obj, Field field) {

try {

return (T)field.get(obj);

} catch (Exception e) {

LOGGER.error("get value fail", e);

 

throw new RuntimeException(e);

}

}

}

 

 

 

分享到:
评论

相关推荐

    智能家居_物联网_环境监控_多功能应用系统_1741777957.zip

    人脸识别项目实战

    PLC热反应炉仿真程序和报告 ,PLC; 热反应炉; 仿真程序; 报告,PLC热反应炉仿真程序报告

    PLC热反应炉仿真程序和报告 ,PLC; 热反应炉; 仿真程序; 报告,PLC热反应炉仿真程序报告

    C++函数全解析:从基础入门到高级特性的编程指南

    内容概要:本文详细介绍了 C++ 函数的基础概念及其实战技巧。内容涵盖了函数的基本结构(定义、声明、调用)、多种参数传递方式(值传递、引用传递、指针传递),各类函数类型(无参无返、有参无返、无参有返、有参有返),以及高级特性(函数重载、函数模板、递归函数)。此外,通过实际案例展示了函数的应用,如统计数组元素频次和实现冒泡排序算法。最后,总结了C++函数的重要性及未来的拓展方向。 适合人群:有一定编程基础的程序员,特别是想要深入了解C++编程特性的开发人员。 使用场景及目标:① 学习C++中函数的定义与调用,掌握参数传递方式;② 掌握不同类型的C++函数及其应用场景;③ 深入理解函数重载、函数模板和递归函数的高级特性;④ 提升实际编程能力,通过实例强化所学知识。 其他说明:文章以循序渐进的方式讲解C++函数的相关知识点,并提供了实际编码练习帮助理解。阅读过程中应当边思考边实践,动手实验有助于更好地吸收知识点。

    `计算机视觉_Python_PyQt5_Opencv_综合图像处理与识别跟踪系统`.zip

    人脸识别项目实战

    Ultra Ethernet Consortium规范介绍与高性能AI网络优化

    内容概要:本文主要介绍了Ultra Ethernet Consortium(UEC)提出的下一代超高性能计算(HPC)和人工智能(AI)网络解决方案及其关键技术创新。文中指出,现代AI应用如大型语言模型(GPT系列)以及HPC对集群性能提出了更高需求。为了满足这一挑战,未来基于超乙太网络的新规格将采用包喷射传输、灵活数据报排序和改进型流量控制等机制来提高尾部延迟性能和整个通信系统的稳定度。同时UEC也在研究支持高效远程直接内存访问的新一代协议,确保能更好地利用现成以太网硬件设施的同时还增强了安全性。 适合人群:网络架构师、数据中心管理员、高性能运算从业人员及相关科研人员。 使用场景及目标:①为构建高效能的深度学习模型训练平台提供理论指导和技术路线;②帮助企业选择最合适的网络技术和优化现有IT基础设施;③推动整个行业内关于大规模分布式系统网络层面上的设计创新。 阅读建议:本文档重点在于展示UEC如何解决目前RDMA/RoCE所面临的问题并提出了一套全新的设计理念用于未来AI和HPC环境下的通信效率提升。在阅读时需要注意理解作者对于当前网络瓶颈分析背后的原因以及新设计方案所能带来的具体好处

    (参考GUI)MATLAB道路桥梁裂缝检测.zip

    (参考GUI)MATLAB道路桥梁裂缝检测.zip

    pygeos-0.14.0-cp311-cp311-win-amd64.whl

    pygeos-0.14.0-cp311-cp311-win_amd64.whl

    微信小程序_人脸识别_克隆安装_社交娱乐用途_1741777709.zip

    人脸识别项目实战

    基于Matlab的模拟光子晶体光纤中的电磁波传播特性 对模式场的分布和有效折射率的计算 模型使用有限差分时域(FDTD)方法来求解光波在PCF中的传播模式 定义物理参数、光纤材料参数、光波参数、PC

    基于Matlab的模拟光子晶体光纤中的电磁波传播特性 对模式场的分布和有效折射率的计算 模型使用有限差分时域(FDTD)方法来求解光波在PCF中的传播模式 定义物理参数、光纤材料参数、光波参数、PCF参数及几何结构等参数 有限差分时域(FDTD)方法:这是一种数值模拟方法,用于求解麦克斯韦方程,模拟电磁波在不同介质中的传播 特征值问题求解:使用eigs函数求解矩阵的特征值问题,以确定光波的传播模式和有效折射率 模式场分布的可视化:通过绘制模式场的分布图,直观地展示光波在PCF中的传播特性 程序已调通,可直接运行 ,基于Matlab模拟; 光子晶体光纤; 电磁波传播特性; 模式场分布; 有效折射率计算; 有限差分时域(FDTD)方法; 物理参数定义; 几何结构参数; 特征值问题求解; 程序运行。,基于Matlab的PCF电磁波传播模拟与特性分析

    知识图谱与大模型融合实践研究报告:技术路径、挑战及行业应用实例分析

    内容概要:《知识图谱与大模型融合实践研究报告》详细探讨了知识图谱和大模型在企业级落地应用的现状、面临的挑战及融合发展的潜力。首先,介绍了知识图谱与大模型的基本概念和发展历史,并对比分析了两者的优点和缺点,随后重点讨论了两者结合的可行性和带来的具体收益。接下来,报告详细讲解了两者融合的技术路径、关键技术及系统评估方法,并通过多个行业实践案例展示了融合的实际成效。最后提出了对未来的展望及相应的政策建议。 适合人群:对人工智能技术和其应用有兴趣的企业技术人员、研究人员及政策制定者。 使用场景及目标:①帮助企业理解知识图谱与大模型融合的关键技术和实际应用场景;②指导企业在实际应用中解决技术难题,优化系统性能;③推动相关领域技术的进步和发展,为政府决策提供理论依据。 其他说明:报告不仅强调了技术和应用场景的重要性,还关注了安全性和法律法规方面的要求,鼓励各界积极参与到这项新兴技术的研究和开发当中。

    (参考GUI)MATLAB BP神经网络的火焰识别.zip

    神经网络火焰识别,神经网络火焰识别,神经网络火焰识别,神经网络火焰识别,神经网络火焰识别

    人脸识别_实时_ArcFace_多路识别技术_JavaScr_1741771263.zip

    人脸识别项目实战

    telepathy-farstream-0.6.0-5.el7.x64-86.rpm.tar.gz

    1、文件内容:telepathy-farstream-0.6.0-5.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/telepathy-farstream-0.6.0-5.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊

    基于Springboot框架的购物推荐网站的设计与实现(Java项目编程实战+完整源码+毕设文档+sql文件+学习练手好项目).zip

    本东大每日推购物推荐网站管理员和用户两个角色。管理员功能有,个人中心,用户管理,商品类型管理,商品信息管理,商品销售排行榜管理,系统管理,订单管理。 用户功能有,个人中心,查看商品,查看购物资讯,购买商品,查看订单,我的收藏,商品评论。因而具有一定的实用性。 本站是一个B/S模式系统,采用Spring Boot框架作为开发技术,MYSQL数据库设计开发,充分保证系统的稳定性。系统具有界面清晰、操作简单,功能齐全的特点,使得东大每日推购物推荐网站管理工作系统化、规范化。 关键词:东大每日推购物推荐网站;Spring Boot框架;MYSQL数据库 东大每日推购物推荐网站的设计与实现 1 1系统概述 1 1.1 研究背景 1 1.2研究目的 1 1.3系统设计思想 1 2相关技术 3 2.1 MYSQL数据库 3 2.2 B/S结构 3 2.3 Spring Boot框架简介 4 3系统分析 4 3.1可行性分析 4 3.1.1技术可行性 5 3.1.2经济可行性 5 3.1.3操作可行性 5 3.2系统性能分析 5 3.2.1 系统安全性 5 3.2.2 数据完整性 6 3.3系统界面

    使用C语言编程设计实现的平衡二叉树的源代码

    二叉树实现。平衡二叉树(Balanced Binary Tree)是一种特殊的二叉树,其特点是树的高度(depth)保持在一个相对较小的范围内,以确保在进行插入、删除和查找等操作时能够在对数时间内完成。平衡二叉树的主要目的是提高二叉树的操作效率,避免由于不平衡而导致的最坏情况(例如,形成链表的情况)。本资源是使用C语言编程设计实现的平衡二叉树的源代码。

    基于扩张状态观测器eso扰动补偿和权重因子调节的电流预测控制,相比传统方法,增加了参数鲁棒性 降低电流脉动,和误差 基于扩张状态观测器eso补偿的三矢量模型预测控制 ,基于扩张状态观测器; 扰动补

    基于扩张状态观测器eso扰动补偿和权重因子调节的电流预测控制,相比传统方法,增加了参数鲁棒性 降低电流脉动,和误差 基于扩张状态观测器eso补偿的三矢量模型预测控制 ,基于扩张状态观测器; 扰动补偿; 权重因子调节; 电流预测控制; 参数鲁棒性; 电流脉动降低; 误差降低; 三矢量模型预测控制,基于鲁棒性增强和扰动补偿的电流预测控制方法

    永磁同步电机全速域控制高频方波注入法、滑模观测器法SMO、加权切矢量控制Simulink仿真模型 低速域采用高频方波注入法HF,高速域采用滑膜观测器法SMO,期间采用加权形式切 送前方法 1、零低速

    永磁同步电机全速域控制高频方波注入法、滑模观测器法SMO、加权切矢量控制Simulink仿真模型 低速域采用高频方波注入法HF,高速域采用滑膜观测器法SMO,期间采用加权形式切 送前方法 1、零低速域,来用无数字滤波器高频方波注入法, 2.中高速域采用改进的SMO滑模观测器,来用的是sigmoid函数,PLL锁相环 3、转速过渡区域采用加权切法 该仿真各个部分清晰分明,仿真波形效果良好内附详细控制方法资料lunwen 带有参考文献和说明文档,仿真模型 ,核心关键词: 1. 永磁同步电机; 2. 全速域控制; 3. 高频方波注入法; 4. 滑模观测器法SMO; 5. 加权切换矢量控制; 6. Simulink仿真模型; 7. 零低速域控制; 8. 中高速域控制; 9. 转速过渡区域控制; 10. 仿真波形效果; 11. 详细控制方法资料; 12. 参考文献和说明文档。,永磁同步电机多域控制策略的仿真研究

    Buck变器二阶LADRC线性自抗扰控制matlab仿真 包括电压电流双闭环和ladrc控制外环加电流内环控制两种 并进行了对比,ladrc控制超调更小,追踪更快 参考文献 版本为2018b

    Buck变器二阶LADRC线性自抗扰控制matlab仿真 包括电压电流双闭环和ladrc控制外环加电流内环控制两种 并进行了对比,ladrc控制超调更小,追踪更快 参考文献 版本为2018b ,关键词:Buck变换器;二阶LADRC;线性自抗扰控制;Matlab仿真;电压电流双闭环;LADRC控制外环;电流内环控制;对比;超调;追踪;2018b版本。,Matlab仿真二阶LADRC控制的Buck变换器:外环LADRC+内环电流控制对比

    2024全球工程前沿.pdf

    2024全球工程前沿.pdf

Global site tag (gtag.js) - Google Analytics