`

使用logstash的logstash-input-kafka插件读取kafka中的数据

阅读更多

logstash版本为5.5.3,kafka版本为2.11,此版本默认内置了kafka插件,可直接配置使用,不需要重新安装插件;注意logstash5.x版本前后配置不太一样,注意甄别,必要时可去elasticsearch官网查看最新版配置参数的变化,例如logstash5.x版本以前kafka插件配置的是zookeeper地址,5.x以后配置的是kafka实例地址。

input{
      kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test"
        group_id => "test"
        auto_offset_reset => "latest" //从最新的偏移量开始消费
        consumer_threads => 5
        decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中
        topics => ["logq","loge"] //数组类型,可配置多个topic
        type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用
      }
}

 使用了decorate_events属性,注意看logstash控制台打印的信息,会输出如下

"kafka":{"consumer_group":"test","partition":0,"offset":10430232,"topic":"logq","key":null}

 另外一个input里面可设置多个kafka,

input{
      kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test1"
        group_id => "test1"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["loge"]
        type => "classroom"
      }
      kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test2"
        group_id => "test2"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["logq"]
        type => "student"
      }
}

 假如你在filter模块中还要做其他过滤操作,并且针对input里面的每个数据源做得操作不一样,那你就可以根据各自定义的type来匹配

filter{
        if[type] == "classroom"{
            grok{
               ........
            }
        }
        if[type] == "student"{
            mutate{
               ........
            }
        }
}

 不只filter中可以这样,output里面也可以这样;并且当output为elasticsearch的时候,input里面的定义的type将会成为elasticsearch的你定义的index下的type

output {
        if[type] == "classroom"{
          elasticsearch{
               hosts => ["192.168.110.31:9200"]
               index => "school"
               timeout => 300
               user => "elastic"
               password => "changeme"
          }

        }
        if[type] == "student"{
            ........
        }
 }

 对于第一个存储到elasticsearch的路径为localhost:9200/school/classroom;第二个存储到elasticsearch的路径为localhost:9200/school/student。假如从来没有定义过type,默认的type为logs,访问路径为第一个存储到elasticsearch的路径为localhost:9200/school/logs,默认的type也可不加。

分享到:
评论

相关推荐

    logstash-input-jdbc-1.0.0.zip

    Logstash 是一个强大的数据收集、处理和转发工具...总之,Logstash-input-jdbc 插件是将关系数据库集成到 Logstash 数据流中的关键组件,帮助用户轻松地从数据库中获取并处理数据,进而实现更高效的数据分析和可视化。

    最新版linux logstash-8.5.0-linux-x86-64.tar.gz

    在这个例子中,Logstash 从指定的日志文件中读取数据,使用Grok过滤器解析日志格式,然后将处理后的数据发送到本地Elasticsearch实例的特定索引。 值得注意的是,Logstash支持众多插件,如: - 输入插件:file、...

    logstash-input-kafka-9.1.0.gem

    logstash接收kafka插件,已经压缩成zip格式,可以直接集成到logstash

    logstash-output-jdbc.zip

    在实际应用中,这个插件常常与其他Logstash插件如input和filter结合使用,形成数据流水线。例如,你可以先使用file input插件读取日志文件,然后通过grok filter解析日志内容,最后由jdbc output将解析后的数据存入...

    最新版linux logstash-7.12.1-linux-x86_64.tar.gz

    Linux Logstash是一款强大的数据收集、处理和转发工具,广泛应用于日志管理和监控系统中。它属于Elastic Stack(以前称为ELK Stack)的一部分,与Elasticsearch和Kibana一起,构建了一个完整的日志分析解决方案。...

    最新版linux logstash-7.10.0-linux-x86_64.tar.gz

    最后,输出插件将处理后的数据发送到目标位置,比如 Elasticsearch 进行存储和分析,或者直接输出到文件、syslog、Kafka 等。 在 Logstash 7.10.0 版本中,可能包含以下特性与改进: 1. 性能提升:Logstash 团队...

    logstash-output-kafka:Logstash 的 Kafka 输出

    Logstash插件 这是的插件。 它是完全免费和完全开源的。 许可证是 Apache 2.0,这意味着您可以随意以任何方式使用它。 Kafka 输出插件已移动 这个 Kafka 输出插件现在是的一部分。 在可能的情况下,该项目仍对该...

    最新版linux logstash-8.5.2-linux-x86-64.tar.gz

    Logstash有多种输入插件,例如file input用于读取本地文件系统的日志文件,tcp input允许通过TCP端口接收数据,还有syslog input可以处理syslog协议的日志。 2. **过滤器插件**:对数据进行处理和转换。例如,grok ...

    logstash-input-kafka:Logstash 的 Kafka 输入

    Kafka 日志不遵守 Log4J2 根记录器级别并默认为 INFO,对于其他级别,您必须在 Logstash 部署的log4j2.properties文件中明确设置日志级别,例如: logger.kafka.name=org.apache.kafka logger.kafka.appenderRef....

    最新版windows logstash-7.10.0-windows-x86_64.zip

    Logstash 是一个开源的数据收集和传输工具,广泛用于日志管理和大数据分析的生态系统中。它遵循“收集、处理、存储”的工作模式,能够从各种数据源接收数据,对其进行过滤、转换,并将处理后的数据发送到各种目标,...

    最新版linux logstash-8.1.1-linux-x86_64.tar.gz

    3. **输出插件(Output Plugins)**:输出插件决定了数据的去向,可以是Elasticsearch、Kafka、MySQL、stdout等。Elasticsearch output 插件是最常见的,将处理后的数据存储到Elasticsearch索引中。 **安装与配置:...

    logstash-8.2.3-linux-x86_64.tar.gz

    3. **Outputs**:经过处理的数据将被发送到各种目的地,如 Elasticsearch、Logstash Indexer、Kafka、文件等。 在8.2.3版本中,可能包括了对Java的依赖,因为 Logstash 是基于 Java 开发的。这意味着在运行 ...

    最新版windows logstash-8.1.1-windows-x86_64.zip

    Logstash 是一个强大的开源数据收集、转换和...总之,Logstash 8.1.1 for Windows 提供了一个强大而灵活的工具,用于管理和分析Windows环境中的日志数据,通过其丰富的插件生态系统,可以适应各种复杂的数据处理场景。

    最新版linux logstash-8.0.1-linux-x86_64.tar.gz

    Logstash 是一个强大的开源数据收集、转换和分发系统,广泛应用于日志管理和ELK(Elasticsearch, Logstash, Kibana)堆栈中。最新版的 Logstash 8.0.1 针对 Linux x86_64 架构提供了优化的性能和新特性,旨在更高效...

    logstash-integration-kafka:Kafka Integration for Logstash,提供输入和输出插件

    Kafka日志不遵守Log4J2根记录器级别,默认为INFO,对于其他级别,必须在Logstash部署的log4j2.properties文件中显式设置日志级别,例如: logger.kafka.name=org.apache.kafka logger.kafka.appenderRef.console....

    最新版windows logstash-8.5.3-windows-x86-64.zip

    2. 数据处理:在收集到数据后,Logstash 使用过滤插件(如 grok、mutate、date 等)进行清洗、转换和格式化。例如,grok 插件可以解析复杂日志格式,mutate 可以修改字段值,date 可以解析时间戳并标准化日期格式。 ...

    Logstash(logstash-8.6.0-linux-aarch64.tar.gz)

    - **输入插件(Input Plugins)**:Logstash能够通过多种输入插件从各种来源收集数据,例如文件、网络套接字、JMX指标、数据库等。 - **过滤器插件(Filter Plugins)**:在收集的数据传递到输出之前,过滤器可以...

    最新版windows logstash-8.1.3-windows-x86_64.zip

    2. 过滤器(Filters):在输入阶段之后,Logstash可以使用过滤器插件对数据进行处理。这些插件可以用来解析日志格式、提取关键字段、转换数据类型、删除或添加字段,以及执行更复杂的操作,如正则表达式匹配和脚本...

    最新版linux logstash-7.11.1-linux-x86_64.tar.gz

    输入插件允许 Logstash 从各种数据源收集数据。对于 Linux 系统,常见的输入插件包括文件输入、syslog 输入、JDBC 输入等。在7.11.1版本中,用户可能已经发现对不同输入源的增强支持,如更高效地读取和解析日志文件...

    最新版windows logstash-7.17.1-windows-x86_64.zip

    Logstash 是一个强大的开源数据收集、转换和分发工具,广泛应用于日志管理和ELK (Elasticsearch, Logstash, Kibana)堆栈中。这个压缩包 "logstash-7.17.1-windows-x86_64.zip" 提供了适用于Windows操作系统的最新版...

Global site tag (gtag.js) - Google Analytics