理解队列和消息队列
队列(来自百度百科):是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
消息队列(来自百度百科):是在消息的传输过程中保存消息的容器。
从队列和消息队列的定义看来,看不出什么相似之处。但我理解它们的作用是相似的,只是使用环境不同。队列和消息队列 本质上都可以用于解决“生产者”和“消费者”问题,在二者这间建立桥梁,it中专业术语是对“生产者”和“消费者”进行解耦。可以动态的通过调整“生产者”和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理。
队列 一般指的是单个服务实例内部使用,比如,在java中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(无界)、PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue(延迟队列 无界)、PriorityBlockingQueue(优先 无界)、SynchronousQueue(没有容量的队列)。可以看到java的api已经很强大了,可以根据自己的业务需求选择使用。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义)。
另外上面提到的“有界”和“无界”,指的是队列的容量大小。有界 指的是创建队列时必须指定队列的容量;无界 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用“无界”队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了“消息队列”。
消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中。比如常见的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发。但他们本质作用跟上面讲的单实例环境中java“队列”没什么两样:在消息的传输过程中保存消息的容器。只是这里转换到“分布式”环境中而已。
可以看到这里这里提到的“传统”消息队列,都是一个很重型的集群。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架。比如:本次分享的主题 如何使用redis实现“消息队列”。
redis中的消息队列
redis中可以使用自带的publish和subscribe命令完成“消息推送”和“消息拉取”功能,实现消息队列。但这种方式有一个缺陷就是,消费者必须一致在线,否则会出现消费遗漏。
redis中的list(本质上是个双向链表)、zset(有序set)都可以用做“消息队列”的容器,稍加处理就可以实现一个高可用的“消息队列”。使用redis实现的“轻量化”“消息队列”有三大优势:
1、现在redis已经广泛运用于各大系统中,无需再次引入其他第三方框架和api。
2、并且redis是基于内存存储的,生产者和消费者的存取速度都非常快。
3、使用redis集群的的容量,可以通过添加实例进行扩展。
首先思考下做一个轻量化的“消息队列”,需要满足些什么基本要求:
1、消费顺序保持跟生产顺序一致。
2、对于广播消息,某个消费者实例重启后,能重新收到消息。
3、定时清理 所有消费者都已经消费过的数据,防止容量无限增长。
满足以上三点要求,就可以实现一个简单的“消息队列”了。
使用zset实现“消息队列”示例展示
这是一个真实的业务场景,为了实现java中的jar包热更新,在web页面中录入一个jar包的下载地址,某个server接受到请求后,首先把这个地址存入到mysql数据à然后向消息队列中发布一条消息à其他所有server都会监听这个“消息队列”(广播模式),接受队列中的消息à解析出消息队列中的jar包下载地址,下载jar包到自己服务器的某个目录下,通过自定义classLoader执行热更新操作。本示例中主要展示的是使用redis实现“消息队列”,jar包热更新部分会省略。整个实现过程都是采用了多个redis的zset实现,这里简单说下zset的数据结构:zset对应的key;zset中的每个成员;每个成员对应的分值(可以用于排序)。
消息队列zset:这个有点类似传统消息队列中的topic(主题),不同的消息类型可以放到不同的zset中。本示例中只有一种业务类型:jar包上传,只需定义一个zset即可:szet的key为upload_msg;每个成员为jar包链接;成员对应的分值为一个自增的id(可以通过redis的incr方法实现)。该zset结构示意图如下:
主题已消费记录zset:这个zset用于存储各个消费者(ip地址),已经消费的消息id。主要用途是用于清理“消息队列zset”中所有消费者都已经消费的消息。该zset数据结构:szet可key为 topic_upload;每个成员为消费者服务器ip;成员分值为消息id。该zset结构示意图如下:
消费者已消费记录zset:这“类”zset用于存储每个消费者(ip地址) 已消费的各个topic中的消息id。主要用途是,程序重启时,重新继续消费每个topic中剩余的消息。本示例中有两个消费者ip: 192.168.1.100、192.168.1.101,对应有两个szset,key分别为:server_192.168.1.100、server_192.168.1.100;成员为topic的key,这里只有1个topic对应的key为 upload_msg;成员分值为该ip已消费指定topic的消息id。这“类”zset的结构示意图如下:
各个数据结构设计完成后,下面开始来具体实现,这里采用的是java伪代码实现(没办法 就只擅长这个)。实现过程分三步:redis数据结构初始化、发送消息、接收消息,下面分别进行实现,首先定义redis中的key:
private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀 public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key
redis数据结构初始化:程序启动时(一般是生成者)首先检查 “题已消费记录zset”(或者消息队列zset)是否已经创建,如果没有则进行初始化,java伪代码如下:
//每个ip 启动时执行 public static void init(){ //获取本机ip String ipAddr = geIp(); if(ipAddr !=null){ Long ret = redis.zrank(TOPIC_ZSET, ipAddr); if(ret == null){//如果未创建,就开始初始化 String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号 Long score = 0l; if(max_id != null){ score = Long.valueOf(max_id); } redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题) } } }
发送消息:本过程比较简单,就是生产者服务器接受到jar包上传请求后,首先入库,然后向“消息队列”中发送一条消息,java伪代码实现如下:
public static void sendMsg(){ //省略入mysql库等业务方法 String upload_url = "xxxx"; Long now = System.currentTimeMillis(); //加时间搓,可以是实现重复上传同一个jar,也可以去掉 String msg = upload_url+"|"+now; //生成消息Id Long msg_id = redis.incr(MSG_SEQ_ID); System.out.println(msg_id); //发送消息 想消息队列中添加一条新消息 redis.zadd(MSG_ZSET,msg_id,msg); }
接收消息:该过程首先获取当前消费者ip已经执行消息id,到“消息队列”中获取该id之后的所有新消息进行业务处理;处理完成后更新“已消费记录”;最后清理所有消费者都消费过的消息。Java实现伪代码如下:
public static void receiveMsg(){ //获取本机ip String ipAddr = geIp(); if(ipAddr!=null){ while (true){ //获取当前已消费的msg_id Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET); System.out.println(score); //获取未读消息进行处理 Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf"); Double lastMsg_id = 0d; for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作 lastMsg_id = t.getScore(); System.out.println(t.getElement() + ":" + t.getScore()); } if(tuples2.size()>0){ //处理完成后,更新该服务器的已处理列表 redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr); redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET); //找出所以ip都消费过的消息id,其实就是zset的第一个成员 Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0); Double allReceive_id = 0d; if(first.iterator().hasNext()){ Tuple temp = first.iterator().next(); if(temp!=null){ allReceive_id = temp.getScore(); redis.zremrangeByScore(MSG_ZSET,0,allReceive_id); } } } try { Thread.sleep(1000);//每隔1秒钟消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } }
完整示例展示
完整代码实现如下,可以执行main方法进行测试:
public class RedisQueue { private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀 public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key public static void main(String[] args) { redis = new Jedis("192.168.26.128", 6379); init();//初始化,每个服务启动时 sendMsg();//模拟发送消息 receiveMsg();//消费消息 } //每个ip 启动时执行 public static void init(){ //获取本机ip String ipAddr = geIp(); if(ipAddr !=null){ Long ret = redis.zrank(TOPIC_ZSET, ipAddr); if(ret == null){//如果未创建,就开始初始化 String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号 Long score = 0l; if(max_id != null){ score = Long.valueOf(max_id); } redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题) } } } public static void sendMsg(){ //省略入mysql库等业务方法 String upload_url = "xxxx"; Long now = System.currentTimeMillis(); //加时间搓,可以是实现重复上传同一个jar,也可以去掉 String msg = upload_url+"|"+now; //生成消息Id Long msg_id = redis.incr(MSG_SEQ_ID); System.out.println(msg_id); //发送消息 想消息队列中添加一条新消息 redis.zadd(MSG_ZSET,msg_id,msg); } public static void receiveMsg(){ //获取本机ip String ipAddr = geIp(); if(ipAddr!=null){ while (true){ //获取当前已消费的msg_id Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET); System.out.println(score); //获取未读消息进行处理 Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf"); Double lastMsg_id = 0d; for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作 lastMsg_id = t.getScore(); System.out.println(t.getElement() + ":" + t.getScore()); } if(tuples2.size()>0){ //处理完成后,更新该服务器的已处理列表 redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr); redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET); //找出所以ip都消费过的消息id,其实就是zset的第一个成员 Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0); Double allReceive_id = 0d; if(first.iterator().hasNext()){ Tuple temp = first.iterator().next(); if(temp!=null){ allReceive_id = temp.getScore(); redis.zremrangeByScore(MSG_ZSET,0,allReceive_id); } } } try { Thread.sleep(1000);//每隔1秒钟消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } } private static String geIp(){ //获取本机ip String ipAddr = null; try { ipAddr = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return ipAddr; } }
一个轻量级的redis消息队列实现,上述代码基本都可以满足。如果生产者比较繁忙的话,又要保证消费顺序的前提下,在sendMsg()方法上需要使用redis分布式锁,来解决“线程安全”问题。关于redis实现的分布式锁 这里不细讲,后面有时间再单独总结(注意和前一章讲的jvm内部的类锁和对象锁区别开来,他们的关系有点类似于队列和消息队列的关系,也就是在单实例和分布式环境下的区别)。
出处:
http://moon-walker.iteye.com/blog/2401516
相关推荐
通过这些资源,你可以更好地理解和实践PHP-Redis队列的用法,从而提升你的PHP应用性能。 总结起来,PHP队列结合PHP-Redis扩展,提供了一种高效、灵活的后台任务处理方式,适用于各种高并发和异步需求。了解并掌握这...
07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-...
同时,Redis++也可以用于消息队列,用于异步处理任务。除此之外,Redis++也可以作为数据存储系统,用于存储结构化数据。 Redis++是一个功能强大且灵活的缓存系统,它可以满足各种应用场景的需求。通过这篇文章,...
二、基于Redis List实现消息队列 Redis的List数据结构可以模拟消息队列,通过LPUSH/RPOP、RPUSH/LPOP操作进行消息的入队和出队。为了处理无消息时的阻塞问题,可以使用BLPOP和BRPOP命令。List作为消息队列的优点是...
- **Redis**:是一个开源的、内存中的数据结构存储系统,可以用作数据库、缓存和消息队列。它的特点是数据结构丰富,包括字符串、列表、集合、哈希表和有序集合,支持原子操作,适合处理高并发场景。 - **数据库**...
5. **发布/订阅**: Redis的发布/订阅功能允许客户端订阅特定的频道,当有消息发布到该频道时,所有订阅者都会收到消息,常用于实现消息通知或者异步处理。 6. **Lua脚本**: Redis支持在服务端执行Lua脚本,可以进行...
- **发布/订阅**:支持消息订阅和发布功能,可用于实现简单的消息队列。 2. **Windows上的安装和配置**: - `redis.windows-service.conf`和`redis.windows.conf`是Redis的配置文件。前者用于以服务方式启动Redis...
2. **消息生产者**:生产者是向消息队列发送消息的组件,它通常使用Spring的`@Autowired`注解注入消息模板,并调用相关方法将消息发布到Redis队列。 3. **消息消费者**:消费者负责从消息队列中取出并处理消息。...
本篇文章将详细讲解如何在Qt应用程序中利用Redis来实现一个高效的消息队列,以实现点对点的生产者-消费者模式。 首先,我们需要了解Qt和Redis的基础知识。Qt是一个跨平台的C++图形用户界面库,它提供了丰富的API...
Redis,全称Remote Dictionary Server,是一款开源的、高性能的键值存储系统,广泛应用于缓存、消息队列、数据持久化等多种场景。它以其高效、轻量级的特性,在IT行业中备受青睐,尤其是在互联网领域。在Windows环境...
3. **Scrapy-Redis架构**: Scrapy-Redis通过将待爬取URLs和请求放入Redis队列,实现多个Scrapy爬虫实例并行工作,从而提高整体爬取效率。其主要组件包括:Request Queue(请求队列)、Scheduler(调度器)、Spider ...
Redis,全称Remote Dictionary Server,是一款开源的、高性能的键值对存储系统,常被用作数据缓存、消息队列以及数据库等角色。它的设计目标是速度和数据持久化,支持多种数据结构,如字符串、哈希表、列表、集合、...
要深入了解这个项目,你需要查看其源代码和文档,理解如何设置生产者和消费者,以及如何利用Redis的数据结构和命令来实现消息的发送与接收。 **应用场景** 消息队列在很多场景下都非常实用,如: - **任务调度**...
列表支持两端插入和弹出元素,适用于实现消息队列;集合是无序的唯一元素集合,而有序集合则按分数排序元素,可用于排行榜等场景。 在Windows上运行Redis,用户需要编辑`redis.windows.conf`配置文件,根据实际需求...
标题中的“Redis延时消息队列基于swoole实现的多进程消费端”是指使用Redis作为消息队列,结合Swoole的多进程特性来构建一个高效、可扩展的延迟消息处理系统。在这个系统中,Redis作为一个可靠的键值存储,用于暂存...
Redis是一款高性能的键值数据库,广泛应用于缓存、消息队列以及数据存储等领域。为了方便开发者管理和操作Redis服务器,出现了各种桌面管理工具,其中Another-Redis-Desktop-manager就是其中之一。这款工具提供了一...
5. **发布/订阅**:Redis支持发布订阅模式,允许客户端订阅特定的频道,当有消息发布到这些频道时,所有订阅者都会收到通知,这使得Redis可以用作简单的消息队列。 6. **Lua脚本**:Redis内置了对Lua脚本的支持,...
- **列表**:按照插入顺序存储元素,可以实现消息队列的功能。 - **集合**:不包含重复元素的集合,可用于去重。 - **有序集合**:与集合类似,但每个元素都有分数,可以通过分数进行排序。 6. **持久化机制**:...
Redis是一种开源的、基于键值对的数据存储系统,常用于数据缓存、消息队列以及数据库功能。在Windows环境下,Redis的安装和使用通常需要经过编译和配置过程。"redis-6.2.14-win-amd64"是专门为Windows AMD64架构编译...
**标题解析:** "mvc - redis队列dome" 暗示了这是一个关于使用Model-View-Controller(MVC)架构模式与Redis构建队列的示例项目。MVC是一种广泛应用于Web开发的设计模式,它将应用程序分为三个主要部分:模型、视图...