`
kavy
  • 浏览: 888391 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Flume Source对多行的处理

 
阅读更多

ExecSource会readLine()读取日志中的每一行,把其作为每一个flume event的body放进去,这对于大部分这种每行就可以结束的日志记录,是完全可以的:

1
2
2016-03-18 17:53:48,374 INFO namenode.FSNamesystem (FSNamesystem.java:listCorruptFileBlocks(7217)) - there are no corrupt file blocks.
2016-03-18 17:53:48,278 INFO namenode.FSNamesystem (FSNamesystem.java:listCorruptFileBlocks(7217)) - there are no corrupt file blocks.

 

但是,对于有stacktraceERROR日志记录,如果把一行的内容当作一个flume event会有很大的问题,直观上来看,肯定需要把若干行看作是一个flume event,比如下面这样的日志记录,要作为一个flume event,而不是27个(一共27行):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2016-03-18 17:53:40,278 ERROR [HiveServer2-Handler-Pool: Thread-26]: Error occurred during processing of message.
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset
	at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:268)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
	at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:178)
	at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125)
	at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
	at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41)
	at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216)
	... 4 more
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:196)
	at java.net.SocketInputStream.read(SocketInputStream.java:122)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
	... 10 more

 

我这里的实现方式是:识别每行的开头部分,如果满足某种条件,就当作一条日志,否则,视作是上一条的日志的一部分。

比如:

对于上面举的例子来说(即符合标准log4j的日志),如果每一行开头满足下面这条正则表达式:

1
\s?\d\d\d\d-\d\d-\d\d\s\d\d:\d\d:\d\d,\d\d\d

 

就当作一条新的日志,如果不满足,就说明该行内容是上一条日志(已规定格式开头的那条)的一部分。

当然,我增加了可以自定义配置以哪种方式开头视为一条日志的regex配置,可以对不通的source进行不通的配置,已满足要求。

有了这样的约束,就可以写出将某些多行看作一个flume event的ExecSource,我把它开源到了github上,如有兴趣,欢迎前去试用,如有任何建议,欢迎提出与指正:MultiLineExecSource

1
github.com/qwurey/flume-source-multiline

 

该版本基于flume-ng-core 1.6.0

 
 

 

 

 

 

 

转自:http://blog.csdn.net/asia_kobe/article/details/51003173

分享到:
评论

相关推荐

    flume抽取数据库数据的source

    Flume 是 Apache 开源项目中的一款分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。...同时,由于其开源和可扩展性,Flume JDBC Source 也适合各种规模的企业和项目,以满足不同层次的数据处理需求。

    flume-ftp-source 相关jar包

    由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source的核心功能在于,它能够定期查询指定的SQL数据库,并将查询结果作为事件流传输到Flume的数据通道中,进一步处理或存储。这一特性使得Flume可以方便地整合各种结构化数据源,如MySQL、PostgreSQL...

    flume-sql-source-jar.zip

    总结来说,通过这次对Flume SQL Source的改造,我们不仅提高了数据抽取的灵活性,还增强了其在大数据实时处理场景下的适用性。用户现在可以根据业务需求自由选择增量字段,而不仅仅是依赖于数据库的主键,这无疑为...

    flume-ng-sql-source

    **优点与挑战** 使用"flume-ng-sql-source"的主要优点在于其能够无缝集成Oracle和Kafka,提供实时数据流处理能力。然而,也存在一些挑战,例如性能优化(如批处理和并行处理)、错误处理、以及确保在数据库更新频繁...

    Flume配置文件kafkaSource

    Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。

    flume-ng-sql-source-release-1.5.2.zip

    Flume-ng-sql-source的版本1.5.2可能包含了对早期版本的改进和修复,例如性能优化、错误修复或增加了对更多数据库类型的兼容性。为了使用这个版本,你需要确保你的Flume环境支持此插件,并正确地配置了相关依赖。 ...

    flume-taildir-source-1.9.0.jar

    flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可

    flume-ng-sql-source:Flume Source从SQL数据库导入数据

    flume-ng-sql-source 该项目用于与sql数据库进行通信 当前支持SQL数据库引擎 在最后一次更新之后,该代码已与hibernate集成在一起,因此该技术支持的所有数据库均应正常工作。 编译与包装 $ mvn package 部署方式 ...

    flume-ng-sql-source-1.4.3.jar

    综上所述,Flume 的 SQL Source 插件是连接传统数据库和大数据生态系统的关键组件,它简化了从 RDBMS 实时抽取数据的过程,使得数据的实时处理和分析成为可能。通过适当配置和优化,用户可以构建高效、可靠的实时...

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    Flume自定义source

    Flume自定义Source,数据不丢失,一致性,可以根据自己开发情况选择

    flume-mysql.zip

    agent.sources.mysql-source.type = org.apache.flume.source.sql.SQLSource agent.sources.mysql-source.connection.url = jdbc:mysql://localhost:3306/databasename agent.sources.mysql-source.username = ...

    flume-ng-sql-source-1.5.3.jar

    flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...

Global site tag (gtag.js) - Google Analytics