Redis可以很容的实现消息订阅/发布功能
一.JedisPubSub
需要实现一个JedisPubSub,相当于Redis消息的Listener
package com.gqshao.redis.channels; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisPubSub; public class MyJedisPubSub extends JedisPubSub { protected static Logger logger = LoggerFactory.getLogger(MyJedisPubSub.class); // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { logger.info("取得订阅的消息后的处理 : " + channel + "=" + message); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { logger.info("初始化订阅时候的处理 : " + channel + "=" + subscribedChannels); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { logger.info("取消订阅时候的处理 : " + channel + "=" + subscribedChannels); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { logger.info("初始化按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { logger.info(" 取消按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { logger.info("取得按表达式的方式订阅的消息后的处理 :" + pattern + "=" + channel + "=" + message); } }
二.消息订阅/发布
1.消息的订阅需要一个Redis连接始终保持连接,Jedis中停止订阅的unsubscribe是在JedisPubSub中
2.程序中因为需要Jedis始终保持连接,又有可能需要停止订阅,所以用到了ExecutorService
package com.gqshao.redis.channels; import com.gqshao.redis.JedisTest; import org.junit.Test; import redis.clients.jedis.Jedis; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 发布/订阅 */ public class MessageTest extends JedisTest { /** * SUBSCRIBE [channel...] 订阅一个匹配的通道 * PSUBSCRIBE [pattern...] 订阅匹配的通道 * PUBLISH [channel] [message] 将value推送到channelone通道中 * UNSUBSCRIBE [channel...] 取消订阅消息 * PUNSUBSCRIBE [pattern ...] 取消匹配的消息订阅 * web环境中可以编写一个JedisPubSub 继承 @see redis.clients.jedis.JedisPubSub来实现监听 * Jedis中通过使用 JedisPubSub.UNSUBSCRIBE/PUNSUBSCRIBE 来取消订阅 */ @Test public void testSubscribe() { final MyJedisPubSub listener = new MyJedisPubSub(); Thread thread = new Thread(new Runnable() { @Override public void run() { logger.info("subscribe channelA.test channelB.send_message"); jedis.subscribe(listener, "channelA.test", "channelB.send_message"); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(thread); // 测试发送 Jedis pubJedis = pool.getResource(); logger.info("publish channelA.test OK : " + pubJedis.publish("channelA.test", "OK")); logger.info("publish channelB.send_message \"Hello World!\" : " + pubJedis.publish("channelB.send_message", "Hello World!")); listener.unsubscribe("channelA.test", "channelB.send_message"); try { executor.shutdownNow(); logger.info("executor.shutdownNow"); if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { logger.warn("Pool did not terminated"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } logger.info("完成subscribe测试"); } /** * SUBSCRIBE channelone 订阅一个通道 * PSUBSCRIBE channel* 订阅一批通道 * PUBLISH channelone value 将value推送到channelone通道中 * web环境中可以编写一个Listener 继承 @see redis.clients.jedis.JedisPubSub来实现监听 */ @Test public void testPsubscribe() { final MyJedisPubSub listener = new MyJedisPubSub(); Thread thread = new Thread(new Runnable() { @Override public void run() { logger.info("psubscribe channel*"); jedis.psubscribe(listener, "channel*"); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(thread); // 测试发送 Jedis pubJedis = pool.getResource(); logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK")); logger.info("publish channelB.send_message \"Hello World!\"" + pubJedis.publish("channelB.send_message", "Hello World!")); pool.returnResource(pubJedis); listener.punsubscribe(); try { executor.shutdownNow(); logger.info("executor.shutdownNow"); if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { logger.warn("Pool did not terminated"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } logger.info("完成psubscribe测试"); logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK")); } }
相关推荐
6. **发布/订阅(Pub/Sub)**:ServiceStack.Redis提供了丰富的订阅者和发布者接口,支持实时消息传递,这对于构建分布式事件系统或实现实时通知非常有用。 7. **持久化策略**:Redis提供了多种数据持久化策略,如...
6. **发布/订阅(Pub/Sub)模式**:StackExchange.Redis支持Redis的发布/订阅功能,允许开发人员实现事件驱动的通信。`Subscribe`和`Unsubscribe`方法可以用来注册和取消订阅频道,而`Publish`方法则用于发送消息。 ...
在这个模式下,发送者(Publisher)将消息发布到一个频道(Channel),而订阅者(Subscriber)可以订阅这些频道并接收发布的消息。这种方式使得多个客户端可以实时共享信息,而无需直接通信,简化了分布式系统中的...
4. **发布/订阅(Pub/Sub)**:ServiceStack.Redis实现了Redis的发布/订阅功能,允许你创建频道并发送或接收消息。`IRedisSubscription`接口提供了订阅和取消订阅的方法,如`Subscribe`和`Unsubscribe`。 5. **事务...
4. **发布/订阅(Pub/Sub)模式**:支持Redis的发布/订阅模式,允许应用程序之间进行实时通信,这对于构建事件驱动的架构非常有用。 5. **事务支持**:ServiceStack.Redis允许用户在Redis中执行原子的事务操作,...
ServiceStack.Redis作为.NET客户端,提供了丰富的API,可以方便地执行各种Redis操作,如设置和获取键值、发布订阅消息、操作集合类型(如列表、集合、有序集合)等。 在.NET 6控制台项目中使用ServiceStack.Redis,...
3. **发布/订阅(Pub/Sub)**:StackExchange.Redis支持Redis的发布订阅模式,允许程序作为发布者发送消息到频道,其他订阅者可以接收这些消息。这对于实现解耦的实时消息传递非常有用。 4. **事务支持**:Redis...
开发者可以利用它来执行各种操作,如设置和获取键值、处理哈希表、发布订阅消息、操作集合和有序集合等。此外,它还支持事务、持久化和主从复制等Redis特性。 StackExchange.Redis.1.2.6是另一个Redis客户端库,...
Redis是一个高性能的键值对数据库,广泛应用于缓存、消息队列、事件发布/订阅等多种场景。版本1.2.6是该库的一个稳定版本,包含了丰富的功能和优化。 1. **Redis介绍**:Redis是一个开源的、基于内存的数据结构存储...
它提供了一个高级、易用的API,用于执行各种Redis操作,如设置和获取键值、操作集合、发布订阅消息等。通过这个库,开发者可以方便地在C#代码中管理Redis的数据结构,如字符串、哈希、列表、集合和有序集合。 3. **...
Redis支持两种主要的消息队列模式:发布/订阅(Pub/Sub)和列表(List)。发布/订阅模式用于广播消息,而列表模式则支持工作队列模型,其中生产者添加任务到列表的一端,消费者从另一端取出并处理任务。 在...
基于ssm实现websocket长连接+redis发布/订阅消息,服务端实时推送消息至前端页面,实时通信。内含前端代码,如需sql文件请下载https://download.csdn.net/download/gmetbtgbki/10824890
发布/订阅是Redis的一个重要特性,它允许应用程序以广播消息的方式进行通信,订阅者可以监听特定频道并接收发送到该频道的消息。以下是一个简单的.NET Core 3.0应用,展示了如何使用StackExchange.Redis创建发布者和...
除了基本操作,NServiceKit.Redis还支持发布/订阅(Pub/Sub)模式,这是Redis的消息传递机制,可用于实现简单的消息队列。例如,你可以订阅一个频道并接收消息: ```csharp var sub = redis.CreateSubscription(); ...
4. **发布/订阅(Pub/Sub)模式**:提供了实现Redis的发布/订阅功能的API,可用于构建实时消息传递系统。 5. **事务支持**:支持Redis的事务操作,允许在单个命令中执行多个操作,确保数据一致性。 6. **连接池管理**...
在这一模式下,Redis服务器作为消息的中间人,允许多个客户端(订阅者)订阅特定的主题,当有发布者向该主题发布消息时,所有订阅了该主题的客户端都会收到消息。这种模式提供了一种轻量级的通信方式,适合实时通知...
5. **发布/订阅(Pub/Sub)**:实现了 Redis 的发布/订阅功能,使得客户端可以订阅感兴趣的消息主题,实现解耦的通信模式。 6. **事务处理**:支持 Redis 事务,允许在一个原子操作中执行多个命令,确保数据的一致性。...
使用`ISubscriber`接口,可以实现发布消息到一个频道,而所有订阅该频道的客户端会收到这些消息。消息顺序通常在单个Redis实例内是保持的,但在分布式环境中则不保证。 8. **命令如KEYS,SCAN,FLUSHDB**: 这些...
3. **发布/订阅(Pub/Sub)**:提供消息发布与订阅功能,实现事件驱动的通信模式,适合构建实时应用或广播消息。 4. **事务(Transactions)**:支持原子操作的事务处理,确保多个命令的执行顺序和一致性。 5. **...
7. 发布/订阅消息:利用`Subscribe`和`Publish`功能实现消息传递。 8. 事务处理:使用`BeginTransaction`、`Execute`和`Commit`进行多条命令的原子操作。 此外,ServiceStack.Redis还支持高级特性,如Lua脚本执行、...