转自:http://bigbo.github.io/pages/2015/08/07/logstash_kafka_new/
通过kafka传输
Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,logstash 也可以快速接入,免去重复建设的麻烦。
如果打算新建 Kafka 系统的,请参考 Kafka 官方入门文档:http://kafka.apache.org/documentation.html#quickstart
kafka 基本概念
以下仅对相关基本概念说明,更多概念见官方文档:
- Topic 主题,声明一个主题,producer指定该主题发布消息,订阅该主题的consumer对该主题进行消费
- Partition 每个主题可以分为多个分区,每个分区对应磁盘上一个目录,分区可以分布在不同broker上,producer在发布消息时,可以通过指定partition key映射到对应分区,然后向该分区发布消息,在无partition key情况下,随机选取分区,一段时间内触发一次(比如10分钟),这样就保证了同一个producer向同一partition发布的消息是顺序的。 消费者消费时,可以指定partition进行消费,也可以使用high-level-consumer api,自动进行负载均衡,并将partition分给consumer,一个partition只能被一个consumer进行消费。
- Consumer 消费者,可以多实例部署,可以批量拉取,有两类API可供选择:一个simpleConsumer,暴露所有的操作给用户,可以提交offset、fetch offset、指定partition fetch message;另外一个high-level-consumer(ZookeeperConsumerConnector),帮助用户做基于partition自动分配的负载均衡,定期提交offset,建立消费队列等。simpleConsumer相当于手动挡,high-level-consumer相当于自动挡。
simpleConsumer:无需像high-level-consumer那样向zk注册brokerid、owner,甚至不需要提交offset到zk,可以将offset提交到任意地方比如(mysql,本地文件等)。
high-level-consumer:一个进程中可以启多个消费线程,一个消费线程即是一个consumer,假设A进程里有2个线程(consumerid分别为1,2),B进程有2个线程(consumerid分别为1,2),topic1的partition有5个,那么partition分配是这样的:
partition1 ---> A进程consumerid1
partition2 ---> A进程consumerid1
partition3 ---> A进程consumerid2
partition4 ---> B进程consumer1
partition5 ---> B进程consumer2
- Group High-level-consumer可以声明group,每个group可以有多个consumer,每group各自管理各自的消费offset,各个不同group之间互不关联影响。
由于目前版本消费的offset、owner、group都是consumer自己通过zk管理,所以group对于broker和producer并不关心,一些监控工具需要通过group来监控,simpleComsumer无需声明group。
小提示
以上概念是 logstash 的 kafka 插件的必要参数,请理解阅读,对后续使用 kafka 插件有重要作用。logstash-kafka-input 插件使用的是 High-level-consumer API
。
插件安装
logstash-1.4 安装
如果你使用的还是 1.4 版本,需要自己单独安装 logstash-kafka 插件。插件地址见:https://github.com/joekiller/logstash-kafka。
插件本身内容非常简单,其主要依赖同一作者写的 jruby-kafka 模块。需要注意的是:该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的,将无法直接使 jruby-kafka 该模块和 logstash-kafka 插件。
安装按照官方文档完全自动化的安装。或是可以通过以下方式手动自己安装插件,不过重点注意的是 kafka 的版本,上面已经指出了。
- 下载 logstash 并解压重命名为
./logstash-1.4.0
文件目录。 - 下载 kafka 相关组件,以下示例选的为 kafka_2.8.0-0.8.1.1-src,并解压重命名为
./kafka_2.8.0-0.8.1.1
。 - 从 releases 页下载 logstash-kafka v0.4.2 版,并解压重命名为
./logstash-kafka-0.4.2
。 - 从
./kafka_2.8.0-0.8.1.1/libs
目录下复制所有的 jar 文件拷贝到./logstash-1.4.0/vendor/jar/kafka_2.8.0-0.8.1.1/libs
下,其中你需要创建kafka_2.8.0-0.8.1.1/libs
相关文件夹及目录。 - 分别复制
./logstash-kafka-0.4.2/logstash
里的inputs
和outputs
下的kafka.rb
,拷贝到对应的./logstash-1.4.0/lib/logstash
里的inputs
和outputs
对应目录下。 - 切换到
./logstash-1.4.0
目录下,现在需要运行 logstash-kafka 的 gembag.rb 脚本去安装 jruby-kafka 库,执行以下命令:GEM_HOME=vendor/bundle/jruby/1.9 GEM_PATH= java -jar vendor/jar/jruby-complete-1.7.11.jar --1.9 ../logstash-kafka-0.4.2/gembag.rb ../logstash-kafka-0.4.2/logstash-kafka.gemspec
。 - 现在可以使用 logstash-kafka 插件运行 logstash 了。
logstash-1.5 安装
logstash 从 1.5 版本开始才集成了 Kafka 支持。1.5 版本开始所有插件的目录和命名都发生了改变,插件发布地址见:https://github.com/logstash-plugins。 安装和更新插件都可以使用官方提供的方式:
$bin/plugin install OR $bin/plugin update
小贴士
对于插件的安装和更新,默认走的Gem源为 https://rubygems.org,对于咱们国内网络来说是出奇的慢或是根本无法访问(爬梯子除外),在安装或是更新插件是,可以尝试修改目录下 Gemfile
文件中的 source
为淘宝源https://ruby.taobao.org,这样会使你的安装或是更新顺畅很多。
插件配置
Input 配置示例
以下配置可以实现对 kafka 读取端(consumer)的基本使用。
消费端更多详细的配置请查看 http://kafka.apache.org/documentation.html#consumerconfigs kafka 官方文档的消费者部分配置文档。
input {
kafka {
zk_connect => "localhost:2181"
group_id => "logstash"
topic_id => "test"
reset_beginning => false # boolean (optional), default: false
consumer_threads => 5 # number (optional), default: 1
decorate_events => true # boolean (optional), default: false
}
}
Input 解释
作为 Consumer 端,插件使用的是 High-level-consumer API
,请结合上述 kafka 基本概念进行设置:
- group_id
消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离。
- topic_id
指定消费话题,也是必填项目,指定消费某个 topic
,这个其实就是订阅某个主题,然后去消费。
- reset_beginning
logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 "true", logstash 进程就从头开始读取.有点类似 cat
,但是读到最后一行不会终止,而是变成 tail -F
,继续监听相应数据。
- decorate_events
在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息。
- rebalance_max_retries
当有新的 consumer(logstash) 加入到同一 group 时,将会 reblance
,此后将会有 partitions
的消费端迁移到新的 consumer
上,如果一个 consumer
获得了某个 partition
的消费权限,那么它将会向 zookeeper
注册, Partition Owner registry
节点信息,但是有可能此时旧的 consumer
尚没有释放此节点,此值用于控制,注册节点的重试次数。
- consumer_timeout_ms
指定时间内没有消息到达就抛出异常,一般不需要改。
以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 input 默认参数。
注意
1.想要使用多个 logstash 端协同消费同一个 topic
的话,那么需要把两个或是多个 logstash 消费端配置成相同的 group_id
和 topic_id
, 但是前提是要把相应的 topic 分多个 partitions (区),多个消费者消费是无法保证消息的消费顺序性的。
- 这里解释下,为什么要分多个 partitions(区), kafka 的消息模型是对 topic 分区以达到分布式效果。每个
topic
下的不同的 partitions (区)只能有一个 Owner 去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由 server 端协调而成。不必使用者考虑太多。只是消息的消费则是无序的。
总结:保证消息的顺序,那就用一个 partition。 kafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费。
Output 配置
以下配置可以实现对 kafka 写入端 (producer) 的基本使用。
生产端更多详细的配置请查看 http://kafka.apache.org/documentation.html#producerconfigs kafka 官方文档的生产者部分配置文档。
output {
kafka {
broker_list => "localhost:9092"
topic_id => "test"
compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
}
}
Output 解释
作为 Producer 端使用,以下仅为重要概念解释,请结合上述 kafka 基本概念进行设置:
- compression_codec
消息的压缩模式,默认是 none,可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能,数据传输大小等对比)。
- compressed_topics
可以针对特定的 topic 进行压缩,设置这个参数为 topic
,表示此 topic
进行压缩。
- request_required_acks
消息的确认模式:
可以设置为 0: 生产者不等待 broker 的回应,只管发送.会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失) 可以设置为 1: 生产者会收到 leader 的回应在 leader 写入之后.(在当前 leader 服务器为复制前失败可能会导致信息丢失) 可以设置为 -1: 生产者会收到 leader 的回应在全部拷贝完成之后。
- partitioner_class
分区的策略,默认是 hash 取模
- send_buffer_bytes
socket 的缓存大小设置,其实就是缓冲区的大小
消息模式相关
- serializer_class
消息体的系列化处理类,转化为字节流进行传输,请注意 encoder 必须和下面的 key_serializer_class
使用相同的类型。
- key_serializer_class
默认的是与 serializer_class
相同
- producer_type
生产者的类型 async
异步执行消息的发送 sync
同步执行消息的发送
- queue_buffering_max_ms
异步模式下,那么就会在设置的时间缓存消息,并一次性发送
- queue_buffering_max_messages
异步的模式下,最长等待的消息数
- queue_enqueue_timeout_ms
异步模式下,进入队列的等待时间,若是设置为0,那么要么进入队列,要么直接抛弃
- batch_num_messages
异步模式下,每次发送的最大消息数,前提是触发了 queue_buffering_max_messages
或是 queue_enqueue_timeout_ms
的限制
以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 output 默认参数。
小贴士
logstash-kafka 插件输入和输出默认 codec
为 json 格式。在输入和输出的时候注意下编码格式。消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下),可以使用以下配置,例如:
output {
kafka {
codec => plain {
format => "%{message}"
}
}
}
作为 Consumer 从kafka中读数据,如果为非 json 格式的话需要进行相关解码,例如:
input {
kafka {
zk_connect => "xxx:xxx"
group_id => "test"
topic_id => "test-topic"
codec => "line"
...........
}
}
关于性能:
其实 logstash 的 kafka 插件性能并不是很突出,可以通过使用以下命令查看队列积压消费情况:
$/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test
队列积压严重,性能跟不上的情况下,结合实际服务器资源,可以适当增加 topic 的 partition 多实例化 Consumer 进行消费处理消息。
更多高性能方案可以选择 https://github.com/reachkrishnaraj/kafka-elasticsearch-standalone-consumer
- See more at: http://bigbo.github.io/pages/2015/08/07/logstash_kafka_new/#sthash.dUA3BNd0.dpuf
相关推荐
logstash-kafka地位 该项目仅为 logstash 1.4.X 实现了 Kafka 0.8.2.1 输入和输出。 对于 1.5 支持,请阅读以下内容。 有关 logstash 的更多信息,请参阅 logstash-kafka 已集成到和。 它将与 1.5 版本的 logstash ...
Logstash插件 这是的插件。 它是完全免费和完全开源的。 许可证是 Apache 2.0,这意味着您可以随意以任何方式使用它。 Kafka 输入插件已移动 这个 Kafka 输入插件现在是的一部分。 在可能的情况下,该项目仍对该...
Logstash插件 这是的插件。 它是完全免费和完全开源的。 许可证是 Apache 2.0,这意味着您可以随意以任何方式使用它。 Kafka 输出插件已移动 这个 Kafka 输出插件现在是的一部分。 在可能的情况下,该项目仍对该...
logstash接收kafka插件,已经压缩成zip格式,可以直接集成到logstash
在这个场景中,我们将探讨如何使用Scala编程语言通过Logstash将应用日志发送到Kafka消息队列。首先,让我们理解涉及的技术栈: 1. **Scala**: Scala是一种多范式编程语言,结合了面向对象和函数式编程的特点。在...
Logstash插件 这是的插件。 它是完全免费和完全开源的。 该许可证是Apache 2.0,这意味着您可以随意使用它,但是您可以通过任何方式使用它。 记录中 Kafka日志不遵守Log4J2根记录器级别,默认为INFO,对于其他级别...
本篇文章详细介绍了如何使用Elasticsearch5.3.2、Logstash5.3.2、Kibana5.3.2以及Kafka-*.**.*.*打造一个稳定高效的数据收集、存储和分析平台。 首先,我们需要了解ELK系统架构中的各个组件。Elasticsearch是一个...
- **Kafka 输入**:Logstash 通过 Kafka 输入插件可以从 Kafka 主题消费消息,使其成为数据流处理链路的一部分。 - **Kafka 输出**:同样,Logstash 也可以将处理过的数据发布到 Kafka 的特定主题,提供数据的可靠...
输出插件则可以将处理后的数据发送到各种地方,如 Elasticsearch、Kafka、文件系统、甚至其他 Logstash 实例,形成数据处理链路。 在版本 7.9.2 中,可能有以下值得关注的更新: - 性能提升:Logstash 团队可能对...
Logstash的输出插件种类繁多,包括文件、Elasticsearch、Kafka等,它们使得Logstash能够适应各种不同的数据流向。 总的来说,"logstash-output-jdbc.zip" 插件是Logstash系统中不可或缺的一部分,它提供了将结构化...
Logstash 可以将数据发送到 Elasticsearch、Kafka、Redis、HTTP 服务器等。Elasticsearch 是常见的输出目标,因为它与 Logstash 和 Kibana 配合良好,提供强大的搜索和分析功能。 7. **插件管理**:Logstash 的插件...
3. **Logstash输出插件**:输出插件决定了数据的去向,可以将处理后的数据发送到Elasticsearch、Kafka、文件系统、MySQL等。 在描述中提到的"sqoop-1.4.6.bin__hadoop-0.23.tar"是Apache Sqoop的安装包,它是一个...
2019年elaticsearch6.6.0的安装教程,kibana6.6.0安装教程,logstash6.6.0安装教程,ik分词器,head插件,bigdesk等插件安装教程,x-pack使用等。
4. **数据输出**:输出阶段,数据可以被发送到 Elasticsearch 进行索引和搜索,或者通过 stdout 输出到控制台,或者使用其他输出插件,如 Kafka 或 Redis,进一步传递到其他系统。 5. **性能优化**:Logstash 支持...
通过配置Kafka输出插件,Logstash可以将处理后的数据发送到Kafka主题,进一步实现数据的分布式处理和存储。 当然,Logstash的主要目标是将数据送入Elasticsearch,一个强大的分布式搜索和分析引擎。Elasticsearch...
5. **插件使用** - 插件的使用需要在配置文件中指定,例如: ``` input { file { path => ["/var/log/app.log"] } } filter { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{WORD:...
4. `plugins` 目录:存放Logstash的插件,这些插件扩展了Logstash的功能,如输入插件(如file input,用于读取文件中的日志)、过滤插件(如grok filter,用于解析日志格式)和输出插件(如elasticsearch output,...
例如,你可以使用 `file` 输入插件来监听日志文件的变化,一旦有新的日志行被写入,Logstash 就会读取并处理这些数据。 2. **过滤插件**(Filters):在数据被收集后,过滤插件允许用户对数据进行清洗、转换或解析...
一个JMeter插件,使您可以将测试结果发送到Kafka服务器。 总览 描述 JMeter后端监听器Kafka是一个JMeter插件,使您可以将测试结果发送到Kafka服务器。 它受JMeter 后端侦听器插件的启发。 产品特点 筛选器 仅使用...
此外,Logstash 也可以将数据推送到 Kafka、RabbitMQ、MySQL、Syslog 等其他系统,实现数据的进一步处理和存储。 **4. 源码分析** Logstash 是用 Ruby 编写的,其源码对于开发者来说是一份宝贵的资源。通过阅读源码...