ExecSource会readLine()
读取日志中的每一行,把其作为每一个flume event的body放进去,这对于大部分这种每行就可以结束的日志记录,是完全可以的:
1 2 |
2016-03-18 17:53:48,374 INFO namenode.FSNamesystem (FSNamesystem.java:listCorruptFileBlocks(7217)) - there are no corrupt file blocks. 2016-03-18 17:53:48,278 INFO namenode.FSNamesystem (FSNamesystem.java:listCorruptFileBlocks(7217)) - there are no corrupt file blocks. |
但是,对于有stacktrace
的ERROR
日志记录,如果把一行的内容当作一个flume event会有很大的问题,直观上来看,肯定需要把若干行看作是一个flume event,比如下面这样的日志记录,要作为一个flume event,而不是27个(一共27行):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2016-03-18 17:53:40,278 ERROR [HiveServer2-Handler-Pool: Thread-26]: Error occurred during processing of message. java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:268) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:129) at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.thrift.transport.TSaslTransport.receiveSaslMessage(TSaslTransport.java:178) at org.apache.thrift.transport.TSaslServerTransport.handleSaslStartMessage(TSaslServerTransport.java:125) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more Caused by: java.net.SocketException: Connection reset at java.net.SocketInputStream.read(SocketInputStream.java:196) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) ... 10 more |
我这里的实现方式是:识别每行的开头部分,如果满足某种条件,就当作一条日志,否则,视作是上一条的日志的一部分。
比如:
对于上面举的例子来说(即符合标准log4j的日志),如果每一行开头满足下面这条正则表达式:
1
|
\s?\d\d\d\d-\d\d-\d\d\s\d\d:\d\d:\d\d,\d\d\d
|
就当作一条新的日志,如果不满足,就说明该行内容是上一条日志(已规定格式开头的那条)的一部分。
当然,我增加了可以自定义配置以哪种方式开头视为一条日志的regex配置,可以对不通的source进行不通的配置,已满足要求。
有了这样的约束,就可以写出将某些多行看作一个flume event的ExecSource,我把它开源到了github上,如有兴趣,欢迎前去试用,如有任何建议,欢迎提出与指正:MultiLineExecSource
1
|
github.com/qwurey/flume-source-multiline
|
该版本基于flume-ng-core 1.6.0
转自:http://blog.csdn.net/asia_kobe/article/details/51003173
相关推荐
Flume 是 Apache 开源项目中的一款分布式、可靠且可用于有效收集、聚合和移动大量日志数据的工具。...同时,由于其开源和可扩展性,Flume JDBC Source 也适合各种规模的企业和项目,以满足不同层次的数据处理需求。
由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。
Flume-ng-sql-source的核心功能在于,它能够定期查询指定的SQL数据库,并将查询结果作为事件流传输到Flume的数据通道中,进一步处理或存储。这一特性使得Flume可以方便地整合各种结构化数据源,如MySQL、PostgreSQL...
总结来说,通过这次对Flume SQL Source的改造,我们不仅提高了数据抽取的灵活性,还增强了其在大数据实时处理场景下的适用性。用户现在可以根据业务需求自由选择增量字段,而不仅仅是依赖于数据库的主键,这无疑为...
**优点与挑战** 使用"flume-ng-sql-source"的主要优点在于其能够无缝集成Oracle和Kafka,提供实时数据流处理能力。然而,也存在一些挑战,例如性能优化(如批处理和并行处理)、错误处理、以及确保在数据库更新频繁...
Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。
Flume-ng-sql-source的版本1.5.2可能包含了对早期版本的改进和修复,例如性能优化、错误修复或增加了对更多数据库类型的兼容性。为了使用这个版本,你需要确保你的Flume环境支持此插件,并正确地配置了相关依赖。 ...
flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可
flume-ng-sql-source 该项目用于与sql数据库进行通信 当前支持SQL数据库引擎 在最后一次更新之后,该代码已与hibernate集成在一起,因此该技术支持的所有数据库均应正常工作。 编译与包装 $ mvn package 部署方式 ...
综上所述,Flume 的 SQL Source 插件是连接传统数据库和大数据生态系统的关键组件,它简化了从 RDBMS 实时抽取数据的过程,使得数据的实时处理和分析成为可能。通过适当配置和优化,用户可以构建高效、可靠的实时...
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
Flume自定义Source,数据不丢失,一致性,可以根据自己开发情况选择
agent.sources.mysql-source.type = org.apache.flume.source.sql.SQLSource agent.sources.mysql-source.connection.url = jdbc:mysql://localhost:3306/databasename agent.sources.mysql-source.username = ...
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...