`

elk集成kafka配置

阅读更多

Input 配置示例

以下配置可以实现对 kafka 读取端(consumer)的基本使用。

消费端更多详细的配置请查看 http://kafka.apache.org/documentation.html#consumerconfigs kafka 官方文档的消费者部分配置文档。

input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "logstash"
        topic_id => "test"
        codec => plain
        reset_beginning => false # boolean (optional), default: false
        consumer_threads => 5  # number (optional), default: 1
        decorate_events => true # boolean (optional), default: false
    }
}

Input 解释

作为 Consumer 端,插件使用的是 High-level-consumer API请结合上述 kafka 基本概念进行设置

  • group_id

消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离。

  • topic_id

指定消费话题,也是必填项目,指定消费某个 topic ,这个其实就是订阅某个主题,然后去消费。

  • reset_beginning

logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 "true", logstash 进程就从头开始读取.有点类似 cat ,但是读到最后一行不会终止,而是变成tail -F ,继续监听相应数据。

  • decorate_events

在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息。

  • rebalance_max_retries

当有新的 consumer(logstash) 加入到同一 group 时,将会 reblance ,此后将会有 partitions 的消费端迁移到新的 consumer 上,如果一个 consumer 获得了某个 partition 的消费权限,那么它将会向zookeeper 注册, Partition Owner registry 节点信息,但是有可能此时旧的 consumer 尚没有释放此节点,此值用于控制,注册节点的重试次数。

  • consumer_timeout_ms

指定时间内没有消息到达就抛出异常,一般不需要改。

以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 input 默认参数。

注意

1.想要使用多个 logstash 端协同消费同一个 topic 的话,那么需要把两个或是多个 logstash 消费端配置成相同的 group_id 和 topic_id, 但是前提是要把相应的 topic 分多个 partitions (区),多个消费者消费是无法保证消息的消费顺序性的。

这里解释下,为什么要分多个 partitions(区), kafka 的消息模型是对 topic 分区以达到分布式效果。每个 topic 下的不同的 partitions (区)只能有一个 Owner 去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由 server 端协调而成。不必使用者考虑太多。只是消息的消费则是无序的

总结:保证消息的顺序,那就用一个 partition。 kafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费

Output 配置

以下配置可以实现对 kafka 写入端 (producer) 的基本使用。

生产端更多详细的配置请查看 http://kafka.apache.org/documentation.html#producerconfigs kafka 官方文档的生产者部分配置文档。

 output {
    kafka {
        broker_list => "localhost:9092"
        topic_id => "test"
        compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
    }
}

Output 解释

作为 Producer 端使用,以下仅为重要概念解释,请结合上述 kafka 基本概念进行设置

  • compression_codec

消息的压缩模式,默认是 none,可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能,数据传输大小等对比)。

  • compressed_topics

可以针对特定的 topic 进行压缩,设置这个参数为 topic ,表示此 topic 进行压缩。

  • request_required_acks

消息的确认模式:

可以设置为 0: 生产者不等待 broker 的回应,只管发送.会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失) 可以设置为 1: 生产者会收到 leader 的回应在 leader 写入之后.(在当前 leader 服务器为复制前失败可能会导致信息丢失) 可以设置为 -1: 生产者会收到 leader 的回应在全部拷贝完成之后。

  • partitioner_class

分区的策略,默认是 hash 取模

  • send_buffer_bytes

socket 的缓存大小设置,其实就是缓冲区的大小

消息模式相关

  • serializer_class

消息体的系列化处理类,转化为字节流进行传输,请注意 encoder 必须和下面的 key_serializer_class使用相同的类型

  • key_serializer_class

默认的是与 serializer_class 相同

  • producer_type

生产者的类型 async 异步执行消息的发送 sync 同步执行消息的发送

  • queue_buffering_max_ms

异步模式下,那么就会在设置的时间缓存消息,并一次性发送

  • queue_buffering_max_messages

异步的模式下,最长等待的消息数

  • queue_enqueue_timeout_ms

异步模式下,进入队列的等待时间,若是设置为0,那么要么进入队列,要么直接抛弃

  • batch_num_messages

异步模式下,每次发送的最大消息数,前提是触发了 queue_buffering_max_messages 或是queue_enqueue_timeout_ms 的限制

以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 output 默认参数。

小贴士

logstash-kafka 插件输入和输出默认 codec 为 json 格式。在输入和输出的时候注意下编码格式。消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下),可以使用以下配置,例如:

 output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
    }
}

作为 Consumer 从kafka中读数据,如果为非 json 格式的话需要进行相关解码,例如:

input {
    kafka {
        zk_connect => "xxx:xxx"
        group_id => "test"
        topic_id => "test-topic"
        codec => "line"
        ...........
    }
}

性能

队列监控

其实 logstash 的 kafka 插件性能并不是很突出,可以通过使用以下命令查看队列积压消费情况:

$/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test

队列积压严重,性能跟不上的情况下,结合实际服务器资源,可以适当增加 topic 的 partition 多实例化 Consumer 进行消费处理消息。

input-kafka 的 JSON 序列化性能

此外,跟 logstash-input-syslog 改在 filter 阶段 grok 的优化手段类似,也可以将 logstash-input-kafka 的默认 JSON 序列化操作从 codec 阶段后移到 filter 阶段。如下:

input {
    kafka {
        codec => plain
    }
}
filter {
    json {
        source => "message"
    }
} 

然后通过 bin/logstash -w $num_cpus 运行,利用多核计算优势,可以获得大约一倍左右的性能提升。 

分享到:
评论

相关推荐

    debj_SpringBoot_ELK+Kafka_resources.zip

    2. Spring Kafka配置:在SpringBoot应用中,我们可以通过`application.properties`或`application.yml`配置文件设置Kafka的相关属性,如服务器地址、主题等。同时,可以创建`@KafkaListener`注解的监听器方法来接收...

    ELK+KAFKA详细安装文档和资源包.7z

    ELK与Kafka的集成通常涉及Logstash的配置,例如使用kafka_input插件接收Kafka主题的数据,kafka_output插件将处理结果发送回Kafka或直接写入Elasticsearch。需要在Logstash的配置文件中明确指定Kafka的服务器地址、...

    elk+springboot+kafka日志跟踪配置1

    ELK+SringBoot+Kafka 日志跟踪配置详解 ELK(Elasticsearch、Logstash、Kibana)是一款流行的日志分析平台,SpringBoot 是一个流行的 Java 框架,Kafka 是一个流行的消息队列系统。本文将详细介绍如何将 SpringBoot...

    ELK5.5+rsyslog+kafka步骤详细说明.rar

    这个ELK5.5+rsyslog+kafka的详细说明应该包含了如何集成这些组件,以及在实际环境中如何进行配置和优化,帮助你构建一个强大的日志管理和分析系统。通过这个系统,你可以实时监控系统状态,快速定位问题,提升运维...

    Spring Cloud集成ELK完成日志收集实战(elasticsearch、logstash、kibana)

    对于日志来说,最常见的需求就是收集、存储、查询、展示,开源社区正好有相对应的开源项目:logstash(收集)、elasticsearch(存储+搜索)、kibana(展示),我们将这三个组合起来的技术称之为ELK,所以说ELK指的是...

    ELK+kafaka日志集群项目.zip

    此外,Kafka还可以与其他系统(如Storm、Spark)集成,实现更复杂的数据处理和分析。 Kibana是Elasticsearch的数据可视化工具,通过简洁的界面,用户可以创建和分享数据仪表板,直观地展示日志数据的趋势、异常和...

    Kafka+Log4j实现日志集中管理

    在配置Log4j时,我们需要设置KafkaAppender的相关属性,包括Kafka的broker列表、主题名以及消息编码方式等。例如: ```xml <appender name="KAFKA" class="org.apache.log4j.kafka.KafkaAppender"> ``` ...

    扩展logback将日志输出到Kafka实例源码

    通过将日志输出到Kafka,可以方便地将这些日志与其他系统集成,如ELK(Elasticsearch、Logstash、Kibana)堆栈,实现日志的集中收集、分析和检索。 总之,扩展Logback将日志输出到Kafka是一种常见的日志管理实践,...

    microservice-scaffold:基于Spring Cloud(Greenwich.SR2)构建的微服务脚手架(适用于在线系统),已集成注册中心(Nacos Config),配置中心(Nacos Discovery),认证授权(Oauth 2 + JWT),日志处理(ELK + Kafka),限流熔断(AliBaba Sentinel),应用指标监控(Prometheus + Grafana),调用链监控(Pinpoint),以及Spring Boot Admin

    微服务支架基于Spring Cloud(Greenwich.SR2)构建的微服务脚手架,已集成注册中心(Nacos Config),配置中心(Nacos Discovery),认证授权(Oauth 2 + JWT),日志处理(ELK + Kafka),限流熔断(AliBaba ...

    kafka-doc-zh-2.2.0.zip

    1. 实时日志收集:Kafka 可以作为日志聚合系统的前端,收集应用程序产生的日志数据,然后转发给日志分析工具如 ELK Stack(Elasticsearch, Logstash, Kibana)。 2. 数据流处理:结合 Apache Spark 或 Flink,Kafka...

    ELk知识详解.zip

    6. **实战案例**:"ELK收集应用系统日志实战案例.pptx"可能包含具体的实施步骤和技巧,如如何配置Logstash来处理特定类型的日志,如何优化Elasticsearch集群的性能,以及如何使用Kibana创建实用的可视化界面。...

    基于Golang+Kratos+MySQL+Redis+Kafka+elk+Opentracing实现的微服务项目

    6. **ELK Stack**:ELK是Elasticsearch、Logstash和Kibana的组合,提供了一个强大的日志分析和可视化解决方案。Elasticsearch用于存储和搜索日志,Logstash收集、处理和转发日志,Kibana则用于日志的可视化展示和...

    ELK日志平台搭建安装包和文档

    在日志处理场景中,ELK常常与Kafka集成。Kafka是一个分布式流处理平台,它可以作为日志消息队列,收集来自不同应用的日志数据,然后由Logstash消费并转发到Elasticsearch。这种架构增强了系统的可靠性和可扩展性。 ...

    kafka_2.10-0.8.2.0

    此外,这个版本还增强了稳定性,提升了与其他系统的集成能力,使得在ELK堆栈中使用Kafka更加顺畅。 在实际部署时,需要注意配置Kafka的broker设置,如broker.id、zookeeper.connect等,以及生产者和消费者的配置,...

    ELK-7.2.1版本安装包

    尽管Kafka不是ELK堆栈的一部分,但在这个组合中,它可以作为Logstash的输入源,或者直接与Elasticsearch集成。`kafka_2.12-2.3.1.tgz`是Kafka的安装包,安装后需配置主题、生产者和消费者,以实现数据流的创建和消费...

    kafka_2.13-3.0.0.tgz.7z

    标题中的"kafka_2.13-3.0.0.tgz.7z"是一个压缩文件,它包含的是Apache Kafka的特定版本——2.13兼容的3.0.0版。...对于开发和运维人员来说,理解Kafka的架构和配置选项,以及如何与其他系统集成,是至关重要的技能。

    kafka-eagle

    在部署过程中,需要将Kafka配置文件中的相关参数告知Kafka Eagle,以便它能正确地连接和管理Kafka集群。完成部署后,通过浏览器访问Kafka Eagle的Web界面,就可以开始监控和管理你的Kafka集群了。 总之,Kafka ...

Global site tag (gtag.js) - Google Analytics