- 浏览: 202144 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
budairenqin:
budairenqin 写道carlosfu 写道膜拜一下,要 ...
写个RPC娱乐一下 -
budairenqin:
carlosfu 写道膜拜一下,要是把实现过程写个博客或者文档 ...
写个RPC娱乐一下 -
dengchang1:
好文章。 详细看了《Netty源码细节1--IO线程(Even ...
Netty源码细节3--accept(Linux os层 + Netty层代码细节) -
carlosfu:
膜拜一下,要是把实现过程写个博客或者文档就更赞了
写个RPC娱乐一下 -
budairenqin:
I_am_rookie 写道你好!能把安装包发我一下吗?我找了 ...
CentOS 6.3 X86_64安装MySQL 5.5.28 64-Bit RPM以及my.cnf配置
最近项目中使用redis,学习了一下,client端使用jedis-2.1.0
首先是一个redis实现的跨jvm的lock,
接着是一个简单封装的工具类,也对pipeline处理进行了几个常用的封装
然后是对应Spring的相关配置
Spring配置文件:
超时部分我用的redis.expire(),超时是redis来计算的,上面代码是自己计算的(System.currentTimeMillis()),但这么计算应该是有偏差的,因为在集群等环境中不同机器的时间不一定一样的,所以超时应该交给redis来处理,我只是简单看了代码,可能有说错的地方,见谅
redis.clients.jedis.Jedis 有
ShardedJedis肯定没有了
哦哦,官网上说是为了效率,所以分片的没有keys
redis.clients.jedis.Jedis 有
ShardedJedis肯定没有了
首先是一个redis实现的跨jvm的lock,
接着是一个简单封装的工具类,也对pipeline处理进行了几个常用的封装
然后是对应Spring的相关配置
public class RedisLock { /** 加锁标志 */ public static final String LOCKED = "TRUE"; /** 毫秒与毫微秒的换算单位 1毫秒 = 1000000毫微秒 */ public static final long MILLI_NANO_CONVERSION = 1000 * 1000L; /** 默认超时时间(毫秒) */ public static final long DEFAULT_TIME_OUT = 1000; public static final Random RANDOM = new Random(); /** 锁的超时时间(秒),过期删除 */ public static final int EXPIRE = 3 * 60; private ShardedJedisPool shardedJedisPool; private ShardedJedis jedis; private String key; // 锁状态标志 private boolean locked = false; /** * This creates a RedisLock * @param key key * @param shardedJedisPool 数据源 */ public RedisLock(String key, ShardedJedisPool shardedJedisPool) { this.key = key + "_lock"; this.shardedJedisPool = shardedJedisPool; this.jedis = this.shardedJedisPool.getResource(); } /** * 加锁 * 应该以: * lock(); * try { * doSomething(); * } finally { * unlock(); * } * 的方式调用 * @param timeout 超时时间 * @return 成功或失败标志 */ public boolean lock(long timeout) { long nano = System.nanoTime(); timeout *= MILLI_NANO_CONVERSION; try { while ((System.nanoTime() - nano) < timeout) { if (this.jedis.setnx(this.key, LOCKED) == 1) { this.jedis.expire(this.key, EXPIRE); this.locked = true; return this.locked; } // 短暂休眠,避免出现活锁 Thread.sleep(3, RANDOM.nextInt(500)); } } catch (Exception e) { throw new RuntimeException("Locking error", e); } return false; } /** * 加锁 * 应该以: * lock(); * try { * doSomething(); * } finally { * unlock(); * } * 的方式调用 * @param timeout 超时时间 * @param expire 锁的超时时间(秒),过期删除 * @return 成功或失败标志 */ public boolean lock(long timeout, int expire) { long nano = System.nanoTime(); timeout *= MILLI_NANO_CONVERSION; try { while ((System.nanoTime() - nano) < timeout) { if (this.jedis.setnx(this.key, LOCKED) == 1) { this.jedis.expire(this.key, expire); this.locked = true; return this.locked; } // 短暂休眠,避免出现活锁 Thread.sleep(3, RANDOM.nextInt(500)); } } catch (Exception e) { throw new RuntimeException("Locking error", e); } return false; } /** * 加锁 * 应该以: * lock(); * try { * doSomething(); * } finally { * unlock(); * } * 的方式调用 * @return 成功或失败标志 */ public boolean lock() { return lock(DEFAULT_TIME_OUT); } /** * 解锁 * 无论是否加锁成功,都需要调用unlock * 应该以: * lock(); * try { * doSomething(); * } finally { * unlock(); * } * 的方式调用 */ public void unlock() { try { if (this.locked) { this.jedis.del(this.key); } } finally { this.shardedJedisPool.returnResource(this.jedis); } } }
/** * 内存数据库Redis的辅助类,负责对内存数据库的所有操作 * @version V1.0 * @author fengjc */ public class RedisUtil { // 数据源 private ShardedJedisPool shardedJedisPool; /** * 执行器,{@link com.futurefleet.framework.base.redis.RedisUtil}的辅助类, * 它保证在执行操作之后释放数据源returnResource(jedis) * @version V1.0 * @author fengjc * @param <T> */ abstract class Executor<T> { ShardedJedis jedis; ShardedJedisPool shardedJedisPool; public Executor(ShardedJedisPool shardedJedisPool) { this.shardedJedisPool = shardedJedisPool; jedis = this.shardedJedisPool.getResource(); } /** * 回调 * @return 执行结果 */ abstract T execute(); /** * 调用{@link #execute()}并返回执行结果 * 它保证在执行{@link #execute()}之后释放数据源returnResource(jedis) * @return 执行结果 */ public T getResult() { T result = null; try { result = execute(); } catch (Throwable e) { throw new RuntimeException("Redis execute exception", e); } finally { if (jedis != null) { shardedJedisPool.returnResource(jedis); } } return result; } } /** * 删除模糊匹配的key * @param likeKey 模糊匹配的key * @return 删除成功的条数 */ public long delKeysLike(final String likeKey) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { Collection<Jedis> jedisC = jedis.getAllShards(); Iterator<Jedis> iter = jedisC.iterator(); long count = 0; while (iter.hasNext()) { Jedis _jedis = iter.next(); Set<String> keys = _jedis.keys(likeKey + "*"); count += _jedis.del(keys.toArray(new String[keys.size()])); } return count; } }.getResult(); } /** * 删除 * @param key 匹配的key * @return 删除成功的条数 */ public Long delKey(final String key) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.del(key); } }.getResult(); } /** * 删除 * @param keys 匹配的key的集合 * @return 删除成功的条数 */ public Long delKeys(final String[] keys) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { Collection<Jedis> jedisC = jedis.getAllShards(); Iterator<Jedis> iter = jedisC.iterator(); long count = 0; while (iter.hasNext()) { Jedis _jedis = iter.next(); count += _jedis.del(keys); } return count; } }.getResult(); } /** * 为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。 * 在 Redis 中,带有生存时间的 key 被称为『可挥发』(volatile)的。 * @param key key * @param expire 生命周期,单位为秒 * @return 1: 设置成功 0: 已经超时或key不存在 */ public Long expire(final String key, final int expire) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.expire(key, expire); } }.getResult(); } /** * 一个跨jvm的id生成器,利用了redis原子性操作的特点 * @param key id的key * @return 返回生成的Id */ public long makeId(final String key) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { long id = jedis.incr(key); if ((id + 75807) >= Long.MAX_VALUE) { // 避免溢出,重置,getSet命令之前允许incr插队,75807就是预留的插队空间 jedis.getSet(key, "0"); } return id; } }.getResult(); } /* ======================================Strings====================================== */ /** * 将字符串值 value 关联到 key 。 * 如果 key 已经持有其他值, setString 就覆写旧值,无视类型。 * 对于某个原本带有生存时间(TTL)的键来说, 当 setString 成功在这个键上执行时, 这个键原有的 TTL 将被清除。 * 时间复杂度:O(1) * @param key key * @param value string value * @return 在设置操作成功完成时,才返回 OK 。 */ public String setString(final String key, final String value) { return new Executor<String>(shardedJedisPool) { @Override String execute() { return jedis.set(key, value); } }.getResult(); } /** * 将值 value 关联到 key ,并将 key 的生存时间设为 expire (以秒为单位)。 * 如果 key 已经存在, 将覆写旧值。 * 类似于以下两个命令: * SET key value * EXPIRE key expire # 设置生存时间 * 不同之处是这个方法是一个原子性(atomic)操作,关联值和设置生存时间两个动作会在同一时间内完成,在 Redis 用作缓存时,非常实用。 * 时间复杂度:O(1) * @param key key * @param value string value * @param expire 生命周期 * @return 设置成功时返回 OK 。当 expire 参数不合法时,返回一个错误。 */ public String setString(final String key, final String value, final int expire) { return new Executor<String>(shardedJedisPool) { @Override String execute() { return jedis.setex(key, expire, value); } }.getResult(); } /** * 将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 setStringIfNotExists 不做任何动作。 * 时间复杂度:O(1) * @param key key * @param value string value * @return 设置成功,返回 1 。设置失败,返回 0 。 */ public Long setStringIfNotExists(final String key, final String value) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.setnx(key, value); } }.getResult(); } /** * 返回 key 所关联的字符串值。如果 key 不存在那么返回特殊值 nil 。 * 假如 key 储存的值不是字符串类型,返回一个错误,因为 getString 只能用于处理字符串值。 * 时间复杂度: O(1) * @param key key * @return 当 key 不存在时,返回 nil ,否则,返回 key 的值。如果 key 不是字符串类型,那么返回一个错误。 */ public String getString(final String key) { return new Executor<String>(shardedJedisPool) { @Override String execute() { return jedis.get(key); } }.getResult(); } /** * 批量的 {@link #setString(String, String)} * @param pairs 键值对数组{数组第一个元素为key,第二个元素为value} * @return 操作状态的集合 */ public List<Object> batchSetString(final List<Pair<String, String>> pairs) { return new Executor<List<Object>>(shardedJedisPool) { @Override List<Object> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); for (Pair<String, String> pair : pairs) { pipeline.set(pair.getKey(), pair.getValue()); } return pipeline.syncAndReturnAll(); } }.getResult(); } /** * 批量的 {@link #getString(String)} * @param keys key数组 * @return value的集合 */ public List<String> batchGetString(final String[] keys) { return new Executor<List<String>>(shardedJedisPool) { @Override List<String> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); List<String> result = new ArrayList<String>(keys.length); List<Response<String>> responses = new ArrayList<Response<String>>(keys.length); for (String key : keys) { responses.add(pipeline.get(key)); } pipeline.sync(); for (Response<String> resp : responses) { result.add(resp.get()); } return result; } }.getResult(); } /* ======================================Hashes====================================== */ /** * 将哈希表 key 中的域 field 的值设为 value 。 * 如果 key 不存在,一个新的哈希表被创建并进行 hashSet 操作。 * 如果域 field 已经存在于哈希表中,旧值将被覆盖。 * 时间复杂度: O(1) * @param key key * @param field 域 * @param value string value * @return 如果 field 是哈希表中的一个新建域,并且值设置成功,返回 1 。如果哈希表中域 field 已经存在且旧值已被新值覆盖,返回 0 。 */ public Long hashSet(final String key, final String field, final String value) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.hset(key, field, value); } }.getResult(); } /** * 将哈希表 key 中的域 field 的值设为 value 。 * 如果 key 不存在,一个新的哈希表被创建并进行 hashSet 操作。 * 如果域 field 已经存在于哈希表中,旧值将被覆盖。 * @param key key * @param field 域 * @param value string value * @param expire 生命周期,单位为秒 * @return 如果 field 是哈希表中的一个新建域,并且值设置成功,返回 1 。如果哈希表中域 field 已经存在且旧值已被新值覆盖,返回 0 。 */ public Long hashSet(final String key, final String field, final String value, final int expire) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { Pipeline pipeline = jedis.getShard(key).pipelined(); Response<Long> result = pipeline.hset(key, field, value); pipeline.expire(key, expire); pipeline.sync(); return result.get(); } }.getResult(); } /** * 返回哈希表 key 中给定域 field 的值。 * 时间复杂度:O(1) * @param key key * @param field 域 * @return 给定域的值。当给定域不存在或是给定 key 不存在时,返回 nil 。 */ public String hashGet(final String key, final String field) { return new Executor<String>(shardedJedisPool) { @Override String execute() { return jedis.hget(key, field); } }.getResult(); } /** * 返回哈希表 key 中给定域 field 的值。 如果哈希表 key 存在,同时设置这个 key 的生存时间 * @param key key * @param field 域 * @param expire 生命周期,单位为秒 * @return 给定域的值。当给定域不存在或是给定 key 不存在时,返回 nil 。 */ public String hashGet(final String key, final String field, final int expire) { return new Executor<String>(shardedJedisPool) { @Override String execute() { Pipeline pipeline = jedis.getShard(key).pipelined(); Response<String> result = pipeline.hget(key, field); pipeline.expire(key, expire); pipeline.sync(); return result.get(); } }.getResult(); } /** * 同时将多个 field-value (域-值)对设置到哈希表 key 中。 * 时间复杂度: O(N) (N为fields的数量) * @param key key * @param hash field-value的map * @return 如果命令执行成功,返回 OK 。当 key 不是哈希表(hash)类型时,返回一个错误。 */ public String hashMultipleSet(final String key, final Map<String, String> hash) { return new Executor<String>(shardedJedisPool) { @Override String execute() { return jedis.hmset(key, hash); } }.getResult(); } /** * 同时将多个 field-value (域-值)对设置到哈希表 key 中。同时设置这个 key 的生存时间 * @param key key * @param hash field-value的map * @param expire 生命周期,单位为秒 * @return 如果命令执行成功,返回 OK 。当 key 不是哈希表(hash)类型时,返回一个错误。 */ public String hashMultipleSet(final String key, final Map<String, String> hash, final int expire) { return new Executor<String>(shardedJedisPool) { @Override String execute() { Pipeline pipeline = jedis.getShard(key).pipelined(); Response<String> result = pipeline.hmset(key, hash); pipeline.expire(key, expire); pipeline.sync(); return result.get(); } }.getResult(); } /** * 返回哈希表 key 中,一个或多个给定域的值。如果给定的域不存在于哈希表,那么返回一个 nil 值。 * 时间复杂度: O(N) (N为fields的数量) * @param key key * @param fields field的数组 * @return 一个包含多个给定域的关联值的表,表值的排列顺序和给定域参数的请求顺序一样。 */ public List<String> hashMultipleGet(final String key, final String... fields) { return new Executor<List<String>>(shardedJedisPool) { @Override List<String> execute() { return jedis.hmget(key, fields); } }.getResult(); } /** * 返回哈希表 key 中,一个或多个给定域的值。如果给定的域不存在于哈希表,那么返回一个 nil 值。 * 同时设置这个 key 的生存时间 * @param key key * @param fields field的数组 * @param expire 生命周期,单位为秒 * @return 一个包含多个给定域的关联值的表,表值的排列顺序和给定域参数的请求顺序一样。 */ public List<String> hashMultipleGet(final String key, final int expire, final String... fields) { return new Executor<List<String>>(shardedJedisPool) { @Override List<String> execute() { Pipeline pipeline = jedis.getShard(key).pipelined(); Response<List<String>> result = pipeline.hmget(key, fields); pipeline.expire(key, expire); pipeline.sync(); return result.get(); } }.getResult(); } /** * 批量的{@link #hashMultipleSet(String, Map)},在管道中执行 * @param pairs 多个hash的多个field * @return 操作状态的集合 */ public List<Object> batchHashMultipleSet(final List<Pair<String, Map<String, String>>> pairs) { return new Executor<List<Object>>(shardedJedisPool) { @Override List<Object> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); for (Pair<String, Map<String, String>> pair : pairs) { pipeline.hmset(pair.getKey(), pair.getValue()); } return pipeline.syncAndReturnAll(); } }.getResult(); } /** * 批量的{@link #hashMultipleSet(String, Map)},在管道中执行 * @param data Map<String, Map<String, String>>格式的数据 * @return 操作状态的集合 */ public List<Object> batchHashMultipleSet(final Map<String, Map<String, String>> data) { return new Executor<List<Object>>(shardedJedisPool) { @Override List<Object> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); for (Map.Entry<String, Map<String, String>> iter : data.entrySet()) { pipeline.hmset(iter.getKey(), iter.getValue()); } return pipeline.syncAndReturnAll(); } }.getResult(); } /** * 批量的{@link #hashMultipleGet(String, String...)},在管道中执行 * @param pairs 多个hash的多个field * @return 执行结果的集合 */ public List<List<String>> batchHashMultipleGet(final List<Pair<String, String[]>> pairs) { return new Executor<List<List<String>>>(shardedJedisPool) { @Override List<List<String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); List<List<String>> result = new ArrayList<List<String>>(pairs.size()); List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(pairs.size()); for (Pair<String, String[]> pair : pairs) { responses.add(pipeline.hmget(pair.getKey(), pair.getValue())); } pipeline.sync(); for (Response<List<String>> resp : responses) { result.add(resp.get()); } return result; } }.getResult(); } /** * 返回哈希表 key 中,所有的域和值。在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。 * 时间复杂度: O(N) * @param key key * @return 以列表形式返回哈希表的域和域的值。若 key 不存在,返回空列表。 */ public Map<String, String> hashGetAll(final String key) { return new Executor<Map<String, String>>(shardedJedisPool) { @Override Map<String, String> execute() { return jedis.hgetAll(key); } }.getResult(); } /** * 返回哈希表 key 中,所有的域和值。在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。 * 同时设置这个 key 的生存时间 * @param key key * @param expire 生命周期,单位为秒 * @return 以列表形式返回哈希表的域和域的值。若 key 不存在,返回空列表。 */ public Map<String, String> hashGetAll(final String key, final int expire) { return new Executor<Map<String, String>>(shardedJedisPool) { @Override Map<String, String> execute() { Pipeline pipeline = jedis.getShard(key).pipelined(); Response<Map<String, String>> result = pipeline.hgetAll(key); pipeline.expire(key, expire); pipeline.sync(); return result.get(); } }.getResult(); } /** * 批量的{@link #hashGetAll(String)} * @param keys key的数组 * @return 执行结果的集合 */ public List<Map<String, String>> batchHashGetAll(final String... keys) { return new Executor<List<Map<String, String>>>(shardedJedisPool) { @Override List<Map<String, String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); List<Map<String, String>> result = new ArrayList<Map<String, String>>(keys.length); List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length); for (String key : keys) { responses.add(pipeline.hgetAll(key)); } pipeline.sync(); for (Response<Map<String, String>> resp : responses) { result.add(resp.get()); } return result; } }.getResult(); } /** * 批量的{@link #hashMultipleGet(String, String...)},与{@link #batchHashGetAll(String...)}不同的是,返回值为Map类型 * @param keys key的数组 * @return 多个hash的所有filed和value */ public Map<String, Map<String, String>> batchHashGetAllForMap(final String... keys) { return new Executor<Map<String, Map<String, String>>>(shardedJedisPool) { @Override Map<String, Map<String, String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); // 设置map容量防止rehash int capacity = 1; while ((int) (capacity * 0.75) <= keys.length) { capacity <<= 1; } Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>(capacity); List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length); for (String key : keys) { responses.add(pipeline.hgetAll(key)); } pipeline.sync(); for (int i = 0; i < keys.length; ++i) { result.put(keys[i], responses.get(i).get()); } return result; } }.getResult(); } /* ======================================List====================================== */ /** * 将一个或多个值 value 插入到列表 key 的表尾(最右边)。 * @param key key * @param values value的数组 * @return 执行 listPushTail 操作后,表的长度 */ public Long listPushTail(final String key, final String... values) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.rpush(key, values); } }.getResult(); } /** * 将一个或多个值 value 插入到列表 key 的表头 * @param key key * @param value string value * @return 执行 listPushHead 命令后,列表的长度。 */ public Long listPushHead(final String key, final String value) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.lpush(key, value); } }.getResult(); } /** * 将一个或多个值 value 插入到列表 key 的表头, 当列表大于指定长度是就对列表进行修剪(trim) * @param key key * @param value string value * @param size 链表超过这个长度就修剪元素 * @return 执行 listPushHeadAndTrim 命令后,列表的长度。 */ public Long listPushHeadAndTrim(final String key, final String value, final long size) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { Pipeline pipeline = jedis.getShard(key).pipelined(); Response<Long> result = pipeline.lpush(key, value); // 修剪列表元素, 如果 size - 1 比 end 下标还要大,Redis将 size 的值设置为 end 。 pipeline.ltrim(key, 0, size - 1); pipeline.sync(); return result.get(); } }.getResult(); } /** * 批量的{@link #listPushTail(String, String...)},以锁的方式实现 * @param key key * @param values value的数组 * @param delOld 如果key存在,是否删除它。true 删除;false: 不删除,只是在行尾追加 */ public void batchListPushTail(final String key, final String[] values, final boolean delOld) { new Executor<Object>(shardedJedisPool) { @Override Object execute() { if (delOld) { RedisLock lock = new RedisLock(key, shardedJedisPool); lock.lock(); try { Pipeline pipeline = jedis.getShard(key).pipelined(); pipeline.del(key); for (String value : values) { pipeline.rpush(key, value); } pipeline.sync(); } finally { lock.unlock(); } } else { jedis.rpush(key, values); } return null; } }.getResult(); } /** * 同{@link #batchListPushTail(String, String[], boolean)},不同的是利用redis的事务特性来实现 * @param key key * @param values value的数组 * @return null */ public Object updateListInTransaction(final String key, final List<String> values) { return new Executor<Object>(shardedJedisPool) { @Override Object execute() { Transaction transaction = jedis.getShard(key).multi(); transaction.del(key); for (String value : values) { transaction.rpush(key, value); } transaction.exec(); return null; } }.getResult(); } /** * 在key对应list的尾部部添加字符串元素,如果key存在,什么也不做 * @param key key * @param values value的数组 * @return 执行insertListIfNotExists后,表的长度 */ public Long insertListIfNotExists(final String key, final String[] values) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { RedisLock lock = new RedisLock(key, shardedJedisPool); lock.lock(); try { if (!jedis.exists(key)) { return jedis.rpush(key, values); } } finally { lock.unlock(); } return 0L; } }.getResult(); } /** * 返回list所有元素,下标从0开始,负值表示从后面计算,-1表示倒数第一个元素,key不存在返回空列表 * @param key key * @return list所有元素 */ public List<String> listGetAll(final String key) { return new Executor<List<String>>(shardedJedisPool) { @Override List<String> execute() { return jedis.lrange(key, 0, -1); } }.getResult(); } /** * 返回指定区间内的元素,下标从0开始,负值表示从后面计算,-1表示倒数第一个元素,key不存在返回空列表 * @param key key * @param beginIndex 下标开始索引(包含) * @param endIndex 下标结束索引(不包含) * @return 指定区间内的元素 */ public List<String> listRange(final String key, final long beginIndex, final long endIndex) { return new Executor<List<String>>(shardedJedisPool) { @Override List<String> execute() { return jedis.lrange(key, beginIndex, endIndex - 1); } }.getResult(); } /** * 一次获得多个链表的数据 * @param keys key的数组 * @return 执行结果 */ public Map<String, List<String>> batchGetAllList(final List<String> keys) { return new Executor<Map<String, List<String>>>(shardedJedisPool) { @Override Map<String, List<String>> execute() { ShardedJedisPipeline pipeline = jedis.pipelined(); Map<String, List<String>> result = new HashMap<String, List<String>>(); List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(keys.size()); for (String key : keys) { responses.add(pipeline.lrange(key, 0, -1)); } pipeline.sync(); for (int i = 0; i < keys.size(); ++i) { result.put(keys.get(i), responses.get(i).get()); } return result; } }.getResult(); } /* ======================================Pub/Sub====================================== */ /** * 将信息 message 发送到指定的频道 channel。 * 时间复杂度:O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量。 * @param channel 频道 * @param message 信息 * @return 接收到信息 message 的订阅者数量。 */ public Long publish(final String channel, final String message) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { Jedis _jedis = jedis.getShard(channel); return _jedis.publish(channel, message); } }.getResult(); } /** * 订阅给定的一个频道的信息。 * @param jedisPubSub 监听器 * @param channel 频道 */ public void subscribe(final JedisPubSub jedisPubSub, final String channel) { new Executor<Object>(shardedJedisPool) { @Override Object execute() { Jedis _jedis = jedis.getShard(channel); // 注意subscribe是一个阻塞操作,因为当前线程要轮询Redis的响应然后调用subscribe _jedis.subscribe(jedisPubSub, channel); return null; } }.getResult(); } /** * 取消订阅 * @param jedisPubSub 监听器 */ public void unSubscribe(final JedisPubSub jedisPubSub) { jedisPubSub.unsubscribe(); } /* ======================================Sorted set================================= */ /** * 将一个 member 元素及其 score 值加入到有序集 key 当中。 * @param key key * @param score score 值可以是整数值或双精度浮点数。 * @param member 有序集的成员 * @return 被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。 */ public Long addWithSortedSet(final String key, final double score, final String member) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.zadd(key, score, member); } }.getResult(); } /** * 将多个 member 元素及其 score 值加入到有序集 key 当中。 * @param key key * @param scoreMembers score、member的pair * @return 被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。 */ public Long addWithSortedSet(final String key, final Map<Double, String> scoreMembers) { return new Executor<Long>(shardedJedisPool) { @Override Long execute() { return jedis.zadd(key, scoreMembers); } }.getResult(); } /** * 返回有序集 key 中, score 值介于 max 和 min 之间(默认包括等于 max 或 min )的所有的成员。 * 有序集成员按 score 值递减(从大到小)的次序排列。 * @param key key * @param max score最大值 * @param min score最小值 * @return 指定区间内,带有 score 值(可选)的有序集成员的列表 */ public Set<String> revrangeByScoreWithSortedSet(final String key, final double max, final double min) { return new Executor<Set<String>>(shardedJedisPool) { @Override Set<String> execute() { return jedis.zrevrangeByScore(key, max, min); } }.getResult(); } /* ======================================Other====================================== */ /** * 设置数据源 * @param shardedJedisPool 数据源 */ public void setShardedJedisPool(ShardedJedisPool shardedJedisPool) { this.shardedJedisPool = shardedJedisPool; } /** * 构造Pair键值对 * @param key key * @param value value * @return 键值对 */ public <K, V> Pair<K, V> makePair(K key, V value) { return new Pair<K, V>(key, value); } /** * 键值对 * @version V1.0 * @author fengjc * @param <K> key * @param <V> value */ public class Pair<K, V> { private K key; private V value; public Pair(K key, V value) { this.key = key; this.value = value; } public K getKey() { return key; } public void setKey(K key) { this.key = key; } public V getValue() { return value; } public void setValue(V value) { this.value = value; } } }
Spring配置文件:
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <!-- POOL配置 --> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxActive" value="${redis.jedisPoolConfig.maxActive}" /> <property name="maxIdle" value="${redis.jedisPoolConfig.maxIdle}" /> <property name="maxWait" value="${redis.jedisPoolConfig.maxWait}" /> <property name="testOnBorrow" value="${redis.jedisPoolConfig.testOnBorrow}" /> </bean> <!-- jedis shard信息配置 --> <bean id="jedis.shardInfoCache1" class="redis.clients.jedis.JedisShardInfo"> <constructor-arg index="0" value="${redis.jedis.shardInfoCache1.host}" /> <constructor-arg index="1" type="int" value="${redis.jedis.shardInfoCache1.port}" /> </bean> <bean id="jedis.shardInfoCache2" class="redis.clients.jedis.JedisShardInfo"> <constructor-arg index="0" value="${redis.jedis.shardInfoCache2.host}" /> <constructor-arg index="1" type="int" value="${redis.jedis.shardInfoCache2.port}" /> </bean> <!-- jedis shard pool配置 --> <bean id="shardedJedisPoolCache" class="redis.clients.jedis.ShardedJedisPool"> <constructor-arg index="0" ref="jedisPoolConfig" /> <constructor-arg index="1"> <list> <ref bean="jedis.shardInfoCache1" /> <ref bean="jedis.shardInfoCache2" /> </list> </constructor-arg> </bean> <bean id="redisCache" class="com.**.RedisUtil"> <property name="shardedJedisPool" ref="shardedJedisPoolCache" /> </bean> </beans>
评论
9 楼
qingfeng197
2015-05-22
你这种获取锁和解锁方式 有可能会删除其他客户端获取的锁。因为你是靠redis自身的超时机制删除key 这就有个问题--在这个时间到期后 当前客户端还没有释放锁 但是其他客户端就有可能通过setnx方式返回1获取到锁 此时 当前客户端再执行释放锁 释放的就是其他客户端获取到的锁了
8 楼
hyf_0528
2015-01-28
写的很牛逼的样子!
7 楼
hardPass
2013-11-15
不错,是这么个意思
6 楼
budairenqin
2013-11-15
hardPass 写道
好吧,楼主这个lock,看起来很简单的样子。在网上找到了个这样的,比较复杂罗嗦,大家看看,多出来的判断是啥意思
http://grepcode.com/file_/repo1.maven.org/maven2/com.github.jedis-lock/jedis-lock/1.0.0/com/github/jedis/lock/JedisLock.java/?v=source
public synchronized boolean acquire(Jedis jedis) throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires);
if (jedis.setnx(lockKey, expiresStr) == 1) {
// lock acquired
locked = true;
return true;
}
String currentValueStr = jedis.get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// lock is expired
String oldValueStr = jedis.getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// lock acquired
locked = true;
return true;
}
}
timeout -= 100;
Thread.sleep(100);
}
return false;
}
http://grepcode.com/file_/repo1.maven.org/maven2/com.github.jedis-lock/jedis-lock/1.0.0/com/github/jedis/lock/JedisLock.java/?v=source
public synchronized boolean acquire(Jedis jedis) throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires);
if (jedis.setnx(lockKey, expiresStr) == 1) {
// lock acquired
locked = true;
return true;
}
String currentValueStr = jedis.get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// lock is expired
String oldValueStr = jedis.getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// lock acquired
locked = true;
return true;
}
}
timeout -= 100;
Thread.sleep(100);
}
return false;
}
超时部分我用的redis.expire(),超时是redis来计算的,上面代码是自己计算的(System.currentTimeMillis()),但这么计算应该是有偏差的,因为在集群等环境中不同机器的时间不一定一样的,所以超时应该交给redis来处理,我只是简单看了代码,可能有说错的地方,见谅
5 楼
hardPass
2013-11-15
好吧,楼主这个lock,看起来很简单的样子。在网上找到了个这样的,比较复杂罗嗦,大家看看,多出来的判断是啥意思
http://grepcode.com/file_/repo1.maven.org/maven2/com.github.jedis-lock/jedis-lock/1.0.0/com/github/jedis/lock/JedisLock.java/?v=source
public synchronized boolean acquire(Jedis jedis) throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires);
if (jedis.setnx(lockKey, expiresStr) == 1) {
// lock acquired
locked = true;
return true;
}
String currentValueStr = jedis.get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// lock is expired
String oldValueStr = jedis.getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// lock acquired
locked = true;
return true;
}
}
timeout -= 100;
Thread.sleep(100);
}
return false;
}
http://grepcode.com/file_/repo1.maven.org/maven2/com.github.jedis-lock/jedis-lock/1.0.0/com/github/jedis/lock/JedisLock.java/?v=source
public synchronized boolean acquire(Jedis jedis) throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires);
if (jedis.setnx(lockKey, expiresStr) == 1) {
// lock acquired
locked = true;
return true;
}
String currentValueStr = jedis.get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// lock is expired
String oldValueStr = jedis.getSet(lockKey, expiresStr);
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
// lock acquired
locked = true;
return true;
}
}
timeout -= 100;
Thread.sleep(100);
}
return false;
}
4 楼
SE_XiaoFeng
2013-07-26
好文章呀,放在util包下,当工具类,拿走了哈,非常感谢.
3 楼
znlyj
2013-07-06
budairenqin 写道
znlyj 写道
jedis有keys函数?
redis.clients.jedis.Jedis 有
ShardedJedis肯定没有了
哦哦,官网上说是为了效率,所以分片的没有keys
2 楼
budairenqin
2013-05-28
znlyj 写道
jedis有keys函数?
redis.clients.jedis.Jedis 有
ShardedJedis肯定没有了
1 楼
znlyj
2013-05-27
jedis有keys函数?
相关推荐
这些jar包可能包括`spring-session-data-redis`,`spring-boot-starter-data-redis`,以及Redis客户端库(如`lettuce`或`jedis`)。 2. **配置Redis连接**:接着,需要在Spring Boot的配置文件(如`application....
JUC(Java Concurrency Utilities)是Java并发包,提供了高级并发工具类,如线程池、并发容器、锁、信号量等,帮助开发者更高效、安全地编写多线程程序。ConcurrentHashMap是线程安全的哈希表,ReentrantLock是可重...
以下是一个简单的 Redis 工具类的代码片段,展示了如何使用 Redis 的 List 实现等待序列: ```java public class RedisUcUitl { // ... public static Long lpush(Jedis jedis, final byte[] key, final byte[] ...
`jedis-2.8.1.jar`则是Jedis,这是一个Java客户端,用于连接和操作Redis服务器。Jedis提供了丰富的命令支持,包括基本的键值操作、事务处理、脚本执行等。它是一个轻量级的库,易于集成到任何Java项目中。版本2.8.1...
Java中通常使用Jedis或Lettuce客户端来与Redis交互。 - **缓存策略**:了解如何使用Redis作为缓存,包括LRU(Least Recently Used)最近最少使用淘汰策略、TTL(Time To Live)生存时间等。 3. **JVM运行时数据区*...
它通过将Session数据从传统的JVM内存中移出,存储到一个共享的数据存储(如Redis)中,实现了Session的跨服务器共享。这样,无论用户访问哪个服务器,都能获取到相同的Session信息,保证了用户体验的一致性。 接...
- **SpringIOC和AOP的概念以及实现方式**: - **IOC(Inversion of Control,控制反转)**是Spring的核心特性之一,它通过依赖注入(DI)实现对对象创建和管理的控制反转。 - **AOP(Aspect Oriented Programming...
- 如需在Java应用中使用Redis,还需安装Jedis等Java客户端库,配置连接参数,实现数据交互。 7. Nginx作为反向代理服务: - Nginx是一款高性能的HTTP和反向代理服务器,用于负载均衡和内容缓存。 - 反向代理配置...
│ 09.FastDFS工具类的使用.avi │ 10.图片上传过程分析.avi │ 11.图片上传Service.avi │ 12.图片上传完成.avi │ 13.解决火狐兼容性问题.avi │ 14.spring的父子容器.avi │ 淘淘商城第三天笔记.docx │ ├─04....