`
lzfying
  • 浏览: 10775 次
  • 性别: Icon_minigender_1
  • 来自: 济南
社区版块
存档分类
最新评论

Fluentd+mongodb实现log文件的过滤

 
阅读更多

 

Fluentd+mongodb 的安装不多说了,官网上很详细各种操作系统版本的都有。由于,我的项目中需要将多个log文件进行初步的过滤,所以,官网上提供的in_tail 插件不能够满足需求,需要基于in_tail插件写一个适合自己需求的插件。 

 

直接上代码:

 

module Fluent
    require 'fluent/plugin/in_tail'
	class LtAbcInput < Fluent::TailInput
	  Fluent::Plugin.register_input('lt_abc', self)

	  # Override the 'configure_parser(conf)' method.
	  # You can get config parameters in this method.
	  def configure_parser(conf)
		@time_format = conf['time_format'] || '%Y%m%d%H%M%S'
	  end

	  # Override the 'parse_line(line)' method that returns the time and record.
	  # This example method assumes the following log format:
	  #   %Y-%m-%d %H:%M:%S\tkey1\tvalue1\tkey2\tvalue2...
	  #   %Y-%m-%d %H:%M:%S\tkey1\tvalue1\tkey2\tvalue2...
	  #   ...
	  def parse_line(line)
		elements = line.split("|") //获取输入的每一行的信息

		time = elements.shift 
		time = Time.strptime(time, @time_format).to_i

		# [k1, v1, k2, v2, ...] -> {k1=>v1, k2=>v2, ...}
		record = {}
		
		mark = elements.shift
		record["mark"] = mark //这里是我的log中每行数据的类型
		
		while (p = elements.shift)
			pair = p.split(":")
			if (k = pair.shift) && (v = pair.shift)
				record[k] = v
			end
			
			if (p == "E")
				record["E"] = "E";
			end
			
			if (p == "N")
				record["N"] = "N";
			end
		end

		return time, record
	  end
	end
end

 

 

下面是log文件的格式

 

20131228182935|HEART|id:40354|obu:102198|type:0x1
20131228182935|GPS|id:40444|obu:104751|type:0|lat:363936|lon:1170118|speed:0.0000|direction:0|time:182935|E|N|mile:27647448
20131228182935|TIME SYNC|id:46576|obu:105450|termtime:1388255374:20131229022934
20131228182935|GPS|id:40886|obu:103611|type:0|lat:363819|lon:1165236|speed:0.0000|direction:0|time:182933|E|N|mile:3140237
20131228182935|GPS|id:50446|obu:104756|type:0|lat:363936|lon:1170135|speed:31.687|direction:90|time:182934|E|N|mile:12300444
20131228182935|GPS|id:41464|obu:103569|type:0|lat:364212|lon:1165313|speed:0.0000|direction:0|time:182934|E|N|mile:4211590
20131228182935|GPS|id:47602|obu:103170|type:0|lat:364309|lon:1170235|speed:13.575|direction:310|time:192249|E|N|mile:17175434
20131228182935|HEART|id:43991|obu:102578|type:0x1
20131228182935|GPS|id:40221|obu:104703|type:0|lat:364051|lon:1165917|speed:36.632|direction:181|time:182934|E|N|mile:938709
20131228182935|HEART|id:40303|obu:104036|type:0x1
20131228182935|INOUT|id:42517|obu:102431|io:0|auto^hand:1|route:00027|service:00w|station:00271048|time:182934

 



 

分享到:
评论

相关推荐

    Elasticsearch+Fluentd+Kafka搭建日志系统

    【Elasticsearch+Fluentd+Kafka搭建日志系统】 在日志管理领域,传统的ELK(Elasticsearch, Logstash, Kibana)堆栈正逐渐被EFK(Elasticsearch, Fluentd, Kafka)所取代,原因是Logstash在处理大量日志时可能消耗...

    fluentd+filebeat

    fluentd+filebeat技术资料分享,欢迎大家交流,指正错误

    vagrant-centos-kibana:在Vagrant上使用Fluentd + Elasticsearch + Kibana的CentOS

    在Vagrant上使用Fluentd + Elasticsearch + Kibana的CentOS。 用法 运行vagrant up启动两个来宾VM。 打开以显示Kibana仪表板。 登录到发送方VM并生成Apache的虚拟访问日志。 $ vagrant ssh sender [vagrant@...

    fluent-plugin-mongo:Fluentd的MongoDB输入和输出插件

    Fluentd通过插件系统实现了这一目标,用户可以根据需要安装和配置各种输入、输出和过滤插件。 ### 二、MongoDB介绍 MongoDB是一款文档型数据库,支持JSON格式的文档存储,适合处理大量的半结构化数据。其特点是高...

    EFKKsetup:Elasticsearch,Fluentd,Kafka,Kibana

    编辑文件/opt/kafka_2.10-0.8.2.2/config/server.properties 。 在第20行中设置broker.id。 在第28行中添加您的hostname:port。 在第118行中添加Zookeeper主机的地址。 编辑文件/opt/kibana-4.1.2-linux-x64/...

    记一次K8s EFK(elasticsearch+fluentd+kibana)搭建排错经历

    部署教程:...yaml地址:https://github.com/kubernetes/kubernetes/tree/master/cluster/addons/fluentd-elasticsearch 部署教程里面的两个注意 Elasticsearch 数据持久化:默认 EmptyDir 的方

    Kafka+Log4j实现日志集中管理

    本主题将深入探讨如何使用Apache Kafka和Log4j来实现日志的集中管理和处理。Kafka是一个高吞吐量、分布式的消息发布订阅系统,而Log4j则是一款广泛使用的Java日志框架,二者结合能有效提升日志处理效率和分析能力。 ...

    fluentd日志收集组件yaml文件Daemonset资源清单

    原文链接:https://blog.csdn.net/m0_37814112/article/details/120762517 说明:测试资源,包含fluentd-v3.1.0.tar.gz、fluentd-v3.2.0.tar.gz镜像和ds-fluentd.yaml文件

    mongodb高可用所需yaml

    MongoDB的YAML文件可能包含Deployment(定义MongoDB的Pod规格)、StatefulSet、Service、PV和PVC等对象的定义。YAML文件会详细指定Pod的镜像、环境变量、端口映射、存储需求等。 7. **健康检查与生命周期管理**:在...

    log处理

    1. **日志收集**:这一步通常通过日志收集工具完成,如Fluentd、Logstash或 Beats系列(如Filebeat、Metricbeat)。这些工具可以实时地从各种来源(如文件、网络端口)收集日志数据。 2. **日志存储**:收集到的...

    制作带有kafka插件和es插件的fluentd镜像

    前言 Fluentd是用于统一日志记录层的开源数据收集器,是继Kubernetes、Prometheus、Envoy 、CoreDNS 和...Dockerfile文件编写 Dockerfile FROM fluent/fluentd:v1.3.2 ADD fluent.conf /etc/fluent/ RUN echo sourc

    Fluentd 输出过滤器插件,用于重写与指定属性匹配的标签_Ruby_代码_相关文件_下载

    为Fluentd重写标签过滤器。它旨在重写像 mod_rewrite 这样的标签。 当值与正则表达式匹配/不匹配时,重新发出带有重写标记的记录。 您还可以使用正则表达式按域、状态代码(例如 500 错误)、 用户代理、请求 uri、...

    Ruby-Fluentd收集来自各种数据源的事件然后写文件数据库或其他类型的存储

    Ruby-Fluentd是一款强大的日志管理和处理工具,它被设计用于收集来自各种不同数据源的事件,并将这些事件转发到各种目标存储,如文件、数据库或其他类型的数据存储系统。这个工具采用Ruby语言编写,因此在标签中被...

    fluent-logger-golang:Fluentd(Golang)的结构化记录器

    fluent-logger-golangFluentd(Golang)的结构化事件记录器如何安装go get github.com/fluent/fluent-logger-golang/fluent用法使用go get安装该软件包,并使用import将其包括在您的项目中。... New (fluent....

    Ansible-ansible-fluentd.zip

    在“Ansible-ansible-fluentd.zip”中,"ansible-fluentd-master"目录可能包含了Ansible的playbooks和其他相关文件,用于自动配置和部署Fluentd。使用这些资源,你可以实现以下操作: 1. **安装Fluentd**:Ansible...

    fluentd-v1.8-1.tar

    fluentd镜像,版本1.8.1,arm版本,需要的请使用,fluentd是基于fluent协议实现的

    k8s搭建EFK日志中心需要的部署文件

    k8s搭建EFK日志中心需要的部署文件,适用于搭建elasticsearch + fluentd + kibana组成的日志分析平台。适合新手小白直接拿来执行使用的k8s部署脚本文件。因为镜像文件较大,所以需要对应镜像文件的请私聊。此种方案...

    log4j.zip 项目日志

    4. **日志监控**:使用工具(如Logstash、Fluentd等)收集、解析并转发日志,实现集中式日志管理和分析。 5. **性能优化**:避免在高并发场景下频繁地写入日志,可以使用异步日志记录或日志缓冲机制来提高性能。 ...

    efk-docker:elasticsearch +流利的+ kibana

    ---version: '2.4'services: elasticsearch: image: ${ELASTICSEARCH_IMAGE} restart: always environment: - 'node.name=HEYJUDE' - 'discovery.type=single-node' - 'bootstrap.memory_lock=true' - 'ES_JAVA_OPTS...

Global site tag (gtag.js) - Google Analytics