`
QING____
  • 浏览: 2250681 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Redis编程实践【pub/sub】

 
阅读更多

 

    Redis或许已经在很多企业开始推广并试水,本文也根据个人的实践,简单描述一下Redis在实际开发过程中的使用(部署与架构,稍后介绍),程序执行环境为java + jedis,关于spring下如何集成redis-api,稍后介绍吧。

 

前言:下载redis-2.6.2,安装好redis之后,请在redis.conf文件中,将如下3个配置属性开启(仅供测试使用):

    ##客户端链接的端口,也是server端侦听client链接的端口  
    ##每个client实例,都将和server在此端口上建立tcp长链接  
    port 6379  
    ## server端绑定的ip地址,如果一个物理机器有多个网络接口时,可以明确指定为某个网口的ip地址  
    bind 127.0.0.1  
    ##链接中io操作空闲时间,如果在指定时间内,没有IO操作,链接将会被关闭  
    ##此属性和TCP链接中的timeout选项一样,建议设置为0,很多时候,我们一个应用也只会有一个redis实例  
    ##不过,如果你使用连接池的话,你需要对此参数做额外的考虑。  
    timeout 0  

 

Pub/Sub: "发布/订阅",对于此功能,我们将会想到很多JMS实现,Redis提供此功能显的“多此一举”;不过这个功能在redis中,被设计的非常轻量级和简洁,它做到了消息的“发布”和“订阅”的基本能力,但是尚未提供JMS中关于消息的持久化/耐久性等各种企业级的特性。

    一个Redis client发布消息,其他多个redis client订阅消息,发布的消息“即发即失”,redis不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将无从获得。这就类似于JMS中“非持久”类型的消息。

    消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)

    消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。

    一旦subscribe端断开链接,将会失去部分消息,即链接失效期间的消息将会丢失。

    如果你非常关注每个消息,那么你应该考虑使用JMS或者基于Redis做一些额外的补充工作,如果你期望订阅是持久的,那么如下的设计思路可以借鉴(如下原理基于JMS):

    1) subscribe端首先向一个Set集合中增加“订阅者ID”,此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯一的订阅者,例如:sub:email,sub:web。此SET称为“活跃订阅者集合”

    2) subcribe端开启订阅操作,并基于Redis创建一个以“订阅者ID”为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的消息。此LIST称为“订阅者消息队列”

    3) publish端:每发布一条消息之后,publish端都需要遍历“活跃订阅者集合”,并依次向每个“订阅者消息队列”尾部追加此次发布的消息。

    4) 到此为止,我们可以基本保证,发布的每一条消息,都会持久保存在每个“订阅者消息队列”中。

    5) subscribe端,每收到一个订阅消息,在消费之后,必须删除自己的“订阅者消息队列”头部的一条记录。

    6) subscribe端启动时,如果发现自己的自己的“订阅者消息队列”有残存记录,那么将会首先消费这些记录,然后再去订阅。

 

--------------------------------------------------------------非持久化订阅-------------------------------------------------------

PrintListener.java:订阅者消息处理器

public class PrintListener extends JedisPubSub{

	@Override
	public void onMessage(String channel, String message) {
		String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
		System.out.println("message receive:" + message + ",channel:" + channel + "..." + time);
		//此处我们可以取消订阅
		if(message.equalsIgnoreCase("quit")){
			this.unsubscribe(channel);
		}
	}
...
}
 

PubClient.java:消息发布端

public class PubClient {

	private Jedis jedis;//
	public PubClient(String host,int port){
		jedis = new Jedis(host,port);
	}
	
	public void pub(String channel,String message){
		jedis.publish(channel, message);
	}
	
	public void close(String channel){
		jedis.publish(channel, "quit");
		jedis.del(channel);//
	}

}

 

SubClient.java:消息订阅端

public class SubClient {

	private Jedis jedis;//
	
	public SubClient(String host,int port){
		jedis = new Jedis(host,port);
	}
	
	public void sub(JedisPubSub listener,String channel){
		jedis.subscribe(listener, channel);
		//此处将会阻塞,在client代码级别为JedisPubSub在处理消息时,将会“独占”链接
		//并且采取了while循环的方式,侦听订阅的消息
		//
	}

}

 

PubSubTestMain.java:测试引导类

public class PubSubTestMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		PubClient pubClient = new PubClient(Constants.host, Constants.port);
		final String channel = "pubsub-channel";
		pubClient.pub(channel, "before1");
		pubClient.pub(channel, "before2");
		Thread.sleep(2000);
		//消息订阅着非常特殊,需要独占链接,因此我们需要为它创建新的链接;
		//此外,jedis客户端的实现也保证了“链接独占”的特性,sub方法将一直阻塞,
		//直到调用listener.unsubscribe方法
		Thread subThread = new Thread(new Runnable() {
			@Override
			public void run() {
				try{
					SubClient subClient = new SubClient(Constants.host, Constants.port);
					System.out.println("----------subscribe operation begin-------");
					JedisPubSub listener = new PrintListener();
					//在API级别,此处为轮询操作,直到unsubscribe调用,才会返回
					subClient.sub(listener, channel);
					System.out.println("----------subscribe operation end-------");
				}catch(Exception e){
					e.printStackTrace();
				}
				
			}
		});
		subThread.start();
		int i=0;
		while(i < 10){
			String message = RandomStringUtils.random(6, true, true);//apache-commons
			pubClient.pub(channel, message);
			i++;
			Thread.sleep(1000);
		}
		//被动关闭指示,如果通道中,消息发布者确定通道需要关闭,那么就发送一个“quit”
		//那么在listener.onMessage()中接收到“quit”时,其他订阅client将执行“unsubscribe”操作。
		pubClient.close(channel);
		//此外,你还可以这样取消订阅
		//listener.unsubscribe(channel);

	}

}

 

--------------------------------------------------------------持久化订阅-------------------------------------------------------

 基本思路:当订阅者订阅消息时,将此订阅者信息添加到一个列表中,此列表为“所有订阅者列表”,同时为每个订阅者都创建一个保存消息(内容或者消息ID)的队列,消息发布者将每条消息都添加到每个订阅者的队列中。

如下实现仅供参考,有很多更优的实现方式。

PPrintListener.java

public class PPrintListener extends JedisPubSub{

	private String clientId;
	private PSubHandler handler;
	
	public PPrintListener(String clientId,Jedis jedis){
		this.clientId = clientId;
		handler = new PSubHandler(jedis);
	}
	
	@Override
	public void onMessage(String channel, String message) {
		//此处我们可以取消订阅
		if(message.equalsIgnoreCase("quit")){
			this.unsubscribe(channel);
		}
		handler.handle(channel, message);//触发当前订阅者从自己的消息队列中移除消息
	}
	
	private void message(String channel,String message){
		String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
		System.out.println("message receive:" + message + ",channel:" + channel + "..." + time);
	}

	@Override
	public void onPMessage(String pattern, String channel, String message) {
		System.out.println("message receive:" + message + ",pattern channel:" + channel);
		
	}

	@Override
	public void onSubscribe(String channel, int subscribedChannels) {
		handler.subscribe(channel);
		System.out.println("subscribe:" + channel + ";total channels : " + subscribedChannels);
		
	}

	@Override
	public void onUnsubscribe(String channel, int subscribedChannels) {
		handler.unsubscribe(channel);
		System.out.println("unsubscribe:" + channel + ";total channels : " + subscribedChannels);
		
	}

	@Override
	public void onPUnsubscribe(String pattern, int subscribedChannels) {
		System.out.println("unsubscribe pattern:" + pattern + ";total channels : " + subscribedChannels);
		
	}

	@Override
	public void onPSubscribe(String pattern, int subscribedChannels) {
		System.out.println("subscribe pattern:" + pattern + ";total channels : " + subscribedChannels);		
	}
	
	@Override
	public void unsubscribe(String... channels) {
        super.unsubscribe(channels);
        for(String channel : channels){
        	handler.unsubscribe(channel);
        }
    }
	
	class PSubHandler {

		private Jedis jedis;
		PSubHandler(Jedis jedis){
			this.jedis = jedis;
		}
		public void handle(String channel,String message){
			int index = message.indexOf("/");
			if(index < 0){
				return;
			}
			Long txid = Long.valueOf(message.substring(0,index));
			String key = clientId + "/" + channel;
			while(true){
					String lm = jedis.lindex(key, 0);//获取第一个消息
					if(lm == null){
						break;
					}
					int li = lm.indexOf("/");
					//如果消息不合法,删除并处理
					if(li < 0){
						String result = jedis.lpop(key);//删除当前message
						//为空
						if(result == null){
							break;
						}
						message(channel, lm);
						continue;
					}
					Long lxid = Long.valueOf(lm.substring(0,li));//获取消息的txid
					//直接消费txid之前的残留消息
					if(txid >= lxid){
						jedis.lpop(key);//删除当前message
						message(channel, lm);
						continue;
					}else{
						break;
					}
			}
		}
		
		public void subscribe(String channel){
			String key = clientId + "/" + channel;
			boolean exist = jedis.sismember(Constants.SUBSCRIBE_CENTER,key);
			if(!exist){
				jedis.sadd(Constants.SUBSCRIBE_CENTER, key);
			}
		}
		
		public void unsubscribe(String channel){
			String key = clientId + "/" + channel;
			jedis.srem(Constants.SUBSCRIBE_CENTER, key);//从“活跃订阅者”集合中删除
			jedis.del(key);//删除“订阅者消息队列”
		}
	}
}

 

 

PPubClient.java

public class PPubClient {

	private Jedis jedis;//
	public PPubClient(String host,int port){
		jedis = new Jedis(host,port);
	}
	
	/**
	 * 发布的每条消息,都需要在“订阅者消息队列”中持久
	 * @param message
	 */
	private void put(String message){
		//期望这个集合不要太大
		Set<String> subClients = jedis.smembers(Constants.SUBSCRIBE_CENTER);
		for(String clientKey : subClients){
			jedis.rpush(clientKey, message);
		}
	}
	
	public void pub(String channel,String message){
		//每个消息,都有具有一个全局唯一的id
		//txid为了防止订阅端在数据处理时“乱序”,这就要求订阅者需要解析message
		Long txid = jedis.incr(Constants.MESSAGE_TXID);
		String content = txid + "/" + message;
		//非事务
		this.put(content);
		jedis.publish(channel, content);//为每个消息设定id,最终消息格式1000/messageContent
		
	}
	
	public void close(String channel){
		jedis.publish(channel, "quit");
		jedis.del(channel);//删除
	}
	
	public void test(){
		jedis.set("pub-block", "15");
		String tmp = jedis.get("pub-block");
		System.out.println("TEST:" + tmp);
	}


}

 

PPSubClient.java

public class PSubClient {

	private Jedis jedis;//
	private JedisPubSub listener;//单listener
	
	public PSubClient(String host,int port,String clientId){
		jedis = new Jedis(host,port);
		listener = new PPrintListener(clientId, new Jedis(host, port));
	}
	
	public void sub(String channel){
		jedis.subscribe(listener, channel);
	}
	
	public void unsubscribe(String channel){
		listener.unsubscribe(channel);
	}
	
}

 

PPubSubTestMain.java

 

public class PPubSubTestMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		PPubClient pubClient = new PPubClient(Constants.host, Constants.port);
		final String channel = "pubsub-channel-p";
		final PSubClient subClient = new PSubClient(Constants.host, Constants.port,"subClient-1");
		Thread subThread = new Thread(new Runnable() {
			
			@Override
			public void run() {
				System.out.println("----------subscribe operation begin-------");
				//在API级别,此处为轮询操作,直到unsubscribe调用,才会返回
				subClient.sub(channel);
				System.out.println("----------subscribe operation end-------");
				
			}
		});
		subThread.setDaemon(true);
		subThread.start();
		int i = 0;
		while(i < 2){
			String message = RandomStringUtils.random(6, true, true);//apache-commons
			pubClient.pub(channel, message);
			i++;
			Thread.sleep(1000);
		}
		subClient.unsubscribe(channel);
	}
	
}

 

分享到:
评论
5 楼 QING____ 2018-12-10  
xiatiandebaofengyu 写道
有个问题:发布者在发布消息时,是往所有的订阅者消息队列上存放该消息的(类似群发),但按道理是应该往指定channel的订阅者存放消息才对吧,不然订阅者就会收到所有发布者的消息,而不是自己订阅的channel中的消息了。是否可以改为如下:
 
    private void put(String message,String channel){ 
        //期望这个集合不要太大 
        Set<String> subClients = jedis.smembers(Constants.SUBSCRIBE_CENTER); 
        for(String clientKey : subClients){ 
            //只存进需要消费的队列中,而不是所有
            if(clientKey.contains(channel)){
              jedis.rpush(clientKey, message); 
           }
           
        } 
    }

本blog的部分知识点已经比较陈旧,建议你根据最新官方信息结合源码进行梳理,谢谢!
4 楼 xiatiandebaofengyu 2018-12-02  
有个问题:发布者在发布消息时,是往所有的订阅者消息队列上存放该消息的(类似群发),但按道理是应该往指定channel的订阅者存放消息才对吧,不然订阅者就会收到所有发布者的消息,而不是自己订阅的channel中的消息了。是否可以改为如下:
 
    private void put(String message,String channel){ 
        //期望这个集合不要太大 
        Set<String> subClients = jedis.smembers(Constants.SUBSCRIBE_CENTER); 
        for(String clientKey : subClients){ 
            //只存进需要消费的队列中,而不是所有
            if(clientKey.contains(channel)){
              jedis.rpush(clientKey, message); 
           }
           
        } 
    }
3 楼 it_eye_imjamin 2018-04-14  
谢谢!有用!
2 楼 文艺吧网 2017-05-22  
hbb239 写道
可以使用,不过有个小Bug。

你这种人,我只能呵呵!
1 楼 hbb239 2016-07-07  
可以使用,不过有个小Bug。

相关推荐

    whiteboard-redis-pub-sub-JonathanElluin:GitHub Classroom创建的whiteboard-redis-pub-sub-JonathanElluin

    标题中的“whiteboard-redis-pub-sub-JonathanElluin”是一个项目名称,它与GitHub Classroom有关,表明这是一个教育环境下的编程练习。这个项目聚焦于使用Redis实现发布/订阅(Pub/Sub)功能,由Jonathan Elluin...

    redis技术实践

    Redis支持发布/订阅(Pub/Sub)模式,允许程序之间通过频道(Channel)进行消息传递。这一特性可以实现消息的实时推送、广播等功能,常用于构建实时应用程序。 - **PUBLISH**: 将信息(message)发送到指定的频道(channel...

    redis资料.zip

    此外,Redis还支持发布/订阅(Pub/Sub)模式,实现简单的消息传递。 接着,Redis的集群功能是为了解决单实例的性能和容量瓶颈。Redis Cluster是官方提供的分布式解决方案,通过分片(Sharding)技术将数据分散到多...

    c#使用Redis缓存

    除了基础操作,Redis还支持发布/订阅(Pub/Sub)模式,可用于实现消息广播和事件驱动。在C#中,可以通过`ISubscriber`接口订阅和发布消息: ```csharp var subscriber = connection.GetSubscriber(); subscriber....

    Redis使用+redis工具

    8. **发布/订阅(Pub/Sub)**:Redis的发布/订阅模式允许发送者(Publisher)向订阅者(Subscriber)广播消息,常用于实现异步通信。 9. **Lua脚本**:Redis支持在服务器端执行Lua脚本,可以利用Lua的计算能力实现...

    redis2-nginx-module-0.15

    - **发布/订阅**:支持 Redis 的 Pub/Sub(发布/订阅)模式,用于实现消息传递和实时通信。 - **事务处理**:能够执行 Redis 的事务,确保多个操作的原子性。 - **管道**:使用 Redis 的 Pipeline 技术,批量发送...

    redis学习笔记。

    此外,Redis 还提供了发布/订阅(pub/sub)功能,支持简单的主从复制和分片,以及丰富的客户端库,使其在各种编程语言中都能方便地使用。 #### 1.2 应用场景 - **缓存**:Redis 可以作为高速缓存,减少数据库的...

    redis 快速学习demo

    以上是Redis与Java编程的基本知识,通过学习和实践这些内容,你可以快速上手Redis并应用于实际项目中。在压缩包中的"redis"文件可能包含了更多具体的操作示例,建议结合这些示例深入理解和掌握Redis的用法。

    windows下c#操作redis类

    除了基本的数据操作,StackExchange.Redis还支持发布/订阅(Pub/Sub)模式,事务(Transactions),以及脚本执行(Lua scripts)等高级功能。例如,你可以创建一个发布者和订阅者来实现消息传递: ```csharp // 创建...

    ServiceStack.Redis 源码

    总的来说,ServiceStack.Redis 是一个强大且功能丰富的 Redis 客户端,通过深入理解其源代码,开发者不仅可以提升对 Redis 协议和操作的理解,还能学习到优秀的 C# 编程实践和设计模式,对于提升个人技能和解决实际...

    C#+Redis供初学者学习使用

    此外,Redis还支持发布/订阅(pub/sub)模式,可以作为消息中间件。在C#中,你可以创建发布者和订阅者对象来发送和接收消息,这对于实现分布式系统的异步通信非常有用。 标签中的“分布式框架”可能指的是利用Redis...

    Redis.zip C# WINDOWS

    - 发布订阅:利用 Redis 的 pub/sub 功能实现消息传递。 总的来说,这个 `Redis.zip` 包含了一个 C# 应用程序使用 Redis 的基础示例,通过学习和理解其中的代码,你可以快速掌握如何在 .NET 环境下利用 Redis 的...

    java中运用redis的demo

    2. **消息订阅与发布(Pub/Sub)**: Redis支持发布/订阅模式,用于实现轻量级的消息传递。首先创建一个`JedisPubSub`子类并重写`onMessage`方法,然后使用`subscribe`方法订阅频道。 ```java class ...

    StackExchange.Redis-1.2.6

    - **发布/订阅功能**:实现Redis的pub/sub(发布/订阅)机制,支持消息的实时传递,可用于构建事件驱动的应用。 3. **Redis数据类型**: - **字符串**:基本的键值对存储,可以存储字符串、数字等。 - **哈希**...

    Spring+Struts2+hibernate+Redis整合

    - 使用Redis的发布/订阅(Pub/Sub)功能进行消息通信,例如通知缓存更新。 - 利用Redis的事务和Lua脚本功能,确保多步操作的原子性。 - 结合Spring AOP进行更细粒度的缓存控制,如按用户、按条件动态缓存。 - 注意...

    RedisDemo_redis_

    在.NET开发环境中,C#是常用的编程语言,而利用C#与Redis交互可以极大地提升应用程序的性能。本示例"RedisDemo_redis_"将带你入门如何在C#中使用Redis进行数据的存储和读取。 首先,我们需要安装`StackExchange....

    C# RedisDemo Redisdll 全部DLL

    3. 高级功能:可能包含发布/订阅(Pub/Sub)模式,用于实现简单的消息传递;事务(Transactions),确保多条命令的原子性执行;或者 Lua 脚本,允许在服务器端执行自定义逻辑。 4. 连接池管理:Redis连接池能有效地...

    redis design and implementation

    虽然Redis不支持ACID(原子性、一致性、隔离性、持久性)中的隔离性,但通过正确的编程实践,仍可以实现高一致性的应用。 5. **发布订阅**:Redis的发布/订阅(Pub/Sub)模式允许客户端订阅特定的频道,当有消息...

    windows 64位 redis 3.2.1

    7. **发布/订阅(Pub/Sub)**:Redis的发布/订阅功能提供了一种异步通信模式。发布者将消息发送到频道,订阅者可以监听并接收这些消息,无需直接交互。 8. **事务**:Redis支持简单的事务,用户可以打包多个命令,...

Global site tag (gtag.js) - Google Analytics