`
WAMING5
  • 浏览: 10891 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

ELK+kafka集成

阅读更多
1、因为本项目采用的log4j2,所以在log4j2中直接配置
	    <Kafka name="Kafka" topic="XX_log">
	      <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss}||%p||%c{1}||XX_web||%m%n"/>
	        <Property name="bootstrap.servers">127.0.0.1:9092</Property>
	        <Property name="timeout.ms">500</Property>
	    </Kafka>

PatternLayout 中格式采用了||将内容连接起来目的为了logstash进行切分,其中增加timeout.ms属性为了保证日志系统挂掉的情况不会对业务系统产生较大影响,当然kafka可以采用集群的方式,bootstrap.servers多个地址用“,”分隔。XX_web代表当前业务平台。
2、搭建kafka集群这里就不多介绍了官网很全,
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183


3、创建logstash动态模板
{
    "template": "*",
    "settings": {
        "index.refresh_interval": "5s",
        "number_of_replicas": "0",
        "number_of_shards": "3"
    },
    "mappings": {
        "_default_": {
            "_all": {
                "enabled": false
            },
            "dynamic_templates": [
                {
                    "message_field": {
                        "match": "message",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string",
                            "index": "analyzed"
                        }
                    }
                },
                {
                    "string_fields": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "string",
                            "index": "not_analyzed"
                        }
                    }
                }
            ],
            "properties": {
                "dateTime": {
                    "type": "date",
                    "format": "yyy-MM-dd HH:mm:ss"
                },
                "@version": {
                    "type": "integer",
                    "index": "not_analyzed"
                },
                "context": {
                    "type": "string",
                    "index": "analyzed"
                },
                "level": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "class": {
                    "type": "string",
                    "index": "not_analyzed"
                },
                "server": {
                    "type": "string",
                    "index": "not_analyzed"
                }
            }
        }
    }
}

4、配置logstash
input{
       kafka {
                zk_connect =>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
                group_id =>"logstash"
                topic_id =>"XX_log"
                reset_beginning => false
                consumer_threads => 5
                decorate_events => true
       }
}
filter {
   mutate{
        split=>["message","||"]
        add_field => {
             "dateTime" => "%{[message][0]}"
        }
        add_field => {
              "level" => "%{[message][1]}"
        }
        add_field => {
               "class" => "%{[message][2]}"
        }
        add_field => {
                "server" => "%{[message][3]}"
         }
        add_field => {
                "context" => "%{[message][4]}"
         }
         remove_field => ["message"]
    }
    date {
        match => ["logdate", "yyyy-MM-dd HH:mm:ss"]
    }
}
output{
	  elasticsearch {
		    hosts => ["127.0.0.1:9200"]
		    index => "XX_log-%{+YYYY-MM}"
		    codec => "json"
		    manage_template => true
		    template_overwrite => true
		    flush_size => 50000
		    idle_flush_time => 10
		    workers => 2
		    template => "E:\logstash\template\template_log.json"  
	}
}

按照年月将日志保存进ES索引中index => "XX_log-%{+YYYY-MM}",logstash从kafka集群中读取日志信息。

5、搭建ZK集群,这里就不多介绍了,网上资料比较多----http://blog.csdn.net/shirdrn/article/details/7183503

6、搭建ES集群,ES集群比较简单,设置的参数不要太多就可以使用。http://blog.csdn.net/xgjianstart/article/details/52192675

7、配置kibana
server.port: 5601 # 服务端口
# The host to bind the server to.
server.host: "115.28.240.113"
elasticsearch.url: http://127.0.0.1:9200   ES地址-集群
kibana.index: "kibana"


8、版本 JKD 1.7  ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4
0
1
分享到:
评论
1 楼 happysoul 2017-07-14  
logstash 内置配置
https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

自定义的匹配规则(日期、log类型、括号、日志文本)
引用
JAVA_DATE %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND},%{INT}
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
KUOHAO (\[[^\[]*\])
TEXT (.*)


采集例子(重点是多行要合并为一条记录)
input{ 
file { 
                type => "log4j" 
#星号匹配多个文件
                path => ["/home/admin/logs/weblog-*.log","/home/admin/logs/stderr.log"] 
#可以不扫描某个文件
#	exclude => ["1.log"] 
 
start_position => "beginning" 
         
                codec => multiline { 
#多行代码分割标识(时间戳)
                        pattern => "^%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND},%{INT}" 
                        negate => true 
                        what => previous 
                } 
        } 
} 
 
 
filter { 
  if [type] == "log4j" { 
    grok { 
#使用这个目录下的匹配规则(上面一段代码里的东西)
        patterns_dir => ["./patterns"] 
        match => ["message", "%{JAVA_DATE:@timestamp} %{LOGLEVEL:LEVEL} %{KUOHAO:THREAD} %{KUOHAO:CLASS} %{KUOHAO:TRACEID} - %{TEXT:MSG}"] 
    } 
    date { 
        match => [ "timestamp" , "YYYY-MM-dd HH:mm:ss,Z" ] 
                                                                                   
    } 
  } 
 
} 
 
output { 
        elasticsearch { 
                hosts => ["10.10.10.10:9200"] 
                sniffing => true 
                manage_template => false 
                template_overwrite => true 
                index => "ailicai-%{+YYYY.MM.dd}" 
        } 
 #控制台输出 正式使用的时候这个没用
        stdout{ 
                codec => rubydebug 
        } 
}

相关推荐

Global site tag (gtag.js) - Google Analytics