`

5.Redis消息订阅/发布

阅读更多

Redis可以很容的实现消息订阅/发布功能

 

一.JedisPubSub

需要实现一个JedisPubSub,相当于Redis消息的Listener

package com.gqshao.redis.channels;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;

public class MyJedisPubSub extends JedisPubSub {

    protected static Logger logger = LoggerFactory.getLogger(MyJedisPubSub.class);

    // 取得订阅的消息后的处理  
    public void onMessage(String channel, String message) {
        logger.info("取得订阅的消息后的处理 : " + channel + "=" + message);
    }

    // 初始化订阅时候的处理  
    public void onSubscribe(String channel, int subscribedChannels) {
        logger.info("初始化订阅时候的处理 : " + channel + "=" + subscribedChannels);
    }

    // 取消订阅时候的处理  
    public void onUnsubscribe(String channel, int subscribedChannels) {
        logger.info("取消订阅时候的处理 : " + channel + "=" + subscribedChannels);
    }

    // 初始化按表达式的方式订阅时候的处理  
    public void onPSubscribe(String pattern, int subscribedChannels) {
        logger.info("初始化按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);
    }

    // 取消按表达式的方式订阅时候的处理  
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        logger.info(" 取消按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);
    }

    // 取得按表达式的方式订阅的消息后的处理  
    public void onPMessage(String pattern, String channel, String message) {
        logger.info("取得按表达式的方式订阅的消息后的处理 :" + pattern + "=" + channel + "=" + message);
    }
}

 

 

二.消息订阅/发布

1.消息的订阅需要一个Redis连接始终保持连接,Jedis中停止订阅的unsubscribe是在JedisPubSub中

2.程序中因为需要Jedis始终保持连接,又有可能需要停止订阅,所以用到了ExecutorService

package com.gqshao.redis.channels;


import com.gqshao.redis.JedisTest;
import org.junit.Test;
import redis.clients.jedis.Jedis;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 发布/订阅
 */
public class MessageTest extends JedisTest {

    /**
     * SUBSCRIBE [channel...] 订阅一个匹配的通道
     * PSUBSCRIBE [pattern...] 订阅匹配的通道
     * PUBLISH [channel] [message] 将value推送到channelone通道中
     * UNSUBSCRIBE [channel...] 取消订阅消息
     * PUNSUBSCRIBE [pattern ...] 取消匹配的消息订阅
     * web环境中可以编写一个JedisPubSub 继承 @see redis.clients.jedis.JedisPubSub来实现监听
     * Jedis中通过使用 JedisPubSub.UNSUBSCRIBE/PUNSUBSCRIBE 来取消订阅
     */
    @Test
    public void testSubscribe() {
        final MyJedisPubSub listener = new MyJedisPubSub();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                logger.info("subscribe channelA.test channelB.send_message");
                jedis.subscribe(listener, "channelA.test", "channelB.send_message");
            }
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(thread);

        // 测试发送
        Jedis pubJedis = pool.getResource();
        logger.info("publish channelA.test OK : " + pubJedis.publish("channelA.test", "OK"));
        logger.info("publish channelB.send_message \"Hello World!\" : " + pubJedis.publish("channelB.send_message", "Hello World!"));
        listener.unsubscribe("channelA.test", "channelB.send_message");
        try {
            executor.shutdownNow();
            logger.info("executor.shutdownNow");
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                logger.warn("Pool did not terminated");
            }
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        logger.info("完成subscribe测试");
    }


    /**
     * SUBSCRIBE channelone 订阅一个通道
     * PSUBSCRIBE channel* 订阅一批通道
     * PUBLISH channelone value 将value推送到channelone通道中
     * web环境中可以编写一个Listener 继承 @see redis.clients.jedis.JedisPubSub来实现监听
     */
    @Test
    public void testPsubscribe() {
        final MyJedisPubSub listener = new MyJedisPubSub();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                logger.info("psubscribe channel*");
                jedis.psubscribe(listener, "channel*");
            }
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(thread);

        // 测试发送
        Jedis pubJedis = pool.getResource();

        logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));
        logger.info("publish channelB.send_message \"Hello World!\"" + pubJedis.publish("channelB.send_message", "Hello World!"));

        pool.returnResource(pubJedis);
        listener.punsubscribe();
        try {
            executor.shutdownNow();
            logger.info("executor.shutdownNow");
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                logger.warn("Pool did not terminated");
            }
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        logger.info("完成psubscribe测试");
        logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));
    }

}

 

 

 

1
1
分享到:
评论
1 楼 weijs 2014-12-31  
看完了5篇REIDS的文章,写很很好,希望能共享DEMO,非常感谢。

相关推荐

    ServiceStack.Redis-5.8无限制.zip

    6. **发布/订阅(Pub/Sub)**:ServiceStack.Redis提供了丰富的订阅者和发布者接口,支持实时消息传递,这对于构建分布式事件系统或实现实时通知非常有用。 7. **持久化策略**:Redis提供了多种数据持久化策略,如...

    StackExchange.Redis .NET4.0

    6. **发布/订阅(Pub/Sub)模式**:StackExchange.Redis支持Redis的发布/订阅功能,允许开发人员实现事件驱动的通信。`Subscribe`和`Unsubscribe`方法可以用来注册和取消订阅频道,而`Publish`方法则用于发送消息。 ...

    C# Redis发布与订阅系统源码

    在这个模式下,发送者(Publisher)将消息发布到一个频道(Channel),而订阅者(Subscriber)可以订阅这些频道并接收发布的消息。这种方式使得多个客户端可以实时共享信息,而无需直接通信,简化了分布式系统中的...

    ServiceStack.Redis操作工具类

    4. **发布/订阅(Pub/Sub)**:ServiceStack.Redis实现了Redis的发布/订阅功能,允许你创建频道并发送或接收消息。`IRedisSubscription`接口提供了订阅和取消订阅的方法,如`Subscribe`和`Unsubscribe`。 5. **事务...

    ServiceStack.Redis 3.9

    4. **发布/订阅(Pub/Sub)模式**:支持Redis的发布/订阅模式,允许应用程序之间进行实时通信,这对于构建事件驱动的架构非常有用。 5. **事务支持**:ServiceStack.Redis允许用户在Redis中执行原子的事务操作,...

    ServiceStack.Redis 目前最新版6.9.0.0绕过6000限制

    ServiceStack.Redis作为.NET客户端,提供了丰富的API,可以方便地执行各种Redis操作,如设置和获取键值、发布订阅消息、操作集合类型(如列表、集合、有序集合)等。 在.NET 6控制台项目中使用ServiceStack.Redis,...

    StackExchange.Redis-1.2.6精简,附64位dll

    3. **发布/订阅(Pub/Sub)**:StackExchange.Redis支持Redis的发布订阅模式,允许程序作为发布者发送消息到频道,其他订阅者可以接收这些消息。这对于实现解耦的实时消息传递非常有用。 4. **事务支持**:Redis...

    ServiceStack.redis v3.9.71

    开发者可以利用它来执行各种操作,如设置和获取键值、处理哈希表、发布订阅消息、操作集合和有序集合等。此外,它还支持事务、持久化和主从复制等Redis特性。 StackExchange.Redis.1.2.6是另一个Redis客户端库,...

    StackExchange.Redis-1.2.6

    Redis是一个高性能的键值对数据库,广泛应用于缓存、消息队列、事件发布/订阅等多种场景。版本1.2.6是该库的一个稳定版本,包含了丰富的功能和优化。 1. **Redis介绍**:Redis是一个开源的、基于内存的数据结构存储...

    C# 使用 ServiceStack.Redis 必须的4个dll

    它提供了一个高级、易用的API,用于执行各种Redis操作,如设置和获取键值、操作集合、发布订阅消息等。通过这个库,开发者可以方便地在C#代码中管理Redis的数据结构,如字符串、哈希、列表、集合和有序集合。 3. **...

    StackExchange.Redis缓存扩展

    Redis支持两种主要的消息队列模式:发布/订阅(Pub/Sub)和列表(List)。发布/订阅模式用于广播消息,而列表模式则支持工作队列模型,其中生产者添加任务到列表的一端,消费者从另一端取出并处理任务。 在...

    基于SSM实现Websocket+redis订阅/发布,消息实时推送

    基于ssm实现websocket长连接+redis发布/订阅消息,服务端实时推送消息至前端页面,实时通信。内含前端代码,如需sql文件请下载https://download.csdn.net/download/gmetbtgbki/10824890

    基于netcore 3.0的redis发布订阅示例代码

    发布/订阅是Redis的一个重要特性,它允许应用程序以广播消息的方式进行通信,订阅者可以监听特定频道并接收发送到该频道的消息。以下是一个简单的.NET Core 3.0应用,展示了如何使用StackExchange.Redis创建发布者和...

    NServiceKit.Redis.rar

    除了基本操作,NServiceKit.Redis还支持发布/订阅(Pub/Sub)模式,这是Redis的消息传递机制,可用于实现简单的消息队列。例如,你可以订阅一个频道并接收消息: ```csharp var sub = redis.CreateSubscription(); ...

    ServiceStack.Redis.3.9.29.0

    4. **发布/订阅(Pub/Sub)模式**:提供了实现Redis的发布/订阅功能的API,可用于构建实时消息传递系统。 5. **事务支持**:支持Redis的事务操作,允许在单个命令中执行多个操作,确保数据一致性。 6. **连接池管理**...

    Java实现Redis的消息订阅和发布

    在这一模式下,Redis服务器作为消息的中间人,允许多个客户端(订阅者)订阅特定的主题,当有发布者向该主题发布消息时,所有订阅了该主题的客户端都会收到消息。这种模式提供了一种轻量级的通信方式,适合实时通知...

    ServiceStack.Redis 源码

    5. **发布/订阅(Pub/Sub)**:实现了 Redis 的发布/订阅功能,使得客户端可以订阅感兴趣的消息主题,实现解耦的通信模式。 6. **事务处理**:支持 Redis 事务,允许在一个原子操作中执行多个命令,确保数据的一致性。...

    StackExchange.Redis文档翻译.pdf

    使用`ISubscriber`接口,可以实现发布消息到一个频道,而所有订阅该频道的客户端会收到这些消息。消息顺序通常在单个Redis实例内是保持的,但在分布式环境中则不保证。 8. **命令如KEYS,SCAN,FLUSHDB**: 这些...

    ServiceStack.Redis.dll

    3. **发布/订阅(Pub/Sub)**:提供消息发布与订阅功能,实现事件驱动的通信模式,适合构建实时应用或广播消息。 4. **事务(Transactions)**:支持原子操作的事务处理,确保多个命令的执行顺序和一致性。 5. **...

    (5.9.3.0)ServiceStack.Redis(已取消6000限制)

    7. 发布/订阅消息:利用`Subscribe`和`Publish`功能实现消息传递。 8. 事务处理:使用`BeginTransaction`、`Execute`和`Commit`进行多条命令的原子操作。 此外,ServiceStack.Redis还支持高级特性,如Lua脚本执行、...

Global site tag (gtag.js) - Google Analytics