- 浏览: 129840 次
- 性别:
- 来自: 深圳
-
文章分类
最新评论
SpoolDirectorySource使用及源码分析
Spooling Directory Source简介
Spooling Directory Source可以获取硬盘上“spooling”目录的数据,这个Source将监视指定目录是否有新文件,如果有新文件的话,就解析这个新文件。事件的解析逻辑是可插拔的。在文件的内容所有的都读取到Channel之后,Spooling Directory Source会重名或者是删除该文件以表示文件已经读取完成。不像Exec Source,这个Source是可靠的,且不会丢失数据。即使Flume重启或者被Kill。但是需要注意如下两点:
1,如果文件在放入spooling目录之后还在写,那么Flume会打印错误日志,并且停止处理该文件。
2,如果文件之后重复使用,Flume将打印错误日志,并且停止处理。
为了避免以上问题,我们可以使用唯一的标识符来命令文件,例如:时间戳。
尽管这个Source是可靠的,但是如果下游发生故障,也会导致Event重复,这种情况就需要通过Flume的其他组件提供保障了。
channels | – |
|
type | – |
组件名:spooldir .
|
spoolDir | – | 读取文件的目录。 |
fileSuffix | .COMPLETED | Spooling读取过的文件,添加的后缀。 |
deletePolicy | never |
完成后的文件是否删除。never:不删除 或immediate:立即删除
|
fileHeader | false | 是不把路径加入到Heander |
fileHeaderKey | file | 路径加入到Header的Key是什么 |
basenameHeader | false | 是不把文件名加入到Heander |
basenameHeaderKey | basename | 文件名加入到Header的Key是什么 |
ignorePattern | ^$ | 采用正则表达是去过滤一些文件。只有符合正则表达式的文件才会被使用。 |
trackerDir | .flumespool | 被处理文件的元数据的存储目录,如果不是绝对路径,就被会解析到spoolDir目录下。 |
consumeOrder | oldest |
消费spooling目录文件的规则,分别有:oldest,youngest和random。在oldest 和 youngest的情况下, 通过文件的最后修改时间来比较文件。如果最后修改时间相同,就根据字典的序列从小开始。在随机的情况 下,就随意读取文件。如果文件列表很长,采用oldest/youngest可能会很慢,因为用oldest/youngest要 扫描文件。但是如果采用random的话,就可能造成新的文件消耗的很快,老的文件一直都没有被消费。 |
maxBackoff | 4000 | 如果Channel已经满了,那么该Source连续尝试写入该Channel的最长时间(单位:毫秒)。 |
batchSize | 100 | 批量传输到Channel的粒度。 |
inputCharset | UTF-8 | 字符集 |
decodeErrorPolicy |
FAIL
|
在文件中有不可解析的字符时的解析策略。FAIL : 抛出一个异常,并且不能解析该文件。REPLACE :
取代不可解析的字符,通常用Unicode U+FFFD. IGNORE : 丢弃不可能解析字符序列。
|
deserializer |
LINE
|
自定序列化的方式,自定的话,必须实现EventDeserializer.Builder .
|
deserializer.* |
|
|
bufferMaxLines | – | 已废弃。 |
bufferMaxLineLength | 5000 | (不推荐使用) 一行中最大的长度,可以使用deserializer.maxLineLength代替。 |
selector.type | replicating | replicating(复制) 或 multiplexing(复用) |
selector.* |
|
取决于selector.type的值 |
interceptors | – | 空格分割的interceptor列表。 |
interceptors.* |
|
|
SpoolDirectorySource示例
读取文件写入到file_roll中
a1.sources = source1 a1.sinks = sink1 a1.channels = channel1 #resources a1.sources.source1.type = spooldir a1.sources.source1.channels = channel1 a1.sources.source1.spoolDir = E:\\home\\spooling a1.sources.source1.fileHeader = true a1.sources.source1.fileHeaderKey = fishfile a1.sources.source1.basenameHeader = true a1.sources.source1.basenameHeaderKey = fishbasename a1.sinks.sink1.type = file_roll a1.sinks.sink1.sink.directory = E:\\home\\file_roll a1.sinks.sink1.sink.rollInterval = 300 a1.sinks.sink1.sink.serializer = TEXT a1.sinks.sink1.sink.batchSize = 100 a1.channels.channel1.type = memory a1.channels.channel1.capacity = 1000 a1.channels.channel1.transactionCapacity = 100 a1.sources.source1.channels = channel1 a1.sinks.sink1.channel = channel1
SpoolDirectorySource源码分析
一,调用configure(Context context)方法初始化:
@Override public synchronized void configure(Context context) { //spool目录 spoolDirectory = context.getString(SPOOL_DIRECTORY); Preconditions.checkState(spoolDirectory != null, "Configuration must specify a spooling directory"); //完成后的文件后缀 completedSuffix = context.getString(SPOOLED_FILE_SUFFIX, DEFAULT_SPOOLED_FILE_SUFFIX); //删除策略,never:不删除 或 immediate:立即删除 deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY); //以下四个参数是是否在header中加入文件名和文件路径。 fileHeader = context.getBoolean(FILENAME_HEADER, DEFAULT_FILE_HEADER); fileHeaderKey = context.getString(FILENAME_HEADER_KEY, DEFAULT_FILENAME_HEADER_KEY); basenameHeader = context.getBoolean(BASENAME_HEADER, DEFAULT_BASENAME_HEADER); basenameHeaderKey = context.getString(BASENAME_HEADER_KEY, DEFAULT_BASENAME_HEADER_KEY); //批量处理的数量 batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE); //字符集 inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET); //在文件中有不可解析的字符时的解析策略 decodeErrorPolicy = DecodeErrorPolicy.valueOf( context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY) .toUpperCase(Locale.ENGLISH)); //过滤文件的正则表达式 ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT); //被处理文件的元数据的存储目录 trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR); //序列化 deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER); deserializerContext = new Context(context.getSubProperties(DESERIALIZER + ".")); //消费spooling目录文件的规则 consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER, DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH)); // "Hack" to support backwards compatibility with previous generation of // spooling directory source, which did not support deserializers Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH); if (bufferMaxLineLength != null && deserializerType != null && deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) { deserializerContext.put(LineDeserializer.MAXLINE_KEY, bufferMaxLineLength.toString()); } maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } }
start方法:
@Override public synchronized void start() { logger.info("SpoolDirectorySource source starting with directory: {}", spoolDirectory); executor = Executors.newSingleThreadScheduledExecutor(); File directory = new File(spoolDirectory); //构建ReliableSpoolingFileEventReader对象 try { reader = new ReliableSpoolingFileEventReader.Builder() .spoolDirectory(directory) .completedSuffix(completedSuffix) .ignorePattern(ignorePattern) .trackerDirPath(trackerDirPath) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) .annotateBaseName(basenameHeader) .baseNameHeader(basenameHeaderKey) .deserializerType(deserializerType) .deserializerContext(deserializerContext) .deletePolicy(deletePolicy) .inputCharset(inputCharset) .decodeErrorPolicy(decodeErrorPolicy) .consumeOrder(consumeOrder) .build(); } catch (IOException ioe) { throw new FlumeException("Error instantiating spooling event parser", ioe); } //构建SpoolDirectoryRunnable线程。 Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter); //每隔POLL_DELAY_MS(500ms)执行以下SpoolDirectoryRunnable线程。 executor.scheduleWithFixedDelay( runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS); super.start(); logger.debug("SpoolDirectorySource source started"); sourceCounter.start(); }构建ReliableSpoolingFileEventReader类,构造方法功能:
1,spooling目录是否存在,是否是目录
2,通过创建零时文件测试spooling目录的权限。
3,创建trackerDir目录和.flumespool-main.meta文件
SpoolDirectoryRunnable线程,主要用于发送和读取Event:
private class SpoolDirectoryRunnable implements Runnable { private ReliableSpoolingFileEventReader reader; private SourceCounter sourceCounter; public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader, SourceCounter sourceCounter) { this.reader = reader; this.sourceCounter = sourceCounter; } @Override public void run() { int backoffInterval = 250; try { while (!Thread.interrupted()) { //ReliableSpoolingFileEventReader读取batchSize大小的Event List<Event> events = reader.readEvents(batchSize); if (events.isEmpty()) { break; } //统计 sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); try { //将Event数组发送到Channel getChannelProcessor().processEventBatch(events); //commit会记录最后一次读取的行数,以便下次知道从哪里开始读 reader.commit(); } catch (ChannelException ex) { //ChannelProcessor批量提交Event出错,会抛出ChannelException异常,此时reader.commit是没有执行的 //所以在接下来的continue后,继续通过reader读取文件的话,还是从原来的位置读取,以保证数据不会丢失。 logger.warn("The channel is full, and cannot write data now. The " + "source will try again after " + String.valueOf(backoffInterval) + " milliseconds"); hitChannelException = true; if (backoff) { TimeUnit.MILLISECONDS.sleep(backoffInterval); backoffInterval = backoffInterval << 1; backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : backoffInterval; } continue; } backoffInterval = 250; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); } } catch (Throwable t) { logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " + "Uncaught exception in SpoolDirectorySource thread. " + "Restart or reconfigure Flume to continue processing.", t); hasFatalError = true; Throwables.propagate(t); } } }ReliableSpoolingFileEventReader读取Event:
public List<Event> readEvents(int numEvents) throws IOException { //committed初始化为true if (!committed) { if (!currentFile.isPresent()) { throw new IllegalStateException("File should not roll when " + "commit is outstanding."); } logger.info("Last read was never committed - resetting mark position."); //正常情况下,会在SpoolDirectorySource类中记录读取的字节数之后,将commited设置为true //没有设置为true,可能是因为发送到Channel异常了,调用下面reset方法可以保证数据不丢失。 currentFile.get().getDeserializer().reset(); } else { // Check if new files have arrived since last call if (!currentFile.isPresent()) { //读取文件,读取文件过程中使用FileFilter过滤掉completedSuffix后缀的文件,然后根据消费文件的规则(consumeOrder)去消费文件。 currentFile = getNextFile(); } // Return empty list if no new files if (!currentFile.isPresent()) { return Collections.emptyList(); } } EventDeserializer des = currentFile.get().getDeserializer(); //根据序列化类读取Event List<Event> events = des.readEvents(numEvents); /* It's possible that the last read took us just up to a file boundary. * If so, try to roll to the next file, if there is one. * Loop until events is not empty or there is no next file in case of 0 byte files */ while (events.isEmpty()) { logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one."); retireCurrentFile(); currentFile = getNextFile(); if (!currentFile.isPresent()) { return Collections.emptyList(); } events = currentFile.get().getDeserializer().readEvents(numEvents); } //添加<span style="font-family:Microsoft Yahei;">文件路径到Header</span> if (annotateFileName) { String filename = currentFile.get().getFile().getAbsolutePath(); for (Event event : events) { event.getHeaders().put(fileNameHeader, filename); } } //添加文件名到Header if (annotateBaseName) { String basename = currentFile.get().getFile().getName(); for (Event event : events) { event.getHeaders().put(baseNameHeader, basename); } } committed = false; lastFileRead = currentFile; return events; }消费文件的规则,以OLDEST为例:
for (File candidateFile: candidateFiles) { //已选择文件的最后修改时间,减去文件列表取的文件最后修改时间 long compare = selectedFile.lastModified() - candidateFile.lastModified(); if (compare == 0) { // ts is same pick smallest lexicographically. //时间一样就根据字典序列排序 selectedFile = smallerLexicographical(selectedFile, candidateFile); } else if (compare > 0) { // candidate is older (cand-ts < selec-ts). selectedFile = candidateFile; } }
在ReliableSpoolingFileEventReader类读取Events之后,调用commit方法提交,这里有个问题,batchSize为200,我们每次也就读取200行,那么SpoolingDirectorySource是如何标记我们读到文件的那个位置的呢?
其实SpoolingDirectorySource在commit时,会调用EventDeserializer的mark方法标记此时读取在文件的什么位置。源码如下:
public synchronized void storePosition(long position) throws IOException { metaCache.setOffset(position); writer.append(metaCache); writer.sync(); writer.flush(); }上面的方法会把文件的读取到什么位置记录到.flumespool\.flumespool-main.meta(默认情况下)文件中。
public void commit() throws IOException { if (!committed && currentFile.isPresent()) { currentFile.get().getDeserializer().mark(); //mark成功后,将committed设置为true committed = true; } }
相关推荐
# 使用SpoolDirectorySource,它会监控指定目录下新产生的文件 agent.sources.logSource.type = spoolDir # 指定日志文件的输入目录 agent.sources.logSource.spoolDir = /path/to/log/files # 文件被读取后自动...
内容概要:本文详细介绍了基于SpringBoot和Vue开发的养老院管理系统的具体实现细节。该系统采用前后端不分离的架构,旨在快速迭代并满足中小项目的开发需求。文中涵盖了多个关键技术点,如数据库设计(组合唯一约束、触发器)、定时任务(@Scheduled、@Async)、前端数据绑定(Vue的条件渲染和动态class绑定)、权限控制(RBAC模型、自定义注解)以及报表导出(SXSSFWorkbook流式导出)。此外,还讨论了开发过程中遇到的一些常见问题及其解决方案,如CSRF防护、静态资源配置、表单提交冲突等。 适合人群:具备一定Java和前端开发经验的研发人员,尤其是对SpringBoot和Vue有一定了解的开发者。 使用场景及目标:适用于需要快速开发中小型管理系统的团队,帮助他们理解如何利用SpringBoot和Vue进行全栈开发,掌握前后端不分离架构的优势和注意事项。 其他说明:文章不仅提供了详细的代码示例和技术要点,还分享了许多实用的小技巧和避坑指南,有助于提高开发效率和系统稳定性。
家族企业如何应对人才流失问题?
员工关怀制度.doc
内容概要:本文详细探讨了对传统蚁群算法进行改进的方法,特别是在路径规划领域的应用。主要改进措施包括:采用排序搜索机制,即在每轮迭代后对所有路径按长度排序并只强化前20%的优质路径;调整信息素更新规则,如引入动态蒸发系数和分级强化策略;优化路径选择策略,增加排序权重因子;以及实现动态地图调整,使算法能够快速适应环境变化。实验结果显示,改进后的算法在收敛速度上有显著提升,在复杂地形中的表现更加稳健。 适合人群:从事路径规划研究的技术人员、算法工程师、科研工作者。 使用场景及目标:适用于需要高效路径规划的应用场景,如物流配送、机器人导航、自动驾驶等领域。目标是提高路径规划的效率和准确性,减少不必要的迂回路径,确保在动态环境中快速响应变化。 其他说明:改进后的蚁群算法不仅提高了收敛速度,还增强了对复杂环境的适应能力。建议在实际应用中结合可视化工具进行调参,以便更好地观察和优化蚂蚁的探索轨迹。此外,还需注意避免过度依赖排序机制而导致的过拟合问题。
内容概要:本文详细介绍了利用粒子群优化(PSO)算法解决配电网中分布式光伏系统的选址与定容问题的方法。首先阐述了问题背景,即在复杂的配电网环境中选择合适的光伏安装位置和确定合理的装机容量,以降低网损、减小电压偏差并提高光伏消纳效率。接着展示了具体的PSO算法实现流程,包括粒子初始化、适应度函数构建、粒子位置更新规则以及越界处理机制等关键技术细节。文中还讨论了目标函数的设计思路,将多个相互制约的目标如网损、电压偏差和光伏消纳通过加权方式整合为单一评价标准。此外,作者分享了一些实践经验,例如采用前推回代法进行快速潮流计算,针对特定应用场景调整权重系数,以及引入随机波动模型模拟光伏出力特性。最终实验结果显示,经过优化后的方案能够显著提升系统的整体性能。 适用人群:从事电力系统规划与设计的专业人士,尤其是那些需要处理分布式能源集成问题的研究人员和技术人员。 使用场景及目标:适用于希望深入了解如何运用智能优化算法解决实际工程难题的人士;旨在帮助读者掌握PSO算法的具体应用方法,从而更好地应对配电网中分布式光伏系统的选址定容挑战。 其他说明:文中提供了完整的Matlab源代码片段,便于读者理解和复现研究结果;同时也提到了一些潜在改进方向,鼓励进一步探索和创新。
内容概要:本文详细介绍了丰田Prius2004永磁同步电机的设计流程,涵盖从初始参数计算到最终温升仿真的各个环节。首先利用Excel进行基本参数计算,如铁芯叠厚、定子外径等,确保设计符合预期性能。接着使用Maxwell进行参数化仿真,通过Python脚本自动化调整磁钢尺寸和其他关键参数,优化电机性能并减少齿槽转矩。随后借助橡树岭实验室提供的实测数据验证仿真结果,确保模型准确性。最后采用MotorCAD进行温升仿真,优化冷却系统设计,确保电机运行安全可靠。文中还分享了许多实用技巧,如如何正确设置材料参数、避免常见的仿真错误等。 适合人群:从事电机设计的专业工程师和技术人员,尤其是对永磁同步电机设计感兴趣的读者。 使用场景及目标:适用于希望深入了解永磁同步电机设计全过程的技术人员,帮助他们在实际工作中提高设计效率和精度,解决常见问题,优化设计方案。 其他说明:文章提供了丰富的实战经验和具体的操作步骤,强调了理论与实践相结合的重要性。同时提醒读者注意一些容易忽视的细节,如材料参数的选择和仿真模型的准确性。
内容概要:本文详细介绍了基于DSP28335的单相逆变器的设计与实现,涵盖了多个关键技术模块。首先,ADC采样模块用于获取输入电压和电流的数据,确保后续控制的准确性。接着,PWM控制模块负责生成精确的脉宽调制信号,控制逆变器的工作状态。液晶显示模块则用于实时展示电压、电流等重要参数。单相锁相环电路实现了电网电压的频率和相位同步,确保逆变器输出的稳定性。最后,电路保护程序提供了过流保护等功能,保障系统的安全性。每个模块都有详细的代码示例和技术要点解析。 适合人群:具备一定嵌入式系统和电力电子基础知识的研发人员,尤其是对DSP28335感兴趣的工程师。 使用场景及目标:适用于单相逆变器项目的开发,帮助开发者理解和掌握各个模块的具体实现方法,提高系统的可靠性和性能。 其他说明:文中不仅提供了具体的代码实现,还分享了许多调试经验和常见问题的解决方案,有助于读者更好地理解和应用相关技术。
SecureCRT安装包
内容概要:本文详细介绍了如何利用C#、WPF和MVVM模式构建一个大屏看板3D可视化系统。主要内容涵盖WPF编程设计、自定义工业控件、数据库设计、MVVM架构应用以及典型的三层架构设计。文中不仅提供了具体的代码实例,还讨论了数据库连接配置、3D模型绑定、依赖属性注册等关键技术细节。此外,文章强调了项目开发过程中需要注意的问题,如3D坐标系换算、MVVM中命令传递、数据库连接字符串加密等。 适合人群:具备一定C#编程基础,对WPF和MVVM模式有一定了解的研发人员。 使用场景及目标:适用于希望深入了解WPF和MVVM模式在实际项目中应用的开发者,特别是那些从事工业控制系统、数据可视化平台开发的专业人士。通过学习本文,读者可以掌握如何构建高效、稳定的大屏看板3D可视化系统。 其他说明:本文提供的设计方案和技术实现方式,可以帮助开发者更好地理解和应用WPF和MVVM模式,同时也能为相关领域的项目开发提供有价值的参考。
基于ssm的系统设计,包含sql文件(Spring+SpringMVC+MyBatis)
内容概要:本文详细介绍了利用COMSOL进行非厄米超表面双参数传感器的设计与实现。首先,通过构建超表面单元并引入虚部折射率,实现了PT对称系统的增益-损耗交替分布。接着,通过频域扫描和参数化扫描,捕捉到了复频率空间中的能级劈裂现象,并找到了奇异点(Exceptional Point),从而显著提高了传感器对微小扰动的敏感度。此外,文章探讨了双参数检测的独特优势,如解耦温度和折射率变化的能力,并展示了其在病毒检测、工业流程监控等领域的潜在应用。 适合人群:从事光学传感器研究的专业人士,尤其是对非厄米系统和COMSOL仿真感兴趣的科研人员。 使用场景及目标:适用于需要高精度、多参数检测的应用场合,如生物医学检测、环境监测等。目标是提高传感器的灵敏度和分辨率,解决传统传感器中存在的参数交叉敏感问题。 其他说明:文中提供了详细的建模步骤和代码片段,帮助读者理解和重现实验结果。同时,强调了在建模过程中需要注意的关键技术和常见问题,如网格划分、参数设置等。
怎样健全员工福利体系.docx
离职证明范本.doc
6538b79724855900a9c930904a302920.part6
员工离职单.doc
内容概要:本文详细介绍了在COMSOL中进行超材料异常折射仿真的关键技术。首先解释了异常折射现象及其产生的原因,接着通过具体代码展示了如何利用相位梯度和结构色散精确计算折射角。文中还讨论了边界条件的设置、网格划分的优化以及参数化扫描的应用。此外,提供了多个实用脚本和技巧,帮助提高仿真的精度和效率。最后强调了验证结果的重要性和一些常见的注意事项。 适合人群:从事电磁仿真研究的专业人士,尤其是对超材料和异常折射感兴趣的科研人员和技术开发者。 使用场景及目标:适用于需要深入理解和解决超材料中异常折射问题的研究项目。主要目标是掌握COMSOL中异常折射仿真的完整流程,确保仿真结果的准确性并优化计算性能。 其他说明:文章不仅提供了详细的代码示例和技术细节,还分享了许多实践经验,有助于读者更好地应对实际仿真过程中可能出现的问题。
招聘工作数据分析表.xls
platform-tools-latest-windows.zip
个人资料临时存储QT资源