`

logstash 过滤采集nginx日志

阅读更多

 

 

      在生产环境中,nginx日志格式往往使用的是自定义的格式,我们需要把logstash中的message结构化后再存储,方便kibana的搜索和统计,因此需要对message进行解析。

  本文采用grok过滤器,使用match正则表达式解析,根据自己的log_format定制。

1、nginx日志格式

  log_format配置如下:

log_format  main  '$remote_addr - $remote_user [$time_local] $http_host $request_method "$uri" "$query_string" '
                  '$status $body_bytes_sent "$http_referer" $upstream_status $upstream_addr $request_time $upstream_response_time '
                  '"$http_user_agent" "$http_x_forwarded_for"' ;

对应的日志如下:

192.172.2.1 - - [06/Jun/2016:00:00:01 +0800] test.changh.com GET "/api/index" "?cms=0&rnd=1692442321" 200 4 "http://www.test.com/?cp=sfwefsc" 200 192.168.0.122:80 0.004 0.004 "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-"

 

2、编写正则表达式

  logstash中默认存在一部分正则让我们来使用,可以访问Grok Debugger来查看,可以在logstash/

vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/ 目录中查看

      基本定义在grok-patterns中,我们可以使用其中的正则,当然并不是所有的都适合nginx字段,这时就需要我们自定义正则,然后通过指定patterns_dir来调用。

  同时在写正则的时候可以使用Grok Debugger或者Grok Comstructor工具来帮助我们更快的调试。在不知道如何使用logstash中的正则的时候也可使用Grok Debugger的Descover来自动匹配。(注意网络是否通,需要墙)

  1)nginx标准日志格式

    logstash自带的grok正则中有Apache的标准日志格式:

 COMMONAPACHELOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

对于nginx标准日志格式,可以发现只是最后多了一个 $http_x_forwarded_for 变量。则nginx标准日志的grok正则定义为:

MAINNGINXLOG %{COMBINEDAPACHELOG} %{QS:x_forwarded_for}

2)自定义格式

    通过log_format来匹配对应的正则如下:

%{IPV4:remote_addr} - (%{USERNAME:user}|-) \[%{HTTPDATE:log_timestamp}\] (%{HOSTNAME1:http_host}|-) (%{WORD:request_method}|-) \"(%{URIPATH1:uri}|-|)\" \"(%{URIPARM1:param}|-)\" %{STATUS:http_status} (?:%{BASE10NUM:body_bytes_sent}|-) \"(?:%{GREEDYDATA:http_referrer}|-)\" (%{STATUS:upstream_status}|-) (?:%{HOSTPORT1:upstream_addr}|-) (%{BASE16FLOAT:upstream_response_time}|-) (%{STATUS:request_time}|-) \"(%{GREEDYDATA:user_agent}|-)\" \"(%{FORWORD:x_forword_for}|-)\"

这里面有几个是我自定义的正则:

       URIPARM1 [A-Za-z0-9$.+!*'|(){},~@#%&/=:;^\\_<>`?\-\[\]]*

URIPATH1 (?:/[\\A-Za-z0-9$.+!*'(){},~:;=@#% \[\]_<>^\-&?]*)+

HOSTNAME1 \b(?:[0-9A-Za-z_\-][0-9A-Za-z-_\-]{0,62})(?:\.(?:[0-9A-Za-z_\-][0-9A-Za-z-:\-_]{0,62}))*(\.?|\b)

STATUS ([0-9.]{0,3}[, ]{0,2})+

HOSTPORT1 (%{IPV4}:%{POSINT}[, ]{0,2})+

FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

      logstash中的message是每段读进来的日志,IPORHOST、USERNAME、HTTPDATE等都是patterns/grok-patterns中定义好的正则格式名称,对照日志进行编写。

  grok pattren的语法为:%{SYNTAX:semantic},":" 前面是grok-pattrens中定义的变量,后面可以自定义变量的名称。(?:%{SYNTAX:semantic}|-)这种形式是条件判断。

  如果有双引号""或者中括号[],需要加 \ 进行转义。

  详解自定义正则:

 URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]* 

 URIPARM1 [A-Za-z0-9$.+!*'|(){},~@#%&/=:;^\\_<>`?\-\[\]]* grok-patterns中正则表达式,可以看到grok-patterns中是以“?”开始的参数,在nginx的 $query_string 中已经把“?”去掉了,所以我们这里不再需要“?”。另外单独加入日志中出现的  ^ \ _ < > ` 特殊符号 

 URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\-]*)+ 

 URIPATH1 (?:/[\\A-Za-z0-9$.+!*'(){},~:;=@#% \[\]_<>^\-&?]*)+ grok-patterns中正则表达式,grok-patterns中的URIPATH不能匹配带空格的URI,于是在中间加一个空格。另外还有 \ [ ] < > ^ 特殊符号。

 HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) 

 HOSTNAME1 \b(?:[0-9A-Za-z_\-][0-9A-Za-z-_\-]{0,62})(?:\.(?:[0-9A-Za-z_\-][0-9A-Za-z-:\-_]{0,62}))*(\.?|\b) 添加匹配 http_host 中带有 "-" 的字符。

 HOSTPORT %{IPORHOST}:%{POSINT} 

 HOSTPORT1 (%{IPV4}:%{POSINT}[, ]{0,2})+ 在匹配 upstream_addr 字段时发现,会出现多个IP地址的情况出现,匹配多个IP地址。

 STATUS ([0-9.]{0,3}[, ]{0,2})+ 该字段是当出现多个 upstream_addr 字段时匹配多个 http_status 。

 FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD} 当 x_forword_for 字段出现多个IP地址时匹配。

  nginx左右字段都定义完成,可以使用Grok Debugger或者Grok Comstructor工具来测试。添加自定义正则的时候,在Grok Debugger中可以勾选“Add custom patterns”。

  以上日志匹配结果为:

