1、因为本项目采用的log4j2,所以在log4j2中直接配置
<Kafka name="Kafka" topic="XX_log">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss}||%p||%c{1}||XX_web||%m%n"/>
<Property name="bootstrap.servers">127.0.0.1:9092</Property>
<Property name="timeout.ms">500</Property>
</Kafka>
PatternLayout 中格式采用了||将内容连接起来目的为了logstash进行切分,其中增加timeout.ms属性为了保证日志系统挂掉的情况不会对业务系统产生较大影响,当然kafka可以采用集群的方式,bootstrap.servers多个地址用“,”分隔。XX_web代表当前业务平台。
2、搭建kafka集群这里就不多介绍了官网很全,
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
3、创建logstash动态模板
{
"template": "*",
"settings": {
"index.refresh_interval": "5s",
"number_of_replicas": "0",
"number_of_shards": "3"
},
"mappings": {
"_default_": {
"_all": {
"enabled": false
},
"dynamic_templates": [
{
"message_field": {
"match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "analyzed"
}
}
},
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "not_analyzed"
}
}
}
],
"properties": {
"dateTime": {
"type": "date",
"format": "yyy-MM-dd HH:mm:ss"
},
"@version": {
"type": "integer",
"index": "not_analyzed"
},
"context": {
"type": "string",
"index": "analyzed"
},
"level": {
"type": "string",
"index": "not_analyzed"
},
"class": {
"type": "string",
"index": "not_analyzed"
},
"server": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
4、配置logstash
input{
kafka {
zk_connect =>"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"
group_id =>"logstash"
topic_id =>"XX_log"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
filter {
mutate{
split=>["message","||"]
add_field => {
"dateTime" => "%{[message][0]}"
}
add_field => {
"level" => "%{[message][1]}"
}
add_field => {
"class" => "%{[message][2]}"
}
add_field => {
"server" => "%{[message][3]}"
}
add_field => {
"context" => "%{[message][4]}"
}
remove_field => ["message"]
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss"]
}
}
output{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "XX_log-%{+YYYY-MM}"
codec => "json"
manage_template => true
template_overwrite => true
flush_size => 50000
idle_flush_time => 10
workers => 2
template => "E:\logstash\template\template_log.json"
}
}
按照年月将日志保存进ES索引中index => "XX_log-%{+YYYY-MM}",logstash从kafka集群中读取日志信息。
5、搭建ZK集群,这里就不多介绍了,网上资料比较多----http://blog.csdn.net/shirdrn/article/details/7183503
6、搭建ES集群,ES集群比较简单,设置的参数不要太多就可以使用。http://blog.csdn.net/xgjianstart/article/details/52192675
7、配置kibana
server.port: 5601 # 服务端口
# The host to bind the server to.
server.host: "115.28.240.113"
elasticsearch.url: http://127.0.0.1:9200 ES地址-集群
kibana.index: "kibana"
8、版本 JKD 1.7 ES-2.4, logstash 2.4, kafka-2.10,kibana-4.6.4
分享到:
相关推荐
二、SpringBoot与Kafka集成 1. Apache Kafka:是一个高吞吐量的分布式消息系统,常用于构建实时数据管道和流应用程序。SpringBoot应用可以通过Spring for Apache Kafka库来轻松实现Kafka的生产者和消费者。 2. ...
ELK栈(Elasticsearch、Logstash、Kibana)正是解决这一需求的开源解决方案,尤其适用于Java和其他开发语言的日志集成。本文将详细阐述如何在CentOS7系统上利用ELK、logback、kafka和nginx搭建分布式日志分析平台。 ...
本文将详细介绍如何将 SpringBoot 集成 ELK 和 Kafka,以实现日志跟踪和分析。 ELK 环境配置 在开始之前,需要确保已经安装了 ELK 环境,包括 Elasticsearch、Logstash 和 Kibana。ELK 环境可以通过 Docker 容器或...
这个ELK5.5+rsyslog+kafka的详细说明应该包含了如何集成这些组件,以及在实际环境中如何进行配置和优化,帮助你构建一个强大的日志管理和分析系统。通过这个系统,你可以实时监控系统状态,快速定位问题,提升运维...
此外,Kafka还可以与其他系统(如Storm、Spark)集成,实现更复杂的数据处理和分析。 Kibana是Elasticsearch的数据可视化工具,通过简洁的界面,用户可以创建和分享数据仪表板,直观地展示日志数据的趋势、异常和...
ELK与Kafka的集成通常涉及Logstash的配置,例如使用kafka_input插件接收Kafka主题的数据,kafka_output插件将处理结果发送回Kafka或直接写入Elasticsearch。需要在Logstash的配置文件中明确指定Kafka的服务器地址、...
6. **ELK Stack**:ELK是Elasticsearch、Logstash和Kibana的组合,提供了一个强大的日志分析和可视化解决方案。Elasticsearch用于存储和搜索日志,Logstash收集、处理和转发日志,Kibana则用于日志的可视化展示和...
对于日志来说,最常见的需求就是收集、存储、查询、展示,开源社区正好有相对应的开源项目:logstash(收集)、elasticsearch(存储+搜索)、kibana(展示),我们将这三个组合起来的技术称之为ELK,所以说ELK指的是...
夕颜博客(集成K8S一键部署和Docker...分配任务调度XXL-JOB,使用ELK + Kafka + Filebeat日志收集,文件上传使用七牛云,数据加密AES,SEO优化快速爬虫抓取,个人免签支付系统采用的技术是SpringBoot + Layui + DB2 + J
3. 分析友好:日志数据在Kafka中可被下游系统(如ELK Stack或Hadoop)进行进一步分析和存储,方便进行故障排查和业务监控。 总之,Log4j2+kafka的组合提供了一种强大且灵活的日志处理解决方案,它将日志管理提升到...
微服务支架基于Spring Cloud(Greenwich.SR2)构建的微服务脚手架,已集成注册中心(Nacos Config),配置中心(Nacos Discovery),认证授权(Oauth 2 + JWT),日志处理(ELK + Kafka),限流熔断(AliBaba ...
在实际操作中,可能还需要结合其他工具,如Logstash用于日志解析和Elasticsearch进行日志存储与搜索,形成ELK(Elasticsearch, Logstash, Kibana)或EFK(Elasticsearch, Fluentd, Kibana)栈,以提供更强大的日志...
6. **日志可视化和分析**:日志数据在Kafka中流转后,可以使用诸如Elasticsearch、Logstash和Kibana(ELK栈)或者Grafana等工具进行可视化和分析,以便于实时监控和故障排查。 通过这样的架构,我们可以构建一个...
Logstash以其强大的过滤插件著称,可以对数据进行解析、重格式化和添加元数据,最终将处理后的数据发送到Elasticsearch、Kafka、MongoDB等目标。 【Kibana】 Kibana 是一个开源的可视化工具,与Elasticsearch紧密...
3. **监控与日志**:集成监控工具(如Grafana、Prometheus)和日志系统(如Logstash、ELK Stack),实时监控系统性能和错误,及时发现问题并进行优化。 总的来说,"storm-kafka实时趋势分析"是一种强大的实时数据...
10. **扩展与集成**:学习如何与其他系统集成,如Beats(如Filebeat、Metricbeat等)、Apache Kafka、RabbitMQ等。 这个中文指南将帮助用户全面地了解ELKstack,无论你是初学者还是经验丰富的开发者,都能从中受益...
Kafka Connect是Kafka提供的一种用于集成外部系统的工具,它可以方便地导入和导出数据到各种数据源,如数据库、HDFS或ELK(Elasticsearch、Logstash、Kibana)堆栈。Kafka Streams是Kafka内置的轻量级流处理库,用于...
技术组件包括:elasticsearch + logstash + kafka + filebeat + head + zookeeper + kibana 部署方式:docker + docker-compose 文件内容包括: 1.ELK部署资源方案(用于申请资源) 2.ELK部署指南 3.elasticsearch...
标题"扩展logback将日志输出到Kafka实例源码"涉及的技术点主要集中在如何将Logback与Kafka集成,使得日志可以被有效地发送到Kafka集群。这个过程通常涉及到以下几个步骤: 1. **添加依赖**:首先,你需要在项目的...
"ELK Filebeat Kafka ZooKeeper构建数据日志分析平台.pptx"可能涉及如何整合Filebeat和其他组件构建完整的日志处理系统。 5. **Kafka** 和 **ZooKeeper**:这两个组件在大型日志处理系统中经常一起出现。Apache ...