`

logstash的kafka插件使用

 
阅读更多

转自:http://bigbo.github.io/pages/2015/08/07/logstash_kafka_new/

通过kafka传输

Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,logstash 也可以快速接入,免去重复建设的麻烦。

如果打算新建 Kafka 系统的,请参考 Kafka 官方入门文档:http://kafka.apache.org/documentation.html#quickstart

kafka 基本概念

以下仅对相关基本概念说明,更多概念见官方文档:

  • Topic 主题,声明一个主题,producer指定该主题发布消息,订阅该主题的consumer对该主题进行消费
  • Partition 每个主题可以分为多个分区,每个分区对应磁盘上一个目录,分区可以分布在不同broker上,producer在发布消息时,可以通过指定partition key映射到对应分区,然后向该分区发布消息,在无partition key情况下,随机选取分区,一段时间内触发一次(比如10分钟),这样就保证了同一个producer向同一partition发布的消息是顺序的。 消费者消费时,可以指定partition进行消费,也可以使用high-level-consumer api,自动进行负载均衡,并将partition分给consumer,一个partition只能被一个consumer进行消费。
  • Consumer 消费者,可以多实例部署,可以批量拉取,有两类API可供选择:一个simpleConsumer,暴露所有的操作给用户,可以提交offset、fetch offset、指定partition fetch message;另外一个high-level-consumer(ZookeeperConsumerConnector),帮助用户做基于partition自动分配的负载均衡,定期提交offset,建立消费队列等。simpleConsumer相当于手动挡,high-level-consumer相当于自动挡。

simpleConsumer:无需像high-level-consumer那样向zk注册brokerid、owner,甚至不需要提交offset到zk,可以将offset提交到任意地方比如(mysql,本地文件等)。

high-level-consumer:一个进程中可以启多个消费线程,一个消费线程即是一个consumer,假设A进程里有2个线程(consumerid分别为1,2),B进程有2个线程(consumerid分别为1,2),topic1的partition有5个,那么partition分配是这样的:

    partition1 ---> A进程consumerid1
    partition2 ---> A进程consumerid1
    partition3 ---> A进程consumerid2
    partition4 ---> B进程consumer1
    partition5 ---> B进程consumer2
  • Group High-level-consumer可以声明group,每个group可以有多个consumer,每group各自管理各自的消费offset,各个不同group之间互不关联影响。

由于目前版本消费的offset、owner、group都是consumer自己通过zk管理,所以group对于broker和producer并不关心,一些监控工具需要通过group来监控,simpleComsumer无需声明group。

小提示

以上概念是 logstash 的 kafka 插件的必要参数,请理解阅读,对后续使用 kafka 插件有重要作用。logstash-kafka-input 插件使用的是 High-level-consumer API

插件安装

logstash-1.4 安装

如果你使用的还是 1.4 版本,需要自己单独安装 logstash-kafka 插件。插件地址见:https://github.com/joekiller/logstash-kafka

插件本身内容非常简单,其主要依赖同一作者写的 jruby-kafka 模块。需要注意的是:该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的,将无法直接使 jruby-kafka 该模块和 logstash-kafka 插件。

安装按照官方文档完全自动化的安装。或是可以通过以下方式手动自己安装插件,不过重点注意的是 kafka 的版本,上面已经指出了。

  1. 下载 logstash 并解压重命名为 ./logstash-1.4.0 文件目录。
  2. 下载 kafka 相关组件,以下示例选的为 kafka_2.8.0-0.8.1.1-src,并解压重命名为 ./kafka_2.8.0-0.8.1.1
  3.  releases 页下载 logstash-kafka v0.4.2 版,并解压重命名为 ./logstash-kafka-0.4.2
  4.  ./kafka_2.8.0-0.8.1.1/libs 目录下复制所有的 jar 文件拷贝到 ./logstash-1.4.0/vendor/jar/kafka_2.8.0-0.8.1.1/libs 下,其中你需要创建 kafka_2.8.0-0.8.1.1/libs 相关文件夹及目录。
  5. 分别复制 ./logstash-kafka-0.4.2/logstash 里的 inputs  outputs 下的 kafka.rb,拷贝到对应的 ./logstash-1.4.0/lib/logstash 里的 inputs  outputs 对应目录下。
  6. 切换到 ./logstash-1.4.0 目录下,现在需要运行 logstash-kafka 的 gembag.rb 脚本去安装 jruby-kafka 库,执行以下命令: GEM_HOME=vendor/bundle/jruby/1.9 GEM_PATH= java -jar vendor/jar/jruby-complete-1.7.11.jar --1.9 ../logstash-kafka-0.4.2/gembag.rb ../logstash-kafka-0.4.2/logstash-kafka.gemspec
  7. 现在可以使用 logstash-kafka 插件运行 logstash 了。

logstash-1.5 安装

logstash 从 1.5 版本开始才集成了 Kafka 支持。1.5 版本开始所有插件的目录和命名都发生了改变,插件发布地址见:https://github.com/logstash-plugins。 安装和更新插件都可以使用官方提供的方式:

$bin/plugin install OR $bin/plugin update

小贴士

对于插件的安装和更新,默认走的Gem源为 https://rubygems.org,对于咱们国内网络来说是出奇的慢或是根本无法访问(爬梯子除外),在安装或是更新插件是,可以尝试修改目录下 Gemfile 文件中的 source 为淘宝源https://ruby.taobao.org,这样会使你的安装或是更新顺畅很多。

插件配置

Input 配置示例

以下配置可以实现对 kafka 读取端(consumer)的基本使用。

消费端更多详细的配置请查看 http://kafka.apache.org/documentation.html#consumerconfigs kafka 官方文档的消费者部分配置文档。

input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "logstash"
        topic_id => "test"
        reset_beginning => false # boolean (optional), default: false
        consumer_threads => 5  # number (optional), default: 1
        decorate_events => true # boolean (optional), default: false
    }
}

Input 解释

作为 Consumer 端,插件使用的是 High-level-consumer API请结合上述 kafka 基本概念进行设置

  • group_id

消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离。

  • topic_id

指定消费话题,也是必填项目,指定消费某个 topic ,这个其实就是订阅某个主题,然后去消费。

  • reset_beginning

logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 "true", logstash 进程就从头开始读取.有点类似 cat ,但是读到最后一行不会终止,而是变成 tail -F ,继续监听相应数据。

  • decorate_events

在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息。

  • rebalance_max_retries

当有新的 consumer(logstash) 加入到同一 group 时,将会 reblance ,此后将会有 partitions 的消费端迁移到新的 consumer 上,如果一个 consumer 获得了某个 partition 的消费权限,那么它将会向 zookeeper 注册, Partition Owner registry 节点信息,但是有可能此时旧的 consumer 尚没有释放此节点,此值用于控制,注册节点的重试次数。

  • consumer_timeout_ms

指定时间内没有消息到达就抛出异常,一般不需要改。

以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 input 默认参数。

注意

1.想要使用多个 logstash 端协同消费同一个 topic 的话,那么需要把两个或是多个 logstash 消费端配置成相同的 group_id  topic_id, 但是前提是要把相应的 topic 分多个 partitions (区),多个消费者消费是无法保证消息的消费顺序性的。

  • 这里解释下,为什么要分多个 partitions(区), kafka 的消息模型是对 topic 分区以达到分布式效果。每个 topic 下的不同的 partitions (区)只能有一个 Owner 去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由 server 端协调而成。不必使用者考虑太多。只是消息的消费则是无序的

总结:保证消息的顺序,那就用一个 partition kafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费

Output 配置

以下配置可以实现对 kafka 写入端 (producer) 的基本使用。

生产端更多详细的配置请查看 http://kafka.apache.org/documentation.html#producerconfigs kafka 官方文档的生产者部分配置文档。

 output {
    kafka {
        broker_list => "localhost:9092"
        topic_id => "test"
        compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
    }
}

Output 解释

作为 Producer 端使用,以下仅为重要概念解释,请结合上述 kafka 基本概念进行设置

  • compression_codec

消息的压缩模式,默认是 none,可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能,数据传输大小等对比)。

  • compressed_topics

可以针对特定的 topic 进行压缩,设置这个参数为 topic ,表示此 topic 进行压缩。

  • request_required_acks

消息的确认模式:

可以设置为 0: 生产者不等待 broker 的回应,只管发送.会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失) 可以设置为 1: 生产者会收到 leader 的回应在 leader 写入之后.(在当前 leader 服务器为复制前失败可能会导致信息丢失) 可以设置为 -1: 生产者会收到 leader 的回应在全部拷贝完成之后。

  • partitioner_class

分区的策略,默认是 hash 取模

  • send_buffer_bytes

socket 的缓存大小设置,其实就是缓冲区的大小

消息模式相关

  • serializer_class

消息体的系列化处理类,转化为字节流进行传输,请注意 encoder 必须和下面的 key_serializer_class 使用相同的类型

  • key_serializer_class

默认的是与 serializer_class 相同

  • producer_type

生产者的类型 async 异步执行消息的发送 sync 同步执行消息的发送

  • queue_buffering_max_ms

异步模式下,那么就会在设置的时间缓存消息,并一次性发送

  • queue_buffering_max_messages

异步的模式下,最长等待的消息数

  • queue_enqueue_timeout_ms

异步模式下,进入队列的等待时间,若是设置为0,那么要么进入队列,要么直接抛弃

  • batch_num_messages

异步模式下,每次发送的最大消息数,前提是触发了 queue_buffering_max_messages 或是 queue_enqueue_timeout_ms 的限制

以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 output 默认参数。

小贴士

logstash-kafka 插件输入和输出默认 codec 为 json 格式。在输入和输出的时候注意下编码格式。消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下),可以使用以下配置,例如:

 output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
    }
}

作为 Consumer 从kafka中读数据,如果为非 json 格式的话需要进行相关解码,例如:

input {
    kafka {
        zk_connect => "xxx:xxx"
        group_id => "test"
        topic_id => "test-topic"
        codec => "line"
        ...........
    }
}

关于性能:

其实 logstash 的 kafka 插件性能并不是很突出,可以通过使用以下命令查看队列积压消费情况:

$/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test

队列积压严重,性能跟不上的情况下,结合实际服务器资源,可以适当增加 topic 的 partition 多实例化 Consumer 进行消费处理消息。

更多高性能方案可以选择 https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer

- See more at: http://bigbo.github.io/pages/2015/08/07/logstash_kafka_new/#sthash.dUA3BNd0.dpuf

分享到:
评论

相关推荐

    logstash-kafka:Logstash 的 Kafka 插件

    logstash-kafka地位 该项目仅为 logstash 1.4.X 实现了 Kafka 0.8.2.1 输入和输出。 对于 1.5 支持,请阅读以下内容。 有关 logstash 的更多信息,请参阅 logstash-kafka 已集成到和。 它将与 1.5 版本的 logstash ...

    logstash-input-kafka:Logstash 的 Kafka 输入

    Logstash插件 这是的插件。 它是完全免费和完全开源的。 许可证是 Apache 2.0,这意味着您可以随意以任何方式使用它。 Kafka 输入插件已移动 这个 Kafka 输入插件现在是的一部分。 在可能的情况下,该项目仍对该...

    logstash-output-kafka:Logstash 的 Kafka 输出

    Logstash插件 这是的插件。 它是完全免费和完全开源的。 许可证是 Apache 2.0,这意味着您可以随意以任何方式使用它。 Kafka 输出插件已移动 这个 Kafka 输出插件现在是的一部分。 在可能的情况下,该项目仍对该...

    logstash-input-kafka-9.1.0.gem

    logstash接收kafka插件,已经压缩成zip格式,可以直接集成到logstash

    scala通过logstash发送日志到kafka的Demo

    在这个场景中,我们将探讨如何使用Scala编程语言通过Logstash将应用日志发送到Kafka消息队列。首先,让我们理解涉及的技术栈: 1. **Scala**: Scala是一种多范式编程语言,结合了面向对象和函数式编程的特点。在...

    logstash-integration-kafka:Kafka Integration for Logstash,提供输入和输出插件

    Logstash插件 这是的插件。 它是完全免费和完全开源的。 该许可证是Apache 2.0,这意味着您可以随意使用它,但是您可以通过任何方式使用它。 记录中 Kafka日志不遵守Log4J2根记录器级别,默认为INFO,对于其他级别...

    Elasticsearch5.3.2Logstash5.3.2Kibana5.3.2Kafka-0.10.0.1打造开源实时日志分析系统v1.4

    本篇文章详细介绍了如何使用Elasticsearch5.3.2、Logstash5.3.2、Kibana5.3.2以及Kafka-*.**.*.*打造一个稳定高效的数据收集、存储和分析平台。 首先,我们需要了解ELK系统架构中的各个组件。Elasticsearch是一个...

    elasticsearch或kafka的数据抽取工具:logstash-5.6.1

    - **Kafka 输入**:Logstash 通过 Kafka 输入插件可以从 Kafka 主题消费消息,使其成为数据流处理链路的一部分。 - **Kafka 输出**:同样,Logstash 也可以将处理过的数据发布到 Kafka 的特定主题,提供数据的可靠...

    最新版windows logstash-7.9.2.zip

    输出插件则可以将处理后的数据发送到各种地方,如 Elasticsearch、Kafka、文件系统、甚至其他 Logstash 实例,形成数据处理链路。 在版本 7.9.2 中,可能有以下值得关注的更新: - 性能提升:Logstash 团队可能对...

    logstash-output-jdbc.zip

    Logstash的输出插件种类繁多,包括文件、Elasticsearch、Kafka等,它们使得Logstash能够适应各种不同的数据流向。 总的来说,"logstash-output-jdbc.zip" 插件是Logstash系统中不可或缺的一部分,它提供了将结构化...

    logstash6.2.3 zip包

    Logstash 可以将数据发送到 Elasticsearch、Kafka、Redis、HTTP 服务器等。Elasticsearch 是常见的输出目标,因为它与 Logstash 和 Kibana 配合良好,提供强大的搜索和分析功能。 7. **插件管理**:Logstash 的插件...

    flume和logstash.zip

    3. **Logstash输出插件**:输出插件决定了数据的去向,可以将处理后的数据发送到Elasticsearch、Kafka、文件系统、MySQL等。 在描述中提到的"sqoop-1.4.6.bin__hadoop-0.23.tar"是Apache Sqoop的安装包,它是一个...

    2019年最新版elasticSearch+kibana+logstash+各种常用插件安装教程

    2019年elaticsearch6.6.0的安装教程,kibana6.6.0安装教程,logstash6.6.0安装教程,ik分词器,head插件,bigdesk等插件安装教程,x-pack使用等。

    logstash汇总整理.rar

    4. **数据输出**:输出阶段,数据可以被发送到 Elasticsearch 进行索引和搜索,或者通过 stdout 输出到控制台,或者使用其他输出插件,如 Kafka 或 Redis,进一步传递到其他系统。 5. **性能优化**:Logstash 支持...

    logstash-7.6.1.tar.gz

    通过配置Kafka输出插件,Logstash可以将处理后的数据发送到Kafka主题,进一步实现数据的分布式处理和存储。 当然,Logstash的主要目标是将数据送入Elasticsearch,一个强大的分布式搜索和分析引擎。Elasticsearch...

    logstash-5.3.1 下载

    5. **插件使用** - 插件的使用需要在配置文件中指定,例如: ``` input { file { path => ["/var/log/app.log"] } } filter { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{WORD:...

    最新版windows logstash-7.8.0.zip

    4. `plugins` 目录:存放Logstash的插件,这些插件扩展了Logstash的功能,如输入插件(如file input,用于读取文件中的日志)、过滤插件(如grok filter,用于解析日志格式)和输出插件(如elasticsearch output,...

    logstash-7.17.7-windows-x86-64.zip

    例如,你可以使用 `file` 输入插件来监听日志文件的变化,一旦有新的日志行被写入,Logstash 就会读取并处理这些数据。 2. **过滤插件**(Filters):在数据被收集后,过滤插件允许用户对数据进行清洗、转换或解析...

    jmeter-backend-listener-kafka:一个JMeter插件,使您可以将测试结果发送到Kafka服务器

    一个JMeter插件,使您可以将测试结果发送到Kafka服务器。 总览 描述 JMeter后端监听器Kafka是一个JMeter插件,使您可以将测试结果发送到Kafka服务器。 它受JMeter 后端侦听器插件的启发。 产品特点 筛选器 仅使用...

    logstash

    此外,Logstash 也可以将数据推送到 Kafka、RabbitMQ、MySQL、Syslog 等其他系统,实现数据的进一步处理和存储。 **4. 源码分析** Logstash 是用 Ruby 编写的,其源码对于开发者来说是一份宝贵的资源。通过阅读源码...

Global site tag (gtag.js) - Google Analytics