{

  "remote_addr": [

    "1.1.1.1"

  ],

  "user": [

    "-"

  ],

  "log_timestamp": [

    "06/Jun/2016:00:00:01 +0800"

  ],

  "http_host": [

    "www.test.com"

  ],

  "request_method": [

    "GET"

  ],

  "uri": [

    "/api/index"

  ],

  "param": [

    "?cms=0&rnd=1692442321"

  ],

  "http_status": [

    "200"

  ],

  "body_bytes_sent": [

    "4"

  ],

  "http_referrer": [

    "http://www.test.com/?cp=sfwefsc"

  ],

  "port": [

    null

  ],

  "upstream_status": [

    "200"

  ],

  "upstream_addr": [

    "192.168.0.122:80"

  ],

  "upstream_response_time": [

    "0.004"

  ],

  "request_time": [

    "0.004"

  ],

  "user_agent": [

    ""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36""

  ],

  "client_ip": [

    "2.2.2.2"

  ],

  "x_forword_for": [

    null

  ]

}

 

但是、我们的环境中 nginx 的log_format  定义如下:

log_format  access  '$remote_addr - $remote_user [$time_local] "$request" '

                        '$status $body_bytes_sent "$http_referer" '

                        '"$http_user_agent" $http_x_forwarded_for'

                        '$upstream_addr $upstream_response_time $request_time ';

 

故,我在grokdubug调试配置就必须这样写 ,同时要添加自定义的正则表达式:

      %{IPV4:remote_addr} - (%{USERNAME:user}|-) \[%{HTTPDATE:log_timestamp}\] \"%{WORD:request_method} %{URIPATH1:uri}\" %{BASE10NUM:http_status} (?:%{BASE10NUM:body_bytes_sent}|-) \"(?:%{GREEDYDATA:http_referrer}|-)\" \"(%{GREEDYDATA:user_agent}|-)\"

 

3、logstash的配置文件

  创建自定义正则目录

# mkdir -p /usr/local/logstash/patterns
# vi /usr/local/logstash/patterns/nginx

然后写入上面自定义的正则

 

