遇到的问题
在使用redis的过程中,尤其是在做大数据“实时计算”的过程中,也许会经常遇到下列场景:比如网站每个页面的实时pv运算,使用storm(或者spark streaming)从kafka中消费实时点击流数据进行统计计算;并计算好的结果放入redis进行存储,如果redis中已经存在 需要先取出来与最新统计数据相加再放入redis;另外为了减少与redis的交互次数,降低redis的存取压力,一般不会消费一条数据后就立即放入redis,而是处理一批数据后再存入redis,大致流程如下:
网站实时pv计算中可能比这个图的处理要更复杂些,但大致流程也就这样了。这里我们重点看下redis的相关处理,java伪代码实现如下:
public static void main(String[] args) { Redis redis = null;//省略redis实例化过程 //省略storm计算过程,这里假设已经计算好,并放入一个MAP中 Map<String,Integer> pagesPv = new HashMap(); pagesPv.put("page_index",4); pagesPv.put("page_product",2); pagesPv.put("page_cart",3); pagesPv.put("page_order",1); for(String key:pagesPv.keySet()){ redis.incrBy(key,pagesPv.get(key));//incrBy是redis中的自增操作 } }
首先说下redis中的incrBy方法,可以实现数字类型的自增操作。也可以用redis的get方法先获取到老数据,然后加上新增的值,再调用redis的set方法写入redis。但这就跟redis有两次操作,这里更推荐是用incrBy。
咋一看上述代码其实没啥问题,这里Map里只有4条数据,也就是说上述的for循环需要与redis服务器之间做4次远程调用。但一个稍微知名一点的网站,每分钟可能会数万级别的访问量,对应到这个Map里可能会有数百个页面,也就是说这个真实的情况下,如果使用上述代码的话,这个for循环中与redis服务之间会有数百次远程调用操作,对redis服务来说这无疑是一笔较大的开销。讲到这里该是pipeline登场的时候了。
Pipeline是什么
Pipeline其实是redis的java客户端对redis的基本事务的调用封装。这里所谓的“基本事务”与传统的关系型数据库事务回滚不同,而是按照顺序执行一批命令,在redis中是使用MULTI和EXEC命令实现的,而Pipeline只是对这两个命令的java客户端封装而已。
在redis的客户端中执行事务分三步:首先执行MULTI命令;然后输入一批我们想要执行的其他操作(比如一批set操作),这批操作会被写到一个队列里;最后再执行EXEC命令,客户端会把这上述队列“一次性”的发生到服务端,并等待服务端返回。
再来看服务端:服务端接收到这个命令操作队列后,按照顺序一个一个执行,每个命令操作都会对应一个返回值,待队列中所有的命令都执行完成后“一次性”把这些返回值拼装成list返回给客户端。并且这个返回值list的数量和顺序与命令队列一致。整个pipeline的执行过程结束,在有redis需要命令批量执行的情况下,使用pipeline可以大大减少redis客户端与服务端交互次数,从而提升多命令的执行性能。
回到文章开头的场景,假设Map里有500个值,for循环中就是执行500次incrBy操作,对应的与服务端的交互就需要500次。如果使用pipeline,只需要交互两次即可完成,java伪代码实现如下:
public static void main(String[] args) { Redis redis = null;//省略redis实例化过程 Map<String,Integer> pagesPv = new HashMap(); pagesPv.put("page_index",4); pagesPv.put("page_product",2); pagesPv.put("page_cart",3); pagesPv.put("page_order",1); Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例 for(String key:pagesPv.keySet()){ //注意这里调用Pipeline的incrBy方法,此时命令并没有执行 pipeline.incrBy(key,pagesPv.get(key)); } //执行sync方法,此时批量向服务端提交命令,并等待返回 pipeline.sync(); }
整个过程大致分为三步:
1、通过redis.pipelined()方法,获取pipeline实例,与上述讲解中的MULTI命令对应。
2、以前调用的是redis. incrBy()方法,现在改为调用pipeline的incrBy方法,注意此时不会向服务端发起调用请求,只是把命令写入队列。
3、执行pipeline.sync()方法,与上述流程中的EXEC命令对应,此时会把第2步中的命令队列一次性的提交给服务端,并等待服务端返回。
Pipeline是非线程安全的
Pipeline能提升性能,而且使用起来也非常方便,但使用的时候一定要注意一点“Pipeline是非线程安全的”。也就是说多个线程如果公用一个Pipeline实例,会出现线程安全问题,典型的就是数据返回结果错乱。正确的用法是在每次需要用到Pipeline的地方,都新建一个实例即:
Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例
为什么在多线程下是线程不安全的,其实很好理解,看下图:
A、B两个线程共用一个pipeline实例,同时向redis服务端提交5个命令,各自都期望收到5个返回值。但真实的结果是有一个收到10个结果,有一个会失败,这其实不是我们期望的。
在使用Spring框架的java程序中,redis的客户端对象是线程安全的,可以单例注入spring容器,在需要redis客户端的地方直接使用@Resource直接获取即可。
@Resource private Redis redis; public void method(){ redis.xxx; }
但pipeline是非线程安全的,每次都必须新建实例,如果你的代码中出现了下列代码,请注意会有线程安全问题,请及时修正:
//错误的写法,pipeline被单例注入了spring容器,全局复用 @Resource private Pipeline pipeline; public void method(){ pipeline.xxx; }
Spring中Pipeline的正确用法:
@Resource private Redis redis; public void method(){ Pipeline pipeline = redis.pipelined();//生成新的Pipeline实例 pipeline.xxx;//多个操作 pipeline. sync(); }
最后提一点,Pipeline是一次提交一个队列给服务端,这个队列如果太大会占用更多内存,以及增加网络传输时间。所以 Pipeline里一次提交的命令数也不要太多,根据实际数据量大小 一般几百条还是可以的。
好了,关于redis中的pipeline就总结到这里。
出处
http://moon-walker.iteye.com/blog/2397962
相关推荐
为了解决这一问题,Redis 提供了一种名为 Pipeline 的技术,用于提高查询和操作的效率。 1. Redis 的工作原理: Redis 采用客户端-服务端模型,客户端发送命令到服务端,服务端处理命令并返回结果。这个过程包括...
Redis Pipeline 是 Redis 数据库操作中的一个高效特性,它允许客户端一次性发送多个命令到服务器,而无需等待每个命令的响应。这种技术显著提高了批量处理和高并发环境下的性能,因为减少了网络通信的开销。 首先,...
在处理大量数据导入时,为了提高效率,Redis 提供了一种称为 Pipeline 的技术,允许一次性发送多条命令,减少网络通信的开销。批量导入数据可以显著提升数据处理速度,尤其在需要初始化大量数据或进行数据迁移时。 ...
简单介绍 redis pipeline 的机制,结合一段实例说明pipeline 在提升吞吐量方面发生的效用。 案例背景 应用系统在数据推送或事件处理过程中,往往出现数据流经过多个网元; 然而在某些服务中,数据操作对redis 是强...
kettle如何3秒内写入100万条数据到Redis https://blog.csdn.net/huryer/article/details/106889792
这段代码中,p.hmset()方法不会立即执行,而是将命令添加到Pipeline中,最后通过p.sync()触发实际执行。这种方式显著减少了网络延迟,提高了效率。 其次,批量读取数据时,我们可以使用hgetall来获取整个哈希表的...
6. 在Pipeline中执行批量插入命令,例如`p.hset(key, data)`。 7. 最后,调用`sync()`方法提交所有命令并关闭Jedis连接。 示例代码如下: ```java String key = "key"; Set<HostAndPort> nodes = ... // 集群节点...
标题中的“python使用pipeline批量读写redis的方法”指的是在Python编程环境下,利用Redis数据库的pipeline功能进行高效的数据批量读取和写入操作。这在处理大量数据时能显著提高性能,减少网络通信的延迟。 首先,...
"Go-RedisPipe-具有隐式流水线操作的Go开发的高吞吐量Redis客户端" 指的是一款名为 "RedisPipe" 的高性能 Redis 客户端库,它用 Go 语言编写,并且实现了隐式流水线(Pipeline)功能。流水线在 Redis 操作中是一种...
- **Key空间通知**(Keyspace Notifications):监控Redis中的键事件,如`keyspaceEvents`配置。 ### 6. 应用场景 - **缓存**:利用Redis的高速读写能力,存储经常访问的数据,减少数据库负载。 - **会话管理**:将...
java客户端不是很好支持redis cluster,spring-date-redis和jedis批量提交还不支持,单个提交都是可以的。 为了批量解决批量提交 网上有几个方案,本示例使用了其中一种,demo里的JedisClusterPipeline类是网上找的...
5. **Pipeline**:通过Pipeline技术可以一次性发送多个命令到Redis,减少网络延迟,提高性能。 6. **Pub/Sub(发布/订阅)**:提供实时的消息传递机制,允许客户端订阅特定频道并接收来自服务器的推送消息。 7. **...
Redis 是一款开源的键值存储系统,以其高性能、低延迟的特点在 IT 行业中广泛应用于缓存、消息队列等多种场景。它支持多种数据结构如字符串、哈希、列表等,能够满足不同业务需求。 ### 二、PHPRedis 扩展介绍 ...
同意-Redis-Wrappers 这些是Appgree使用的类,用于封装和改进Jedis客户端,从而增加了流水线功能,主从控制和本地内存缓存,从而减少了不同线程重复读取的需求。 这些类可以单独使用或嵌套使用。执照该软件受Apache ...
3. **Redis命令操作**:书中会详细介绍每种数据类型的命令,如`SET`、`GET`、`HSET`、`LPOP`、`SADD`、`ZADD`等,以及事务(Transaction)、管道(Pipeline)等高级操作。 4. **Redis持久化**:为了防止数据丢失,...
在PHP中,可以通过`$redis->subscribe()`或`$redis->psubscribe()`订阅频道,`$redis->publish('channel', 'message')`发送消息。 6. Redis持久化(Persistence): Redis有两种持久化方式:RDB快照和AOF日志。RDB...