`
m635674608
  • 浏览: 5052518 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Jedis实现Publish/Subscribe功能

 
阅读更多

Redis为我们提供了publish/subscribe(发布/订阅)功能。我们可以对某个channel(频道)进行subscribe(订阅),当有人在这个channel上publish(发布)消息时,redis就会通知我们,这样我们可以收到别人发布的消息。 

作为Java的redis客户端,Jedis提供了publish/subscribe的接口。本文讲述如何使用Jedis来实现redis的publish/subscribe。

 

定义Subscriber类

 

Jedis定义了抽象类JedisPubSub,在这个类中,定义publish/subsribe的回调方法。通过继承JedisPubSub类并重新实现这些回调方法,当publish/subsribe事件发生时,我们可以定制自己的处理逻辑。

 

在以下例子中,我们定义了Subscriber类,这个类继承了JedisPubSub类,并重新实现了其中的回调方法。

 

Subscriber.java

 

import redis.clients.jedis.JedisPubSub;

 

 

public class Subscriber extends JedisPubSub {

    public Subscriber() {

    }

 

    public void onMessage(String channel, String message) {

        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));

    }

 

    public void onSubscribe(String channel, int subscribedChannels) {

        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", 

                channel, subscribedChannels));

    }

 

    public void onUnsubscribe(String channel, int subscribedChannels) {

        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", 

                channel, subscribedChannels));

 

    }

}

 

定义SubThread线程类

 

由于Jedis的subscribe操作是阻塞的,因此,我们另起一个线程来进行subscribe操作。

 

SubThread.java

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

 

 

public class SubThread extends Thread {

    private final JedisPool jedisPool;

    private final Subscriber subscriber = new Subscriber();

 

    private final String channel = "mychannel";

 

    public SubThread(JedisPool jedisPool) {

        super("SubThread");

        this.jedisPool = jedisPool;

    }

 

    @Override

    public void run() {

        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));

        Jedis jedis = null;

        try {

            jedis = jedisPool.getResource();

            jedis.subscribe(subscriber, channel);

        } catch (Exception e) {

            System.out.println(String.format("subsrcibe channel error, %s", e));

        } finally {

            if (jedis != null) {

                jedis.close();

            }

        }

    }

}

 

在上面的代码中,我们从JedisPool获取一个Jedis实例,并使用这个Jedis实例进行subscribe的操作。 

Jedis的subscribe的声明如下:

 

public void subscribe(final JedisPubSub jedisPubSub, final String… channels)

第一个参数接受一个JedisPubSub对象,第二个参数指定对哪个频道进行订阅。上例中,我们把我们定义的Subscriber对象传给subscribe方法。 

当publish/subscribe的事件发生时,会自动调用我们Subscriber的方法。

 

定义Publisher类

 

Publisher类接受用户的输入,并将输入发布到channel。当用户输入”quit”后,输入结束。

 

Publisher.java

 

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

 

public class Publisher {

    private final JedisPool jedisPool;

 

    public Publisher(JedisPool jedisPool) {

        this.jedisPool = jedisPool;

    }

 

    public void start() {

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        Jedis jedis = jedisPool.getResource();

        while (true) {

            String line = null;

            try {

                line = reader.readLine();

                if (!"quit".equals(line)) {

                    jedis.publish("mychannel", line);

                } else {

                    break;

                }

            } catch (IOException e) {

                e.printStackTrace();

            }

         }

    }

}

 

定义入口代码

 

如下是我们的程序入口代码。

 

PubSubDemo.java

 

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

 

 

public class PubSubDemo 

{

    public static void main( String[] args )

    {

        // 替换成你的reids地址和端口

        String redisIp = "192.168.229.154";

        int reidsPort = 6379;

        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), redisIp, reidsPort);

        System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", redisIp, reidsPort));

 

        SubThread subThread = new SubThread(jedisPool);

        subThread.start();

 

        Publisher publisher = new Publisher(jedisPool);

        publisher.start();

    }

}

 

在上面的代码中,我们首先生成了一个JedisPool的redis连接池,这是由于Jedis不是线程安全的,JedisPool是线程安全的。而我们的程序在主线程和订阅线程(SubThread)均需要使用Jedis,故在程序中我们使用JedisPool。具体也可以参考在多线程环境中使用Jedis。 

由于Jedis的subcribe操作是阻塞的,故我们另起了一个线程来进行subcribe操作。 

通过调用Publisher::start()方法,接受用户的输入,并publish到指定的channel。

 

输出

 

redis pool is starting, redis ip 192.168.229.154, redis port 6379 

subscribe redis, channel mychannel, thread will be blocked 

subscribe redis channel success, channel mychannel, subscribedChannels 1

这时输入

 

hello

控制窗口中输出

 

receive redis published message, channel mychannel, message hello

参考资料

 

https://github.com/xetorthio/jedis/wiki/AdvancedUsage

http://basrikahveci.com/a-simple-jedis-publish-subscribe-example/

http://blog.csdn.net/lihao21/article/details/48370687

http://blog.csdn.net/eguid_1/article/details/52600755

http://kingxss.iteye.com/blog/1420264

分享到:
评论

相关推荐

    Jedis的Publish/Subscribe功能的运用

    本文将深入探讨Jedis的Publish/Subscribe(发布/订阅)功能的运用,以及如何在实际项目中实现这一特性。 首先,发布/订阅是一种消息传递模式,允许发送者(Publisher)向多个接收者(Subscriber)广播消息,而无需...

    jedis-2.5.1.jar

    Publish/Subscribe Persistence control commands Remote server control commands Connection pooling Sharding (MD5, MurmurHash) Key-tags for sharding Sharding with pipelining Scripting with pipelining

    jedis-jedis-2.7.0.zip

    在2.7.0版本中,Jedis提供了丰富的API,支持了Redis的多种数据结构和操作,包括字符串(Strings)、哈希(Hashes)、列表(Lists)、集合(Sets)、有序集合(Sorted Sets)以及发布/订阅(Publish/Subscribe)等功能。...

    jedis.jar下载

    **Jedis.jar下载详解** Jedis,全称为Java Redis Client,是Java语言中用来与Redis...通过下载并使用`jedis.jar`,开发人员可以轻松地在Java应用中实现对Redis数据库的高效访问和管理,充分利用Redis的强大功能。

    jedis.2.9.jar

    Jedis还支持更复杂的操作,如哈希(Hashes)、集合(Sets)、有序集合(Sorted Sets)、列表(Lists)等数据结构的操作,以及事务(Transactions)、发布/订阅(Publish/Subscribe)等高级功能。 此外,`info.txt`...

    jedis-2.0.o

    Jedis 支持事务(Transactions)、发布/订阅(Publish/Subscribe)、 Lua 脚本(Lua Scripting)以及 Redis Sentinel 和 Cluster 的操作。 **Jedis 2.0.0 特性** 1. **全面的 Redis 命令支持**:Jedis 2.0.0 包含...

    英文 Jedis API 2.9.0

    - **发布订阅(Publish/Subscribe)**:`subscribe`和`unsubscribe`用于客户端订阅和退订频道,`publish`用于发布消息到指定频道。 - **脚本(Lua Scripting)**:Jedis支持通过`eval`和`evalsha`方法执行Lua脚本,实现...

    java redis 发布与订阅小demo jedis

    Redis提供了发布/订阅(Publish/Subscribe)功能,使得多个客户端可以订阅特定的频道,当有其他客户端向该频道发布消息时,所有订阅了该频道的客户端都会接收到消息。本示例主要探讨如何使用Jedis库来实现Redis的...

    jedis安装包

    Jedis还支持Redis的事务(Transaction)和发布/订阅(Publish-Subscribe)功能: 1. 事务: ```java jedis.multi(); jedis.set("key1", "value1"); jedis.set("key2", "value2"); List<Object> results = jedis....

    jedis-2.9.0.jar

    订阅者通过 `jedis.subscribe(subscriber, "channel")` 监听指定频道,发布者使用 `jedis.publish("channel", "message")` 发布消息。 **六、持久化与复制** Redis 支持多种持久化策略,如 RDB 和 AOF。Jedis 可以...

    jedis-jedis-2.4.0.zip

    4. **发布/订阅**:Jedis支持Redis的发布/订阅(Publish/Subscribe)模式,允许实现消息广播和监听。`subscribe`和`unsubscribe`方法用于订阅和退订频道,`publish`方法则用于发送消息。 5. **脚本支持**:Jedis ...

    jedis-jedis-3.6.0-rc1.tar.gz

    3. 高级特性:如事务(Transactions)、管道(Pipelining)、脚本执行(Lua Scripting)和发布/订阅(Publish/Subscribe)。 4. 持久化操作:支持RDB和AOF两种Redis持久化方式。 5. 主从复制:可以处理主库和从库...

    jedis-jedis-2.7.2

    4. **发布/订阅**:Jedis提供了`subscribe()`和`publish()`方法,用于实现Redis的消息发布与订阅功能,从而构建基于消息的分布式系统。 5. **Pipeline**:为了提高性能,Jedis提供了管道模式,允许一次性发送多条...

    jedis源码 (学习jedis)

    Jedis提供了`subscribe`和`unsubscribe`方法用于订阅和退订频道,以及`publish`方法来发布消息。源码揭示了这些功能的实现原理。 8. **pipeline和事务的异步优化**: 使用`Pipeline`可以批量发送命令,减少网络...

    Jedis测试Demo

    4. **发布/订阅**:Jedis也支持Redis的消息发布和订阅功能,用于实现简单的消息队列或事件通知。 ```java // 订阅 jedis.subscribe(new JedisPubSub() {...}, "channel"); // 发布 jedis.publish("channel", ...

    jedis2.x使用指南.pdf

    5. Publish/Subscribe发布订阅模式 Jedis支持Redis的发布订阅消息模式,这种模式允许客户端订阅一个或多个频道,并接收发布到这些频道的消息。通过继承`JedisPubSub`类并重写其方法,可以实现消息的接收。例如,`...

    java连接redis的jedis.jar包

    此外,Jedis也支持事务(Transaction)和发布/订阅(Publish/Subscribe)功能,可以实现原子性的操作序列以及消息传递。例如,开启一个事务: ```java Transaction tx = jedis.multi(); tx.set("key1", "value1"); ...

    jedis操作源码

    此外,Jedis还实现了发布订阅(Publish/Subscribe)功能。`subscribe`方法会创建一个`JedisPubSub`实例,并通过`connection.subscribe(pubsub, channels)`订阅指定频道。内部实现中,`Jedis`会持续监听Redis服务器的...

Global site tag (gtag.js) - Google Analytics