`
zfy421
  • 浏览: 233671 次
社区版块
存档分类
最新评论

jedis的pipeline、事务等方式

阅读更多

转自:http://www.open-open.com/lib/view/open1410485827242.html

 

jedis是一个著名的key-value存储系统,而作为其官方推荐的java版客户端jedis也非常强大和稳定,支持事务、管道及有jedis自身实现的分布式。

在这里对jedis关于事务、管道和分布式的调用方式做一个简单的介绍和对比:

一、普通同步方式

最简单和基础的调用方式,

1
2
3
4
5
6
7
8
9
10
11
@Test
public void test1Normal() {
    Jedis jedis = new Jedis("localhost");
    long start = System.currentTimeMillis();
    for (int i = 0; i < 100000; i++) {
        String result = jedis.set("n" + i, "n" + i);
    }
    long end = System.currentTimeMillis();
    System.out.println("Simple SET: " + ((end - start)/1000.0) + " seconds");
    jedis.disconnect();
}

很简单吧,每次set之后都可以返回结果,标记是否成功。

二、事务方式(Transactions)

redis的事务很简单,他主要目的是保障,一个client发起的事务中的命令可以连续的执行,而中间不会插入其他client的命令。

看下面例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test2Trans() {
    Jedis jedis = new Jedis("localhost");
    long start = System.currentTimeMillis();
    Transaction tx = jedis.multi();
    for (int i = 0; i < 100000; i++) {
        tx.set("t" + i, "t" + i);
    }
    List<Object> results = tx.exec();
    long end = System.currentTimeMillis();
    System.out.println("Transaction SET: " + ((end - start)/1000.0) + " seconds");
    jedis.disconnect();
}

我们调用jedis.watch(…)方法来监控key,如果调用后key值发生变化,则整个事务会执行失败。另外,事务中某个操作失败,并不会回滚其他操作。这一点需要注意。还有,我们可以使用discard()方法来取消事务。

三、管道(Pipelining)

有时,我们需要采用异步方式,一次发送多个指令,不同步等待其返回结果。这样可以取得非常好的执行效率。这就是管道,调用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test3Pipelined() {
    Jedis jedis = new Jedis("localhost");
    Pipeline pipeline = jedis.pipelined();
    long start = System.currentTimeMillis();
    for (int i = 0; i < 100000; i++) {
        pipeline.set("p" + i, "p" + i);
    }
    List<Object> results = pipeline.syncAndReturnAll();
    long end = System.currentTimeMillis();
    System.out.println("Pipelined SET: " + ((end - start)/1000.0) + " seconds");
    jedis.disconnect();
}

四、管道中调用事务

就Jedis提供的方法而言,是可以做到在管道中使用事务,其代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void test4combPipelineTrans() {
    jedis = new Jedis("localhost"); 
    long start = System.currentTimeMillis();
    Pipeline pipeline = jedis.pipelined();
    pipeline.multi();
    for (int i = 0; i < 100000; i++) {
        pipeline.set("" + i, "" + i);
    }
    pipeline.exec();
    List<Object> results = pipeline.syncAndReturnAll();
    long end = System.currentTimeMillis();
    System.out.println("Pipelined transaction: " + ((end - start)/1000.0) + " seconds");
    jedis.disconnect();
}

但是经测试(见本文后续部分),发现其效率和单独使用事务差不多,甚至还略微差点。

五、分布式直连同步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void test5shardNormal() {
    List<JedisShardInfo> shards = Arrays.asList(
            new JedisShardInfo("localhost",6379),
            new JedisShardInfo("localhost",6380));
 
    ShardedJedis sharding = new ShardedJedis(shards);
 
    long start = System.currentTimeMillis();
    for (int i = 0; i < 100000; i++) {
        String result = sharding.set("sn" + i, "n" + i);
    }
    long end = System.currentTimeMillis();
    System.out.println("Simple@Sharing SET: " + ((end - start)/1000.0) + " seconds");
 
    sharding.disconnect();
}

这个是分布式直接连接,并且是同步调用,每步执行都返回执行结果。类似地,还有异步管道调用。

六、分布式直连异步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void test6shardpipelined() {
    List<JedisShardInfo> shards = Arrays.asList(
            new JedisShardInfo("localhost",6379),
            new JedisShardInfo("localhost",6380));
 
    ShardedJedis sharding = new ShardedJedis(shards);
 
    ShardedJedisPipeline pipeline = sharding.pipelined();
    long start = System.currentTimeMillis();
    for (int i = 0; i < 100000; i++) {
        pipeline.set("sp" + i, "p" + i);
    }
    List<Object> results = pipeline.syncAndReturnAll();
    long end = System.currentTimeMillis();
    System.out.println("Pipelined@Sharing SET: " + ((end - start)/1000.0) + " seconds");
 
    sharding.disconnect();
}

七、分布式连接池同步调用

如果,你的分布式调用代码是运行在线程中,那么上面两个直连调用方式就不合适了,因为直连方式是非线程安全的,这个时候,你就必须选择连接池调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void test7shardSimplePool() {
    List<JedisShardInfo> shards = Arrays.asList(
            new JedisShardInfo("localhost",6379),
            new JedisShardInfo("localhost",6380));
 
    ShardedJedisPool pool = new ShardedJedisPool(new JedisPoolConfig(), shards);
 
    ShardedJedis one = pool.getResource();
 
    long start = System.currentTimeMillis();
    for (int i = 0; i < 100000; i++) {
        String result = one.set("spn" + i, "n" + i);
    }
    long end = System.currentTimeMillis();
    pool.returnResource(one);
    System.out.println("Simple@Pool SET: " + ((end - start)/1000.0) + " seconds");
 
    pool.destroy();
}

上面是同步方式,当然还有异步方式。

八、分布式连接池异步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Test
public void test8shardPipelinedPool() {
    List<JedisShardInfo> shards = Arrays.asList(
            new JedisShardInfo("localhost",6379),
            new JedisShardInfo("localhost",6380));
 
    ShardedJedisPool pool = new ShardedJedisPool(new JedisPoolConfig(), shards);
 
    ShardedJedis one = pool.getResource();
 
    ShardedJedisPipeline pipeline = one.pipelined();
 
    long start = System.currentTimeMillis();
    for (int i = 0; i < 100000; i++) {
        pipeline.set("sppn" + i, "n" + i);
    }
    List<Object> results = pipeline.syncAndReturnAll();
    long end = System.currentTimeMillis();
    pool.returnResource(one);
    System.out.println("Pipelined@Pool SET: " + ((end - start)/1000.0) + " seconds");
    pool.destroy();
}

九、需要注意的地方

  1. 事务和管道都是异步模式。在事务和管道中不能同步查询结果。比如下面两个调用,都是不允许的:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
     Transaction tx = jedis.multi();
     for (int i = 0; i < 100000; i++) {
         tx.set("t" + i, "t" + i);
     }
     System.out.println(tx.get("t1000").get());  //不允许
     
     List<Object> results = tx.exec();
     
     
     
     
     Pipeline pipeline = jedis.pipelined();
     long start = System.currentTimeMillis();
     for (int i = 0; i < 100000; i++) {
         pipeline.set("p" + i, "p" + i);
     }
     System.out.println(pipeline.get("p1000").get()); //不允许
     
     List<Object> results = pipeline.syncAndReturnAll();
  2. 事务和管道都是异步的,个人感觉,在管道中再进行事务调用,没有必要,不如直接进行事务模式。

  3. 分布式中,连接池的性能比直连的性能略好(见后续测试部分)。

  4. 分布式调用中不支持事务。

    因为事务是在服务器端实现,而在分布式中,每批次的调用对象都可能访问不同的机器,所以,没法进行事务。

十、测试

运行上面的代码,进行测试,其结果如下:

1
2
3
4
5
6
7
8
9
10
11
Simple SET: 5.227 seconds
 
Transaction SET: 0.5 seconds
Pipelined SET: 0.353 seconds
Pipelined transaction: 0.509 seconds
 
Simple@Sharing SET: 5.289 seconds
Pipelined@Sharing SET: 0.348 seconds
 
Simple@Pool SET: 5.039 seconds
Pipelined@Pool SET: 0.401 seconds

另外,经测试分布式中用到的机器越多,调用会越慢。上面是2片,下面是5片:

1
2
3
4
Simple@Sharing SET: 5.494 seconds
Pipelined@Sharing SET: 0.51 seconds
Simple@Pool SET: 5.223 seconds
Pipelined@Pool SET: 0.518 seconds

下面是10片:

1
2
3
4
Simple@Sharing SET: 5.9 seconds
Pipelined@Sharing SET: 0.794 seconds
Simple@Pool SET: 5.624 seconds
Pipelined@Pool SET: 0.762 seconds

下面是100片:

1
2
3
4
Simple@Sharing SET: 14.055 seconds
Pipelined@Sharing SET: 8.185 seconds
Simple@Pool SET: 13.29 seconds
Pipelined@Pool SET: 7.767 seconds

分布式中,连接池方式调用不但线程安全外,根据上面的测试数据,也可以看出连接池比直连的效率更好。

十一、完整的测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.example.nosqlclient;
 
import java.util.Arrays;
import java.util.List;
 
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
 
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPipeline;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.Transaction;
 
import org.junit.FixMethodOrder;
import org.junit.runners.MethodSorters;
 
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestJedis {
 
    private static Jedis jedis;
    private static ShardedJedis sharding;
    private static ShardedJedisPool pool;
 
    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        List<JedisShardInfo> shards = Arrays.asList(
                new JedisShardInfo("localhost",6379),
                new JedisShardInfo("localhost",6379)); //使用相同的ip:port,仅作测试
 
 
        jedis = new Jedis("localhost"); 
        sharding = new ShardedJedis(shards);
 
        pool = new ShardedJedisPool(new JedisPoolConfig(), shards);
    }
 
    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        jedis.disconnect();
        sharding.disconnect();
        pool.destroy();
    }
 
    @Test
    public void test1Normal() {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            String result = jedis.set("n" + i, "n" + i);
        }
        long end = System.currentTimeMillis();
        System.out.println("Simple SET: " + ((end - start)/1000.0) + " seconds");
    }
 
    @Test
    public void test2Trans() {
        long start = System.currentTimeMillis();
        Transaction tx = jedis.multi();
        for (int i = 0; i < 100000; i++) {
            tx.set("t" + i, "t" + i);
        }
        //System.out.println(tx.get("t1000").get());
 
        List<Object> results = tx.exec();
        long end = System.currentTimeMillis();
        System.out.println("Transaction SET: " + ((end - start)/1000.0) + " seconds");
    }
 
    @Test
    public void test3Pipelined() {
        Pipeline pipeline = jedis.pipelined();
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            pipeline.set("p" + i, "p" + i);
        }
        //System.out.println(pipeline.get("p1000").get());
        List<Object> results = pipeline.syncAndReturnAll();
        long end = System.currentTimeMillis();
        System.out.println("Pipelined SET: " + ((end - start)/1000.0) + " seconds");
    }
 
    @Test
    public void test4combPipelineTrans() {
        long start = System.currentTimeMillis();
        Pipeline pipeline = jedis.pipelined();
        pipeline.multi();
        for (int i = 0; i < 100000; i++) {
            pipeline.set("" + i, "" + i);
        }
        pipeline.exec();
        List<Object> results = pipeline.syncAndReturnAll();
        long end = System.currentTimeMillis();
        System.out.println("Pipelined transaction: " + ((end - start)/1000.0) + " seconds");
    }
 
    @Test
    public void test5shardNormal() {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            String result = sharding.set("sn" + i, "n" + i);
        }
        long end = System.currentTimeMillis();
        System.out.println("Simple@Sharing SET: " + ((end - start)/1000.0) + " seconds");
    }
 
    @Test
    public void test6shardpipelined() {
        ShardedJedisPipeline pipeline = sharding.pipelined();
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            pipeline.set("sp" + i, "p" + i);
        }
        List<Object> results = pipeline.syncAndReturnAll();
        long end = System.currentTimeMillis();
        System.out.println("Pipelined@Sharing SET: " + ((end - start)/1000.0) + " seconds");
    }
 
    @Test
    public void test7shardSimplePool() {
        ShardedJedis one = pool.getResource();
 
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            String result = one.set("spn" + i, "n" + i);
        }
        long end = System.currentTimeMillis();
        pool.returnResource(one);
        System.out.println("Simple@Pool SET: " + ((end - start)/1000.0) + " seconds");
    }
 
    @Test
    public void test8shardPipelinedPool() {
        ShardedJedis one = pool.getResource();
 
        ShardedJedisPipeline pipeline = one.pipelined();
 
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            pipeline.set("sppn" + i, "n" + i);
        }
        List<Object> results = pipeline.syncAndReturnAll();
        long end = System.currentTimeMillis();
        pool.returnResource(one);
        System.out.println("Pipelined@Pool SET: " + ((end - start)/1000.0) + " seconds");
    }
}
分享到:
评论

相关推荐

    jedis源码 (学习jedis)

    Jedis支持Redis的事务功能,`Multi`、`Exec`、`Discard`等方法用于开启、提交和取消事务。查看源码,了解这些方法如何与Redis的事务模型对应。 7. **发布订阅(Pub/Sub)**: Jedis提供了`subscribe`和`unsubscribe...

    jedis-2.9.0.jar

    2. **管道模式**:通过 `Pipeline pipeline = jedis.pipelined();` 创建管道,批量发送命令并一次性执行,提高效率。 3. **地理空间索引**:Jedis 提供了对 Redis 地理位置数据的支持,如 `jedis.geoadd("Sicily", ...

    jedis依赖jar包

    7. **事务操作**:`multi()`开始一个事务,`exec()`提交事务,`discard()`取消事务。 **高级特性** 1. **连接池**:为了提高性能和资源利用率,可以使用Jedis连接池,如`JedisPool`和`JedisPoolConfig`。 2. **...

    jedis.jar包

    4. **Pipeline和Transaction**:Jedis支持Pipeline和事务,可以批量发送命令,减少网络延迟,提高处理速度。 5. **Sentinel支持**:Jedis 3.0改进了对Redis Sentinel的支持,可以自动发现主从切换,提高了应用的高...

    jedis-jedis-2.7.2

    3. **事务**:Jedis支持Redis的事务机制,通过`Multi()`、`exec()`和`discard()`等方法,可以批量执行多个命令,并确保它们在一个原子操作中完成。 4. **发布/订阅**:Jedis提供了`subscribe()`和`publish()`方法,...

    jedis-3.0.0源码

    Jedis作为Java与Redis之间的桥梁,为开发者提供了简单易用的API,包括字符串、哈希、列表、集合、有序集合等数据类型的操作,以及发布/订阅、事务、脚本执行等功能。Jedis 3.0.0版本对前代进行了优化,提升了性能和...

    jedis-3.0.0.7z

    5. **事务处理**:Jedis支持事务,可以批量执行命令,如`jedis.multi()`、`jedis.exec()`等。 6. **其他操作**:Jedis还提供了对字符串、哈希表、列表、集合、有序集合等多种数据类型的CRUD操作,以及发布订阅、管道...

    jedis-3.0.0.jar、jedis-3.0.0-javadoc.jar、jedis-3.0.0-sources.jar

    开发者可以通过导入这个JAR,直接在Java程序中使用Jedis来执行Redis命令,例如设置和获取键值、执行事务、操作列表、集合和有序集合等。 2. **jedis-3.0.0-javadoc.jar**:这个文件包含了Jedis库的API文档,提供了...

    jedis-2.9.0-java

    3. **pipeline和事务**:Jedis提供了pipeline功能,允许批量发送命令并一次性接收所有响应,显著减少网络延迟。同时,它也支持Redis的事务机制,保证一组命令的原子性执行。 4. **集群支持**:Jedis从2.x版本开始...

    Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】.docx

    总结来说,Jedis通过Pipeline、分布式ID生成器以及分布式锁等特性,为Java开发者提供了强大的工具来构建高效、可靠的分布式系统。理解并善用这些特性,可以帮助我们更好地利用Redis作为数据存储和协调中心,提高系统...

    jedis-2.9.0.jar及连接池分享

    在本资源包中,我们提供了jedis-2.9.0.jar,这是Jedis的一个稳定版本,支持多种Redis操作,如键值操作、事务处理、发布订阅等。同时,还包含了commons-pool2-2.4.2.jar,这是一个通用对象池库,用于创建和管理连接池...

    Jedis-Sample

    此外,Jedis还支持事务(`multi`、`exec`)和脚本(`scriptLoad`、`eval`)功能,允许开发者以原子方式执行一系列操作或者编写自定义的Lua脚本来处理复杂的逻辑。 在处理大量并发请求时,Jedis还提供了一些优化策略...

    jedis开发使用包

    Jedis还支持其他高级特性,如事务(`Transaction`)、管道(`Pipeline`)和集群(`JedisCluster`)操作。在处理大规模并发请求或分布式环境时,这些功能会非常有用。 总之,"jedis开发使用包"为你提供了一个基础的...

    英文 Jedis API 2.9.0

    - **事务(Transaction)**:Jedis支持Redis的事务功能,通过`multi`、`exec`、`discard`方法可以实现一组操作的原子性执行。 - **发布订阅(Publish/Subscribe)**:`subscribe`和`unsubscribe`用于客户端订阅和退订...

    Jedis2.1.0源码与Jar包

    2. **全面支持**:Jedis涵盖了Redis的大部分命令,包括读写操作、事务处理、发布订阅等。 3. **连接池**:Jedis支持连接池管理,可以有效地复用连接,提高性能。 4. **Pipeline和事务**:通过Pipeline技术,可以批量...

    jedis-2.4.2版本.rar

    5. **pipeline和事务的批量操作**:Jedis的`pipeline`功能允许将多个命令打包并一次性发送到Redis服务器,减少网络通信的开销。结合事务,可以实现高效的批量操作。 6. **脚本操作**:Jedis提供了对Lua脚本的支持,...

    Jedis最新版本jar包

    1. **丰富的数据操作**:Jedis提供了完整的Redis命令支持,包括字符串、哈希、列表、集合、有序集合的操作,以及事务、发布/订阅、管道等高级功能。 2. **连接池管理**:Jedis支持连接池,可以有效地复用连接,提高...

    jedisRedis的Java客户端

    `JedisPoolConfig`配置连接池参数,如最大连接数、超时时间等,然后通过`JedisPool jedisPool = new JedisPool(config, "localhost", 6379);`创建连接池。 - 使用`jedisPool.getResource()`获取连接,用完后通过`...

    jedis-2.9.0 最新版Redis客户端CSDN下载

    如字符串操作(SET、GET、INCR等)、哈希表操作(HSET、HGET、HKEYS等)、列表操作(LPUSH、RPOP、LINDEX等)、集合操作(SADD、SMEMBERS、SREM等)、有序集合操作(ZADD、ZRANGE、ZREMRANGEBYSCORE等)以及事务处理...

Global site tag (gtag.js) - Google Analytics