`

理解队列、消息队列--用redis实现消息队列

阅读更多

理解队列和消息队列

 

队列(来自百度百科):是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

 

消息队列(来自百度百科):是在消息的传输过程中保存消息的容器。

 

从队列和消息队列的定义看来,看不出什么相似之处。但我理解它们的作用是相似的,只是使用环境不同。队列和消息队列 本质上都可以用于解决生产者和“消费者”问题,在二者这间建立桥梁,it中专业术语是对生产者和“消费者”进行解耦。可以动态的通过调整生产者和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理。

 

队列 一般指的是单个服务实例内部使用,比如,在java中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(无界)PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)LinkedBlockingQueue(无界)DelayQueue(延迟队列 无界)PriorityBlockingQueue(优先 无界)SynchronousQueue(没有容量的队列)。可以看到javaapi已经很强大了,可以根据自己的业务需求选择使用。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义)。



 

 

另外上面提到的有界无界,指的是队列的容量大小。有界 指的是创建队列时必须指定队列的容量;无界 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用无界队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了消息队列

 

消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中。比如常见的:RabbitMQkafkarocketMQActiveMQzeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发。但他们本质作用跟上面讲的单实例环境中java“队列没什么两样:在消息的传输过程中保存消息的容器。只是这里转换到分布式环境中而已。



 

 

可以看到这里这里提到的传统消息队列,都是一个很重型的集群。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架。比如:本次分享的主题 如何使用redis实现“消息队列”。

 

redis中的消息队列

 

redis中可以使用自带的publishsubscribe命令完成消息推送消息拉取功能,实现消息队列。但这种方式有一个缺陷就是,消费者必须一致在线,否则会出现消费遗漏。

 

redis中的list(本质上是个双向链表)、zset(有序set)都可以用做消息队列的容器,稍加处理就可以实现一个高可用的消息队列。使用redis实现的“轻量化”消息队列有三大优势:

1、现在redis已经广泛运用于各大系统中,无需再次引入其他第三方框架和api

2、并且redis是基于内存存储的,生产者和消费者的存取速度都非常快。

3、使用redis集群的的容量,可以通过添加实例进行扩展。

 

首先思考下做一个轻量化的消息队列,需要满足些什么基本要求:

1、消费顺序保持跟生产顺序一致。

2、对于广播消息,某个消费者实例重启后,能重新收到消息。

3、定时清理 所有消费者都已经消费过的数据,防止容量无限增长。

 

满足以上三点要求,就可以实现一个简单的消息队列了。

 

使用zset实现“消息队列”示例展示

 

这是一个真实的业务场景,为了实现java中的jar包热更新,在web页面中录入一个jar包的下载地址,某个server接受到请求后,首先把这个地址存入到mysql数据à然后向消息队列中发布一条消息à其他所有server都会监听这个消息队列(广播模式),接受队列中的消息à解析出消息队列中的jar包下载地址,下载jar包到自己服务器的某个目录下,通过自定义classLoader执行热更新操作。本示例中主要展示的是使用redis实现消息队列jar包热更新部分会省略。整个实现过程都是采用了多个rediszset实现,这里简单说下zset的数据结构:zset对应的keyzset中的每个成员;每个成员对应的分值(可以用于排序)。

 

消息队列zset:这个有点类似传统消息队列中的topic(主题),不同的消息类型可以放到不同的zset中。本示例中只有一种业务类型:jar包上传,只需定义一个zset即可:szetkeyupload_msg;每个成员为jar包链接;成员对应的分值为一个自增的id(可以通过redisincr方法实现)。该zset结构示意图如下:



 

 

主题已消费记录zset:这个zset用于存储各个消费者(ip地址),已经消费的消息id。主要用途是用于清理“消息队列zset”中所有消费者都已经消费的消息。该zset数据结构:szetkey topic_upload;每个成员为消费者服务器ip;成员分值为消息id。该zset结构示意图如下:



 

 

消费者已消费记录zset:这“类”zset用于存储每个消费者(ip地址) 已消费的各个topic中的消息id。主要用途是,程序重启时,重新继续消费每个topic中剩余的消息。本示例中有两个消费者ip: 192.168.1.100192.168.1.101,对应有两个szset,key分别为:server_192.168.1.100server_192.168.1.100;成员为topickey,这里只有1topic对应的key upload_msg;成员分值为该ip已消费指定topic的消息id。这“类”zset的结构示意图如下:




各个数据结构设计完成后,下面开始来具体实现,这里采用的是java伪代码实现(没办法 就只擅长这个)。实现过程分三步:redis数据结构初始化、发送消息、接收消息,下面分别进行实现,首先定义redis中的key

    private static Jedis redis;
 
    public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key
    public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key
    public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀
    public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key

redis数据结构初始化:程序启动时(一般是生成者)首先检查 “题已消费记录zset(或者消息队列zset)是否已经创建,如果没有则进行初始化,java伪代码如下:

//每个ip 启动时执行
    public static void init(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr !=null){
            Long ret = redis.zrank(TOPIC_ZSET, ipAddr);
            if(ret == null){//如果未创建,就开始初始化
                String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号
                Long score = 0l;
                if(max_id != null){
                    score = Long.valueOf(max_id);
                }
                redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset
                redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题)
            }
        }
}
 

 

发送消息:本过程比较简单,就是生产者服务器接受到jar包上传请求后,首先入库,然后向消息队列中发送一条消息,java伪代码实现如下:

 

    public static void sendMsg(){
        //省略入mysql库等业务方法
        String upload_url = "xxxx";
        Long now = System.currentTimeMillis();
        //加时间搓,可以是实现重复上传同一个jar,也可以去掉
        String msg = upload_url+"|"+now;
 
        //生成消息Id
        Long msg_id = redis.incr(MSG_SEQ_ID);
        System.out.println(msg_id);
 
        //发送消息 想消息队列中添加一条新消息
        redis.zadd(MSG_ZSET,msg_id,msg);
}
 

 

接收消息:该过程首先获取当前消费者ip已经执行消息id,到消息队列中获取该id之后的所有新消息进行业务处理;处理完成后更新“已消费记录”;最后清理所有消费者都消费过的消息。Java实现伪代码如下:

 
public static void receiveMsg(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr!=null){
            while (true){
                //获取当前已消费的msg_id
                Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET);
                System.out.println(score);
 
                //获取未读消息进行处理
                Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf");
                Double lastMsg_id = 0d;
                for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作
                    lastMsg_id = t.getScore();
                    System.out.println(t.getElement() + ":" + t.getScore());
                }
 
                if(tuples2.size()>0){
                    //处理完成后,更新该服务器的已处理列表
                    redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr);
                    redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET);
 
                    //找出所以ip都消费过的消息id,其实就是zset的第一个成员
                    Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0);
                    Double allReceive_id = 0d;
                    if(first.iterator().hasNext()){
                        Tuple temp = first.iterator().next();
                        if(temp!=null){
                            allReceive_id = temp.getScore();
                            redis.zremrangeByScore(MSG_ZSET,0,allReceive_id);
                        }
                    }
                }
 
 
 
                try {
                    Thread.sleep(1000);//每隔1秒钟消费一次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
 
}

完整示例展示

 

完整代码实现如下,可以执行main方法进行测试:

public class RedisQueue {
    private static Jedis redis;
 
    public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key
    public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key
    public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀
    public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key
 
    public static void main(String[] args) {
        redis = new Jedis("192.168.26.128", 6379);
        init();//初始化,每个服务启动时
        sendMsg();//模拟发送消息
        receiveMsg();//消费消息
 
    }
 
    //每个ip 启动时执行
    public static void init(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr !=null){
            Long ret = redis.zrank(TOPIC_ZSET, ipAddr);
            if(ret == null){//如果未创建,就开始初始化
                String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号
                Long score = 0l;
                if(max_id != null){
                    score = Long.valueOf(max_id);
                }
                redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset
                redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题)
            }
        }
    }
 
    public static void sendMsg(){
        //省略入mysql库等业务方法
        String upload_url = "xxxx";
        Long now = System.currentTimeMillis();
        //加时间搓,可以是实现重复上传同一个jar,也可以去掉
        String msg = upload_url+"|"+now;
 
        //生成消息Id
        Long msg_id = redis.incr(MSG_SEQ_ID);
        System.out.println(msg_id);
 
        //发送消息 想消息队列中添加一条新消息
        redis.zadd(MSG_ZSET,msg_id,msg);
    }
 
    public static void receiveMsg(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr!=null){
            while (true){
                //获取当前已消费的msg_id
                Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET);
                System.out.println(score);
 
                //获取未读消息进行处理
                Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf");
                Double lastMsg_id = 0d;
                for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作
                    lastMsg_id = t.getScore();
                    System.out.println(t.getElement() + ":" + t.getScore());
                }
 
                if(tuples2.size()>0){
                    //处理完成后,更新该服务器的已处理列表
                    redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr);
                    redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET);
 
                    //找出所以ip都消费过的消息id,其实就是zset的第一个成员
                    Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0);
                    Double allReceive_id = 0d;
                    if(first.iterator().hasNext()){
                        Tuple temp = first.iterator().next();
                        if(temp!=null){
                            allReceive_id = temp.getScore();
                            redis.zremrangeByScore(MSG_ZSET,0,allReceive_id);
                        }
                    }
                }
 
                try {
                    Thread.sleep(1000);//每隔1秒钟消费一次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
 
    }
 
    private static String geIp(){
        //获取本机ip
        String ipAddr = null;
        try {
            ipAddr = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return ipAddr;
    }
}
 

 

 

一个轻量级的redis消息队列实现,上述代码基本都可以满足。如果生产者比较繁忙的话,又要保证消费顺序的前提下,在sendMsg()方法上需要使用redis分布式锁,来解决线程安全问题。关于redis实现的分布式锁 这里不细讲,后面有时间再单独总结(注意和前一章讲的jvm内部的类锁和对象锁区别开来,他们的关系有点类似于队列和消息队列的关系,也就是在单实例和分布式环境下的区别)

 

出处:

http://moon-walker.iteye.com/blog/2401516

  • 大小: 15.7 KB
  • 大小: 30.7 KB
  • 大小: 13.6 KB
  • 大小: 13.7 KB
  • 大小: 11.7 KB
  • 大小: 11.5 KB
0
1
分享到:
评论

相关推荐

    php队列+php-redis队列+php-redis扩展

    通过这些资源,你可以更好地理解和实践PHP-Redis队列的用法,从而提升你的PHP应用性能。 总结起来,PHP队列结合PHP-Redis扩展,提供了一种高效、灵活的后台任务处理方式,适用于各种高并发和异步需求。了解并掌握这...

    07-Redis队列Stream、Redis多线程详解-ev.rar

    07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-...

    redis++使用说明,windows下编译redis-plus-plus

    同时,Redis++也可以用于消息队列,用于异步处理任务。除此之外,Redis++也可以作为数据存储系统,用于存储结构化数据。 Redis++是一个功能强大且灵活的缓存系统,它可以满足各种应用场景的需求。通过这篇文章,...

    微服务SpringBoot整合Redis基于Redis的Stream消息队列实现异步秒杀下单

    二、基于Redis List实现消息队列 Redis的List数据结构可以模拟消息队列,通过LPUSH/RPOP、RPUSH/LPOP操作进行消息的入队和出队。为了处理无消息时的阻塞问题,可以使用BLPOP和BRPOP命令。List作为消息队列的优点是...

    Redis稳定版 Redis-x64-5.0.14.1.zip

    5. **发布/订阅**: Redis的发布/订阅功能允许客户端订阅特定的频道,当有消息发布到该频道时,所有订阅者都会收到消息,常用于实现消息通知或者异步处理。 6. **Lua脚本**: Redis支持在服务端执行Lua脚本,可以进行...

    Redis-x64-5.0.14.1

    - **发布/订阅**:支持消息订阅和发布功能,可用于实现简单的消息队列。 2. **Windows上的安装和配置**: - `redis.windows-service.conf`和`redis.windows.conf`是Redis的配置文件。前者用于以服务方式启动Redis...

    Another-Redis-Desktop-Manager.1.5.5

    - **Redis**:是一个开源的、内存中的数据结构存储系统,可以用作数据库、缓存和消息队列。它的特点是数据结构丰富,包括字符串、列表、集合、哈希表和有序集合,支持原子操作,适合处理高并发场景。 - **数据库**...

    spring-redis-mq, 基于 Spring 和 Redis 的分布式消息队列(MessageQueue)实现.zip

    2. **消息生产者**:生产者是向消息队列发送消息的组件,它通常使用Spring的`@Autowired`注解注入消息模板,并调用相关方法将消息发布到Redis队列。 3. **消息消费者**:消费者负责从消息队列中取出并处理消息。...

    Qt 应用Redis 实现消息队列

    本篇文章将详细讲解如何在Qt应用程序中利用Redis来实现一个高效的消息队列,以实现点对点的生产者-消费者模式。 首先,我们需要了解Qt和Redis的基础知识。Qt是一个跨平台的C++图形用户界面库,它提供了丰富的API...

    scrapy-redis-master_scrapy-redis_juzi1122_scrapy_

    3. **Scrapy-Redis架构**: Scrapy-Redis通过将待爬取URLs和请求放入Redis队列,实现多个Scrapy爬虫实例并行工作,从而提高整体爬取效率。其主要组件包括:Request Queue(请求队列)、Scheduler(调度器)、Spider ...

    redis-windows-redis7.0.5.zip

    Redis,全称Remote Dictionary Server,是一款开源的、高性能的键值对存储系统,常被用作数据缓存、消息队列以及数据库等角色。它的设计目标是速度和数据持久化,支持多种数据结构,如字符串、哈希表、列表、集合、...

    redis-windows-Redis7.0.0.zip

    Redis,全称Remote Dictionary Server,是一款开源的、高性能的键值存储系统,广泛应用于缓存、消息队列、数据持久化等多种场景。它以其高效、轻量级的特性,在IT行业中备受青睐,尤其是在互联网领域。在Windows环境...

    基于redis实现的消息队列

    要深入了解这个项目,你需要查看其源代码和文档,理解如何设置生产者和消费者,以及如何利用Redis的数据结构和命令来实现消息的发送与接收。 **应用场景** 消息队列在很多场景下都非常实用,如: - **任务调度**...

    redis-windows-7.0.10.zip

    列表支持两端插入和弹出元素,适用于实现消息队列;集合是无序的唯一元素集合,而有序集合则按分数排序元素,可用于排行榜等场景。 在Windows上运行Redis,用户需要编辑`redis.windows.conf`配置文件,根据实际需求...

    Redis延时消息队列基于swoole实现的多进程消费端

    标题中的“Redis延时消息队列基于swoole实现的多进程消费端”是指使用Redis作为消息队列,结合Swoole的多进程特性来构建一个高效、可扩展的延迟消息处理系统。在这个系统中,Redis作为一个可靠的键值存储,用于暂存...

    redis桌面连接工具 Another-Redis-Desktop-manager

    Redis是一款高性能的键值数据库,广泛应用于缓存、消息队列以及数据存储等领域。为了方便开发者管理和操作Redis服务器,出现了各种桌面管理工具,其中Another-Redis-Desktop-manager就是其中之一。这款工具提供了一...

    redis-windows-7.0.8.zip

    - **列表**:按照插入顺序存储元素,可以实现消息队列的功能。 - **集合**:不包含重复元素的集合,可用于去重。 - **有序集合**:与集合类似,但每个元素都有分数,可以通过分数进行排序。 6. **持久化机制**:...

    mvc - redis队列dome

    **标题解析:** "mvc - redis队列dome" 暗示了这是一个关于使用Model-View-Controller(MVC)架构模式与Redis构建队列的示例项目。MVC是一种广泛应用于Web开发的设计模式,它将应用程序分为三个主要部分:模型、视图...

    redis-mac-6.2.2

    在实际应用中,Redis可以用来存储Session信息、实现分布式锁、作为消息队列或者作为高性能的缓存系统。它的数据类型包括字符串、哈希、列表、集合和有序集合,使得它能够适应各种复杂的应用场景。此外,Redis还支持...

    Redis-x64-3.2.100.zip

    5. **发布/订阅**:Redis支持发布订阅模式,允许客户端订阅特定的频道,当有消息发布到这些频道时,所有订阅者都会收到通知,这使得Redis可以用作简单的消息队列。 6. **Lua脚本**:Redis内置了对Lua脚本的支持,...

Global site tag (gtag.js) - Google Analytics