URIPARM1 [A-Za-z0-9$.+!*'|(){},~@#%&/=:;^\\_<>`?\-\[\]]*

URIPATH1 (?:/[\\A-Za-z0-9$.+!*'(){},~:;=@#% \[\]_<>^\-&?]*)+

HOSTNAME1 \b(?:[0-9A-Za-z_\-][0-9A-Za-z-_\-]{0,62})(?:\.(?:[0-9A-Za-z_\-][0-9A-Za-z-:\-_]{0,62}))*(\.?|\b)

STATUS ([0-9.]{0,3}[, ]{0,2})+

HOSTPORT1 (%{IPV4}:%{POSINT}[, ]{0,2})+

FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}

URIPARM [A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*

URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_\- ]*)+

URI1 (%{URIPROTO}://)?(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

NGINXACCESS %{IPORHOST:remote_addr} - (%{USERNAME:user}|-) \[%{HTTPDATE:log_timestamp}\]  \"{WORD:request_method} %{URIPATH1:uri}\" %{BASE10NUM:http_status} (?:%{BASE10NUM:body_bytes_sent}|-) \"(?:%{GREEDYDATA:http_referrer}|-)\" \"(%{GREEDYDATA:user_agent}|-)\" (%{FORWORD:x_forword_for}|-) (?:%{HOSTPORT1:upstream_addr}|-) ({BASE16FLOAT:upstream_response_time}|-) (%{STATUS:request_time}|-)

 

logstash.conf配置文件内容

input { 

  beats {

    port => 5044

    type => "nginx-log"

  }

}

 

filter {

  if [type] == "nginx-log"{

     grok {

        patterns_dir => "/usr/local/logstash/patterns"

        match => {"message" => "%{NGINXACCESS}" }

     }

     date {

        match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]

     }

     geoip {

        source => "clientip"

     }

  }

}

 

output {

  elasticsearch {

    hosts => ["10.129.11.87:9200","10.129.11.88:9200"]

    index => "logstash-custom-nginx%{+YYYY.MM.dd}"

    document_type => "%{type}"

    flush_size => 20000

    idle_flush_time => 10

    sniffing => true

    template_overwrite => true

  }

}

 

 4、启动logstash,然后就可以查看日志是否写入elasticsearch中。

 

==========================

如果用  grafana 读取es日志好看监控数据:

则可以将nginx 配置为:

log_format main   '{"@timestamp":"$time_iso8601",'
                        '"@source":"$server_addr",'
                        '"hostname":"$hostname",'
                        '"ip":"$http_x_forwarded_for",'
                        '"client":"$remote_addr",'
                        '"request_method":"$request_method",'
                        '"scheme":"$scheme",'
                        '"domain":"$server_name",'
                        '"referer":"$http_referer",'
                        '"request":"$request_uri",'
                        '"args":"$args",'
                        '"size":$body_bytes_sent,'
                        '"status": $status,'
                        '"responsetime":$request_time,'
                        '"upstreamtime":"$upstream_response_time",'
                        '"upstreamaddr":"$upstream_addr",'
                        '"http_user_agent":"$http_user_agent",'
                        '"https":"$https"'
                        '}';

这样的字段可以用 grafana 相对的模板数据;

input {
    file {
        #这里根据自己日志命名使用正则匹配所有域名访问日志
        path => [ "/usr/local/nginx/logs/*_access.log" ]
        ignore_older => 0
    codec => json
    }
}

filter {
    mutate {
      convert => [ "status","integer" ]
      convert => [ "size","integer" ]
      convert => [ "upstreatime","float" ]
      remove_field => "message"
    }
    geoip {
        source => "ip"
    }


}
output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "logstash-nginx-access-%{+YYYY.MM.dd}"
    }
#    stdout {codec => rubydebug}
}

https://grafana.com/dashboards/2292 (grafana 的nginx-access 模板)  

 

我用的filebeat采集的则如下配置

input {

  beats {

    port => 5044

  }

}

 

filter {

   if [fields][doc_type] == "nginx_access_log" {

      mutate {

         convert => [ "status","integer" ]

         convert => [ "size","integer" ]

         convert => [ "upstreatime","float" ]

      }

      geoip {

         source => "ip"

      }

   }

}

 

output {

  if [fields][doc_type] == "nginx_access_log"{

     elasticsearch {

        hosts => ["10.129.11.87:9200","10.129.11.88:9200"]

        index => "logstash-nginx-access%{+YYYY.MM.dd}"

        document_type => "%{type}"

        flush_size => 20000

        idle_flush_time => 10

        sniffing => true

        template_overwrite => true

     }

     stdout{codec => rubydebug}

  }

  if [fields][doc_type] == "nginx_error_log" {

     elasticsearch {

        hosts => ["10.129.11.87:9200","10.129.11.88:9200"]

        index => "logstash-nginx-error%{+YYYY.MM.dd}"

        document_type => "%{type}"

        flush_size => 20000

        idle_flush_time => 10

        sniffing => true

        template_overwrite => true

     }

  }

}

 

 

分享到:
评论

