- 浏览: 25438 次
- 性别:
- 来自: 深圳
文章分类
最新评论
想找到一个消息推送的方案,隐约觉得Pub/Sub是一种解决问题的途径,但没在项目实践中用到。最新在了解学习阿里云,里面有demo。摘录记之。
消息的发布与订阅
场景介绍
ApsaraDB for Redis也提供了与Redis相同的消息发布(pub)与订阅(sub)功能。即一个client发布消息,其他多个client订阅消息。
需要注意的是,ApsaraDB for Redis发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外消息发布者(即publish客户端),无需独占与服务器端的连接,你可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如List操作等)。但是消息订阅者(即subscribe客户端),需要独占与服务器端的连接,即进行subscribe期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。
消息发布者 (即publish client)
消息订阅者 (即subscribe client)
消息监听者
示例主程序
运行结果
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)
----------订阅消息SUBSCRIBE 开始-------
>>> 订阅(SUBSCRIBE) > Channel:KVStore频道-A
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> 取消订阅(UNSUBSCRIBE) > Channel:KVStore频道-A
----------订阅消息SUBSCRIBE 结束-------
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息2:(此时订阅取消,所以此消息不会被接收)
>>> 发布(PUBLISH)结束 > Channel:KVStore频道-A > Message:quit
消息的发布与订阅
场景介绍
ApsaraDB for Redis也提供了与Redis相同的消息发布(pub)与订阅(sub)功能。即一个client发布消息,其他多个client订阅消息。
需要注意的是,ApsaraDB for Redis发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。
此外消息发布者(即publish客户端),无需独占与服务器端的连接,你可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如List操作等)。但是消息订阅者(即subscribe客户端),需要独占与服务器端的连接,即进行subscribe期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。
消息发布者 (即publish client)
package message.kvstore.aliyun.com; import redis.clients.jedis.Jedis; public class KVStorePubClient { private Jedis jedis;// public KVStorePubClient(String host,int port, String password){ jedis = new Jedis(host,port); //KVStore的实例ID及密码 String authString = jedis.auth(password);//kvstore_instance_id:password if (!authString.equals("OK")) { System.err.println("AUTH Failed: " + authString); return; } } public void pub(String channel,String message){ System.out.println(" >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message); jedis.publish(channel, message); } public void close(String channel){ System.out.println(" >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit"); //消息发布者结束发送,即发送一个“quit”消息; jedis.publish(channel, "quit"); } }
消息订阅者 (即subscribe client)
package message.kvstore.aliyun.com; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; public class KVStoreSubClient extends Thread{ private Jedis jedis; private String channel; private JedisPubSub listener; public KVStoreSubClient(String host,int port, String password){ jedis = new Jedis(host,port); //ApsaraDB for Redis的实例ID及密码 String authString = jedis.auth(password);//kvstore_instance_id:password if (!authString.equals("OK")) { System.err.println("AUTH Failed: " + authString); return; } } public void setChannelAndListener(JedisPubSub listener,String channel){ this.listener=listener; this.channel=channel; } private void subscribe(){ if(listener==null || channel==null){ System.err.println("Error:SubClient> listener or channel is null"); } System.out.println(" >>> 订阅(SUBSCRIBE) > Channel:"+channel); System.out.println(); //接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅 jedis.subscribe(listener, channel); } public void unsubscribe(String channel){ System.out.println(" >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel); System.out.println(); listener.unsubscribe(channel); } @Override public void run() { try{ System.out.println(); System.out.println("----------订阅消息SUBSCRIBE 开始-------"); subscribe(); System.out.println("----------订阅消息SUBSCRIBE 结束-------"); System.out.println(); }catch(Exception e){ e.printStackTrace(); } } }
消息监听者
package message.kvstore.aliyun.com; import redis.clients.jedis.JedisPubSub; public class KVStoreMessageListener extends JedisPubSub{ @Override public void onMessage(String channel, String message) { System.out.println(" <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message ); System.out.println(); //当接收到的message为quit时,取消订阅(被动方式) if(message.equalsIgnoreCase("quit")){ this.unsubscribe(channel); } } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub } @Override public void onSubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onUnsubscribe(String channel, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String pattern, int subscribedChannels) { // TODO Auto-generated method stub } }
示例主程序
package message.kvstore.aliyun.com; import java.util.UUID; import redis.clients.jedis.JedisPubSub; public class KVStorePubSubTest { //ApsaraDB for Redis的连接信息,从控制台可以获得 static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com"; static final int port = 6379; static final String password="xxxxxxxxxx:yyyyyyyy";//kvstore_instance_id:password public static void main(String[] args) throws Exception{ KVStorePubClient pubClient = new KVStorePubClient(host, port,password); final String channel = "KVStore频道-A"; //消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收 pubClient.pub(channel, "Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)"); //消息接收者 KVStoreSubClient subClient = new KVStoreSubClient(host, port,password); JedisPubSub listener = new KVStoreMessageListener(); subClient.setChannelAndListener(listener, channel); //消息接收者开始订阅 subClient.start(); //消息发送者继续发消息 for (int i = 0; i < 5; i++) { String message=UUID.randomUUID().toString(); pubClient.pub(channel, message); Thread.sleep(1000); } //消息接收者主动取消订阅 subClient.unsubscribe(channel); Thread.sleep(1000); pubClient.pub(channel, "Aliyun消息2:(此时订阅取消,所以此消息不会被接收)"); //消息发布者结束发送,即发送一个“quit”消息; //此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。 pubClient.close(channel); } }
运行结果
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)
----------订阅消息SUBSCRIBE 开始-------
>>> 订阅(SUBSCRIBE) > Channel:KVStore频道-A
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:ed5924a9-016b-469b-8203-7db63d06f812
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< 订阅(SUBSCRIBE)< Channel:KVStore频道-A >接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> 取消订阅(UNSUBSCRIBE) > Channel:KVStore频道-A
----------订阅消息SUBSCRIBE 结束-------
>>> 发布(PUBLISH) > Channel:KVStore频道-A > 发送出的Message:Aliyun消息2:(此时订阅取消,所以此消息不会被接收)
>>> 发布(PUBLISH)结束 > Channel:KVStore频道-A > Message:quit
发表评论
-
Canal相关理解
2017-12-29 16:18 459转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 7191.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 859设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+kafka+Spark Steaming demo2
2017-11-22 13:15 459一,flume配置 # Name the components ... -
Flume+Kafka+Spark Steaming demo
2017-11-21 15:21 441一.准备flume配置 a1.sources = r1 a1. ... -
HBase表导出成HDFS
2017-10-19 19:40 898导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 361一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 905一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3601. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 1023为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 472package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 466#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 4151.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1352一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 351192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 391物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
Storm demo
2016-12-19 15:50 439public class SentenceSpout exte ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 1017将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1654当在windows下运行MR程序时,会报各种错误。现把这次碰到 ...
相关推荐
4. **订阅消息** 创建一个线程安全的`JedisPubSub`子类,重写`onMessage`方法来处理接收到的消息: ```java class MySubscriber extends JedisPubSub { @Override public void onMessage(String channel, ...
综上所述,C# Redis发布订阅Demo涵盖了Redis Pub/Sub机制的使用以及Key过期通知的处理。通过这个Demo,开发者可以学习到如何在C#应用中利用Redis进行实时数据通信和监控,这对于构建分布式系统和实时应用非常有价值...
本项目提供了一个基于C#的Redis发布与订阅(Publish/Subscribe)系统的源码,帮助开发者了解如何在Windows Forms(Winform)应用中实现这一功能。 首先,我们需要了解Redis的发布/订阅模式。在这个模式下,发送者...
在这个“redis发布订阅小案例”中,我们将探讨如何使用Java来实现Redis的发布订阅功能。首先,我们需要引入Jedis库,这是Java操作Redis的常用客户端库。确保在项目中已经添加了Jedis的依赖,例如通过Maven或Gradle。...
发布/订阅是Redis的一个重要特性,它允许应用程序以广播消息的方式进行通信,订阅者可以监听特定频道并接收发送到该频道的消息。以下是一个简单的.NET Core 3.0应用,展示了如何使用StackExchange.Redis创建发布者和...
标题中的“redis绑定webSocket发布订阅连接推送”指的是在Web应用中使用Redis作为消息中间件,结合WebSocket技术来实现实时的数据推送。Redis是一个高性能的键值存储系统,支持多种数据结构,如字符串、哈希、列表、...
在众多功能中,Redis的消息订阅发布(Pub/Sub)机制是其重要的特性之一,常用于实现轻量级的消息队列和实时通信。 **Redis消息订阅发布机制** 1. **基本概念** - **Publisher(发布者)**:负责发送消息的客户端...
标题"Redis发布订阅.net实现"所涉及的关键知识点包括: 1. Redis的发布订阅机制: 发布订阅模式在Redis中由`PUBLISH`和`SUBSCRIBE`命令支持。发布者通过`PUBLISH`命令将消息发送到一个特定的频道,而订阅者则通过`...
**Redis发布订阅模式详解** Redis 是一款高性能的键值存储系统,它支持多种数据结构,如字符串、哈希、列表、集合和有序集合等。在实际应用中,Redis 不仅可以作为缓存服务,还提供了丰富的消息通信机制,其中之一...
这个项目可能包含了启动 Redis 客户端,订阅频道,以及处理收到的订阅消息的示例。 2. **SMSForm** 和 **OrderForm**:这两个文件名暗示了可能有两个不同的应用场景,比如 SMSForm 可能是处理短信通知的界面或服务...
基于muduo网络库的集群聊天服务器和客户端源码,使用nginx tcp负载均衡,mysql数据库,redis发布-订阅数据库,redis发布-订阅 基于muduo网络库的集群聊天服务器和客户端源码,使用nginx tcp负载均衡,mysql数据库,...
在这个模式下,Redis服务器作为一个消息中间件,允许客户端订阅特定的频道,并在有新消息发布到这些频道时接收到通知。这种模式常用于构建实时通信系统,如聊天应用或实时通知服务。 接下来,我们将介绍如何在Qt...
redis订阅机制,一方面推送消息,另一方面同时接收消息。
通过这种方式,SpringBoot结合Redis的发布订阅功能,可以在分布式系统中有效地传递消息,提高系统的响应速度和扩展性。同时,由于解耦了发布者和订阅者,系统更加灵活,易于维护和扩展。 总的来说,结合SpringBoot...
在 Redis 中,发布/订阅(pub/sub)是一种消息通信模式,允许生产者(发布者)向多个消费者(订阅者)广播消息,而无需直接知道对方的存在。这种模式对于实现实时通知、聊天系统或者构建事件驱动的应用非常有用。 *...
本案例包含redis的发布订阅功能,以及dotnet core+SignalR实现的简单即时通信,并提供文档笔记。本案例初衷是想结合redis的发布订阅功能+websocket实现消息客户端页面订阅指定的消息,并在客户端页面进行显示;
在探讨.NET Core如何使用Redis进行发布订阅之前,首先需要了解Redis发布订阅的基本概念。Redis的发布订阅(pub/sub)是一种消息通信模式,它包括发布者(publisher)和订阅者(subscriber)。发布者负责发布消息,而...
本文将深入探讨如何使用Java实现Redis的消息订阅(Subscribe)和发布(Publish)功能,这对于构建实时通信系统或者分布式事件驱动架构至关重要。 首先,我们需要了解Redis的Pub/Sub(发布/订阅)模式。在该模式下,...
本项目"redis订阅发布实现websocket集群.zip"利用SpringBoot框架,结合Redis的发布/订阅功能,来实现实时的消息传递,从而达到WebSocket集群的通信目的。 首先,让我们深入了解WebSocket和Redis各自的作用: **...
基于Java环境下的Redis发布订阅设计与实现 本文研究了基于Java环境下的Redis发布订阅设计与实现,以满足实时聊天系统、微博以及分布式架构等应用对数据通信中间件的需求。Redis的发布订阅功能可以实现实时通信、...