`
godlovesdog
  • 浏览: 99955 次
社区版块
存档分类
最新评论

nginx-push-stream模块源码学习(三)——发布

 
阅读更多
一、概述
    发布:发布者将MSG post到某一特定通道上,channel将信息缓存
在说明发布流程之前有必要说明下channel和msg的数据结构。
二、数据结构
2.1 MSG
    发布时,模块先将消息转化为ngx_http_push_stream_msg_t的数据结构进行存储。
// message queue
typedef struct {
    ngx_queue_t                     queue; // this MUST be first
    time_t                          expires;//消息过期时间
    time_t                          time;//消息创建时间
    ngx_flag_t                      deleted;//是否已删除
    ngx_int_t                       id;
    ngx_str_t                      *raw;//纯文本
    ngx_int_t                       tag;
    ngx_str_t                      *event_id;//支持event source
    ngx_str_t                      *event_id_message;
    ngx_str_t                      *formatted_messages;//格式化后消息
    ngx_int_t                       workers_ref_count;//待发送该消息ngx worker计数
} ngx_http_push_stream_msg_t;

    @queue:每个channel会维护一个消息链表,每向channel发布一条消息,channel将其添加到自身的消息链表中。
    @expires,@deleted:消息的过期时间,待介绍过订阅流程后,我会整理出一条消息的生产周期,到时会详细阐述该字段的意义。
    @raw,@formatted_messages:该模块允许自定义三种消息模板——header模板:当收到订阅请求后发送模板消息;message模板:对消息体格式化;footer模板:断开连接时发送该模板。raw为消息原始内容,后者为应用message模板格式化后的信息

2.2 channel
    channel作为发布订阅的中间载体,理解理解它的存储至关重要。
typedef struct {
    ngx_rbtree_node_t                   node; // this MUST be first
    ngx_str_t                           id;
    ngx_uint_t                          last_message_id;
    time_t                              last_message_time;
    ngx_int_t                           last_message_tag;
    ngx_uint_t                          stored_messages;//# of messages
    ngx_uint_t                          subscribers;//# of subscribers
    ngx_http_push_stream_pid_queue_t    workers_with_subscribers;//处理该channel上订阅者的ngx worker进程链表
    ngx_http_push_stream_msg_t          message_queue;//消息链表
    time_t                              expires;//过期时间
    ngx_flag_t                          deleted;//是否已删除
    ngx_flag_t                          broadcast;//是否为广播通道
    ngx_http_push_stream_msg_t         *channel_deleted_message;//已删除消息链表
} ngx_http_push_stream_channel_t;


2.3 worker msg
// messages to worker processes
typedef struct {
    ngx_queue_t                         queue;
    ngx_http_push_stream_msg_t         *msg; // ->shared memory
    ngx_pid_t                           pid;
    ngx_http_push_stream_channel_t     *channel; // ->shared memory
    ngx_http_push_stream_queue_elem_t  *subscribers_sentinel; // ->a worker's local pool
} ngx_http_push_stream_worker_msg_t;
 
    模块初始化时为每个ngx worker分配一片独立的工作区,工作区中维护一份消息链表。
三、流程
发布流程总的流程图如图所示:

    对于删除channel和获取channel info的流程比较简单,不做阐述,具体说明下发布消息流程,流程图如图所示:




    需要说明的是“向所有订阅者发送MSG”的过程:
  • 向每个有该channel订阅者的worker(workers_with_subscriber)的消息链表中插入一条消息
  • 向上述worker发送CHECK_MESSAGES指令,触发msg发送流程(ngx_http_push_stream_process_worker_message)

MSG发送(ngx_http_push_stream_process_worker_message):
        // now let's respond to some requests!
        //对于该channel上的所有订阅者
        while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != subscribers_sentinel) {
            ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
            //如果订阅者为longpolling模式
            if (subscriber->longpolling) {
                ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur-
>queue);
                //发送longpolling头(last Modified/Etag)
                ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->reque
st->pool);
                ngx_http_send_header(subscriber->request);
                //发送模块配置header模板
                ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(
subscriber->request, ngx_http_push_stream_module));
                //发送响应MSG
                ngx_http_push_stream_send_response_message(subscriber->request, channel, msg);
                //发送footer模板,last chunck("\0"CRLF CRLF)
                ngx_http_push_stream_send_response_finalize(subscriber->request);

                cur = prev;
            } else {//stream或polling模式
                if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg) == NGX_ERROR) {
                    ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&
cur->queue);
                    ngx_http_push_stream_send_response_finalize(subscriber->request);
                    cur = prev;
                }
            }

说明:
    可以看出push stream模块在发布过程中针对longpolling和stream两种模式的不同:
  • Longpolling模式下,每次发布消息时会发送longpolling头:last modified和etag,使得客户端下次请求时可据此判断服务端是否有更新的消息待发布。
  • longpolling模式下,订阅者每次请求都会在获得数据后断开重连,所以每次发布时都会发送header模板
  • ngx_http_push_stream_send_response_finalize同时会清理订阅者
  • 大小: 41.1 KB
分享到:
评论

相关推荐

    nginx-rtmp-module-master源码

    1. 下载 Nginx 源码和 Nginx-RTMP 模块源码。 2. 使用 `./configure` 命令配置 Nginx,添加 RTMP 模块。 3. 编译并安装 Nginx。 4. 编写 Nginx 配置文件,定义 RTMP 直播应用、流处理规则等。 5. 启动 Nginx 服务。 ...

    nginx-rtmp模块源码包nginx-rtmp-module-master

    **三、Nginx-RTMP 模块功能** 1. **直播**:Nginx-RTMP 可以接收来自各种 RTMP 发布工具(如 OBS Studio、FFmpeg)的直播流,并将其分发到多个客户端。 2. **点播**:支持存储和重放已发布的流,实现点播功能。 3. *...

    nginx带nginx-http-flv模块windows编译版rtmp

    Nginx-RTMP是Nginx的一个扩展模块,由Adobe Systems开发,用于支持Real-Time Messaging Protocol (RTMP)。RTMP是一种协议,常用于在线流媒体传输,如视频直播服务。Nginx-RTMP模块允许Nginx接收来自Flash Player或...

    添加nginx-http-flv-module模块并重新编译后的nginx(windows版)

    本资源提供的是一款针对Windows平台的Nginx,其中已经集成了`nginx-http-flv-module`模块,这个模块主要用于支持HTTP实时流(HTTP Live Streaming, HLS)和Flash视频流(Flash Video, FLV)。现在我们将深入探讨这一...

    nginx-upload-module模块源码

    nginx-upload-module模块源码,用于nginx配置文件上传功能

    rh-nginx118-nginx-mod-stream-1.18.0-3.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    nginx-upload-progress模块源码

    2. 下载并解压 `nginx-upload-progress` 模块源码。 3. 配置 Nginx 时,指定模块源码路径。 4. 编译并安装 Nginx。 5. 配置 Nginx 配置文件,启用模块并设置相关参数。 6. 重启 Nginx 使配置生效。 **应用场景** 1...

    集成了nginx-http-flv-module 1.2.9模块的64位nginx-1.21.4程序

    `nginx-http-flv-module`是Nginx的一个第三方模块,由张洪君开发,用于处理FLV格式的流媒体数据。通过这个模块,Nginx可以支持RTMP协议,实现实时流媒体的推拉流,为Flash Player和其他支持RTMP的客户端提供服务。...

    nginx-mod-stream-1.14.1-9.module_el8.0.0+184+e34fea82.x86_64.rpm

    离线安装包,亲测可用

    nginx-module-vts.tar.gz

    Nginx-Module-VTS是Nginx的一个增强模块,主要功能是提供详细的Web服务器访问统计和性能监控。Prometheus是一款流行的开源监控和警报工具,广泛用于收集和分析各种系统的指标。在本场景中,Nginx-Module-VTS与...

    nginx-http-flv-module-1.2.10(包含nginx-rtmp-module)

    而nginx-rtmp-module则是一个额外的Nginx模块,它增加了对Real-Time Messaging Protocol (RTMP)的支持,使Nginx能够作为RTMP服务器接收并分发直播流。 1. **HTTP FLV 模块详解** HTTP FLV模块使得Nginx可以处理FLV...

    nginx-rtmp-module-1.2.1.zip

    4. 下载 RTMP 模块源码:`git clone https://github.com/arut/nginx-rtmp-module.git` 5. 配置并编译:`./configure --add-module=../nginx-rtmp-module && make && sudo make install` 6. 配置 Nginx 配置文件...

    nginx-1.19.3-http-flv.zip

    1. 采用nginx最新版编译,包含最新的nginx-http-flv-module,以及基础模块openssl、prce、zlib 2. 整体打包,已配置好nginx.conf的http-flv直播流,以及http web环境。无需任何配置即可使用 3. 自带windows的服务...

    nginx-1.19.3_nginx-http-flv-module.rar

    `nginx-http-flv-module`是Nginx的一个第三方模块,它扩展了Nginx处理FLV流的能力。FLV是一种流行的视频格式,常用于在线视频服务,尤其是与Adobe Flash Player兼容的平台。通过这个模块,Nginx可以接收并分发FLV流...

    nginx + nginx-http-flv-module-1.2.9

    nginx-http-flv-module 是由 nginx 开发社区创建的一个第三方模块,用于在 Nginx 上实现 HTTP 直播(HTTP Live Streaming,HLS)和FLV格式的视频流。FLV(Flash Video)是 Adobe Flash 平台广泛使用的视频格式,...

    带nginx-rtmp-module模块的Nginx

    【标题】: "带nginx-rtmp-module模块的Nginx" 在当今互联网技术日新月异的时代,实时流媒体传输已经成为在线视频分享、直播、远程教育等应用场景不可或缺的一部分。Nginx,作为一款高性能的HTTP和反向代理服务器,...

    nginx-1.19.6_nginx-http-flv-module(64位)

    Nginx-1.19.6是Nginx的一个版本,发布于2020年11月27日。这个版本可能包含了性能优化、错误修复以及新的特性和功能。Nginx的更新通常旨在提升服务质量和安全性,确保它能适应不断变化的网络环境和需求。 **Nginx-...

    nginx-rtmp-module

    Nginx 是一款高性能、轻量级的 Web 服务器/反向代理服务器,而 Nginx-RTMP-Module 是一个用于扩展 Nginx 功能的模块,它使 Nginx 能够处理 Real-Time Messaging Protocol (RTMP) 流,从而支持音频和视频的实时传输。...

    集成了nginx-http-flv-module 1.2.7模块的64位nginx程序

    `nginx-http-flv-module`是一个Nginx的第三方模块,由Arut开发,用于处理HTTP实时流媒体(HTTP Live Streaming,HLS)和Flash Video(FLV)流。通过这个模块,Nginx可以作为RTMP服务器,接收和分发FLV流,使得网页...

    nginx-sticky-module-ng-1.2.6.tar.gz

    在 `nginx-sticky-module-ng-1.2.6` 压缩包中,通常包含以下组件: 1. `src`: 这是源代码目录,包含了模块的核心代码,如 `ngx_http_sticky_module.c`,它是实现会话保持功能的主要源文件。 2. `config`: 配置脚本...

Global site tag (gtag.js) - Google Analytics