编写不易,转载请注明:http://shihlei.iteye.com/blog/2306151
项目需要:
Flume收集日志,期望落地文件系统,按小时分割,并压缩保存。
Flume自带的File Roll Sink 只能按时间分割文件,不能定制存放目录,压缩文件等。所以自定义了Sink结合Log4j的RollingFileAppender的特性,完成该功能。
一 借助Log4j2 的Logger实现动态配置,filePattern分割文件压缩
package light.flume; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.RollingFileAppender; import org.apache.logging.log4j.core.appender.rolling.TimeBasedTriggeringPolicy; import org.apache.logging.log4j.core.appender.rolling.TriggeringPolicy; import org.apache.logging.log4j.core.config.AppenderRef; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.logging.log4j.core.layout.PatternLayout; /** * RollingFileLogger , * 基于Log4j2的RollingFileAppender,通过运行时修改配置的方式,在程序中实时指定要输出的目录和压缩规则 * * @author shilei * */ public class RollingFileLogger { private static final Logger logger = LogManager.getLogger(RollingFileLogger.class); private Logger fileWriter; private String fileName; private String filePattern; private String appenderName; private String loggerName; /** * 创建Logger * * @param id * logger唯一标识,相同的会覆盖 * @param fileName * log4j2 fileName , 例如 : logs/main.log * @param filePattern * log4j2 的filePattern * ,例如:logs/$${date:yyyy-MM}/main-%d{yyyy-MM-dd_hh_mm_ss}.log.gz */ public RollingFileLogger(String loggerId, String fileName, String filePattern) { this.fileName = fileName; this.filePattern = filePattern; appenderName = loggerId + "_appender"; loggerName = loggerId + "_logger"; logger.info("fileName : " + fileName); logger.info("filePattern : " + filePattern); logger.info("appenderName : " + appenderName); logger.info("loggerName : " + loggerName); updateLoggerConfig(); fileWriter = LogManager.getLogger(loggerName); } /** * 更新配置 */ private void updateLoggerConfig() { final LoggerContext ctx = (LoggerContext) LogManager.getContext(false); final Configuration config = ctx.getConfiguration(); // add RollingFileAppender TriggeringPolicy policy = TimeBasedTriggeringPolicy.createPolicy("1", "true"); Layout<?> layout = PatternLayout.createLayout("%m%n", null, config, null, Charset.forName("utf-8"), true, false, null, null); Appender appender = RollingFileAppender.createAppender(fileName, filePattern, "true", appenderName, "true", "", "true", policy, null, layout, null, "true", "false", null, config); appender.start(); config.addAppender(appender); // add AsyncLogger AppenderRef ref = AppenderRef.createAppenderRef(appenderName, null, null); AppenderRef[] refs = new AppenderRef[] { ref }; LoggerConfig loggerConfig = LoggerConfig.createLogger(false, Level.INFO, loggerName, "true", refs, null, config, null); loggerConfig.addAppender(appender, null, null); config.addLogger(loggerName, loggerConfig); ctx.updateLoggers(); } public void write(String msg) { fileWriter.info("{}", msg); } public static void main(String[] args) throws Exception { RollingFileLogger writer = new RollingFileLogger("1", "test/test.data", "test/data/${date:yyyy-MM}/test-%d{yyyy-MM-dd_hh_mm_ss}.log.gz"); RollingFileLogger writer2 = new RollingFileLogger("2", "hehe/hehe.data", "hehe/data/${date:yyyy-MM}/test-%d{yyyy-MM-dd_hh_mm_ss}.log.gz"); for (int i = 0; true; i++) { writer.write("hh" + i); writer2.write("hehe" + i); TimeUnit.MICROSECONDS.sleep(100); } } }
二 自定义Flume Sink,集成
package light.flume; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 集成flume sink * * a1.channels = c1 * a1.sinks = k1 * a1.sinks.k1.type = light.flume.RollingFileFlumeSink * a1.sinks.k1.channel = c1 * a1.sinks.k1.sink.id = test * a1.sinks.k1.sink.filename = /tmp/flume_rollingfile_sink/rolling_file.log * a1.sinks.k1.sink.filepattern = /tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd}.log.gz * * * @author shilei * */ public class RollingFileFlumeSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(RollingFileFlumeSink.class); private static final String SINK_ID = "sink.id"; private static final String SINK_FILENAME = "sink.filename"; private static final String SINK_FILEPATTERN = "sink.filepattern"; private RollingFileLogger rollingFileLogger; @Override public void configure(Context context) { String sinkId = context.getString(SINK_ID, "1"); String sinkFileName = context.getString(SINK_FILENAME, "/tmp/flume_rollingfile_sink/rolling_file.log"); String sinkFilePattern = context.getString(SINK_FILEPATTERN, "/tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd}.log.gz"); logger.info("{} : {} ", SINK_ID, sinkId); logger.info("{} : {} ", SINK_FILENAME, sinkFileName); logger.info("{} : {} ", SINK_FILEPATTERN, sinkFilePattern); rollingFileLogger = new RollingFileLogger(sinkId, sinkFileName, sinkFilePattern); } @Override public Status process() throws EventDeliveryException { Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { // This try clause includes whatever Channel operations you want to // do Event event = ch.take(); // Send the Event to the external repository. // storeSomeData(e); handleEvent(event.getBody()); txn.commit(); status = Status.READY; } catch (Throwable t) { txn.rollback(); // Log exception, handle individual exceptions as needed status = Status.BACKOFF; // re-throw all Errors if (t instanceof Error) { throw (Error) t; } } // you must add this line of code in order to close the Transaction. txn.close(); return status; } public void handleEvent(byte[] msg) { try { String msgStr = new String(msg, "utf-8"); rollingFileLogger.write(msgStr); } catch (Exception e) { logger.error("Cookie inject error : ", e.getMessage(), e); } } }
注:
官方costumer sink 的样例,没有关闭会话的方法,会报异常,如下
2016-06-17 15:28:40,486 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. java.lang.IllegalStateException: begin() called when transaction is COMPLETED! at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) at drizzt.injector.flume.InjectFlumeSink.process(InjectFlumeSink.java:22) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)
解决方案:添加:txn.close();
三 Flume 使用Sink
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = light.flume.RollingFileFlumeSink a1.sinks.k1.channel = c1 a1.sinks.k1.sink.id = test a1.sinks.k1.sink.filename = /tmp/flume_rollingfile_sink/rolling_file.log a1.sinks.k1.sink.filepattern = /tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd_hh_mm_ss}.log.gz
文件见附件,可直接放到flume lib下使用。
相关推荐
flume 自定义sink组件 实现sink直接写入mysql数据库
在大数据处理领域,Flume 是一个非常重要的工具,它用于收集、聚合和移动大量日志...通过阅读提供的文章和利用 `flume-ng-core5` 相关库,开发者可以深入理解并实现 Flume 的自定义功能,提升数据处理的灵活性和效率。
3. **配置与集成**:在Flume配置文件中,通过`sinks`关键字指定自定义Sink,并设置相关参数。 **四、自定义组件的优势** 1. **灵活性**:自定义拦截器和Sink可以根据具体业务需求进行定制,实现对数据的特殊处理。 ...
spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。
- 在Flume Agent的配置文件中添加自定义的HDFSSink。例如,配置文件可以包含如下内容: ```properties a3.sinks.k6.type=com.kdg.hdfsSink.HDFSEventSink a3.sinks.k6.hdfs.path=hdfs://NNHA1/song/test3/20%y%m...
Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...
Flume自定义Source,数据不丢失,一致性,可以根据自己开发情况选择
flume与spark streaming结合(pull方式)报错:org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink....
在“flume 文件备份”项目中,我们关注的是如何利用 Flume 来实现文件的备份与管理,特别是通过自定义的 Sink 实现安全的滚动文件备份。下面将详细讲解 Flume 的工作原理以及如何在项目中使用 `flume-sinks-safe-...
flume自定义拦截器学习
flume pull 方式需要的jar包,spark-streaming-flume-sink_2.11_2.1.1.jar
然而,具体的接收器类型在提供的配置文件片段中没有指定,这通常意味着我们需要自己填充这个信息,例如使用 `logger` 或 `hdfs` sink 来将接收到的日志数据写入控制台或 HDFS。 自定义开发 Flume 可能涉及创建新的...
总结来说,Flume 的深入开发涉及 RPC 客户端的使用、容错机制、负载均衡、事务处理以及自定义 Source、Sink 和 Channel 的实现。这些特性使得 Flume 成为一个高度可扩展和适应性强的日志收集系统,能够应对复杂的...
flume1.9 sink支持elasticsearch6.x;采用源码重构的方式;个人测试功能完善,但未投入生产使用,请读者自行考虑慎重使用
2. 配置与加载:在Flume配置文件中,通过`type`属性指定自定义Sink的类全名,同时提供必要的配置参数。 3. 实现数据处理逻辑:在`process()`方法中,根据业务需求对事件数据进行处理,然后写入目标系统,如Elastic...
使用spark集成flume,由于flume默认只支持pull消息的方式,不过它可以自定义消息拉取方式,现要使用poll方式,可以使用spark-streaming-flume-sink_2.11-2.1.0.jar包下的org.apache.spark.streaming.flume.sink....
flume定制化sink,用于参考,使用了多线程及读取配置文件的技术
自定义 Flume sinks 的相关类或 jar 文件可以放在 Flume classpath 中,也可以将 sinks 与 Flume Source 一起编译。 配置水槽 将接收器的类型设置为类名,例如: agent1.sinks.influxDBSink1.type = org.apache....
- 在 Java 中实现自定义拦截器,需要继承 Flume 的 `Interceptor` 接口,并实现其中的 `initialize()`、`intercept(List)` 和 `close()` 方法。 - `initialize()` 方法用于初始化拦截器,可以加载配置信息等。 - ...