`
qindongliang1922
  • 浏览: 2184538 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117546
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125937
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59934
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71308
社区版块
存档分类
最新评论

Logstash与Kafka集成

阅读更多

在ELKK的架构中,各个框架的角色分工如下:
ElasticSearch1.7.2:数据存储+全文检索+聚合计算+服务端
Logstasch2.2.2:日志收集与分发推送
Kafka0.9.0.0:分布式高可靠消息队列+数据中转存储(失效期默认7天,可配置时间或大小来控制删除策略)
Kibana4.1.2:全文检索+查询+图形化页面展示+客户端

拓扑架构如下:






本篇主要讲logstash与kafka的集成:
(1)logstash作为kafka的生产者,就是logstash收集的日志发送到kafka中
(2)logstash作为kafka的消费者,消费kafka里面的数据打印到终端


(一)安装kafka集群,请参考散仙上篇文章:
http://qindongliang.iteye.com/blog/2278194
(二)安装logstash
这个非常简单,直接下载最新版的logstash,经测试logstash1.5.4有问题,不能正常安装插件
wget https://download.elastic.co/logstash/logstash/logstash-2.2.2.tar.gz

为了能够快速下载logstash的相关插件,然后修改logstash的代理
(方案一)
安装ruby的gem
yum -y install ruby rubygems

安装国内淘宝的代理源:
gem sources --remove http://rubygems.org/
gem sources -a https://ruby.taobao.org/
gem sources -l

*** CURRENT SOURCES ***
https://ruby.taobao.org/


(方案二)
修改logstash目录下的Gemfile里面的source的url为
https://ruby.taobao.org/
然后就不用用方案一的方法了
最新版的logstash2.2支持修改Gemfile里面的地址为淘宝的镜像地址
使用的是最新版本2.2.2的logstash

//安装logstash输出到kafka的插件:
bin/plugin install logstash-output-kafka
//安装logstash从kafka读取的插件:
bin/plugin install logstash-input-kafka



logstash-consume-kafka.conf消费者配置

input{
    kafka{
//zk的链接地址
     zk_connect=>"h1:2181,h2:2181,h3:2181/kafka"
//topic_id,必须提前在kafka中建好
     topic_id=>'logstash'
//解码方式json,
     codec => json   
//消费者id,多个消费者消费同一个topic时,做身份标识
     consumer_id => "187"  
//消费者组
     group_id=> "logstash"
//重新负载时间
     rebalance_backoff_ms=>5000
//最大重试次数
    rebalance_max_retries=>10
    } 
}

output{
   stdout{
     codec=>line 
        } 
}



procuder_kafka_es.conf生产者配置:

input{
 //监听log文件
  file{
    path=> ["/ROOT/server/logstash-2.2.2/t.log"]   
   }
}

output{
 //输出端1=>Kafka
   kafka{
    bootstrap_servers=> 'h1:9092,h2:9092,h3:9092'
    topic_id=> 'logstash'
   }

//输出端2=>ElasticSearch
  elasticsearch{
  hosts=> ["192.168.1.187:9200","192.168.1.184:9200","192.168.1.186:9200"]
   }

}



如果想用Logstash读取kafka某个topic的所有数据,需要加上下面2个配置:

     auto_offset_reset => 'smallest'
     reset_beginning => true


但需要注意的是,如果是读取所有的数据,那么此时,对于kafka的消费者同时只能有一个,如果有多个
那么会报错,因为读取所有的数据,保证顺序还不能重复读取消息,只能使用一个消费者,如果不是
读取所有,仅仅读取最新传过来的消息,那么可以启动多个消费者,但建议消费者的数目,与该topic的
partition的个数一致,这样效果最佳且能保证partition内的数据顺序一致,如果不需要保证partition分区内数据
有序,可以接受乱序,那就无所谓了




参考资料

http://bigbo.github.io/pages/2015/08/07/logstash_kafka_new/

http://soft.dog/2016/01/08/logstash-plugins/

http://www.rittmanmead.com/2015/10/forays-into-kafka-01-logstash-transport-centralisation/


有什么问题 可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园







  • 大小: 48.1 KB
1
2
分享到:
评论
2 楼 qindongliang1922 2016-02-25  
ronin47 写道
很棒。目前es版本升级到2.2
logstash一些语法有变动

最新版本的一些插件用的有问题,head和bigdesk,当然不用这些也就没关系了
1 楼 ronin47 2016-02-25  
很棒。目前es版本升级到2.2
logstash一些语法有变动

相关推荐

    logstash-kafka:Logstash 的 Kafka 插件

    有关 logstash 的更多信息,请参阅 logstash-kafka 已集成到和。 它将与 1.5 版本的 logstash 一起发布。 谢谢您的支持。 我的目标是在这里关闭门票,但在大多数情况下,问题和问题应该通过以下方式解决: logstash...

    scala通过logstash发送日志到kafka的Demo

    使用`slf4j` API与logback集成,例如: ```scala import org.slf4j.Logger import org.slf4j.LoggerFactory object LogProducer { val logger: Logger = LoggerFactory.getLogger(classOf[LogProducer]) def ...

    logstash-input-kafka-9.1.0.gem

    logstash接收kafka插件,已经压缩成zip格式,可以直接集成到logstash

    elasticsearch或kafka的数据抽取工具:logstash-5.6.1

    在标题和描述中提到的 "elasticsearch或kafka的数据抽取工具:logstash-5.6.1",我们主要关注的是 Logstash 在数据处理流程中的角色以及其与 Elasticsearch 和 Kafka 的集成。 1. **Logstash 的基本概念**: - **...

    logstash-7.6.1.tar.gz

    在输出端,Logstash 7.6.1版本的一个关键特性是与Kafka的集成。Kafka作为一个高吞吐量的消息中间件,能够提供消息队列服务,确保数据的可靠传输。通过配置Kafka输出插件,Logstash可以将处理后的数据发送到Kafka主题...

    ELK+KAFKA详细安装文档和资源包.7z

    五、集成与配置 ELK与Kafka的集成通常涉及Logstash的配置,例如使用kafka_input插件接收Kafka主题的数据,kafka_output插件将处理结果发送回Kafka或直接写入Elasticsearch。需要在Logstash的配置文件中明确指定Kafka...

    Spring Cloud集成ELK完成日志收集实战(elasticsearch、logstash、kibana)

    对于日志来说,最常见的需求就是收集、存储、查询、展示,开源社区正好有相对应的开源项目:logstash(收集)、elasticsearch(存储+搜索)、kibana(展示),我们将这三个组合起来的技术称之为ELK,所以说ELK指的是...

    debj_SpringBoot_ELK+Kafka_resources.zip

    二、SpringBoot与Kafka集成 1. Apache Kafka:是一个高吞吐量的分布式消息系统,常用于构建实时数据管道和流应用程序。SpringBoot应用可以通过Spring for Apache Kafka库来轻松实现Kafka的生产者和消费者。 2. ...

    skywalking+es+kafka部署文档.docx

    本文将详细介绍如何在环境中集成 SkyWalking 8.4.0、Kafka 和 Elasticsearch 7.12.0 的配置和搭建过程。 首先,确保你已下载了 Apache SkyWalking 的特定于 Elasticsearch 7 的版本,例如 `apache-skywalking-apm-...

    47_Flume、Logstash、Filebeat调研报告

    这些框架可以与Flume、Logstash和Filebeat集成,接收它们收集的数据并进行实时计算,以提供即时洞察和快速响应。 综上所述,Flume、Logstash和Filebeat是大数据领域中重要的日志采集工具,它们各有特点,可以满足...

    Spring Boot 使用 logback、logstash、ELK 记录日志文件的方法

    Spring Boot 使用 logback、logstash、ELK 记录日志文件的方法 Spring Boot 是一个基于 Java 的框架,用于构建可靠的、可维护的、灵活的 web 应用程序。在日志记录方面,Spring Boot 默认使用 logback 记录日志,...

    logstash-5.6.8.z logstash-5.6.8.z

    - **安全性和兼容性**:可能支持与当时流行的 Elasticsearch 和 Kibana 版本的集成,但需要注意的是,随着版本的迭代,新版本可能会提供更好的安全性和性能改进。 3. **解压和使用 "logstash-5.6.8.zip":** - **...

    logstash汇总整理.rar

    8. **集成与扩展**:Logstash 可以轻松与其他工具集成,如 Beats(轻量级数据发送者)和 Kibana(数据可视化工具)。此外,通过编写自定义插件,用户可以扩展 Logstash 的功能以满足特定需求。 9. **监控与管理**:...

    logstash-7.0.0.zip

    7. **与Elasticsearch和Kibana的集成**:作为Elastic Stack的一部分,Logstash与Elasticsearch无缝集成,确保数据能够高效地被索引和查询。Kibana作为可视化工具,可以基于Logstash处理后的数据创建直观的仪表板,...

    logstash.rar

    输出插件(Outputs Plugins)提供了这种灵活性,使得 Logstash 能够轻松集成到现有的数据基础设施中。 在 "logs" 文件夹中,可能包含了由 Logstash 处理的日志文件。这些日志可能来自不同来源,经过 Logstash 收集...

    logstash-5.4.2

    **Kibana集成与可视化:** Logstash与Kibana的集成使得日志分析结果可以通过可视化界面展示。在Kibana中创建仪表板,展示日志数据的统计信息、时间序列图表和搜索查询结果,帮助监控系统状态、排查问题。 总结来说...

    logstash-7.6.2.zip

    Logstash是Elastic Stack(又称ELK Stack,包括Elasticsearch、Logstash和Kibana)的关键组件,与Elasticsearch和Kibana紧密集成。Elasticsearch负责存储和检索数据,Kibana则提供了可视化界面,让用户可以直观地...

    logstash-5.3.1 下载

    - 监控:集成日志和指标监控,确保 Logstash 的健康运行。 8. **应用场景** - 网络日志分析:收集并分析服务器、应用程序和网络设备的日志。 - 事件检测:通过过滤器检测异常行为,例如入侵检测或安全事件。 - ...

    Kafka技术内幕(带书签).pdf

    Kafka Connect是Kafka提供的一种用于集成外部系统的工具,它可以方便地导入和导出数据到各种数据源,如数据库、HDFS或ELK(Elasticsearch、Logstash、Kibana)堆栈。Kafka Streams是Kafka内置的轻量级流处理库,用于...

    logstash-6.3.2.zip

    通过编写配置文件,你可以处理几乎任何格式的日志数据,并且能够方便地与其他系统集成。此外,Logstash 支持横向扩展,通过增加更多实例来处理更大的数据量。 总结来说,Logstash 6.3.2 是 ELK 技术栈中的数据处理...

Global site tag (gtag.js) - Google Analytics