一、概述
发布:发布者将MSG post到某一特定通道上,channel将信息缓存
在说明发布流程之前有必要说明下channel和msg的数据结构。
二、数据结构
2.1 MSG
发布时,模块先将消息转化为ngx_http_push_stream_msg_t的数据结构进行存储。
// message queue
typedef struct {
ngx_queue_t queue; // this MUST be first
time_t expires;//消息过期时间
time_t time;//消息创建时间
ngx_flag_t deleted;//是否已删除
ngx_int_t id;
ngx_str_t *raw;//纯文本
ngx_int_t tag;
ngx_str_t *event_id;//支持event source
ngx_str_t *event_id_message;
ngx_str_t *formatted_messages;//格式化后消息
ngx_int_t workers_ref_count;//待发送该消息ngx worker计数
} ngx_http_push_stream_msg_t;
@queue:每个channel会维护一个消息链表,每向channel发布一条消息,channel将其添加到自身的消息链表中。
@expires,@deleted:消息的过期时间,待介绍过订阅流程后,我会整理出一条消息的生产周期,到时会详细阐述该字段的意义。
@raw,@formatted_messages:该模块允许自定义三种消息模板——header模板:当收到订阅请求后发送模板消息;message模板:对消息体格式化;footer模板:断开连接时发送该模板。raw为消息原始内容,后者为应用message模板格式化后的信息
2.2 channel
channel作为发布订阅的中间载体,理解理解它的存储至关重要。
typedef struct {
ngx_rbtree_node_t node; // this MUST be first
ngx_str_t id;
ngx_uint_t last_message_id;
time_t last_message_time;
ngx_int_t last_message_tag;
ngx_uint_t stored_messages;//# of messages
ngx_uint_t subscribers;//# of subscribers
ngx_http_push_stream_pid_queue_t workers_with_subscribers;//处理该channel上订阅者的ngx worker进程链表
ngx_http_push_stream_msg_t message_queue;//消息链表
time_t expires;//过期时间
ngx_flag_t deleted;//是否已删除
ngx_flag_t broadcast;//是否为广播通道
ngx_http_push_stream_msg_t *channel_deleted_message;//已删除消息链表
} ngx_http_push_stream_channel_t;
2.3 worker msg
// messages to worker processes
typedef struct {
ngx_queue_t queue;
ngx_http_push_stream_msg_t *msg; // ->shared memory
ngx_pid_t pid;
ngx_http_push_stream_channel_t *channel; // ->shared memory
ngx_http_push_stream_queue_elem_t *subscribers_sentinel; // ->a worker's local pool
} ngx_http_push_stream_worker_msg_t;
模块初始化时为每个ngx worker分配一片独立的工作区,工作区中维护一份消息链表。
三、流程
发布流程总的流程图如图所示:
对于删除channel和获取channel info的流程比较简单,不做阐述,具体说明下发布消息流程,流程图如图所示:
需要说明的是“向所有订阅者发送MSG”的过程:
- 向每个有该channel订阅者的worker(workers_with_subscriber)的消息链表中插入一条消息
- 向上述worker发送CHECK_MESSAGES指令,触发msg发送流程(ngx_http_push_stream_process_worker_message)
MSG发送(ngx_http_push_stream_process_worker_message):
// now let's respond to some requests!
//对于该channel上的所有订阅者
while ((cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur->queue)) != subscribers_sentinel) {
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
//如果订阅者为longpolling模式
if (subscriber->longpolling) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&cur-
>queue);
//发送longpolling头(last Modified/Etag)
ngx_http_push_stream_add_polling_headers(subscriber->request, msg->time, msg->tag, subscriber->reque
st->pool);
ngx_http_send_header(subscriber->request);
//发送模块配置header模板
ngx_http_push_stream_send_response_content_header(subscriber->request, ngx_http_get_module_loc_conf(
subscriber->request, ngx_http_push_stream_module));
//发送响应MSG
ngx_http_push_stream_send_response_message(subscriber->request, channel, msg);
//发送footer模板,last chunck("\0"CRLF CRLF)
ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev;
} else {//stream或polling模式
if (ngx_http_push_stream_send_response_message(subscriber->request, channel, msg) == NGX_ERROR) {
ngx_http_push_stream_queue_elem_t *prev = (ngx_http_push_stream_queue_elem_t *) ngx_queue_prev(&
cur->queue);
ngx_http_push_stream_send_response_finalize(subscriber->request);
cur = prev;
}
}
说明:
可以看出push stream模块在发布过程中针对longpolling和stream两种模式的不同:
- Longpolling模式下,每次发布消息时会发送longpolling头:last modified和etag,使得客户端下次请求时可据此判断服务端是否有更新的消息待发布。
- longpolling模式下,订阅者每次请求都会在获得数据后断开重连,所以每次发布时都会发送header模板
- ngx_http_push_stream_send_response_finalize同时会清理订阅者
- 大小: 41.1 KB
分享到:
相关推荐
1. 下载 Nginx 源码和 Nginx-RTMP 模块源码。 2. 使用 `./configure` 命令配置 Nginx,添加 RTMP 模块。 3. 编译并安装 Nginx。 4. 编写 Nginx 配置文件,定义 RTMP 直播应用、流处理规则等。 5. 启动 Nginx 服务。 ...
**三、Nginx-RTMP 模块功能** 1. **直播**:Nginx-RTMP 可以接收来自各种 RTMP 发布工具(如 OBS Studio、FFmpeg)的直播流,并将其分发到多个客户端。 2. **点播**:支持存储和重放已发布的流,实现点播功能。 3. *...
Nginx-RTMP是Nginx的一个扩展模块,由Adobe Systems开发,用于支持Real-Time Messaging Protocol (RTMP)。RTMP是一种协议,常用于在线流媒体传输,如视频直播服务。Nginx-RTMP模块允许Nginx接收来自Flash Player或...
本资源提供的是一款针对Windows平台的Nginx,其中已经集成了`nginx-http-flv-module`模块,这个模块主要用于支持HTTP实时流(HTTP Live Streaming, HLS)和Flash视频流(Flash Video, FLV)。现在我们将深入探讨这一...
nginx-upload-module模块源码,用于nginx配置文件上传功能
官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装
2. 下载并解压 `nginx-upload-progress` 模块源码。 3. 配置 Nginx 时,指定模块源码路径。 4. 编译并安装 Nginx。 5. 配置 Nginx 配置文件,启用模块并设置相关参数。 6. 重启 Nginx 使配置生效。 **应用场景** 1...
`nginx-http-flv-module`是Nginx的一个第三方模块,由张洪君开发,用于处理FLV格式的流媒体数据。通过这个模块,Nginx可以支持RTMP协议,实现实时流媒体的推拉流,为Flash Player和其他支持RTMP的客户端提供服务。...
离线安装包,亲测可用
Nginx-Module-VTS是Nginx的一个增强模块,主要功能是提供详细的Web服务器访问统计和性能监控。Prometheus是一款流行的开源监控和警报工具,广泛用于收集和分析各种系统的指标。在本场景中,Nginx-Module-VTS与...
而nginx-rtmp-module则是一个额外的Nginx模块,它增加了对Real-Time Messaging Protocol (RTMP)的支持,使Nginx能够作为RTMP服务器接收并分发直播流。 1. **HTTP FLV 模块详解** HTTP FLV模块使得Nginx可以处理FLV...
4. 下载 RTMP 模块源码:`git clone https://github.com/arut/nginx-rtmp-module.git` 5. 配置并编译:`./configure --add-module=../nginx-rtmp-module && make && sudo make install` 6. 配置 Nginx 配置文件...
1. 采用nginx最新版编译,包含最新的nginx-http-flv-module,以及基础模块openssl、prce、zlib 2. 整体打包,已配置好nginx.conf的http-flv直播流,以及http web环境。无需任何配置即可使用 3. 自带windows的服务...
`nginx-http-flv-module`是Nginx的一个第三方模块,它扩展了Nginx处理FLV流的能力。FLV是一种流行的视频格式,常用于在线视频服务,尤其是与Adobe Flash Player兼容的平台。通过这个模块,Nginx可以接收并分发FLV流...
nginx-http-flv-module 是由 nginx 开发社区创建的一个第三方模块,用于在 Nginx 上实现 HTTP 直播(HTTP Live Streaming,HLS)和FLV格式的视频流。FLV(Flash Video)是 Adobe Flash 平台广泛使用的视频格式,...
【标题】: "带nginx-rtmp-module模块的Nginx" 在当今互联网技术日新月异的时代,实时流媒体传输已经成为在线视频分享、直播、远程教育等应用场景不可或缺的一部分。Nginx,作为一款高性能的HTTP和反向代理服务器,...
Nginx-1.19.6是Nginx的一个版本,发布于2020年11月27日。这个版本可能包含了性能优化、错误修复以及新的特性和功能。Nginx的更新通常旨在提升服务质量和安全性,确保它能适应不断变化的网络环境和需求。 **Nginx-...
Nginx 是一款高性能、轻量级的 Web 服务器/反向代理服务器,而 Nginx-RTMP-Module 是一个用于扩展 Nginx 功能的模块,它使 Nginx 能够处理 Real-Time Messaging Protocol (RTMP) 流,从而支持音频和视频的实时传输。...
`nginx-http-flv-module`是一个Nginx的第三方模块,由Arut开发,用于处理HTTP实时流媒体(HTTP Live Streaming,HLS)和Flash Video(FLV)流。通过这个模块,Nginx可以作为RTMP服务器,接收和分发FLV流,使得网页...
在 `nginx-sticky-module-ng-1.2.6` 压缩包中,通常包含以下组件: 1. `src`: 这是源代码目录,包含了模块的核心代码,如 `ngx_http_sticky_module.c`,它是实现会话保持功能的主要源文件。 2. `config`: 配置脚本...