相关推荐

    Logstash收集Tomcat集群日志的解决方案.txt

    - `filter`:对日志进行过滤处理,这里使用grok插件解析日志格式。由于示例中的配置被截断,我们假设使用标准的`COMBINEDAPACHELOG`模式进行匹配。 - `output`:定义日志输出目标为Elasticsearch。 通过以上步骤,...

    linux平台centos7系统 - ELK+logback+kafka+nginx 搭建分布式日志分析平台.doc

    - 在每个服务主机上部署Logstash,负责日志采集、过滤和推送至Elasticsearch。 - Elasticsearch接收并存储Logstash传递的结构化数据,供Kibana使用。 - Kibana提供友好的Web界面,用于数据分析和结果呈现。 **4. ...

    logstash-2.3.1.tar.zip

    Logstash 是一个强大的开源日志管理和...随着需求的增长,还可以扩展Logstash的功能,例如添加更多的过滤器来处理更复杂的数据,或者使用Beats(如Filebeat)作为轻量级的日志发送代理,以提高日志采集的效率和可靠性。

    部署ELK分布式日志分析平台

    在具体的应用案例中,使用ELK平台对nginx日志进行分析是一个典型的应用场景。通过ELK的集中式日志管理和分析能力,可以快速定位到问题的具体服务器和服务模块,大大提高了日志分析的效率和准确性,为运维和开发人员...

    第四章:项目:ELK+Filebeat+Redis部署海量日志分析平台1

    Logstash是一款强大的日志收集、分析和过滤工具,支持多种数据采集方式。在本项目中,Logstash作为客户端在需要收集日志的服务器上安装,接收Redis中的日志数据,然后进行过滤、转换等操作,再将处理后的数据发送到...

    elk日志分析系统、elk速成宝典、elk新手晋级大神

    配置Filebeat来采集Nginx日志,并使用Grok来解析这些日志。 ```bash # 配置示例 filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/*.log fields: type: nginx_access_log fields_under_...

    日志文件采集工具filebeat,版本8.2.2

    3. **Logstash**(可选):如果需要进一步处理日志数据,例如添加字段、过滤、转换等,可以先将数据发送到Logstash进行预处理,然后再送入Elasticsearch。 五、安装与运行 下载`filebeat-8.2.2-linux-x86_64`压缩包...

    数据采集架构所需组件

    接着,`logstash` 是一个灵活的日志管理和处理工具,它可以接收来自 `nginx` 或其他数据源的原始数据,然后通过过滤、解析、转换等操作将数据转化为结构化的格式。`logstash` 支持多种输入和输出插件,使得数据采集...

    ELK日志收集系统完全实现

    在本案例中,我们将基于VMware搭建一个完整的ELK日志管理系统,其中包括Elasticsearch集群、Logstash集群、Kibana以及Kafka集群,以及用于日志采集的Filebeat。 首先,搭建环境需要准备VMware Workstation 15 Pro...

    ELK集群的部署、使用以及备份与版本升级.pdf

    例如,使用Filebeat采集nginx日志,通过JSON解析器将日志json化,然后发送到Elasticsearch。 **四、Kibana的安装与使用** Kibana安装完成后,需要配置Elasticsearch的连接信息,并创建索引来展示数据。通过Kibana...

    filebeat-6.7.0-windows-x86_64.zip

    在ELK Stack中,Filebeat主要负责收集日志,Logstash用于进一步处理和过滤日志(如果需要),然后将清洗后的数据发送至Elasticsearch存储。Kibana则提供了用户友好的界面,用于查询、分析和展示这些日志数据。 总的...

    Elasticsearch+Fluentd+Kafka搭建日志系统

    在日志管理领域,传统的ELK(Elasticsearch, Logstash, Kibana)堆栈正逐渐被EFK(Elasticsearch, Fluentd, Kafka)所取代,原因是Logstash在处理大量日志时可能消耗过多内存,而Fluentd在轻量级日志收集方面表现更...

    ELK日志收集系统讲析

    Filebeat 是一种轻量级的日志收集器,可以部署在各个服务器上,用于采集这些服务器上的日志文件。它支持多种格式的日志输入,并可以将数据发送到Elasticsearch、Logstash 或 Kafka 等目标。 **3. 组合使用** - **...

    filebeat-7.6.0-linux-x86_64.tar.gz

    **Filebeat:ELK生态中的日志采集利器** Filebeat是ELK(Elasticsearch、Logstash、Kibana)堆栈中不可或缺的一部分,主要负责日志数据的收集与传输。在ELK解决方案中,Filebeat扮演着轻量级日志代理的角色,它能够...

    filebeat-5.6.16-linux-x86_64.tar.gz

    1. **Filebeat模块**:Filebeat的核心功能是通过模块化设计,方便地配置和管理对不同类型的日志或数据源的采集。例如,它内置了对Apache、Nginx等常见服务器日志的支持。 2. **Prospectors**:Prospectors是...

    filebeat-5.6.8-windows-x86_64.zip

    9. **日志过滤与处理**:虽然Filebeat主要负责日志的收集,但也可以配合grok patterns或其他过滤工具对收集的日志进行初步解析和筛选。 10. **监控与管理**:Filebeat可以通过Elastic Stack中的Kibana仪表板进行...

    filebeat-6.2.2-linux-x86_64.tar.gz下载

    在ELK堆栈中,Filebeat收集的日志数据会被发送到Logstash(如果使用了Logstash),然后由Logstash进一步处理和过滤,最后存储到Elasticsearch。Kibana可以从Elasticsearch中获取数据,创建各种图表和仪表板,以便...

    ELK6.3和Filebeat6.3在Docker-Compose环境下安装和使用

    Filebeat是ELK生态系统中的另一个重要组件,属于轻量级的日志数据采集器,通常用于代替Logstash作为日志收集端。Filebeat部署在数据源所在的服务器上,能够监控指定的日志文件,捕获新增的日志数据,并将其安全地...

Global site tag (gtag.js) - Google Analytics