- 浏览: 564680 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (618)
- java (109)
- Java web (43)
- javascript (52)
- js (15)
- 闭包 (2)
- maven (8)
- 杂 (28)
- python (47)
- linux (51)
- git (18)
- (1)
- mysql (31)
- 管理 (1)
- redis (6)
- 操作系统 (12)
- 网络 (13)
- mongo (1)
- nginx (17)
- web (8)
- ffmpeg (1)
- python安装包 (0)
- php (49)
- imagemagic (1)
- eclipse (21)
- django (4)
- 学习 (1)
- 书籍 (1)
- uml (3)
- emacs (19)
- svn (2)
- netty (9)
- joomla (1)
- css (1)
- 推送 (2)
- android (6)
- memcached (2)
- docker、 (0)
- docker (7)
- go (1)
- resin (1)
- groovy (1)
- spring (1)
最新评论
-
chokee:
...
Spring3 MVC 深入研究 -
googleyufei:
很有用, 我现在打算学学Python. 这些资料的很及时.
python的几个实用网站(转的) -
hujingwei1001:
太好了找的就是它
easy explore -
xiangtui:
例子举得不错。。。学习了
java callback -
幻影桃花源:
太好了,謝謝
Spring3 MVC 深入研究
原文地址:http://blog.chinaunix.net/uid-26443921-id-3018781.html
公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议;
在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
废话少说,直接上代码:
xdrive.rar
注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;
nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:
CPP = g++
LINK = \$(CPP) ##采用g++来链接
##line=338 below was changed by kevin_zhong on 2011-11-14
ngx_obj=`echo $ngx_obj \
| sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`
ngx_post_suffix=`echo $ngx_src \
| sed -e "s#^.*\(\.c\)\\$#\1#g" \
-e "s#^.*\(\.cc\)\\$#\1#g" \
-e "s#^.*\(\.cpp\)\\$#\1#g"`
if [ "$ngx_post_suffix"x = ".cpp"x ];then
ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
else
ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
fi
上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;
下面是具体代码分析:
/*
* Copyright (C) Igor Sysoev; kevin_zhong
* mail: qq2000zhong@gmail.com
* date: 2011-11-13
*/
//因是cpp文件,固包含c头文件需要 extern c
extern "C" {
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include "ngx_chain_util.h"
}
//与服务器内部通信二进制协议实现
#include "ngx_thrift_transport.h"
#include "ngx_xdrive_datagram.h"
#include "protocal/rc_updator_types.h"
using namespace xdrive::msg::rc_updator;
using namespace xdrive;
/*
* 扩展模块需要3个业务相关输入变量,uid,path,recusive
* 参考nginx.conf中的配置写法
*/
typedef struct
{
ngx_http_upstream_conf_t upstream;
//将uid和path以及recusive在配置中的index找出来,以后create request的时候需要
ngx_int_t uid_index;
ngx_int_t path_index;
ngx_int_t recusive_index;
}
ngx_http_xdrive_rc_loc_conf_t;
/*
* 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:
*
* 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,
* 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接
* 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,
* 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;
*
* 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)
* 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client
*
* 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;
*/
typedef struct
{
ngx_http_request_t *request;
ngx_chain_pair_t body_buff;
ngx_chain_t * tail_buff;
uint64_t uid;
ngx_str_t path;
bool recusive;
//后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的
//开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n
//upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确
//的 content len,导致接受http包体数据异常...
//参考 ngx_http_upstream.c:2391
int rest_length;
}
ngx_http_xdrive_rc_ctx_t;
static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);
static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);
static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);
static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);
static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);
static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {
{ ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
{ ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
{ ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
{ ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },
{ ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
{ ngx_null_string, 0 }
};
/*
* 参数设置,不可变,注意和变量的区别
*/
static ngx_command_t ngx_http_xdrive_rc_commands[] = {
{ ngx_string("xdrive_rc_pass"),
NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
ngx_http_xdrive_rc_pass,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
{ ngx_string("xdrive_rc_connect_timeout"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),
NULL },
{ ngx_string("xdrive_rc_send_timeout"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),
NULL },
{ ngx_string("xdrive_rc_buffer_size"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),
NULL },
{ ngx_string("xdrive_rc_read_timeout"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),
NULL },
{ ngx_string("xdrive_rc_next_upstream"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,
ngx_conf_set_bitmask_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),
&ngx_http_xdrive_rc_next_upstream_masks },
ngx_null_command
};
static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {
ngx_http_xdrive_rc_add_variables, /* preconfiguration */
NULL, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
ngx_http_xdrive_rc_create_loc_conf, /* create location configration */
ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */
};
ngx_module_t ngx_http_xdrive_rc_module = {
NGX_MODULE_V1,
&ngx_http_xdrive_rc_module_ctx, /* module context */
ngx_http_xdrive_rc_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
//业务相关变量,get_handler = NULL,因为这三个是从conf里面通过
//正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了
//因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果
//业务需要uid放在 query_string,这时候就只需要改配置即可了
//思路来源于 ngx_http_memcached_module.c
static ngx_http_variable_t ngx_http_proxy_vars[] = {
{ ngx_string("uid"), NULL,
NULL, 0,
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
0 },
{ ngx_string("path"), NULL,
NULL, 0,
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
0 },
{ ngx_string("recusive"), NULL,
NULL, 0,
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
0 },
{ ngx_null_string, NULL,NULL,0, 0, 0 }
};
static ngx_int_t
ngx_http_xdrive_rc_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_xdrive_rc_ctx_t *ctx;
ngx_http_xdrive_rc_loc_conf_t *mlcf;
if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))
{
return NGX_HTTP_NOT_ALLOWED;
}
//get 请求,不需要包体
rc = ngx_http_discard_request_body(r);
if (rc != NGX_OK)
{
return rc;
}
if (ngx_http_set_content_type(r) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (ngx_http_upstream_create(r) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
u = r->upstream;
ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用
u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
u->conf = &mlcf->upstream;
//设置回调,网上大都只讲这里
u->create_request = ngx_http_xdrive_rc_create_request;
u->reinit_request = ngx_http_xdrive_rc_reinit_request;
u->process_header = ngx_http_xdrive_rc_process_header;
u->abort_request = ngx_http_xdrive_rc_abort_request;
u->finalize_request = ngx_http_xdrive_rc_finalize_request;
//分配context内存
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));
if (ctx == NULL)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));
ctx->request = r;
ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);
u->input_filter_init = ngx_http_xdrive_rc_filter_init;
/*
* 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的
* input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认
* filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们
* 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client
*/
u->input_filter = ngx_http_xdrive_rc_filter;
u->input_filter_ctx = ctx;
u->buffering = 0; //note, no buffering...cause too complicated !!
r->main->count++;
//不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要
//调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
ngx_http_upstream_init(r);
return NGX_DONE;
}
static ngx_int_t
ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)
{
size_t len;
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_http_xdrive_rc_ctx_t *ctx;
ngx_http_variable_value_t *vv;
ngx_http_xdrive_rc_loc_conf_t *mlcf;
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
//根据配置文件uid index号从变量中获取uid的变量值
vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);
if (vv == NULL || vv->not_found || vv->len == 0)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"the \"$uid\" variable is not set");
return NGX_ERROR;
}
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
ctx->uid = ngx_atoof(vv->data, vv->len);
if (ctx->uid == (off_t)NGX_ERROR)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"the \"$uid\" variable is err %s set", vv->data);
return NGX_ERROR;
}
//根据配置文件path index号从变量中获取path的变量值
vv = ngx_http_get_indexed_variable(r, mlcf->path_index);
if (vv == NULL || vv->not_found || vv->len == 0)
{
ngx_str_set(&ctx->path, "/");
}
else {
ctx->path.data = vv->data;
ctx->path.len = vv->len;
}
vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);
if (vv == NULL || vv->not_found || vv->len == 0)
{
ctx->recusive = false;
}
else {
ctx->recusive = ngx_atoi(vv->data, vv->len);
}
RcUpdateReq list_req;
list_req._user_id = ctx->uid;
list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);
list_req._recursive = ctx->recusive;
static uint32_t seq = ngx_time();
//编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;
//细节见具体代码,不表
cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,
&list_req, ++seq, 0xC01);
if (cl == NULL)
return NGX_ERROR;
//准备发送
r->upstream->request_bufs = cl;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",
ctx->uid, &ctx->path, ctx->recusive);
return NGX_OK;
}
static ngx_int_t
ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)
{
return NGX_OK;
}
/*
* 读取二进制包体头部
*/
static ngx_int_t
ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)
{
ngx_http_upstream_t *u;
ngx_http_xdrive_rc_ctx_t *ctx;
u = r->upstream;
//因包头固定长度,所以很好判断
if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)
return NGX_AGAIN;
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
ngx_xdrive_datagram_header_t header;
//解包头,获取最重要参数 : 包体长度,根据包体长度收包
if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,
&header, r->connection->log) != NGX_OK)
{
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
//业务代码
if (header._type != 0x08C01)
{
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
"xdrive_rc ret type not legal = %d", header._type);
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
//业务代码
if (header._status != 0)
{
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
"xdrive_rc ret status not ok in response = %d", header._status);
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
//非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;
ngx_http_clear_content_length(r);
//因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)
//所以我们必须自己处理记录剩余包体长度;
ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;
u->headers_in.status_n = NGX_HTTP_OK;
u->state->status = NGX_HTTP_OK;
//包头数据已经处理完毕,必须丢弃;
u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;
return NGX_OK;
}
//其实没啥用
static ngx_int_t
ngx_http_xdrive_rc_filter_init(void *data)
{
ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
ngx_http_upstream_t *u;
u = ctx->request->upstream;
return NGX_OK;
}
/*
* 缓存包体,等待包体接受完毕,解码,然后一次回复给client
*/
static ngx_int_t
ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)
{
ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
u_char *last;
ngx_buf_t *b;
ngx_chain_t *cl, **ll;
ngx_http_upstream_t *u;
ngx_http_xdrive_rc_loc_conf_t *mlcf;
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)
ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);
u = ctx->request->upstream;
b = &u->buffer;
size_t buff_size = mlcf->upstream.buffer_size;
//assert(bytes <= buff_size);
ctx->rest_length -= bytes;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
"recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);
//特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候
//直接交互内存即可,不再需要内存拷贝,否则...
if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)
{
cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
ctx->body_buff._chain_head = cl;
cl->buf->flush = 1;
cl->buf->memory = 1;
last = b->last;
cl->buf->pos = last;
b->last += bytes;
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;
}
else {
//做一次内存拷贝到 body buf 中去
if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,
b->last, bytes) != NGX_OK)
return NGX_ERROR;
b->last += bytes;
}
//判断upstream包体是否收完整
if (ctx->rest_length > 0)
{
return NGX_OK;
}
//包体收完,进行解码
RcUpdateResp list_resp;
if (ngx_datagram_decode_body(ctx->body_buff._chain_head,
ctx->request->connection->log,
&list_resp) != NGX_OK)
{
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
"xdrive_rc RcUpdateResp decode failed");
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,
"xdrive_rc RcUpdateResp list num=%d",
list_resp._action_list.size());
//内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用
ngx_chain_t *busy_bufs = NULL;
ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);
//transfer...
ngx_chain_pair_t chain_pair;
ngx_memzero(&chain_pair, sizeof(chain_pair));
//转成 json 格式
if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
"uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",
ctx->uid, &ctx->path, ctx->recusive,
list_resp._weak_dcid.c_str(),
list_resp._used_space,
list_resp._action_list.size()
))
return NGX_ERROR;
//转成 json 格式
for (size_t i = 0; i < list_resp._action_list.size(); ++i)
{
ActionThrft *ac = &list_resp._action_list[i];
if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
"[path=\"%s\", node_type=%d, status=%d, gcid=%s, size=%d]\n",
ac->m_path.c_str(), ac->m_node_type, ac->m_status,
ac->m_gcid.c_str(), ac->m_file_size
))
return NGX_ERROR;
}
//这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕
//关后端连接,回前端包
chain_pair._chain_last->buf->last_buf = 1;
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)
{
ll = &cl->next;
}
*ll = chain_pair._chain_head;
return NGX_OK;
}
static void
ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"abort http xdrive_rc request");
return;
}
static void
ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"finalize http xdrive_rc request");
return;
}
static void *
ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_xdrive_rc_loc_conf_t *conf;
conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,
sizeof(ngx_http_xdrive_rc_loc_conf_t));
if (conf == NULL)
{
return NULL;
}
conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;
conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;
/* the hardcoded values */
conf->upstream.cyclic_temp_file = 0;
conf->upstream.buffering = 0;
conf->upstream.ignore_client_abort = 0;
conf->upstream.send_lowat = 0;
conf->upstream.bufs.num = 0;
conf->upstream.busy_buffers_size = 0;
conf->upstream.max_temp_file_size = 0;
conf->upstream.temp_file_write_size = 0;
conf->upstream.intercept_errors = 1;
conf->upstream.intercept_404 = 1;
conf->upstream.pass_request_headers = 0;
conf->upstream.pass_request_body = 0;
conf->uid_index = NGX_CONF_UNSET;
conf->path_index = NGX_CONF_UNSET;
conf->recusive_index = NGX_CONF_UNSET;
return conf;
}
static char *
ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;
ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;
ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
prev->upstream.connect_timeout, 60000);
ngx_conf_merge_msec_value(conf->upstream.send_timeout,
prev->upstream.send_timeout, 60000);
ngx_conf_merge_msec_value(conf->upstream.read_timeout,
prev->upstream.read_timeout, 60000);
ngx_conf_merge_size_value(conf->upstream.buffer_size,
prev->upstream.buffer_size,
(size_t)ngx_pagesize);
ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
prev->upstream.next_upstream,
(NGX_CONF_BITMASK_SET
| NGX_HTTP_UPSTREAM_FT_ERROR
| NGX_HTTP_UPSTREAM_FT_TIMEOUT));
if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)
{
conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
| NGX_HTTP_UPSTREAM_FT_OFF;
}
if (conf->upstream.upstream == NULL)
{
conf->upstream.upstream = prev->upstream.upstream;
}
if (conf->uid_index == NGX_CONF_UNSET) {
conf->uid_index = prev->uid_index;
}
if (conf->path_index == NGX_CONF_UNSET) {
conf->path_index = prev->path_index;
}
if (conf->recusive_index == NGX_CONF_UNSET) {
conf->recusive_index = prev->recusive_index;
}
return NGX_CONF_OK;
}
static char *
ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;
ngx_str_t *value;
ngx_url_t u;
ngx_http_core_loc_conf_t *clcf;
if (mlcf->upstream.upstream)
{
return "is duplicate";
}
value = (ngx_str_t *)cf->args->elts;
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
u.no_resolve = 1;
mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
if (mlcf->upstream.upstream == NULL)
{
return (char *)(NGX_CONF_ERROR);
}
clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
clcf->handler = ngx_http_xdrive_rc_handler;
if (clcf->name.data[clcf->name.len - 1] == '/')
{
clcf->auto_redirect = 1;
}
//保存变量index用
mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[0].name);
if (mlcf->uid_index == NGX_ERROR)
{
return (char *)(NGX_CONF_ERROR);
}
mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[1].name);
if (mlcf->path_index == NGX_ERROR)
{
return (char *)(NGX_CONF_ERROR);
}
mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[2].name);
if (mlcf->recusive_index == NGX_ERROR)
{
return (char *)(NGX_CONF_ERROR);
}
return NGX_CONF_OK;
}
static ngx_int_t
ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)
{
ngx_http_variable_t *var, *v;
for (v = ngx_http_proxy_vars; v->name.len; v++)
{
var = ngx_http_add_variable(cf, &v->name, v->flags);
if (var == NULL)
{
return NGX_ERROR;
}
var->get_handler = v->get_handler;
var->data = v->data;
}
return NGX_OK;
}
代码中一些有意思的地方:
//和buf差不多的思想的 buf chain
typedef struct
{
ngx_chain_t* _chain_head;
ngx_chain_t* _chain_pos;
ngx_chain_t* _chain_last;
ngx_chain_t* _chain_tail;
}
ngx_chain_pair_t;
//从buf chain中读取len长内存出来
size_t ngx_cdecl
ngx_chain_read(ngx_chain_pair_t* chain_pair
, uint8_t *buf, uint32_t len);
//将buf写入到buf chain中
ngx_int_t ngx_cdecl
ngx_chain_write(ngx_pool_t* pool
, ngx_chain_t** free_bufs
, ngx_chain_pair_t* chain_pair
, size_t write_chunk_size
, const uint8_t *buf, uint32_t len);
//写json或者xml之类回复有用
ngx_int_t ngx_cdecl
ngx_chain_sprintf(ngx_pool_t *pool
, ngx_chain_t **free_bufs
, ngx_chain_pair_t *chain_pair
, size_t write_chunk_size
, const char *fmt, ...);
下面是nginx配置文件中的关键部分
location ~* /rc_list/([0-9]+).html$ {
xdrive_rc_buffer_size 4096;
set $uid $1;
set $path /;
set $recusive 0;
if ($query_string ~* (|&)recusive=(0|1)(|&)) {
set $recusive $2;
}
xdrive_rc_pass 127.0.0.1:11001;
}
解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;
公司内部协议均是固定包长的二进制协议,对于内部服务器通信来说足够了,但接口服务器还是采用了http协议,毕竟通用,况且私有二进制协议对外非常不好友,更何况还易遭防火墙拦截;写一个通用且配置功能强大的http server是比较困难的。项目组写的http框架非常难用,仅仅达到能用而已,效率低下,不灵活等等;
在接触了nginx后,被其能扩展的特性深深吸引了,于是尝试为项目组的框架写一个能一个扩展模块,需求蛮明确的:就是将http协议转成服务器内部的二进制协议;
在网上找资料,资料比较稀少,大多是一个简单的hello world例子,比较少参考性;《Emiller的Nginx模块开发心得.pdf》相对而言是一个完善的文档;但看了之后还是感觉一头雾水,不甚明了;最好的文档就是代码,下载了 nginx-1.0.8 源码;source insight 建项目,看代码,析流程;渐渐nginx流程在脑海中明晰起来;
看代码熟悉nginx花3天时间;着手写代码到代码完成1天半,测试休bug到完成目标需求花费1天,为了写这个扩展,把整个周末都搭进去了,晚上还熬了下夜,最后看着内部服务器的数据通过扩展模块中介到nginx输出,还是有点小成就感的;
废话少说,直接上代码:
xdrive.rar
注:因代码中夹杂了些公司项目的业务,这些代码在protocal文件夹下,被我从压缩包中剔除了,但绝对不影响代码整个流程完整性;
nginx 只支持c代码,扩展模块中加入了不少c++代码,也懒得去搞其他方法了,直接修改了 auto/make 文件,改动如下:
CPP = g++
LINK = \$(CPP) ##采用g++来链接
##line=338 below was changed by kevin_zhong on 2011-11-14
ngx_obj=`echo $ngx_obj \
| sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
-e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`
ngx_post_suffix=`echo $ngx_src \
| sed -e "s#^.*\(\.c\)\\$#\1#g" \
-e "s#^.*\(\.cc\)\\$#\1#g" \
-e "s#^.*\(\.cpp\)\\$#\1#g"`
if [ "$ngx_post_suffix"x = ".cpp"x ];then
ngx_cc="\$(CPP) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
else
ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS) $ADDON_INCS"
fi
上面的脚本是判断源代码后缀,如果是c++则生成makefile语句采用g++,否则采用gcc;
下面是具体代码分析:
/*
* Copyright (C) Igor Sysoev; kevin_zhong
* mail: qq2000zhong@gmail.com
* date: 2011-11-13
*/
//因是cpp文件,固包含c头文件需要 extern c
extern "C" {
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include "ngx_chain_util.h"
}
//与服务器内部通信二进制协议实现
#include "ngx_thrift_transport.h"
#include "ngx_xdrive_datagram.h"
#include "protocal/rc_updator_types.h"
using namespace xdrive::msg::rc_updator;
using namespace xdrive;
/*
* 扩展模块需要3个业务相关输入变量,uid,path,recusive
* 参考nginx.conf中的配置写法
*/
typedef struct
{
ngx_http_upstream_conf_t upstream;
//将uid和path以及recusive在配置中的index找出来,以后create request的时候需要
ngx_int_t uid_index;
ngx_int_t path_index;
ngx_int_t recusive_index;
}
ngx_http_xdrive_rc_loc_conf_t;
/*
* 注明下,这个模块和网上诸多模块以及nginx特有模块差别最大的地方是:
*
* 1, 因为项目组的二进制协议不是流式协议,即必须将数据包全部收完整后,
* 才能调用decode解码,所以不能像其他模块那样采用流,即不能一边接
* 受数据,一边发送数据;只能先将数据全部缓存起来,等到收集到完整的resp包,
* 再一次性解码,然后再转换成 json 类格式一次性输出,这是这类协议最大最明显的缺点;
*
* 2,虽然从后端server收到的resp content length是确定的,但经过转换(从二进制到json类)
* 后,content len 已经变得不相等,且很不好计算;所以只能采用 chunk 方式返回给client
*
* 3,网上有的,或者<Emiller的Nginx模块开发心得.pdf>中有的,都不提,参考即可;
*/
typedef struct
{
ngx_http_request_t *request;
ngx_chain_pair_t body_buff;
ngx_chain_t * tail_buff;
uint64_t uid;
ngx_str_t path;
bool recusive;
//后端剩余接受包体长度,即还有多少个字节等待从后端读取,本来不需要这个length的
//开始代码是存储 r.out_headers.content_len_n,u->length = r.out_headers.content_len_n
//upstream通过u->length==0判断后端数据是否接受完毕,但这样client回复包将得到一个不正确
//的 content len,导致接受http包体数据异常...
//参考 ngx_http_upstream.c:2391
int rest_length;
}
ngx_http_xdrive_rc_ctx_t;
static ngx_int_t ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf);
static ngx_int_t ngx_http_xdrive_rc_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_process_header(ngx_http_request_t *r);
static ngx_int_t ngx_http_xdrive_rc_filter_init(void *data);
static ngx_int_t ngx_http_xdrive_rc_filter(void *data, ssize_t bytes);
static void ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r);
static void ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc);
static void *ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);
static char *ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static ngx_conf_bitmask_t ngx_http_xdrive_rc_next_upstream_masks[] = {
{ ngx_string("error"), NGX_HTTP_UPSTREAM_FT_ERROR },
{ ngx_string("timeout"), NGX_HTTP_UPSTREAM_FT_TIMEOUT },
{ ngx_string("invalid_header"), NGX_HTTP_UPSTREAM_FT_INVALID_HEADER },
{ ngx_string("not_found"), NGX_HTTP_UPSTREAM_FT_HTTP_404 },
{ ngx_string("off"), NGX_HTTP_UPSTREAM_FT_OFF },
{ ngx_null_string, 0 }
};
/*
* 参数设置,不可变,注意和变量的区别
*/
static ngx_command_t ngx_http_xdrive_rc_commands[] = {
{ ngx_string("xdrive_rc_pass"),
NGX_HTTP_LOC_CONF | NGX_HTTP_LIF_CONF | NGX_CONF_TAKE1,
ngx_http_xdrive_rc_pass,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
{ ngx_string("xdrive_rc_connect_timeout"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.connect_timeout),
NULL },
{ ngx_string("xdrive_rc_send_timeout"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.send_timeout),
NULL },
{ ngx_string("xdrive_rc_buffer_size"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.buffer_size),
NULL },
{ ngx_string("xdrive_rc_read_timeout"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.read_timeout),
NULL },
{ ngx_string("xdrive_rc_next_upstream"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_CONF_1MORE,
ngx_conf_set_bitmask_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_xdrive_rc_loc_conf_t, upstream.next_upstream),
&ngx_http_xdrive_rc_next_upstream_masks },
ngx_null_command
};
static ngx_http_module_t ngx_http_xdrive_rc_module_ctx = {
ngx_http_xdrive_rc_add_variables, /* preconfiguration */
NULL, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
ngx_http_xdrive_rc_create_loc_conf, /* create location configration */
ngx_http_xdrive_rc_merge_loc_conf /* merge location configration */
};
ngx_module_t ngx_http_xdrive_rc_module = {
NGX_MODULE_V1,
&ngx_http_xdrive_rc_module_ctx, /* module context */
ngx_http_xdrive_rc_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
//业务相关变量,get_handler = NULL,因为这三个是从conf里面通过
//正则匹配得到的,为什么不直接通过 get handler 从http requeset里面获取了
//因为这样更灵活,conf可以随时改,比如现在 uid 是从 url 里面获取,但如果
//业务需要uid放在 query_string,这时候就只需要改配置即可了
//思路来源于 ngx_http_memcached_module.c
static ngx_http_variable_t ngx_http_proxy_vars[] = {
{ ngx_string("uid"), NULL,
NULL, 0,
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
0 },
{ ngx_string("path"), NULL,
NULL, 0,
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
0 },
{ ngx_string("recusive"), NULL,
NULL, 0,
NGX_HTTP_VAR_CHANGEABLE | NGX_HTTP_VAR_NOCACHEABLE | NGX_HTTP_VAR_NOHASH,
0 },
{ ngx_null_string, NULL,NULL,0, 0, 0 }
};
static ngx_int_t
ngx_http_xdrive_rc_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_xdrive_rc_ctx_t *ctx;
ngx_http_xdrive_rc_loc_conf_t *mlcf;
if (!(r->method & (NGX_HTTP_GET | NGX_HTTP_HEAD)))
{
return NGX_HTTP_NOT_ALLOWED;
}
//get 请求,不需要包体
rc = ngx_http_discard_request_body(r);
if (rc != NGX_OK)
{
return rc;
}
if (ngx_http_set_content_type(r) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
if (ngx_http_upstream_create(r) != NGX_OK)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
u = r->upstream;
ngx_str_set(&u->schema, "xdrive_rc://");//schema,没发现有什么用,打log貌似会有点用
u->output.tag = (ngx_buf_tag_t)&ngx_http_xdrive_rc_module;
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
u->conf = &mlcf->upstream;
//设置回调,网上大都只讲这里
u->create_request = ngx_http_xdrive_rc_create_request;
u->reinit_request = ngx_http_xdrive_rc_reinit_request;
u->process_header = ngx_http_xdrive_rc_process_header;
u->abort_request = ngx_http_xdrive_rc_abort_request;
u->finalize_request = ngx_http_xdrive_rc_finalize_request;
//分配context内存
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_palloc(r->pool, sizeof(ngx_http_xdrive_rc_ctx_t));
if (ctx == NULL)
{
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ngx_memzero(ctx, sizeof(ngx_http_xdrive_rc_ctx_t));
ctx->request = r;
ngx_http_set_ctx(r, ctx, ngx_http_xdrive_rc_module);
u->input_filter_init = ngx_http_xdrive_rc_filter_init;
/*
* 非常关键的设置,后端服务器包体数据到达的时候,upstream 会回调 input_filter,默认的
* input_filter 是 ngx_http_upstream_non_buffered_filter(ngx_http_upstream.c:2475),默认
* filter 就是收到数据立马发送给client;而因为需求必须将包体缓存起来,所以这里替换成了我们
* 的回调函数,函数里面的功能就是: 缓存包体,等待包体接受完毕,解码,然后一次回复给client
*/
u->input_filter = ngx_http_xdrive_rc_filter;
u->input_filter_ctx = ctx;
u->buffering = 0; //note, no buffering...cause too complicated !!
r->main->count++;
//不需要包体,直接初始化 upstream 即可,若需要接受包体,只需要
//调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
ngx_http_upstream_init(r);
return NGX_DONE;
}
static ngx_int_t
ngx_http_xdrive_rc_create_request(ngx_http_request_t *r)
{
size_t len;
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_http_xdrive_rc_ctx_t *ctx;
ngx_http_variable_value_t *vv;
ngx_http_xdrive_rc_loc_conf_t *mlcf;
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_http_get_module_loc_conf(r, ngx_http_xdrive_rc_module);
//根据配置文件uid index号从变量中获取uid的变量值
vv = ngx_http_get_indexed_variable(r, mlcf->uid_index);
if (vv == NULL || vv->not_found || vv->len == 0)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"the \"$uid\" variable is not set");
return NGX_ERROR;
}
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
ctx->uid = ngx_atoof(vv->data, vv->len);
if (ctx->uid == (off_t)NGX_ERROR)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"the \"$uid\" variable is err %s set", vv->data);
return NGX_ERROR;
}
//根据配置文件path index号从变量中获取path的变量值
vv = ngx_http_get_indexed_variable(r, mlcf->path_index);
if (vv == NULL || vv->not_found || vv->len == 0)
{
ngx_str_set(&ctx->path, "/");
}
else {
ctx->path.data = vv->data;
ctx->path.len = vv->len;
}
vv = ngx_http_get_indexed_variable(r, mlcf->recusive_index);
if (vv == NULL || vv->not_found || vv->len == 0)
{
ctx->recusive = false;
}
else {
ctx->recusive = ngx_atoi(vv->data, vv->len);
}
RcUpdateReq list_req;
list_req._user_id = ctx->uid;
list_req._path.assign((char *)ctx->path.data, (char *)ctx->path.data + ctx->path.len);
list_req._recursive = ctx->recusive;
static uint32_t seq = ngx_time();
//编码,注意这里的变量使用的内存是从pool里面获取的,成功后,会将buf chain返回;
//细节见具体代码,不表
cl = ngx_datagram_encode(r->pool, r->connection->log, mlcf->upstream.buffer_size,
&list_req, ++seq, 0xC01);
if (cl == NULL)
return NGX_ERROR;
//准备发送
r->upstream->request_bufs = cl;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http xdrive_rc request uid=\"%d\", path=\"%V\", recur=%d",
ctx->uid, &ctx->path, ctx->recusive);
return NGX_OK;
}
static ngx_int_t
ngx_http_xdrive_rc_reinit_request(ngx_http_request_t *r)
{
return NGX_OK;
}
/*
* 读取二进制包体头部
*/
static ngx_int_t
ngx_http_xdrive_rc_process_header(ngx_http_request_t *r)
{
ngx_http_upstream_t *u;
ngx_http_xdrive_rc_ctx_t *ctx;
u = r->upstream;
//因包头固定长度,所以很好判断
if (u->buffer.last - u->buffer.pos < NGX_XDRIVE_DATAGRAM_HEADER)
return NGX_AGAIN;
ctx = (ngx_http_xdrive_rc_ctx_t *)ngx_http_get_module_ctx(r, ngx_http_xdrive_rc_module);
ngx_xdrive_datagram_header_t header;
//解包头,获取最重要参数 : 包体长度,根据包体长度收包
if (ngx_decode_header(u->buffer.pos, NGX_XDRIVE_DATAGRAM_HEADER,
&header, r->connection->log) != NGX_OK)
{
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
//业务代码
if (header._type != 0x08C01)
{
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
"xdrive_rc ret type not legal = %d", header._type);
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
//业务代码
if (header._status != 0)
{
ngx_log_error(NGX_LOG_WARN, r->connection->log, 0,
"xdrive_rc ret status not ok in response = %d", header._status);
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
//非常关键一句,这句意思是返回client包包体长度不定,必须采用chunk filter;
ngx_http_clear_content_length(r);
//因upstream不知道该从upstream收取多少包体数据(我们故意没设置包体长度)
//所以我们必须自己处理记录剩余包体长度;
ctx->rest_length = header._length - NGX_XDRIVE_DATAGRAM_HEADER;
u->headers_in.status_n = NGX_HTTP_OK;
u->state->status = NGX_HTTP_OK;
//包头数据已经处理完毕,必须丢弃;
u->buffer.pos += NGX_XDRIVE_DATAGRAM_HEADER;
return NGX_OK;
}
//其实没啥用
static ngx_int_t
ngx_http_xdrive_rc_filter_init(void *data)
{
ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
ngx_http_upstream_t *u;
u = ctx->request->upstream;
return NGX_OK;
}
/*
* 缓存包体,等待包体接受完毕,解码,然后一次回复给client
*/
static ngx_int_t
ngx_http_xdrive_rc_filter(void *data, ssize_t bytes)
{
ngx_http_xdrive_rc_ctx_t *ctx = (ngx_http_xdrive_rc_ctx_t *)data;
u_char *last;
ngx_buf_t *b;
ngx_chain_t *cl, **ll;
ngx_http_upstream_t *u;
ngx_http_xdrive_rc_loc_conf_t *mlcf;
mlcf = (ngx_http_xdrive_rc_loc_conf_t *)
ngx_http_get_module_loc_conf(ctx->request, ngx_http_xdrive_rc_module);
u = ctx->request->upstream;
b = &u->buffer;
size_t buff_size = mlcf->upstream.buffer_size;
//assert(bytes <= buff_size);
ctx->rest_length -= bytes;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
"recv resp len=%d, rest-len=%d", bytes, ctx->rest_length);
//特殊情况下,如果包体数据很短(和缓冲区长度比),很可能一次就将包体收完了,这时候
//直接交互内存即可,不再需要内存拷贝,否则...
if (ctx->rest_length == 0 && ctx->body_buff._chain_head == NULL)
{
cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
ctx->body_buff._chain_head = cl;
cl->buf->flush = 1;
cl->buf->memory = 1;
last = b->last;
cl->buf->pos = last;
b->last += bytes;
cl->buf->last = b->last;
cl->buf->tag = u->output.tag;
}
else {
//做一次内存拷贝到 body buf 中去
if (ngx_chain_write(ctx->request->pool, &u->free_bufs, &ctx->body_buff, buff_size,
b->last, bytes) != NGX_OK)
return NGX_ERROR;
b->last += bytes;
}
//判断upstream包体是否收完整
if (ctx->rest_length > 0)
{
return NGX_OK;
}
//包体收完,进行解码
RcUpdateResp list_resp;
if (ngx_datagram_decode_body(ctx->body_buff._chain_head,
ctx->request->connection->log,
&list_resp) != NGX_OK)
{
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
"xdrive_rc RcUpdateResp decode failed");
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_NOTICE, ctx->request->connection->log, 0,
"xdrive_rc RcUpdateResp list num=%d",
list_resp._action_list.size());
//内容已经存入 list_resp 中,body buf失去作用,回收到free bufs里面去,刚好下面用
ngx_chain_t *busy_bufs = NULL;
ngx_chain_update_chains(&u->free_bufs, &busy_bufs, &ctx->body_buff._chain_head, b->tag);
//transfer...
ngx_chain_pair_t chain_pair;
ngx_memzero(&chain_pair, sizeof(chain_pair));
//转成 json 格式
if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
"uid=%d, path=%V, recusive=%d, week_dcid=\"%s\", used_space=%d, list_num=%d\n",
ctx->uid, &ctx->path, ctx->recusive,
list_resp._weak_dcid.c_str(),
list_resp._used_space,
list_resp._action_list.size()
))
return NGX_ERROR;
//转成 json 格式
for (size_t i = 0; i < list_resp._action_list.size(); ++i)
{
ActionThrft *ac = &list_resp._action_list[i];
if (NGX_OK != ngx_chain_sprintf(ctx->request->pool, &u->free_bufs, &chain_pair, buff_size,
"[path=\"%s\", node_type=%d, status=%d, gcid=%s, size=%d]\n",
ac->m_path.c_str(), ac->m_node_type, ac->m_status,
ac->m_gcid.c_str(), ac->m_file_size
))
return NGX_ERROR;
}
//这句非常有意思,标志这是回包最后一个buf,upstraem通过这标志得知后端收据收集处理完毕
//关后端连接,回前端包
chain_pair._chain_last->buf->last_buf = 1;
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next)
{
ll = &cl->next;
}
*ll = chain_pair._chain_head;
return NGX_OK;
}
static void
ngx_http_xdrive_rc_abort_request(ngx_http_request_t *r)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"abort http xdrive_rc request");
return;
}
static void
ngx_http_xdrive_rc_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"finalize http xdrive_rc request");
return;
}
static void *
ngx_http_xdrive_rc_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_xdrive_rc_loc_conf_t *conf;
conf = (ngx_http_xdrive_rc_loc_conf_t *)ngx_pcalloc(cf->pool,
sizeof(ngx_http_xdrive_rc_loc_conf_t));
if (conf == NULL)
{
return NULL;
}
conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;
conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;
/* the hardcoded values */
conf->upstream.cyclic_temp_file = 0;
conf->upstream.buffering = 0;
conf->upstream.ignore_client_abort = 0;
conf->upstream.send_lowat = 0;
conf->upstream.bufs.num = 0;
conf->upstream.busy_buffers_size = 0;
conf->upstream.max_temp_file_size = 0;
conf->upstream.temp_file_write_size = 0;
conf->upstream.intercept_errors = 1;
conf->upstream.intercept_404 = 1;
conf->upstream.pass_request_headers = 0;
conf->upstream.pass_request_body = 0;
conf->uid_index = NGX_CONF_UNSET;
conf->path_index = NGX_CONF_UNSET;
conf->recusive_index = NGX_CONF_UNSET;
return conf;
}
static char *
ngx_http_xdrive_rc_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_xdrive_rc_loc_conf_t *prev = (ngx_http_xdrive_rc_loc_conf_t *)parent;
ngx_http_xdrive_rc_loc_conf_t *conf = (ngx_http_xdrive_rc_loc_conf_t *)child;
ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
prev->upstream.connect_timeout, 60000);
ngx_conf_merge_msec_value(conf->upstream.send_timeout,
prev->upstream.send_timeout, 60000);
ngx_conf_merge_msec_value(conf->upstream.read_timeout,
prev->upstream.read_timeout, 60000);
ngx_conf_merge_size_value(conf->upstream.buffer_size,
prev->upstream.buffer_size,
(size_t)ngx_pagesize);
ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
prev->upstream.next_upstream,
(NGX_CONF_BITMASK_SET
| NGX_HTTP_UPSTREAM_FT_ERROR
| NGX_HTTP_UPSTREAM_FT_TIMEOUT));
if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF)
{
conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
| NGX_HTTP_UPSTREAM_FT_OFF;
}
if (conf->upstream.upstream == NULL)
{
conf->upstream.upstream = prev->upstream.upstream;
}
if (conf->uid_index == NGX_CONF_UNSET) {
conf->uid_index = prev->uid_index;
}
if (conf->path_index == NGX_CONF_UNSET) {
conf->path_index = prev->path_index;
}
if (conf->recusive_index == NGX_CONF_UNSET) {
conf->recusive_index = prev->recusive_index;
}
return NGX_CONF_OK;
}
static char *
ngx_http_xdrive_rc_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_xdrive_rc_loc_conf_t *mlcf = (ngx_http_xdrive_rc_loc_conf_t *)conf;
ngx_str_t *value;
ngx_url_t u;
ngx_http_core_loc_conf_t *clcf;
if (mlcf->upstream.upstream)
{
return "is duplicate";
}
value = (ngx_str_t *)cf->args->elts;
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
u.no_resolve = 1;
mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
if (mlcf->upstream.upstream == NULL)
{
return (char *)(NGX_CONF_ERROR);
}
clcf = (ngx_http_core_loc_conf_t *)ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
clcf->handler = ngx_http_xdrive_rc_handler;
if (clcf->name.data[clcf->name.len - 1] == '/')
{
clcf->auto_redirect = 1;
}
//保存变量index用
mlcf->uid_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[0].name);
if (mlcf->uid_index == NGX_ERROR)
{
return (char *)(NGX_CONF_ERROR);
}
mlcf->path_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[1].name);
if (mlcf->path_index == NGX_ERROR)
{
return (char *)(NGX_CONF_ERROR);
}
mlcf->recusive_index = ngx_http_get_variable_index(cf, &ngx_http_proxy_vars[2].name);
if (mlcf->recusive_index == NGX_ERROR)
{
return (char *)(NGX_CONF_ERROR);
}
return NGX_CONF_OK;
}
static ngx_int_t
ngx_http_xdrive_rc_add_variables(ngx_conf_t *cf)
{
ngx_http_variable_t *var, *v;
for (v = ngx_http_proxy_vars; v->name.len; v++)
{
var = ngx_http_add_variable(cf, &v->name, v->flags);
if (var == NULL)
{
return NGX_ERROR;
}
var->get_handler = v->get_handler;
var->data = v->data;
}
return NGX_OK;
}
代码中一些有意思的地方:
//和buf差不多的思想的 buf chain
typedef struct
{
ngx_chain_t* _chain_head;
ngx_chain_t* _chain_pos;
ngx_chain_t* _chain_last;
ngx_chain_t* _chain_tail;
}
ngx_chain_pair_t;
//从buf chain中读取len长内存出来
size_t ngx_cdecl
ngx_chain_read(ngx_chain_pair_t* chain_pair
, uint8_t *buf, uint32_t len);
//将buf写入到buf chain中
ngx_int_t ngx_cdecl
ngx_chain_write(ngx_pool_t* pool
, ngx_chain_t** free_bufs
, ngx_chain_pair_t* chain_pair
, size_t write_chunk_size
, const uint8_t *buf, uint32_t len);
//写json或者xml之类回复有用
ngx_int_t ngx_cdecl
ngx_chain_sprintf(ngx_pool_t *pool
, ngx_chain_t **free_bufs
, ngx_chain_pair_t *chain_pair
, size_t write_chunk_size
, const char *fmt, ...);
下面是nginx配置文件中的关键部分
location ~* /rc_list/([0-9]+).html$ {
xdrive_rc_buffer_size 4096;
set $uid $1;
set $path /;
set $recusive 0;
if ($query_string ~* (|&)recusive=(0|1)(|&)) {
set $recusive $2;
}
xdrive_rc_pass 127.0.0.1:11001;
}
解释下上面配置文件意思,将url中匹配的用户数值放入uid参数,根据后缀参数判断是否递归将值放入
recusive 参数中;扩展模块将从这三个参数中将需要的值提取出来;
思路来源于:ngx_http_memcached_module.c 模块,应该还有其他的各种各样的实现方式,不知道还有没有更简单明了的途径;
发表评论
-
Nginx模块fastcgi_cache的几个注意点
2014-09-17 11:01 596原文地址:http://www.cnxct ... -
[nginx] nginx缓存cache的几种方式
2014-09-15 13:57 804原文地址:http://bbs.linuxtone.org/t ... -
NGINX的流媒体插件 nginx-rtmp-module
2014-09-15 12:03 645http://www.oschina.net/p/nginx- ... -
nginx 自定义 header
2014-09-15 11:56 789$http_HEADER The value of the H ... -
nginx反向代理proxy_set_header自定义header头无效
2014-09-15 11:52 1359原文地址:http://www.ttlsa.com/nginx ... -
Nginx模块开发入门
2014-09-14 10:05 628原文地址:http://kb.cnblogs.com/page ... -
nginx log_format 记录自定义header信息
2014-09-12 17:53 2220原文地址:http://notelifes.com/2013/ ... -
在Nginx中记录自定义Header
2014-09-12 17:48 837原文地址:http://gunner.me ... -
nginx log 记录请求的头信息
2014-09-12 14:44 1077记录访问的log,为了在 ... -
nginx官网文档地址
2014-09-10 10:33 571原文地址:http://wiki.nginx.org/Ngin ... -
Nginx配置反向代理时cache缓存的使用方法
2014-09-10 10:32 904原文地址:http://www.server110.com/n ... -
Windows下Nginx+PHP5(FastCgi)安装配置详解
2014-08-25 16:46 853源文地址:http://www.china ... -
nginx 多域名虚拟主机配置 (nginx如何绑定多个域名)
2014-08-25 14:33 391原文地址:http://wenku.baidu.com/lin ... -
实例讲解Nginx下的rewrite规则
2014-06-17 14:44 781一.正则表达式匹配, ... -
使用nginx的proxy_cache做网站缓存
2014-02-25 18:08 806使用nginx的proxy_cache做网站缓存 2012年1 ... -
nginx cache静态化+tmpfs 高性能cdn方案 原创-胡志广
2014-02-25 18:04 848nginx cache静态化+tmpfs 高性能cdn方案 原 ...
相关推荐
通过这个例子,开发者可以学习到Nginx模块开发的基本流程,从而能够根据实际需求创建自己的模块,增强Nginx的功能。这个过程涉及到了C语言编程、Nginx API理解和配置文件解析等多个方面,对提升Nginx的使用和开发...
在开始Nginx模块开发之前,开发者应具备C语言编程基础,对HTTP协议有一定理解,并熟悉基本的Unix/Linux系统操作。此外,了解Nginx的工作原理和事件模型也是必要的。 2. **Nginx模块任务委派的主要轮廓** Nginx...
书中的“模块开发”部分将引导读者学习如何编写自定义的Nginx模块。Nginx模块分为核心模块、HTTP模块、 mail模块和-stream模块,开发者可以根据需求扩展功能。编写Nginx模块涉及C语言编程,需要理解Nginx的API接口和...
《Nginx模块开发指南》是一本专注于帮助开发者深入理解并掌握Nginx模块开发的专著。本书旨在引领读者从基础知识逐步进阶至高级技术,涵盖了从基础设施到实际操作的全过程,包括核心数据结构、配置机制、框架设计以及...
Nginx-RTMP模块是一款强大的开源软件扩展,它将Nginx服务器的功能扩展到了实时传输协议(Real-Time Messaging Protocol, RTMP)领域,使得Nginx能够处理流媒体内容,如直播和点播服务。这个模块是由Alexey Kuznetsov...
#### 十二、模块开发 - Nginx支持模块化扩展,允许用户根据需求开发新的模块。 - 添加新模块:按照特定的接口规范编写新模块。 - 核心模块:预置的核心功能模块。 - 配置指令:用于配置模块行为的指令。 #### 十...
在模块开发方面,Nginx支持用户编写C语言的模块来扩展其功能。开发者需要熟悉Nginx的模块接口,如http、stream、mail三大模块框架,以及ngx_module_t、ngx_http_module_t等结构体,它们定义了模块的基本行为和生命...
在开始Nginx模块开发之前,开发者应具备C语言基础,理解结构体、预处理器指令和指针操作。还需要了解HTTP协议,因为Nginx处理的是Web服务器相关的工作。对于Nginx的配置文件结构,包括main、server、upstream和...
在模块开发方面,Nginx支持用户自定义模块,以扩展其功能。模块开发主要包括以下步骤:配置文件解析、HTTP模块、事件处理、请求处理、内存池使用等。理解Nginx的模块化体系结构对于开发高效且稳定的模块至关重要。...
此外,还讨论了如何实现自定义的HTTP协议、WebSocket支持以及与其他语言(如Python、Lua)的集成,展示了Nginx模块开发的灵活性和强大性。 在Nginx反向代理和负载均衡方面,本书深入剖析了Nginx如何实现高效的反向...
Nginx 是一个高性能的 Web 和反向代理服务器,而 nginx-http-flv-module 是 Nginx 的一个扩展模块,专门用于支持实时流媒体(RTMP)和FLV格式的视频流。让我们深入探讨这个组合的相关知识点。 1. **Nginx 概述** ...
对于开发人员来说,了解Nginx的模块开发是进阶的关键。Nginx支持模块化扩展,开发者可以通过编写C语言的模块来增加新功能。这包括自定义日志记录、实现新协议或者处理特定的HTTP请求方法。 在安全方面,Nginx提供了...
在C++编程环境下,我们可以为Nginx开发自定义模块来扩展其功能,例如"cpp-Nginx静态资源重定向模块"就是一个这样的例子。这个模块旨在优化网站性能,通过重写静态资源(如CSS、JavaScript和图片)的URL,将请求转发...
Nginx的RTMP模块是一款强大的流媒体服务器扩展,它允许Nginx处理实时传输协议(Real-Time Messaging Protocol, RTMP)流。这个模块由Arut Nazaryan开发,广泛应用于在线直播、视频点播等场景,支持H.264、AAC等编码...
HTTP模块是Nginx的核心部分,负责处理HTTP协议。其中,`ngx_http_core_module`是基础模块,负责请求路由;`ngx_http_rewrite_module`用于URL重写;`ngx_http_access_module`控制访问策略;`ngx_http_proxy_module`...
首先,我们将从安装和配置Nginx开始,学习如何编译源代码并自定义配置,以便为后续的模块开发打下基础。接下来,我们将探讨Nginx的模块化设计,理解主模块、事件模块、HTTP模块、服务器块、location块等概念,以及...
1. **模块开发**:Nginx模块开发涉及创建新的HTTP、TCP或UDP处理器,以及与Nginx内核的交互。这包括解析配置文件中的模块指令,处理网络请求,以及与其他模块通信。 2. **事件处理**:Nginx的事件处理模型是基于...