`
lzfying
  • 浏览: 10922 次
  • 性别: Icon_minigender_1
  • 来自: 济南
社区版块
存档分类
最新评论

Fluentd+mongodb实现log文件的过滤

 
阅读更多

 

Fluentd+mongodb 的安装不多说了,官网上很详细各种操作系统版本的都有。由于,我的项目中需要将多个log文件进行初步的过滤,所以,官网上提供的in_tail 插件不能够满足需求,需要基于in_tail插件写一个适合自己需求的插件。 

 

直接上代码:

 

module Fluent
    require 'fluent/plugin/in_tail'
	class LtAbcInput < Fluent::TailInput
	  Fluent::Plugin.register_input('lt_abc', self)

	  # Override the 'configure_parser(conf)' method.
	  # You can get config parameters in this method.
	  def configure_parser(conf)
		@time_format = conf['time_format'] || '%Y%m%d%H%M%S'
	  end

	  # Override the 'parse_line(line)' method that returns the time and record.
	  # This example method assumes the following log format:
	  #   %Y-%m-%d %H:%M:%S\tkey1\tvalue1\tkey2\tvalue2...
	  #   %Y-%m-%d %H:%M:%S\tkey1\tvalue1\tkey2\tvalue2...
	  #   ...
	  def parse_line(line)
		elements = line.split("|") //获取输入的每一行的信息

		time = elements.shift 
		time = Time.strptime(time, @time_format).to_i

		# [k1, v1, k2, v2, ...] -> {k1=>v1, k2=>v2, ...}
		record = {}
		
		mark = elements.shift
		record["mark"] = mark //这里是我的log中每行数据的类型
		
		while (p = elements.shift)
			pair = p.split(":")
			if (k = pair.shift) && (v = pair.shift)
				record[k] = v
			end
			
			if (p == "E")
				record["E"] = "E";
			end
			
			if (p == "N")
				record["N"] = "N";
			end
		end

		return time, record
	  end
	end
end

 

 

下面是log文件的格式

 

20131228182935|HEART|id:40354|obu:102198|type:0x1
20131228182935|GPS|id:40444|obu:104751|type:0|lat:363936|lon:1170118|speed:0.0000|direction:0|time:182935|E|N|mile:27647448
20131228182935|TIME SYNC|id:46576|obu:105450|termtime:1388255374:20131229022934
20131228182935|GPS|id:40886|obu:103611|type:0|lat:363819|lon:1165236|speed:0.0000|direction:0|time:182933|E|N|mile:3140237
20131228182935|GPS|id:50446|obu:104756|type:0|lat:363936|lon:1170135|speed:31.687|direction:90|time:182934|E|N|mile:12300444
20131228182935|GPS|id:41464|obu:103569|type:0|lat:364212|lon:1165313|speed:0.0000|direction:0|time:182934|E|N|mile:4211590
20131228182935|GPS|id:47602|obu:103170|type:0|lat:364309|lon:1170235|speed:13.575|direction:310|time:192249|E|N|mile:17175434
20131228182935|HEART|id:43991|obu:102578|type:0x1
20131228182935|GPS|id:40221|obu:104703|type:0|lat:364051|lon:1165917|speed:36.632|direction:181|time:182934|E|N|mile:938709
20131228182935|HEART|id:40303|obu:104036|type:0x1
20131228182935|INOUT|id:42517|obu:102431|io:0|auto^hand:1|route:00027|service:00w|station:00271048|time:182934

 



 

分享到:
评论

相关推荐

    fluent-plugin-mongo:Fluentd的MongoDB输入和输出插件

    Fluentd通过插件系统实现了这一目标,用户可以根据需要安装和配置各种输入、输出和过滤插件。 ### 二、MongoDB介绍 MongoDB是一款文档型数据库,支持JSON格式的文档存储,适合处理大量的半结构化数据。其特点是高...

    graylog:执行Graylog搜索

    这可以通过配置syslog、logstash、fluentd或其他日志收集代理来实现。 2. **流配置**:在Graylog界面创建或编辑“流”,这是日志数据的逻辑分组。你可以根据源IP、日志级别、特定关键词等条件定义流规则,确保数据...

    Python-Logpara一个对常见的web日志进行解析处理的粗糙DEMO

    7. **日志存储**:日志数据量大,DEMO可能探讨了如何有效地存储日志,如使用文件系统、数据库(如SQLite)或者NoSQL存储(如MongoDB)。 8. **日志级别和过滤**:在实际应用中,我们可能只关心特定级别的日志,如...

    java日志数据的采集显示

    这可能涉及文件系统、数据库(如MySQL、MongoDB)或专门的日志存储服务(如Elasticsearch)。 5. **日志显示与分析**:为了方便查看和分析日志,我们可以使用日志可视化工具,如Kibana、Grafana或自建的Web界面。...

    log2nosql:热部署组件,用于查看存储在noSQL数据库中的日志

    首先,log2nosql的核心功能是将日志数据实时或按需地从各种日志源导入到NoSQL数据库中,例如MongoDB、Cassandra或HBase等。通过这种方式,开发者可以利用NoSQL数据库的查询能力,对日志数据进行快速检索、分析和统计...

    基于Storm的日志收集系统

    【MongoDB】是一个流行的文档型数据库,支持分布式文件存储,适合存储非结构化和半结构化的数据,如日志。在基于Storm的日志系统中,MongoDB可以作为数据存储后端,存储处理过的日志数据,便于后续的查询和分析。 ...

    java实时日志解析处理

    6. **持久化和存储**:处理后的数据可能需要保存到数据库、缓存或文件系统中,可以使用JDBC、NoSQL数据库(如MongoDB)、搜索引擎(如Elasticsearch)等技术。 7. **可视化展示**:通过数据可视化工具(如Grafana、...

    日志管理系统

    日志的收集是日志管理系统的基础,这一步通常通过日志代理实现,如Logstash、Fluentd等,它们可以从各种源(如服务器、应用、设备)收集日志,支持多种协议和格式,如TCP、UDP、HTTP、JSON、CSV等。收集到的日志数据...

Global site tag (gtag.js) - Google Analytics