`

JedisClientUtil.java

 
阅读更多

/**************************************JedisClient.java **************************************/

package com.avit.cache.redis;

 

import java.util.Arrays;

import java.util.HashSet;

import java.util.List;

import java.util.Set;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

 

import org.apache.logging.log4j.LogManager;

import org.apache.logging.log4j.Logger;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

import redis.clients.jedis.ScanParams;

import redis.clients.jedis.ScanResult;

 

import com.avit.util.SysConfig;

 

public class JedisClient {

public static Logger log = LogManager.getLogger(JedisClient.class);

private static JedisPool  jc;

 

public final static ExecutorService batchAddRedisCache = Executors.newFixedThreadPool(40);

 

static {

String nodes = SysConfig.getSystemConfig("redis.user.group.node.ip", "10.18.24.29");

String port = SysConfig.getSystemConfig("redis.user.group.node.port", "6870");

int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));

int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));

int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));

int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));

 

 

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

// 资源池中最大连接数

jedisPoolConfig.setMaxTotal(maxTotal);

// 资源池允许最大空闲的连接数

jedisPoolConfig.setMaxIdle(maxIdle);

// 资源池确保最少空闲的连接数

jedisPoolConfig.setMinIdle(minIdle);

// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效

jedisPoolConfig.setBlockWhenExhausted(true);

// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)

jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnBorrow(true);

// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnReturn(false);

 

// 是否开启空闲资源监测

jedisPoolConfig.setTestWhileIdle(true);

// 空闲资源的检测周期(单位为毫秒),-1:不检测

jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);

// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除

jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);

// 做空闲资源检测时,每次的采样数

// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测

jedisPoolConfig.setNumTestsPerEvictionRun(-1);

 

jc = new JedisPool(jedisPoolConfig,nodes, Integer.parseInt(port), maxWaitMillis);

}

 

/**

* 将给定集合的并集存储在指定的集合

*/

public static long sunionstore(String dstkey, String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

 

return jedis.sunionstore(dstkey, keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return 0;

}

/*

* 将给定集合的差集存储在指定的集合

*/

public static long sdiffstore(String dstkey, String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sdiffstore(dstkey, keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

 

jedis.close();

}

}

return 0;

}

 

/*

* 返回第一个集合中独有的元素

*/

public static Set<String> sdiff(String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sdiff(keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

 

public static void sadd(String key, String member) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.sadd(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

}

 

/*

* 返回集合中的所有的成员

*/

public static Set<String> smembers(String key) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.smembers(key);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

/*

* 给主键设置过期时间

*/

public static void expire(String key, int seconds) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.expire(key,seconds);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

public static Long delete(String... keys) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.del(keys);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return 0L;

}

 

public static void sadd(String key, String... member) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.sadd(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

/*

* 返回集合中元素的数量

*/

public static long scard(String dstkey) {

Jedis jedis = null;

try {

 

jedis = jc.getResource();

Long log1 =  jedis.scard(dstkey);

return log1;

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return 0L;

}

 

/*

* 返回主键是否存在

*/

public static boolean exists(String msgKey) {

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.exists(msgKey);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return false;

}

/*

* 移除集合中的一个或多个成员元素

*/

public static void srem(String key, String value) {

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.srem(key, value);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

/*

* 修改主键名称

*/

public static void rename(String oldKey , String newKey) {

 

Jedis jedis = null;

try {

jedis = jc.getResource();

jedis.rename(oldKey, newKey);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

}

 

/*

* 批量添加,一次插入不超过1024

*/

public static void batchSaddCache(String[] dstUsers, final String key)  {

 

//一次添加上限1024

if(dstUsers.length > 1024)  {

 

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;//总页数

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

 

try {

 

final CountDownLatch countDownLatch = new CountDownLatch(pagecount);

 

for(int i = 1; i <= pagecount; i++) {

 

List<String> subList = null;

if (m == 0) {

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}else{

if(i == pagecount){

subList = arrays.subList((i - 1) * pagesize, totalcount);

}else{

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}

}

final List<String> finalSubList = subList;

batchAddRedisCache.submit(new  Runnable() {

public void run() {

try {

String[] strings = new String[finalSubList.size()];

 

finalSubList.toArray(strings);

JedisClient.sadd(key,strings);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

countDownLatch.countDown();

}

 

}

});

}

countDownLatch.await();

} catch (Exception ex) {

log.error("",ex);

 

} else {

 

if(dstUsers.length <= 0) {

 

} else {

JedisClient.sadd(key,dstUsers);

}

}

}

/*

* 如果超过10000条数据,需要循环读取

*/

public static Set<String> scan(String key) {

 

Jedis jedis = null;

try {

 

jedis = jc.getResource();

Long count = jedis.scard(key);

    if(count > 10000) {

      Set<String> result = new HashSet<String>();

      ScanParams scanParams = new ScanParams();

  scanParams.count(2000);

  String counter = ScanParams.SCAN_POINTER_START;

 

  while(true){

  

  ScanResult<String> ret = jedis.sscan(key, counter, scanParams);

  counter = ret.getStringCursor();

  result.addAll(ret.getResult());

  

  if(result.size() >= count || counter.equals("0")) {//遍历完成

  break;

  }

  }

  return result;

    } else {

    return jedis.smembers(key);

    }

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

return new HashSet<String>();

}

/*

* 将给定集合的交集存储在指定的集合

*/

public static Long sinterstore(String key, String... keys) {

Jedis jedis = null;

try {

 

jedis = jc.getResource();

return jedis.sinterstore(key, keys);

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return 0L;

}

/*

* 元素是否是集合的成员

*/

public static boolean sismember(String key, String member) {

 

 

Jedis jedis = null;

try {

jedis = jc.getResource();

return jedis.sismember(key, member);

 

} catch (Exception ex) {

log.error("",ex);

} finally {

if(jedis != null) {

jedis.close();

}

}

 

return false;

}

 

public static void main(String[] args) {

/*

String dstKey = "dstTestKey";

 

 

JedisClient.sadd("testKey1", "nihao");

JedisClient.sadd("testKey1", "hello");

JedisClient.sadd("testKey1", "word");

 

JedisClient.sadd("testKey3", "nihao");

JedisClient.sadd("testKey3", "this is text");

JedisClient.sadd("testKey3", "wo");

 

 

JedisClient.sunionstore(dstKey, new String[] {"testKey1", "testKey3"});

 

Set<String> reuslt1 = JedisClient.smembers(dstKey);

System.out.println(reuslt1);

 

JedisClient.sadd("testKey4", "key4");

JedisClient.sadd("testKey4", "key4 hahah");

JedisClient.sadd("testKey4", "key4 hahawo");

 

        JedisClient.sunionstore(dstKey, new String[] {"testKey4"});

 

Set<String> reuslt3 = JedisClient.smembers(dstKey);

System.out.println(reuslt3);

 

JedisClient.sadd("testKey2", "word");

JedisClient.sadd("testKey2", "xxx");

JedisClient.sadd("testKey2", "test");

 

JedisClient.sdiffstore(dstKey, new String[] {dstKey, "testKey2"});

 

Set<String> reuslt2 = JedisClient.smembers(dstKey);

System.out.println(reuslt2);*/

 

Set<String> set = new HashSet<String>();

set.add("USER_GROUP_CACHE_KEY_A3601099727_0");

set.add("USER_GROUP_CACHE_KEY_A110163759_0");

String bd = "USER_GROUP_CACHE_KEY_"+"A110163759";

for (String a : set) {

boolean flag = a.indexOf(bd) != -1;

System.out.println(flag);

}

System.out.println("jja")

}

}

 

 

 /****************************************JedisClusterClient.java***************************************************/

package com.avit.cache.redis;

 

import java.util.Arrays;

import java.util.HashSet;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Set;

import java.util.concurrent.CountDownLatch;

 

import com.avit.common.queue.TaskConsumer;

import com.avit.util.SysConfig;

import redis.clients.jedis.HostAndPort;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisCluster;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

 

public class JedisClusterClient {

 

private static JedisCluster jc;

 

static {

String nodes = SysConfig.getSystemConfig("redis.nodes", "");

int maxTotal = Integer.parseInt(SysConfig.getSystemConfig("redis.maxTotal", "100"));

int maxIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.maxIdle", "20"));

int minIdle = Integer.parseInt(SysConfig.getSystemConfig("redis.minIdle", "10"));

int maxWaitMillis = Integer.parseInt(SysConfig.getSystemConfig("redis.MaxWaitMillis", "-1"));

 

Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();

for (String node : nodes.split(",")) {

String host = node.split(":")[0];

int port = Integer.parseInt(node.split(":")[1]);

jedisClusterNodes.add(new HostAndPort(host, port));

}

 

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

// 资源池中最大连接数

jedisPoolConfig.setMaxTotal(maxTotal);

// 资源池允许最大空闲的连接数

jedisPoolConfig.setMaxIdle(maxIdle);

// 资源池确保最少空闲的连接数

jedisPoolConfig.setMinIdle(minIdle);

// 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效

jedisPoolConfig.setBlockWhenExhausted(true);

// 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)

jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);

// 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnBorrow(true);

// 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除

jedisPoolConfig.setTestOnReturn(false);

 

// 是否开启空闲资源监测

jedisPoolConfig.setTestWhileIdle(true);

// 空闲资源的检测周期(单位为毫秒),-1:不检测

jedisPoolConfig.setTimeBetweenEvictionRunsMillis(30 * 1000);

// 资源池中资源最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除

jedisPoolConfig.setMinEvictableIdleTimeMillis(60 * 1000);

// 做空闲资源检测时,每次的采样数

// 可根据自身应用连接数进行微调,如果设置为-1,就是对所有连接做空闲监测

jedisPoolConfig.setNumTestsPerEvictionRun(-1);

 

jc = new JedisCluster(jedisClusterNodes, jedisPoolConfig);

}

 

public static String buildKey(String keyPre, String code) {

        StringBuffer sb = new StringBuffer();

        sb.append(keyPre).append("_").append(code);

        return sb.toString();

    }

 

public static void set(String key, String value) {

jc.set(key, value);

}

 

public static String get(String key) {

String value = jc.get(key);

return value;

}

 

public static void sadd(String key, String member) {

jc.sadd(key, member);

}

 

public static void sadd(String key, String... member) {

jc.sadd(key, member);

}

 

public static void srem(String key, String member) {

jc.srem(key, member);

}

 

public static void srem(String key, String[] member) {

jc.srem(key, member);

}

 

public static Set<String> smembers(String key) {

return jc.smembers(key);

}

 

public static void expire(String key, int seconds) {

jc.expire(key, seconds);

}

 

public static String spop(String key) {

return jc.spop(key);

}

 

public static boolean sismember(String key, String member) {

return jc.sismember(key, member);

}

 

public static String srandmember(String key) {

 

return jc.srandmember(key);

}

 

 

public static Long lpush(String key, String content) {

 

return jc.lpush(key, content);

 

public static String rpop (String key) {

return jc.rpop(key);

}

 

public static long incr(String key) {

return jc.incr(key);

}

 

public static Long rpush(String key, String... content) {

 

return jc.rpush(key, content);

 

public static JedisClusterPipeline getJedisClusterPipeline()  {

return JedisClusterPipeline.pipelined(jc);

}

 

public static long sunionstore(String dstkey , String... keys) {

return jc.sunionstore(dstkey, keys);

}

 

public static long sdiffstore(String dstkey , String... keys) {

return jc.sdiffstore(dstkey, keys);

}

 

public static long scard(String dstkey) {

return jc.scard(dstkey);

}

 

public static long zadd(String key,double score, String member) {

return jc.zadd(key, score, member);

}

public static long zcard(String key) {

return jc.zcard(key);

}

public static long zremrangeByScore(String key, String start, String end) {

return jc.zremrangeByScore(key, start, end);

}

 

 

/**

* 判断集群是否有节点挂掉

* @return

*/

public static boolean isNodesWell(){

Map<String, JedisPool> clusterNodes = jc.getClusterNodes();

try {

for (Entry<String, JedisPool> enty : clusterNodes.entrySet()) {

Jedis jd = enty.getValue().getResource();

if (jd.isConnected()) jd.close();

}

} catch (Exception e) {

return false ;

}

return true ;

}

 

/**

* 判断key是否存在

*/

public static Boolean exists(String key) {

return jc.exists(key);

}

 

public static Long delete(String... keys) {

return jc.del(keys);

}

 

 

public static void batchSaddCache(String[] dstUsers, final String key)  {

 

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

 

}

 

 

try {

final CountDownLatch countDownLatch = new CountDownLatch(pagecount);

for(int i = 1; i <= pagecount; i++) {

 

 

List<String> subList = null;

if (m == 0) {

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}else{

if(i == pagecount){

subList = arrays.subList((i - 1) * pagesize, totalcount);

}else{

subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

}

}

final List<String> finalSubList = subList;

TaskConsumer.batchAddRedisCache.submit(new  Runnable() {

public void run() {

try {

String[] strings = new String[finalSubList.size()];

 

finalSubList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

} catch (Exception ex) {

System.out.println(ex);

} finally {

countDownLatch.countDown();

}

 

}

});

 

}

countDownLatch.await();

} catch (Exception ex) {

 

 

 

} else {

if(dstUsers.length <= 0) {

 

} else {

JedisClusterClient.sadd(key,dstUsers);

}

 

}

}

 

 

 

public static void batchSaddCache(String[] dstUsers, String key, int dstUserNumOneKey) {

 

 

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

for(int i = 1; i <= pagecount; i++) {

if (m == 0) {

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

 

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

}else{

if(i == pagecount){

List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

}else{

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.sadd(key,strings);

 

}

}

}

 

} else {

JedisClusterClient.sadd(key,dstUsers);

}

}

 

 

public static void batchSrem(String offMsgKey, String[] dstUsers) {

if(dstUsers.length > 1024)  {//一次添加超过1024个元素 ,循环插入

List<String> arrays = Arrays.asList(dstUsers);

int pagesize = 1024;

int totalcount = arrays.size();

int pagecount = 0;

int m = totalcount % pagesize;

if(m > 0){

pagecount = totalcount / pagesize + 1;

}else{

pagecount = totalcount / pagesize;

}

for(int i = 1; i <= pagecount; i++) {

if (m == 0) {

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

 

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

}else{

if(i == pagecount){

List<String> subList = arrays.subList((i - 1) * pagesize, totalcount);

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

 

}else{

List<String> subList = arrays.subList((i - 1) * pagesize, pagesize * (i));

String[] strings = new String[subList.size()];

 

subList.toArray(strings);

JedisClusterClient.srem(offMsgKey,strings);

 

}

}

}

 

} else {

JedisClusterClient.srem(offMsgKey,dstUsers);

}

 

}

 

public static void main(String[] args) {

 

 

JedisClusterClient.set("you","testMSg hello");

String foo1 = JedisClusterClient.get("you");

System.out.println(foo1);

 

String foo = JedisClusterClient.get("foo");

System.out.println(foo);

 

String umrKey = "UMR_0__VodS04194";

long t1 = System.currentTimeMillis();

Set<String> msgCodes = JedisClusterClient.smembers(umrKey);

long t2 = System.currentTimeMillis();

System.out.println("set size is " + msgCodes.size() + " cost " + (t2 - t1) + "ms");

 

long t3 = System.currentTimeMillis();

String code = JedisClusterClient.srandmember(umrKey);

long t4 = System.currentTimeMillis();

System.out.println("get random value "+ code + " cost " + (t4 - t3) + "ms");

 

}

/*************************************************************JedisClusterPipeline.java*************/

package com.avit.cache.redis;

 

import java.io.Closeable;

import java.lang.reflect.Field;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.Queue;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import redis.clients.jedis.BinaryJedisCluster;

import redis.clients.jedis.Client;

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisCluster;

import redis.clients.jedis.JedisClusterConnectionHandler;

import redis.clients.jedis.JedisClusterInfoCache;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisSlotBasedConnectionHandler;

import redis.clients.jedis.PipelineBase;

import redis.clients.jedis.exceptions.JedisMovedDataException;

import redis.clients.jedis.exceptions.JedisRedirectionException;

import redis.clients.util.JedisClusterCRC16;

import redis.clients.util.SafeEncoder;

 

/**

 * 在集群模式下提供批量操作的功能。 <br/>

 * 由于集群模式存在节点的动态添加删除,且client不能实时感知(只有在执行命令时才可能知道集群发生变更),

 * 因此,该实现不保证一定成功,建议在批量操作之前调用 refreshCluster() 方法重新获取集群信息。<br />

 * 应用需要保证不论成功还是失败都会调用close() 方法,否则可能会造成泄露。<br/>

 * 如果失败需要应用自己去重试,因此每个批次执行的命令数量需要控制。防止失败后重试的数量过多。<br />

 * 基于以上说明,建议在集群环境较稳定(增减节点不会过于频繁)的情况下使用,且允许失败或有对应的重试策略。<br />

 */

public class JedisClusterPipeline extends PipelineBase implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class);

 

// 部分字段没有对应的获取方法,只能采用反射来做

// 你也可以去继承JedisCluster和JedisSlotBasedConnectionHandler来提供访问接口

private static final Field FIELD_CONNECTION_HANDLER;

private static final Field FIELD_CACHE; 

static {

FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");

FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");

}

 

private JedisSlotBasedConnectionHandler connectionHandler;

private JedisClusterInfoCache clusterInfoCache;

private Queue<Client> clients = new LinkedList<Client>();// 根据顺序存储每个命令对应的Client

private Map<JedisPool, Jedis> jedisMap = new HashMap<JedisPool, Jedis>();// 用于缓存连接

private boolean hasDataInBuf = false;// 是否有数据在缓存区

 

/**

* 根据jedisCluster实例生成对应的JedisClusterPipeline

* @param 

* @return

*/

public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) {

JedisClusterPipeline pipeline = new JedisClusterPipeline();

    pipeline.setJedisCluster(jedisCluster);

    return pipeline;

}

 

public JedisClusterPipeline() {

}

 

public void setJedisCluster(JedisCluster jedis) {

connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER);

clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);

}

 

/**

* 刷新集群信息,当集群信息发生变更时调用

* @param 

* @return

*/

public void refreshCluster() {

connectionHandler.renewSlotCache();

}

 

/**

* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化

*/

public void sync() {

innerSync(null);

}

 

/**

* 同步读取所有数据 并按命令顺序返回一个列表

* @return 按照命令的顺序返回所有的数据

*/

public List<Object> syncAndReturnAll() {

List<Object> responseList = new ArrayList<Object>();

 

innerSync(responseList);

 

return responseList;

}

 

private void innerSync(List<Object> formatted) {

try {

for (Client client : clients) {

// 在sync()调用时其实是不需要解析结果数据的,但是如果不调用get方法,发生了JedisMovedDataException这样的错误应用是不知道的,因此需要调用get()来触发错误。

// 其实如果Response的data属性可以直接获取,可以省掉解析数据的时间,然而它并没有提供对应方法,要获取data属性就得用反射,不想再反射了,所以就这样了

Object data = generateResponse(client.getOne()).get();

if (null != formatted) {

formatted.add(data);

}

}

} catch (JedisRedirectionException jre) {

if (jre instanceof JedisMovedDataException) {

// if MOVED redirection occurred, rebuilds cluster's slot cache,

// recommended by Redis cluster specification

refreshCluster();

}

 

throw jre;

} finally {

// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染

for (Jedis jedis : jedisMap.values()) {

flushCachedData(jedis);

}

 

hasDataInBuf = false;

close();

}

}

 

@Override

public void close() {

clean();

 

clients.clear();

 

for (Jedis jedis : jedisMap.values()) {

if (hasDataInBuf) {

flushCachedData(jedis);

}

 

jedis.close();

}

 

jedisMap.clear();

 

hasDataInBuf = false;

}

 

private void flushCachedData(Jedis jedis) {

try {

jedis.getClient().getAll();

} catch (RuntimeException ex) {

// 其中一个client出问题,后面出问题的几率较大

LOGGER.error("flushCachedData error...", ex);

}

}

 

@Override

protected Client getClient(String key) {

byte[] bKey = SafeEncoder.encode(key);

 

return getClient(bKey);

}

 

@Override

protected Client getClient(byte[] key) {

Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key));

 

Client client = jedis.getClient();

clients.add(client);

 

return client;

}

 

private Jedis getJedis(int slot) {

JedisPool pool = clusterInfoCache.getSlotPool(slot);

 

// 根据pool从缓存中获取Jedis

Jedis jedis = jedisMap.get(pool);

if (null == jedis) {

jedis = pool.getResource();

jedisMap.put(pool, jedis);

}

 

hasDataInBuf = true;

return jedis;

}

 

private static Field getField(Class<?> cls, String fieldName) {

try {

Field field = cls.getDeclaredField(fieldName);

field.setAccessible(true);

 

return field;

} catch (Exception e) {

throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e);

}

}

 

@SuppressWarnings({"unchecked" })

private static <T> T getValue(Object obj, Field field) {

try {

return (T)field.get(obj);

} catch (Exception e) {

LOGGER.error("get value fail", e);

 

throw new RuntimeException(e);

}

}

}

 

 

 

分享到:
评论

相关推荐

    白色宽屏风格的芭蕾舞蹈表演企业网站模板.rar

    白色宽屏风格的芭蕾舞蹈表演企业网站模板.rar

    5个小游戏源代码和图片、音频等资源

    由一个精美的UI集成界面和5个小游戏组成(球球大作战,坦克大战,飞机大战,球球消消乐,贪吃蛇)

    基于Python和OpenCV的电梯开关门视频门位置识别技术实现

    内容概要:本文介绍了如何使用Python和OpenCV库识别电梯开关门视频中的门位置。具体步骤包括将视频帧转换为灰度图像、应用高斯模糊减少噪声、使用Canny边缘检测算法检测图像边缘、查找和筛选轮廓、确定门的位置并在视频中绘制边界框。该方法适用于门的颜色或纹理与周围环境有明显区别的场景。 适合人群:计算机视觉领域的开发者和研究人员,尤其是对图像处理感兴趣的读者。 使用场景及目标:主要用于监控和安全系统中,对电梯开关门进行自动化检测,确保安全运行。 阅读建议:在理解和实践过程中,建议读者熟悉Python和OpenCV的基本操作,并尝试调整参数以适应不同的视频环境。

    48页-智慧工地可视化解决方案.pdf

    智慧工地,作为现代建筑施工管理的创新模式,以“智慧工地云平台”为核心,整合施工现场的“人机料法环”关键要素,实现了业务系统的协同共享,为施工企业提供了标准化、精益化的工程管理方案,同时也为政府监管提供了数据分析及决策支持。这一解决方案依托云网一体化产品及物联网资源,通过集成公司业务优势,面向政府监管部门和建筑施工企业,自主研发并整合加载了多种工地行业应用。这些应用不仅全面连接了施工现场的人员、机械、车辆和物料,实现了数据的智能采集、定位、监测、控制、分析及管理,还打造了物联网终端、网络层、平台层、应用层等全方位的安全能力,确保了整个系统的可靠、可用、可控和保密。 在整体解决方案中,智慧工地提供了政府监管级、建筑企业级和施工现场级三类解决方案。政府监管级解决方案以一体化监管平台为核心,通过GIS地图展示辖区内工程项目、人员、设备信息,实现了施工现场安全状况和参建各方行为的实时监控和事前预防。建筑企业级解决方案则通过综合管理平台,提供项目管理、进度管控、劳务实名制等一站式服务,帮助企业实现工程管理的标准化和精益化。施工现场级解决方案则以可视化平台为基础,集成多个业务应用子系统,借助物联网应用终端,实现了施工信息化、管理智能化、监测自动化和决策可视化。这些解决方案的应用,不仅提高了施工效率和工程质量,还降低了安全风险,为建筑行业的可持续发展提供了有力支持。 值得一提的是,智慧工地的应用系统还围绕着工地“人、机、材、环”四个重要因素,提供了各类信息化应用系统。这些系统通过配置同步用户的组织结构、智能权限,结合各类子系统应用,实现了信息的有效触达、问题的及时跟进和工地的有序管理。此外,智慧工地还结合了虚拟现实(VR)和建筑信息模型(BIM)等先进技术,为施工人员提供了更为直观、生动的培训和管理工具。这些创新技术的应用,不仅提升了施工人员的技能水平和安全意识,还为建筑行业的数字化转型和智能化升级注入了新的活力。总的来说,智慧工地解决方案以其创新性、实用性和高效性,正在逐步改变建筑施工行业的传统管理模式,引领着建筑行业向更加智能化、高效化和可持续化的方向发展。

    基于stm32人体健康监测系统,包含pcb (心率,血氧,体温,语音播报,报警) 本设计采用STM32F103C8T6作为主控 使用MAX30102采集心率和血氧值 使用MLX90614测量体温 OL

    基于stm32人体健康监测系统,包含pcb (心率,血氧,体温,语音播报,报警) 本设计采用STM32F103C8T6作为主控 使用MAX30102采集心率和血氧值 使用MLX90614测量体温 OLED显示当前信息 语音播报使用SYN6658芯片,外围自己搭建,播放当前温度、心率、血氧 两个按键一个蜂鸣器警报,当体温、心率、血氧异常发出警报 资料包括源码,原理图,pcb,bom清单,都是原始文件

    白色简洁风格的流行音乐演奏整站网站源码下载.zip

    白色简洁风格的流行音乐演奏整站网站源码下载.zip

    白色简洁风格的透明登录界面整站网站源码下载.zip

    白色简洁风格的透明登录界面整站网站源码下载.zip

    在线教育平台:课程管理与学习分析

    随着学业负担的日益加重,越来越多的学生选择通过家教、自学或参加补习班来加强课外学习。然而,家教费用高昂,自学效率低下且难以及时解决疑难问题,而补习班则受限于时间和地点,灵活性不足。此外,国家政策也不鼓励校外补习。鉴于网络技术的成熟和各类在线平台的兴起,开发一个专业的在线辅助学习网站对于辅助学生的课外学习显得尤为重要。 本在线教育系统基于Vue.js构建,采用B/S架构设计,后端语言为Java,数据库使用MySQL。通过整合Vue.js技术,系统界面更加丰富和友好。系统主要面向课程购买用户,涉及的角色包括管理员、学生和教师。学生可以注册登录后浏览课程视频、收藏课程、留言并购买课程,同时实现订单管理。管理员负责管理学生信息、课程信息、发布班级和管理章节等。教师则可以管理课程订单、课程内容和章节。该系统允许学生利用碎片时间自主学习,具有很高的灵活性,对于难以理解的课程可以反复学习并在线提问,极大地促进了学生的学习。

    GaAs限幅器芯片:LCLM0002P1,工作频段DC-3Ghz

    GaAs限幅器芯片:LCLM0002P1,工作频段DC-3Ghz

    基于simulink的12 8开关磁阻电机电流斩波、角度位置调速控制、模型预测电流、转矩控制仿真程序

    基于simulink的12 8开关磁阻电机电流斩波、角度位置调速控制、模型预测电流、转矩控制仿真程序

    白色简洁风格的时尚室内设计整站网站源码下载.zip

    白色简洁风格的时尚室内设计整站网站源码下载.zip

    HTTP请求流程深入解析与性能优化技术指南

    内容概要:本文详细解析了HTTP请求的整个流程,包括用户请求发起、请求报文构建、服务器处理请求、响应报文生成、网络传输响应和浏览器接收响应六个阶段。每个阶段的内容均涵盖了关键步骤和技术细节,如DNS解析、TCP连接、缓存策略、HTTP/2性能提升、HTTPS加密等。通过这些内容,读者可以全面理解HTTP请求的完整流程。 适合人群:具备一定网络基础知识的前端、后端开发人员及IT运维人员。 使用场景及目标:适用于希望深入了解HTTP协议及其优化技术的技术人员,有助于提升系统的性能和安全性,优化用户体验。 阅读建议:本文内容详尽且涉及多个关键技术点,建议读者结合实际案例进行学习,逐步理解和掌握各个阶段的技术细节和优化方法。

    2023-04-06-项目笔记 - 第三百五十九阶段 - 4.4.2.357全局变量的作用域-357 -2025.12.26

    2023-04-06-项目笔记-第三百五十九阶段-课前小分享_小分享1.坚持提交gitee 小分享2.作业中提交代码 小分享3.写代码注意代码风格 4.3.1变量的使用 4.4变量的作用域与生命周期 4.4.1局部变量的作用域 4.4.2全局变量的作用域 4.4.2.1全局变量的作用域_1 4.4.2.357局变量的作用域_357- 2024-12-26

    白色简洁风格的互联网推广企业网站源码下载.zip

    白色简洁风格的互联网推广企业网站源码下载.zip

    HTTP协议基础概念解析及其演进过程

    内容概要:本文详细解析了HTTP协议的发展历程,从HTTP/1.0到HTTP/3.0的各个版本演进特点,以及HTTP请求与响应的基本概念。此外,还分析了HTTP报文的结构,包括请求报文和响应报文的具体组成部分,并介绍了HTTPS协议的安全机制。 适合人群:网络开发人员、Web开发者以及对HTTP协议有深入了解需求的技术人员。 使用场景及目标:①理解和掌握HTTP协议的基本概念和工作原理;②了解HTTP协议各版本的改进及应用场景;③学习HTTPS协议的加密机制及其重要性。 阅读建议:本文详细解析了HTTP协议的基础概念和各个版本的演进过程,适合希望深入理解HTTP协议的技术人员阅读。可以结合实际项目中遇到的问题来加深对协议的理解。

    (7483032)OA办公系统源码

    OA办公系统源码是开发企业级管理软件的重要组成部分,它基于C#编程语言,利用ASP.NET框架,并结合SQL数据库技术,为企业的日常运营提供高效、便捷的自动化办公环境。源码是开发者的心血结晶,包含了从需求分析到设计、编码、测试的全过程,体现了软件工程的实践与理论知识。下面我们将深入探讨这些关键知识点。 C#是微软公司推出的一种面向对象的编程语言,广泛应用于Windows平台的开发。在OA办公系统中,C#提供了丰富的类库和工具,支持事件驱动编程,使得代码更简洁,可读性更强。C#支持泛型、自动垃圾回收、异常处理等特性,提高了程序的稳定性和安全性。 ASP.NET是微软构建Web应用程序的开发框架,它基于.NET Framework,提供了丰富的控件和模板,简化了网页开发。在OA办公系统源码中,ASP.NET负责处理HTTP请求,呈现动态内容,实现用户交互。MVC(Model-View-Controller)模式是ASP.NET常用的开发模式,它将业务逻辑、数据模型和用户界面分离,便于维护和扩展。 SQL(Structured Query Language)是用于管理和处理关系数据库的标准语

    (179617412)永磁同步电机无位置传感器控制,采用的是龙贝格,基于模型的 定点开发,仿真效果和实际95%高度吻合,可以仿真学习,也可以直接移植到

    永磁同步电机无位置传感器控制,采用的是龙贝格,基于模型的 定点开发,仿真效果和实际95%高度吻合,可以仿真学习,也可以直接移植到项目中。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。

    三相逆变 单相 三相逆变器 SPWM -stm32主控(输入、输出具体可根据需要设定),本逆变器可以二次开发 本内容只包括 逆变程序,实现变频(0~100Hz)、变压调节,均有外接按键控制(使用

    三相逆变 单相 三相逆变器 SPWM ---stm32主控(输入、输出具体可根据需要设定),本逆变器可以二次开发。 本内容只包括 逆变程序,实现变频(0~100Hz)、变压调节,均有外接按键控制(使用C语言实现)。

    白色简洁风格的旅行记录整站网站源码下载.zip

    白色简洁风格的旅行记录整站网站源码下载.zip

    白色简洁风格的金融会计行业企业网站模板.rar

    白色简洁风格的金融会计行业企业网站模板.rar

Global site tag (gtag.js) - Google Analytics