需求描述:
flume使用 execSource 类型 实现截取数据行中指定列的数据(详见下图)
实现:
1.方案一: execSource接受的是linux命令,所以可以使用linux awk实现这个功能
命令:tail -F /root/test.log | awk -F ',' '{print $2;fflush()}' 注意:fflush()一定要加,否则不输出
完整的flume-exec.properties文件内容如下:
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' hd1.sources=s1 hd1.sources.s1.type=exec hd1.sources.s1.shell=/bin/bash -c hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}' hd1.channels=c1 hd1.channels.c1.type=memory hd1.channels.c1.capacity=1000 hd1.channels.c1.transcationCapacity=100 hd1.sinks=sk1 hd1.sinks.sk1.type=logger #把source 和 sink 关联到channel上 #1个source可以对应多个channel(重点) hd1.sources.s1.channels=c1 #一个sink只对应1个sink(重点) hd1.sinks.sk1.channel=c1
2.方案二:修改源码,扩展ExecSource
(1) 具体怎么改?
看ExecSource.java 的源码,可以知道 ExecSource是通过BufferedReader,读取InputStream,然后把读取出来的每行内容包装成event,发往channel,所以我们可以在包装成event之前,把内容替换成我们需要的
(2)具体实现:
修改ExecSource.java
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.flume.source; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.*; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SourceCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * <p> * A {@link Source} implementation that executes a Unix process and turns each * line of text into an event. * </p> * <p> * This source runs a given Unix command on start up and expects that process to * continuously produce data on standard out (stderr ignored by default). Unless * told to restart, if the process exits for any reason, the source also exits and * will produce no further data. This means configurations such as <tt>cat [named pipe]</tt> * or <tt>tail -F [file]</tt> are going to produce the desired results where as * <tt>date</tt> will probably not - the former two commands produce streams of * data where as the latter produces a single event and exits. * </p> * <p> * The <tt>ExecSource</tt> is meant for situations where one must integrate with * existing systems without modifying code. It is a compatibility gateway built * to allow simple, stop-gap integration and doesn't necessarily offer all of * the benefits or guarantees of native integration with Flume. If one has the * option of using the <tt>AvroSource</tt>, for instance, that would be greatly * preferred to this source as it (and similarly implemented sources) can * maintain the transactional guarantees that exec can not. * </p> * <p> * <i>Why doesn't <tt>ExecSource</tt> offer transactional guarantees?</i> * </p> * <p> * The problem with <tt>ExecSource</tt> and other asynchronous sources is that * the source can not guarantee that if there is a failure to put the event into * the {@link Channel} the client knows about it. As a for instance, one of the * most commonly requested features is the <tt>tail -F [file]</tt>-like use case * where an application writes to a log file on disk and Flume tails the file, * sending each line as an event. While this is possible, there's an obvious * problem; what happens if the channel fills up and Flume can't send an event? * Flume has no way of indicating to the application writing the log file that * it needs to retain the log or that the event hasn't been sent, for some * reason. If this doesn't make sense, you need only know this: <b>Your * application can never guarantee data has been received when using a * unidirectional asynchronous interface such as ExecSource!</b> As an extension * of this warning - and to be completely clear - there is absolutely zero * guarantee of event delivery when using this source. You have been warned. * </p> * <p> * <b>Configuration options</b> * </p> * <table> * <tr> * <th>Parameter</th> * <th>Description</th> * <th>Unit / Type</th> * <th>Default</th> * </tr> * <tr> * <td><tt>command</tt></td> * <td>The command to execute</td> * <td>String</td> * <td>none (required)</td> * </tr> * <tr> * <td><tt>restart</tt></td> * <td>Whether to restart the command when it exits</td> * <td>Boolean</td> * <td>false</td> * </tr> * <tr> * <td><tt>restartThrottle</tt></td> * <td>How long in milliseconds to wait before restarting the command</td> * <td>Long</td> * <td>10000</td> * </tr> * <tr> * <td><tt>logStderr</tt></td> * <td>Whether to log or discard the standard error stream of the command</td> * <td>Boolean</td> * <td>false</td> * </tr> * <tr> * <td><tt>batchSize</tt></td> * <td>The number of events to commit to channel at a time.</td> * <td>integer</td> * <td>20</td> * </tr> * <tr> * <td><tt>batchTimeout</tt></td> * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached, * before data is pushed downstream.</td> * <td>long</td> * <td>3000</td> * </tr> * </table> * <p> * <b>Metrics</b> * </p> * <p> * TODO * </p> */ public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable { private static final Logger logger = LoggerFactory.getLogger(ExecSource.class); private String shell; private String command; private SourceCounter sourceCounter; private ExecutorService executor; private Future<?> runnerFuture; private long restartThrottle; private boolean restart; private boolean logStderr; private Integer bufferCount; private long batchTimeout; private ExecRunnable runner; private Charset charset; //开关,是否做split private boolean customSplitSwitchOn; //split的分隔符 private String customSplitDelimiter; //split后获取的列 private Integer customFetchColId; @Override public void start() { logger.info("Exec source starting with command: {}", command); // Start the counter before starting any threads that may access it. sourceCounter.start(); executor = Executors.newSingleThreadExecutor(); //把自定义的三个参数,传给构造函数 runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset, customSplitSwitchOn, customSplitDelimiter, customFetchColId); // Start the runner thread. runnerFuture = executor.submit(runner); // Mark the Source as RUNNING. super.start(); logger.debug("Exec source started"); } @Override public void stop() { logger.info("Stopping exec source with command: {}", command); if (runner != null) { runner.setRestart(false); runner.kill(); } if (runnerFuture != null) { logger.debug("Stopping exec runner"); runnerFuture.cancel(true); logger.debug("Exec runner stopped"); } executor.shutdown(); while (!executor.isTerminated()) { logger.debug("Waiting for exec executor service to stop"); try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } sourceCounter.stop(); super.stop(); logger.debug("Exec source with command:{} stopped. Metrics:{}", command, sourceCounter); } @Override public void configure(Context context) { command = context.getString("command"); Preconditions.checkState(command != null, "The parameter command must be specified"); restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE); restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART, ExecSourceConfigurationConstants.DEFAULT_RESTART); logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR, ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR); bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT); charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, ExecSourceConfigurationConstants.DEFAULT_CHARSET)); shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null); // 获取split开关配置值 customSplitSwitchOn = context .getBoolean(ExecSourceConfigurationConstants.CUSTOM_SPLIT_SWITCH_ON, ExecSourceConfigurationConstants.DEFAULT_CUSTON_SWITCH_ON); //获取分隔符配置值 customSplitDelimiter = context .getString(ExecSourceConfigurationConstants.CUSTOM_SPLIT_DELIMITER, ExecSourceConfigurationConstants.DEFAULT_CUSTOM_SPLIT_DELIMITER); //获取split后的列 customFetchColId = context.getInteger(ExecSourceConfigurationConstants.CUSTOM_FETCH_COL, ExecSourceConfigurationConstants.DEFAULT_CUSTOM_FETCH_COL_ID); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } } private static class ExecRunnable implements Runnable { //构造函数加入三个参数 public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset, boolean customSplitSwitchOn, String customSplitDelimiter, Integer customFetchColId) { this.command = command; this.channelProcessor = channelProcessor; this.sourceCounter = sourceCounter; this.restartThrottle = restartThrottle; this.bufferCount = bufferCount; this.batchTimeout = batchTimeout; this.restart = restart; this.logStderr = logStderr; this.charset = charset; this.shell = shell; //custom属性 this.customSplitSwitchOn = customSplitSwitchOn; this.customSplitDelimiter = customSplitDelimiter; this.customFetchColId = customFetchColId; } private final String shell; private final String command; private final ChannelProcessor channelProcessor; private final SourceCounter sourceCounter; private volatile boolean restart; private final long restartThrottle; private final int bufferCount; private long batchTimeout; private final boolean logStderr; private final Charset charset; //split的分隔符 private String customSplitDelimiter; //开关(是否允许做split) private boolean customSplitSwitchOn; //split后需要获取的列id private int customFetchColId; private Process process = null; private SystemClock systemClock = new SystemClock(); private Long lastPushToChannel = systemClock.currentTimeMillis(); ScheduledExecutorService timedFlushService; ScheduledFuture<?> future; @Override public void run() { do { String exitCode = "unknown"; BufferedReader reader = null; String line = null; final List<Event> eventList = new ArrayList<Event>(); timedFlushService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat( "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); try { if (shell != null) { String[] commandArgs = formulateShellCommand(shell, command); process = Runtime.getRuntime().exec(commandArgs); } else { String[] commandArgs = command.split("\\s+"); process = new ProcessBuilder(commandArgs).start(); } reader = new BufferedReader( new InputStreamReader(process.getInputStream(), charset)); // StderrLogger dies as soon as the input stream is invalid StderrReader stderrReader = new StderrReader(new BufferedReader( new InputStreamReader(process.getErrorStream(), charset)), logStderr); stderrReader.setName("StderrReader-[" + command + "]"); stderrReader.setDaemon(true); stderrReader.start(); future = timedFlushService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { synchronized (eventList) { if (!eventList.isEmpty() && timeout()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Exception occurred when processing event batch", e); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } } }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); String splits[]; while ((line = reader.readLine()) != null) { sourceCounter.incrementEventReceivedCount(); synchronized (eventList) { //如果开启了split开关,那么将根据指定的分割符做split,并返回指定列的内容 if (customSplitSwitchOn) { try { splits = line.split(customSplitDelimiter); if (splits.length > customFetchColId) { line = splits[customFetchColId]; } else { logger.error("customColId is larger than " + splits.length); continue; } } catch (Exception e) { logger.error("Failed while split line: ", e); continue; } } eventList.add(EventBuilder.withBody(line.getBytes(charset))); if (eventList.size() >= bufferCount || timeout()) { flushEventBatch(eventList); } } } synchronized (eventList) { if (!eventList.isEmpty()) { flushEventBatch(eventList); } } } catch (Exception e) { logger.error("Failed while running command: " + command, e); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } } finally { if (reader != null) { try { reader.close(); } catch (IOException ex) { logger.error("Failed to close reader for exec source", ex); } } exitCode = String.valueOf(kill()); } if (restart) { logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); try { Thread.sleep(restartThrottle); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { logger.info("Command [" + command + "] exited with " + exitCode); } } while (restart); } private void flushEventBatch(List<Event> eventList) { channelProcessor.processEventBatch(eventList); sourceCounter.addToEventAcceptedCount(eventList.size()); eventList.clear(); lastPushToChannel = systemClock.currentTimeMillis(); } private boolean timeout() { return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout; } private static String[] formulateShellCommand(String shell, String command) { String[] shellArgs = shell.split("\\s+"); String[] result = new String[shellArgs.length + 1]; System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); result[shellArgs.length] = command; return result; } public int kill() { if (process != null) { synchronized (process) { process.destroy(); try { int exitValue = process.waitFor(); // Stop the Thread that flushes periodically if (future != null) { future.cancel(true); } if (timedFlushService != null) { timedFlushService.shutdown(); while (!timedFlushService.isTerminated()) { try { timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug( "Interrupted while waiting for exec executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } } return exitValue; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } return Integer.MIN_VALUE; } return Integer.MIN_VALUE / 2; } public void setRestart(boolean restart) { this.restart = restart; } } private static class StderrReader extends Thread { private BufferedReader input; private boolean logStderr; protected StderrReader(BufferedReader input, boolean logStderr) { this.input = input; this.logStderr = logStderr; } @Override public void run() { try { int i = 0; String line = null; while ((line = input.readLine()) != null) { if (logStderr) { // There is no need to read 'line' with a charset // as we do not to propagate it. // It is in UTF-16 and would be printed in UTF-8 format. logger.info("StderrLogger[{}] = '{}'", ++i, line); } } } catch (IOException e) { logger.info("StderrLogger exiting", e); } finally { try { if (input != null) { input.close(); } } catch (IOException ex) { logger.error("Failed to close stderr reader for exec source", ex); } } } } }
修改ExecSourceConfigurationConstants
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.flume.source; import com.google.common.annotations.VisibleForTesting; public class ExecSourceConfigurationConstants { /** * Should the exec'ed command restarted if it dies: : default false */ public static final String CONFIG_RESTART = "restart"; public static final boolean DEFAULT_RESTART = false; /** * Amount of time to wait before attempting a restart: : default 10000 ms */ public static final String CONFIG_RESTART_THROTTLE = "restartThrottle"; public static final long DEFAULT_RESTART_THROTTLE = 10000L; /** * Should stderr from the command be logged: default false */ public static final String CONFIG_LOG_STDERR = "logStdErr"; public static final boolean DEFAULT_LOG_STDERR = false; /** * Number of lines to read at a time */ public static final String CONFIG_BATCH_SIZE = "batchSize"; public static final int DEFAULT_BATCH_SIZE = 20; /** * Amount of time to wait, if the buffer size was not reached, before * to data is pushed downstream: : default 3000 ms */ public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout"; public static final long DEFAULT_BATCH_TIME_OUT = 3000L; /** * Charset for reading input */ public static final String CHARSET = "charset"; public static final String DEFAULT_CHARSET = "UTF-8"; /** * Optional shell/command processor used to run command */ public static final String CONFIG_SHELL = "shell"; /** * 自定义分隔符,默认使用逗号分割 */ public static final String CUSTOM_SPLIT_DELIMITER = "customSplitDelimiter"; public static final String DEFAULT_CUSTOM_SPLIT_DELIMITER = ","; /** * split的开关,默认关闭 */ public static final String CUSTOM_SPLIT_SWITCH_ON = "customSwitchOn"; public static final boolean DEFAULT_CUSTON_SWITCH_ON = false; /** * split后获取哪一列,从0开始,同数组下标 */ public static final String CUSTOM_FETCH_COL = "customFetchCol"; public static final int DEFAULT_CUSTOM_FETCH_COL_ID = 0; }
修改flume-custom.properties
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' hd1.sources=s1 hd1.sources.s1.type=exec hd1.sources.s1.shell=/bin/bash -c #方法一 #hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}' #方法二,改源码 hd1.sources.s1.command=tail -F /root/test.log hd1.sources.s1.customSwitchOn=true hd1.sources.s1.customFetchCol=1 hd1.channels=c1 hd1.channels.c1.type=memory hd1.channels.c1.capacity=1000 hd1.channels.c1.transcationCapacity=100 hd1.sinks=sk1 hd1.sinks.sk1.type=logger #把source 和 sink 关联到channel上 #1个source可以对应多个channel(重点) hd1.sources.s1.channels=c1 #一个sink只对应1个sink(重点) hd1.sinks.sk1.channel=c1
做完以上修改 ,重新打包 flume-ng-core模块,把打完的新包,替换掉服务器的apache-flume-1.8.0-bin/lib目录下的flume-ng-core-1.8.0.jar (这个包的版本号,根据你实际的版本去替换)即可,使用上面修改好的 flume-custom.properties启动flume测试即可
上一步改完源码重新打包,flume pom.xml中配置了代码风格检测的插件,需要注释掉这个插件,否则会因为编码风格不一致,打包报错
思考:
- linux的一些常用命令还是要好好掌握,遇到问题慢慢分析,例如:fflush()
- 不要局限在解决问题,要发散,多扩展思路
- 源码其实并没那么难改,有时候限制我们的,可能是我们自己的耐心,细心等,例如:源码编译过程比较耗时,各种找不到包等,很可能让你望而却步了
- 多看官网,多查阅,多和大牛交流
相关推荐
### Flume采集数据到Kafka,然后从Kafka取数据存储到HDFS的方法思路和完整步骤 #### 一、概述 随着大数据技术的发展,高效的数据采集、处理与存储变得尤为重要。本文将详细介绍如何利用Flume采集日志数据,并将其...
- **Source:** Source是Flume中负责采集数据的组件,它可以接入各种数据源,如系统日志、应用程序日志、网络数据流等。 - **Channel:** Channel相当于一个临时存储区,用于中转事件数据。它像一个队列一样工作,...
Flume提供了多种类型的Source,如Exec Source可以监听系统命令的输出,TailDIR Source用于实时监控并读取指定文件的新写入内容,Spooling DIR Source则关注指定目录中新增文件的内容,而Netcat Source则能监听网络...
例如,`ExecSource` 可以从外部程序或脚本中获取数据。 ##### 源码分析 - Channel - Transaction Channel 在 Flume NG 架构中扮演着数据缓冲的角色,它支持事务处理以确保数据的一致性和完整性。当数据从 Source ...
Source负责从数据源获取数据,Channel作为临时存储,Sink则将数据传输到目的地。Flume通过这些组件的组合,构建出灵活的数据流管道。 3. **配置Flume监控日志到HDFS**: - **环境配置**:首先确保Hadoop和Java环境...
Flume支持多种源,如Avro Source, Thrift Source, Exec Source等。 - **通道(Channel)**:通道是介于源和接收器之间的存储设施。数据一旦被源捕获,它将被发送到通道中。通道对数据提供事务性支持,保证数据的...
在Flume的conf目录下创建lib目录,并将Hadoop、Hive相关的JAR包(例如hadoop-client、hive-exec等)复制到该目录,以支持Flume与Hive的通信。 对于Hive 3.1.2版本,需要注意的是Hive Metastore服务必须启动,因为...
- **Avro Source**: 支持 Avro 协议的数据源,用于接收来自其他 Flume Agent 的数据。 - **Exec Source**: 通过执行外部命令来捕获其标准输出或错误输出。 - **Taildir Source**: 监听指定目录下的文件变化,读取...
- **类型**:Flume支持多种类型的Source,如Avro、Thrift、Exec、JMS等,这些Source可以处理不同来源的数据。 ##### 1.2.3 Channel - **功能**:Channel作为Source和Sink之间的桥梁,用来暂存来自Source的数据。 -...
在Flume配置文件(如`conf/flume.conf`)中,我们需要定义一个Agent,并指定其Source、Channel和Sink。对于Elasticsearch Sink,配置示例如下: ``` agent.sources = source1 agent.channels = channel1 agent....
这里,`source1`使用`exec`类型从指定的日志文件中读取新行,`channel1`是一个内存通道,用于临时存储从源获取的数据,而`sink1`则将数据发送到Kafka的`log_data`主题。 完成配置后,可以通过以下命令启动Flume任务...
Source 从外部获取数据,封装成 Event,然后通过 ChannelProcessor 发送至 Channel。 7. **自定义 Sink** 用户可以创建自定义的 Sink 来适应特定的输出需求,需要继承相应的基类并实现相关接口,以确保数据正确...
1. 常见的Flume source包括:Avro Source(用于接收Avro格式的数据)、Exec Source(执行系统命令并捕获输出作为事件)、JDBC Source(从数据库中抽取数据)、Netcat Source(接收TCP或UDP数据流)和Thrift Source...
4. **利用 tail 命令获取数据**:如 `tail -F` 命令可以实时监控文件的变化,结合 Flume Source,可以持续地从文件中收集新增数据并下沉到 HDFS。 在配置中,`a1.sources.r1.command = tail -F /home/hadoop/log/...
在这个配置中,“logSource”是一个读取日志文件的source,使用“exec”类型从`/var/log/app.log`中获取数据。“timestamp”拦截器则为每条事件添加时间戳。而“elasticSink”是我们的目标sink,它连接到运行在本地...
`a1.sources.r1.type = exec` 表明我们使用的是执行命令类型的源,这允许Flume从外部命令获取数据。 `a1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.log` 指定Flume监听`/opt/module/data/flume....