`

Flume自定义Sink—实现按天或小时分割文件并压缩保存

阅读更多

   编写不易,转载请注明:http://shihlei.iteye.com/blog/2306151

 

    项目需要:

    Flume收集日志,期望落地文件系统,按小时分割,并压缩保存。

    Flume自带的File Roll Sink 只能按时间分割文件,不能定制存放目录,压缩文件等。所以自定义了Sink结合Log4j的RollingFileAppender的特性,完成该功能。

 

一 借助Log4j2 的Logger实现动态配置,filePattern分割文件压缩

 

package light.flume;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.RollingFileAppender;
import org.apache.logging.log4j.core.appender.rolling.TimeBasedTriggeringPolicy;
import org.apache.logging.log4j.core.appender.rolling.TriggeringPolicy;
import org.apache.logging.log4j.core.config.AppenderRef;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.logging.log4j.core.layout.PatternLayout;

/**
 * RollingFileLogger ,
 * 基于Log4j2的RollingFileAppender,通过运行时修改配置的方式,在程序中实时指定要输出的目录和压缩规则
 * 
 * @author shilei
 *
 */
public class RollingFileLogger {

	private static final Logger logger = LogManager.getLogger(RollingFileLogger.class);

	private Logger fileWriter;

	private String fileName;
	private String filePattern;
	private String appenderName;
	private String loggerName;

	/**
	 * 创建Logger
	 * 
	 * @param id
	 *            logger唯一标识,相同的会覆盖
	 * @param fileName
	 *            log4j2 fileName , 例如 : logs/main.log
	 * @param filePattern
	 *            log4j2 的filePattern
	 *            ,例如:logs/$${date:yyyy-MM}/main-%d{yyyy-MM-dd_hh_mm_ss}.log.gz
	 */
	public RollingFileLogger(String loggerId, String fileName, String filePattern) {
		this.fileName = fileName;
		this.filePattern = filePattern;

		appenderName = loggerId + "_appender";
		loggerName = loggerId + "_logger";

		logger.info("fileName : " + fileName);
		logger.info("filePattern : " + filePattern);
		logger.info("appenderName : " + appenderName);
		logger.info("loggerName : " + loggerName);

		updateLoggerConfig();
		fileWriter = LogManager.getLogger(loggerName);
	}

	/**
	 * 更新配置
	 */
	private void updateLoggerConfig() {
		final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
		final Configuration config = ctx.getConfiguration();

		// add RollingFileAppender
		TriggeringPolicy policy = TimeBasedTriggeringPolicy.createPolicy("1", "true");
		Layout<?> layout = PatternLayout.createLayout("%m%n", null, config, null, Charset.forName("utf-8"), true, false, null, null);
		Appender appender = RollingFileAppender.createAppender(fileName, filePattern, "true", appenderName, "true", "", "true", policy, null, layout, null, "true", "false", null, config);
		appender.start();
		config.addAppender(appender);

		// add AsyncLogger
		AppenderRef ref = AppenderRef.createAppenderRef(appenderName, null, null);
		AppenderRef[] refs = new AppenderRef[] { ref };

		LoggerConfig loggerConfig = LoggerConfig.createLogger(false, Level.INFO, loggerName, "true", refs, null, config, null);

		loggerConfig.addAppender(appender, null, null);
		config.addLogger(loggerName, loggerConfig);
		ctx.updateLoggers();
	}

	public void write(String msg) {
		fileWriter.info("{}", msg);
	}

	public static void main(String[] args) throws Exception {
		RollingFileLogger writer = new RollingFileLogger("1", "test/test.data", "test/data/${date:yyyy-MM}/test-%d{yyyy-MM-dd_hh_mm_ss}.log.gz");
		RollingFileLogger writer2 = new RollingFileLogger("2", "hehe/hehe.data", "hehe/data/${date:yyyy-MM}/test-%d{yyyy-MM-dd_hh_mm_ss}.log.gz");
		for (int i = 0; true; i++) {
			writer.write("hh" + i);
			writer2.write("hehe" + i);
			TimeUnit.MICROSECONDS.sleep(100);
		}
	}
}

 

 

二 自定义Flume Sink,集成

 

package light.flume;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 集成flume sink
 * 
 * a1.channels = c1
 * a1.sinks = k1
 * a1.sinks.k1.type = light.flume.RollingFileFlumeSink
 * a1.sinks.k1.channel = c1
 * a1.sinks.k1.sink.id = test
 * a1.sinks.k1.sink.filename = /tmp/flume_rollingfile_sink/rolling_file.log
 * a1.sinks.k1.sink.filepattern = /tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd}.log.gz
 * 
 * 
 * @author shilei
 *
 */
public class RollingFileFlumeSink extends AbstractSink implements Configurable {
	private static final Logger logger = LoggerFactory.getLogger(RollingFileFlumeSink.class);

	private static final String SINK_ID = "sink.id";
	private static final String SINK_FILENAME = "sink.filename";
	private static final String SINK_FILEPATTERN = "sink.filepattern";

	private RollingFileLogger rollingFileLogger;

	@Override
	public void configure(Context context) {
		String sinkId = context.getString(SINK_ID, "1");
		String sinkFileName = context.getString(SINK_FILENAME, "/tmp/flume_rollingfile_sink/rolling_file.log");
		String sinkFilePattern = context.getString(SINK_FILEPATTERN, "/tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd}.log.gz");

		logger.info("{} : {} ", SINK_ID, sinkId);
		logger.info("{} : {} ", SINK_FILENAME, sinkFileName);
		logger.info("{} : {} ", SINK_FILEPATTERN, sinkFilePattern);

		rollingFileLogger = new RollingFileLogger(sinkId, sinkFileName, sinkFilePattern);
	}

	@Override
	public Status process() throws EventDeliveryException {
		Status status = null;
		// Start transaction
		Channel ch = getChannel();
		Transaction txn = ch.getTransaction();
		txn.begin();
		try {
			// This try clause includes whatever Channel operations you want to
			// do
			Event event = ch.take();
			// Send the Event to the external repository.
			// storeSomeData(e);

			handleEvent(event.getBody());

			txn.commit();
			status = Status.READY;
		} catch (Throwable t) {
			txn.rollback();
			// Log exception, handle individual exceptions as needed
			status = Status.BACKOFF;

			// re-throw all Errors
			if (t instanceof Error) {
				throw (Error) t;
			}
		}
		// you must add this line of code in order to close the Transaction.
		txn.close();
		return status;
	}

	public void handleEvent(byte[] msg)  {
		try {
			String msgStr = new String(msg, "utf-8");
			rollingFileLogger.write(msgStr);
		} catch (Exception e) {
			logger.error("Cookie inject error : ", e.getMessage(), e);
		}
	}
}
 

 

 注:

    官方costumer sink 的样例,没有关闭会话的方法,会报异常,如下

 

2016-06-17 15:28:40,486 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: begin() called when transaction is COMPLETED!
	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
	at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
	at drizzt.injector.flume.InjectFlumeSink.process(InjectFlumeSink.java:22)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)
 

 

 解决方案:添加:txn.close();

 

三 Flume 使用Sink

 

  a1.channels = c1
  a1.sinks = k1
  a1.sinks.k1.type = light.flume.RollingFileFlumeSink
  a1.sinks.k1.channel = c1
  a1.sinks.k1.sink.id = test
  a1.sinks.k1.sink.filename = /tmp/flume_rollingfile_sink/rolling_file.log
  a1.sinks.k1.sink.filepattern = /tmp/flume_rollingfile_sink/${date:yyyy-MM}/rolling_file-%d{yyyy-MM-dd_hh_mm_ss}.log.gz

 

文件见附件,可直接放到flume lib下使用。

 

分享到:
评论

相关推荐

    flume sink直写mysql

    flume 自定义sink组件 实现sink直接写入mysql数据库

    flume自定义功能实现代码

    在大数据处理领域,Flume 是一个非常重要的工具,它用于收集、聚合和移动大量日志...通过阅读提供的文章和利用 `flume-ng-core5` 相关库,开发者可以深入理解并实现 Flume 的自定义功能,提升数据处理的灵活性和效率。

    flume自定义函数

    3. **配置与集成**:在Flume配置文件中,通过`sinks`关键字指定自定义Sink,并设置相关参数。 **四、自定义组件的优势** 1. **灵活性**:自定义拦截器和Sink可以根据具体业务需求进行定制,实现对数据的特殊处理。 ...

    spark-streaming-flume-sink_2.11-2.0.0.jar

    spark-streaming-flume-sink_2.11-2.0.0.jar的jar包。

    Flume配置双HA hdfsSink.docx

    - 在Flume Agent的配置文件中添加自定义的HDFSSink。例如,配置文件可以包含如下内容: ```properties a3.sinks.k6.type=com.kdg.hdfsSink.HDFSEventSink a3.sinks.k6.hdfs.path=hdfs://NNHA1/song/test3/20%y%m...

    springboot_log4j2_flume

    Log4j直接发送数据到Flume + Kafka (方式一) 通过flume收集系统日记, 收集的方式通常采用以下. 系统logs直接发送给flume系统, 本文主要记录种方式进行说明. 文章链接,请看:...

    Flume自定义source

    Flume自定义Source,数据不丢失,一致性,可以根据自己开发情况选择

    2.0.0.rar之spark-streaming-flume-sink_2.11-2.0.0.jar

    flume与spark streaming结合(pull方式)报错:org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink....

    flume 文件备份

    在“flume 文件备份”项目中,我们关注的是如何利用 Flume 来实现文件的备份与管理,特别是通过自定义的 Sink 实现安全的滚动文件备份。下面将详细讲解 Flume 的工作原理以及如何在项目中使用 `flume-sinks-safe-...

    flume自定义拦截器学习

    flume自定义拦截器学习

    spark-streaming-flume-sink_2.11_2.1.1.jar

    flume pull 方式需要的jar包,spark-streaming-flume-sink_2.11_2.1.1.jar

    04、日志收集系统Flume-flume自定义开发案例.docx

    然而,具体的接收器类型在提供的配置文件片段中没有指定,这通常意味着我们需要自己填充这个信息,例如使用 `logger` 或 `hdfs` sink 来将接收到的日志数据写入控制台或 HDFS。 自定义开发 Flume 可能涉及创建新的...

    04、日志收集系统Flume-实时计算4-4:flume自定义开发.pptx

    总结来说,Flume 的深入开发涉及 RPC 客户端的使用、容错机制、负载均衡、事务处理以及自定义 Source、Sink 和 Channel 的实现。这些特性使得 Flume 成为一个高度可扩展和适应性强的日志收集系统,能够应对复杂的...

    flume-ng-elasticsearch-sink-1.9.0.jar

    flume1.9 sink支持elasticsearch6.x;采用源码重构的方式;个人测试功能完善,但未投入生产使用,请读者自行考虑慎重使用

    flume-es5.X依赖.zip

    2. 配置与加载:在Flume配置文件中,通过`type`属性指定自定义Sink的类全名,同时提供必要的配置参数。 3. 实现数据处理逻辑:在`process()`方法中,根据业务需求对事件数据进行处理,然后写入目标系统,如Elastic...

    spark-streaming-flume-sink_2.11-2.1.0.jar

    使用spark集成flume,由于flume默认只支持pull消息的方式,不过它可以自定义消息拉取方式,现要使用poll方式,可以使用spark-streaming-flume-sink_2.11-2.1.0.jar包下的org.apache.spark.streaming.flume.sink....

    flume定制化sink

    flume定制化sink,用于参考,使用了多线程及读取配置文件的技术

    influxdb-flume-sink:处理 JSON 格式的 Flume 事件并将它们发送到 InfluxDB 系列的可配置 Flume Sink

    自定义 Flume sinks 的相关类或 jar 文件可以放在 Flume classpath 中,也可以将 sinks 与 Flume Source 一起编译。 配置水槽 将接收器的类型设置为类名,例如: agent1.sinks.influxDBSink1.type = org.apache....

    flume-demo_大数据_flume_DEMO_自定义拦截器_

    - 在 Java 中实现自定义拦截器,需要继承 Flume 的 `Interceptor` 接口,并实现其中的 `initialize()`、`intercept(List)` 和 `close()` 方法。 - `initialize()` 方法用于初始化拦截器,可以加载配置信息等。 - ...

Global site tag (gtag.js) - Google Analytics