`
hot66hot
  • 浏览: 459506 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

nginx+lua+kafka实现日志统一收集汇总

阅读更多

转载请注明出处:http://hot66hot.iteye.com/blog/2291916

一:场景描述
对于线上大流量服务或者需要上报日志的nginx服务,每天会产生大量的日志,这些日志非常有价值。可用于计数上报、用户行为分析、接口质量、性能监控等需求。但传统nginx记录日志的方式数据会散落在各自nginx上,而且大流量日志本身对磁盘也是一种冲击。
我们需要把这部分nginx日志统一收集汇总起来,收集过程和结果需要满足如下需求:

  • 支持不同业务获取数据,如监控业务,数据分析统计业务,推荐业务等。
  • 数据实时性
  • 高性能保证


二:技术方案
得益于openresty和kafka的高性能,我们可以非常轻量高效的实现当前需求,架构如下:

方案描述:

  • 1:线上请求打向nginx后,使用lua完成日志整理:如统一日志格式,过滤无效请求,分组等。
  • 2:根据不同业务的nginx日志,划分不同的topic。
  • 3:lua实现producter异步发送到kafka集群。
  • 4:对不同日志感兴趣的业务组实时消费获取日志数据。


三:相关技术


四:安装配置
为了简单直接,我们采用单机形式配置部署,集群情况类似。
1)准备openresty依赖:

apt-get install libreadline-dev libncurses5-dev libpcre3-dev libssl-dev perl make build-essential
# 或者
yum install readline-devel pcre-devel openssl-devel gcc


2)安装编译openresty:

#1:安装openresty:
cd /opt/nginx/ # 安装文件所在目录
wget https://openresty.org/download/openresty-1.9.7.4.tar.gz
tar -xzf openresty-1.9.7.4.tar.gz /opt/nginx/
 
#配置:
# 指定目录为/opt/openresty,默认在/usr/local。
./configure --prefix=/opt/openresty \
            --with-luajit \
            --without-http_redis2_module \
            --with-http_iconv_module
make
make install


3)安装lua-resty-kafka

#下载lua-resty-kafka:
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
unzip lua-resty-kafka-master.zip -d /opt/nginx/
  
#拷贝lua-resty-kafka到openresty
mkdir /opt/openresty/lualib/kafka
cp -rf /opt/nginx/lua-resty-kafka-master/lib/resty /opt/openresty/lualib/kafka/


4):安装单机kafka

cd /opt/nginx/
wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz
tar xvf kafka_2.10-0.9.0.1.tgz
 
# 开启单机zookeeper
nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties > ./zk.log 2>&1 &
# 绑定broker ip,必须绑定
#在config/servier.properties下修改host.name
#host.name={your_server_ip}
# 启动kafka服务
nohup sh bin/kafka-server-start.sh config/server.properties > ./server.log 2>&1 &
# 创建测试topic
sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 1 --replication-factor 1


五:配置运行
开发编辑/opt/openresty/nginx/conf/nginx.conf 实现kafka记录nginx日志功能,源码如下:

worker_processes  12;
 
events {
    use epoll;
    worker_connections  65535;
}
 
http {
    include       mime.types;
    default_type  application/octet-stream;
    sendfile        on;
    keepalive_timeout  0;
    gzip on;
    gzip_min_length  1k;
    gzip_buffers     4 8k;
    gzip_http_version 1.1;
    gzip_types       text/plain application/x-javascript text/css application/xml application/X-JSON;
    charset UTF-8;
    # 配置后端代理服务
    upstream rc{
        server 10.10.*.15:8080 weight=5 max_fails=3;
        server 10.10.*.16:8080 weight=5 max_fails=3;
        server 10.16.*.54:8080 weight=5 max_fails=3;
        server 10.16.*.55:8080 weight=5 max_fails=3;
        server 10.10.*.113:8080 weight=5 max_fails=3;
        server 10.10.*.137:8080 weight=6 max_fails=3;
        server 10.10.*.138:8080 weight=6 max_fails=3;
        server 10.10.*.33:8080 weight=4 max_fails=3;
        # 最大长连数
        keepalive 32;
    }
    # 配置lua依赖库地址
    lua_package_path "/opt/openresty/lualib/kafka/?.lua;;";
 
    server {
        listen       80;
        server_name  localhost;
        location /favicon.ico {
            root   html;
                index  index.html index.htm;
        }
        location / {
            proxy_connect_timeout 8;
            proxy_send_timeout 8;
            proxy_read_timeout 8;
            proxy_buffer_size 4k;
            proxy_buffers 512 8k;
            proxy_busy_buffers_size 8k;
            proxy_temp_file_write_size 64k;
            proxy_next_upstream http_500 http_502  http_503 http_504  error timeout invalid_header;
            root   html;
            index  index.html index.htm;
            proxy_pass http://rc;
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            # 使用log_by_lua 包含lua代码,因为log_by_lua指令运行在请求最后且不影响proxy_pass机制
            log_by_lua '
                -- 引入lua所有api
                local cjson = require "cjson"
                local producer = require "resty.kafka.producer"
                -- 定义kafka broker地址,ip需要和kafka的host.name配置一致
                local broker_list = {
                    { host = "10.10.78.52", port = 9092 },
                }
                -- 定义json便于日志数据整理收集
                local log_json = {}
                log_json["uri"]=ngx.var.uri
                log_json["args"]=ngx.var.args
                log_json["host"]=ngx.var.host
                log_json["request_body"]=ngx.var.request_body
                log_json["remote_addr"] = ngx.var.remote_addr
                log_json["remote_user"] = ngx.var.remote_user
                log_json["time_local"] = ngx.var.time_local
                log_json["status"] = ngx.var.status
                log_json["body_bytes_sent"] = ngx.var.body_bytes_sent
                log_json["http_referer"] = ngx.var.http_referer
                log_json["http_user_agent"] = ngx.var.http_user_agent
                log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for
                log_json["upstream_response_time"] = ngx.var.upstream_response_time
                log_json["request_time"] = ngx.var.request_time
                -- 转换json为字符串
                local message = cjson.encode(log_json);
                -- 定义kafka异步生产者
                local bp = producer:new(broker_list, { producer_type = "async" })
                -- 发送日志消息,send第二个参数key,用于kafka路由控制:
                -- key为nill(空)时,一段时间向同一partition写入数据
                -- 指定key,按照key的hash写入到对应的partition
                local ok, err = bp:send("test1", nil, message)
 
                if not ok then
                    ngx.log(ngx.ERR, "kafka send err:", err)
                    return
                end
            ';
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}


六:检测&运行

# 检测配置,只检测nginx配置是否正确,lua错误日志在nginx的error.log文件中
./nginx -t /opt/openresty/nginx/conf/nginx.conf
# 启动
./nginx -c /opt/openresty/nginx/conf/nginx.conf
# 重启
./nginx -s reload


七:测试
1:使用任意http请求发送给当前nginx,如:

引用

http://10.10.78.52/m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10


2:查看upstream代理是否工作正常
3:查看kafka 日志对应的topic是否产生消息日志,如下:

引用

# 从头消费topic数据命令
sh kafka-console-consumer.sh --zookeeper 10.10.78.52:2181 --topic test1 --from-beginning


效果监测:

4:ab压力测试

引用

#单nginx+upstream测试:
ab -n 10000 -c 100 -k http://10.10.34.15/m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10

#结果
Server Software:        nginx
Server Hostname:        10.10.34.15
Server Port:            80
Document Path:          /m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com
Document Length:        13810 bytes
Concurrency Level:      100
Time taken for tests:   2.148996 seconds
Complete requests:      10000
Failed requests:        9982
   (Connect: 0, Length: 9982, Exceptions: 0)
Write errors:           0
Keep-Alive requests:    0
Total transferred:      227090611 bytes
HTML transferred:       225500642 bytes
Requests per second:    4653.34 [#/sec] (mean)
Time per request:       21.490 [ms] (mean)
Time per request:       0.215 [ms] (mean, across all concurrent requests)
Transfer rate:          103196.10 [Kbytes/sec] received
Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       2
Processing:     5   20  23.6     16     701
Waiting:        4   17  20.8     13     686
Total:          5   20  23.6     16     701
Percentage of the requests served within a certain time (ms)
  50%     16
  66%     20
  75%     22
  80%     25
  90%     33
  95%     41
  98%     48
  99%     69
100%    701 (longest request)

 

引用

#单nginx+upstream+log_lua_kafka接入测试:
ab -n 10000 -c 100 -k http://10.10.78.52/m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10

#结果
Server Software:        openresty/1.9.7.4
Server Hostname:        10.10.78.52
Server Port:            80
Document Path:          /m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com
Document Length:        34396 bytes
Concurrency Level:      100
Time taken for tests:   2.234785 seconds
Complete requests:      10000
Failed requests:        9981
   (Connect: 0, Length: 9981, Exceptions: 0)
Write errors:           0
Keep-Alive requests:    0
Total transferred:      229781343 bytes
HTML transferred:       228071374 bytes
Requests per second:    4474.70 [#/sec] (mean)
Time per request:       22.348 [ms] (mean)
Time per request:       0.223 [ms] (mean, across all concurrent requests)
Transfer rate:          100410.10 [Kbytes/sec] received
Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.2      0       3
Processing:     6   20  27.6     17    1504
Waiting:        5   15  12.0     14     237
Total:          6   20  27.6     17    1504
Percentage of the requests served within a certain time (ms)
  50%     17
  66%     19
  75%     21
  80%     23
  90%     28
  95%     34
  98%     46
  99%     67
100%   1004 (longest request)

 

  • 大小: 107.6 KB
  • 大小: 445.3 KB
分享到:
评论
3 楼 Aceslup 2017-05-29  
学习博主的lua脚本。感谢分享
2 楼 hot66hot 2017-02-23  
a13575018718 写道
你好,博主,我这出现了
[error] 18278#0: *309346 [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, context: ngx.timer, client: 192.168.1.68, server: 0.0.0.0:80
2016/12/19 10:25:34 [error] 18278#0: *309346 [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, context: ngx.timer, client: 192.168.1.68, server: 0.0.0.0:80
2016/12/19 10:25:34 [error] 18278#0: *309346 [lua] producer.lua:258: buffered messages send to kafka err: not found topic, retryable: true, topic: test2, partition_id: -1, length: 1, context: ngx.timer, client: 192.168.1.68, server: 0.0
.0.0:80

进不去kafka


kafka err: not found topic,
1 楼 a13575018718 2016-12-19  
你好,博主,我这出现了
[error] 18278#0: *309346 [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, context: ngx.timer, client: 192.168.1.68, server: 0.0.0.0:80
2016/12/19 10:25:34 [error] 18278#0: *309346 [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, context: ngx.timer, client: 192.168.1.68, server: 0.0.0.0:80
2016/12/19 10:25:34 [error] 18278#0: *309346 [lua] producer.lua:258: buffered messages send to kafka err: not found topic, retryable: true, topic: test2, partition_id: -1, length: 1, context: ngx.timer, client: 192.168.1.68, server: 0.0
.0.0:80

进不去kafka

相关推荐

    lua-resty-kafka:基于cosocket API的Openresty的Lua kafka客户端驱动程序

    描述这个Lua库是ngx_lua nginx模块的Kafka客户端驱动程序: 这个Lua库利用了ngx_lua的cosocket API,该API确保100%的非阻塞行为。 请注意,至少需要或 ,不幸的是仅支持LuaJIT(-- --with-luajit )。 请注意,对于...

    master.zip

    标题 "master.zip" 提供的是一个实现了 OpenResty、Lua、Nginx 与 Kafka 集成的项目。这个项目的核心是通过 Lua 脚本在 Nginx 中处理请求,并将数据实时写入 Apache Kafka 集群。下面我们将深入探讨这些技术及其集成...

    Java思维导图xmind文件+导出图片

    基于kafka实现应用日志实时上报统计分析 RabbitMQ 初步认识RabbitMQ及高可用集群部署 详解RabbitMQ消息分发机制及主题消息分发 RabbitMQ消息路由机制分析 RabbitMQ消息确认机制 Redis redis数据结构分析 ...

    taotao-weblog-analysis基于openresty kafka hadoop hive 日志点击流数据分析

    综合以上,这个项目展示了大数据处理的一条典型流程:通过OpenResty收集日志,利用Kafka进行数据传输,借助Hadoop进行数据存储和计算,最后通过Hive进行数据查询和分析。这种架构对于处理大规模的点击流数据非常有效...

    《APM演进之路》-胡彪.pdf

    - 使用Nginx+lua模块完成解压、Protobuf等工作,并将数据写入access.log。 - 通过Flume读取access.log,再将数据写入Kafka。 - Kafka只有一个Topic,ETLService直接消费Kafka。根据不同的消息类型进行业务逻辑处理,...

    http转发kafka服务

    例如,可以利用Nginx的反向代理功能,结合lua脚本或者OpenResty扩展,将接收到的HTTP请求转换为Kafka的Produce Request。另一种方法是使用开源工具,如Apache Kafka的Connect框架,配合HTTP Source Connector,例如...

    容器技术在互联网企业的实践.pptx

    同时,利用负载均衡器如Nginx+Lua或Finagle,以及日志收集系统如Flume和Kafka,以及监控系统如Graphite和Grafana,对整个系统进行监控和故障排查。 尽管自研容器管理平台带来了流程控制和权限控制的改善,但也存在...

    apache kafka0.10.20 搭建简单环境并运行javaDemo

    例如,设置 broker.id(每个 Kafka 实例的唯一标识)、zookeeper.connect(Zookeeper 集群的连接字符串)和 log.dirs(日志文件的存储路径)。 2. **启动 Zookeeper**: Kafka 使用 Zookeeper 进行集群协调,所以...

    基于分布式地理围栏的管道舆情监控平台的设计与实现.pdf

    网络分发服务,基于Nginx+Lua脚本实现,负责对用户请求进行哈希路由;采集数据上传服务,利用Apache Kafka消息队列对终端设备采集的数据进行上传;围栏数据管理服务,基于SpringBoot框架实现地理围栏数据的CRUD操作...

    基于storm实时热点统计的分布式并行缓存预热

    一、基于nginx+lua完成商品详情页访问流量实时上报kafka的开发 ==================================== 在nginx这一层,接收到访问请求的时候,就把请求的流量上报发送给kafka 这样的话,storm才能去消费kafka中的...

    OpenResty可伸缩的Web平台 v1.19.9.1 rc1.gz

    OpenResty是一款基于Nginx和LuaJIT的高性能Web平台,它将强大的Nginx服务器与Lua脚本语言紧密集成,提供了丰富的功能和高度的灵活性,使得开发人员能够快速构建可伸缩的Web应用程序和服务。这个压缩包“OpenResty可...

    图说企业安全建设.pdf

    - 安全日志分析:通过工具如Kafka(分布式流处理平台)、Storm(实时计算系统)和Redis(内存数据结构存储)来收集、处理和分析安全事件日志,以便快速响应安全威胁。 以上内容通过图形化的界面展示,便于理解和...

    培训班学java学到什么程度可以出去工作了?(csdn)————程序.pdf

    - **缓存架构设计,如Nginx+Lua的多级缓存** - **MySQL数据库索引** - **DevOps实践,持续集成/持续部署(CI/CD)** 对于没有工作经验的培训班学员,找工作的关键是: 1. **简历优化**:确保简历内容真实且具有...

    lightning:服务收集app web h5埋点信息,openresty接受,按下到kafka

    这可能包括lightning服务的配置文件、lua脚本(用于OpenResty)、与Kafka交互的接口实现、以及可能的测试用例和部署指南。通过这些资料,开发者可以了解到如何配置和运行此系统,以及如何根据自己的需求进行定制。 ...

    基于OpenResty的百万级长连接推送.zip

    OpenResty,作为一个强大的Web服务框架,结合了Nginx的高性能和Lua的灵活性,为实现百万级长连接推送提供了可能。本文将深入探讨如何利用OpenResty构建高效稳定的长连接推送系统。 首先,OpenResty基于Nginx,其...

    HR心声:面试过众多Java程序员之后有这些感受.pdf,这是一份不错的文件

    这包括Nginx的lua配置、session粘滞,Dubbo的Zookeeper整合,以及Kafka的集群配置和消息发送机制。掌握这些技术可以展示候选人具备处理高并发、高可用场景的能力。 数据库优化是另一个重要领域。面试官会期望候选人...

Global site tag (gtag.js) - Google Analytics