flume的source用于收集日志,父类为AbstractSource,下图中的其他类都继承于AbstractSource

AvroSource
lifecycleAware 会调用start方法启动avroSource。avroSource主要启动了一个NettyServer用于接收数据,然后交由avroSource处理。
@Override
public void start() {
logger.info("Starting {}...", this);
//when receive data, AvroSourceProtocol will parse this data, then call AvroSource process received data
Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
//will create NioServerSocketChannelFactory depend on maxThreads
NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();
//will create SSLCompressionChannelPipelineFactory or ChannelPipelineFactory
ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();
server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
socketChannelFactory, pipelineFactory, null);
connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
server.start();
sourceCounter.start();
super.start();
final NettyServer srv = (NettyServer)server;
connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){
@Override
public void run() {
//用于监控connection count
sourceCounter.setOpenConnectionCount(
Long.valueOf(srv.getNumActiveConnections()));
}
}, 0, 60, TimeUnit.SECONDS);
logger.info("Avro source {} started.", getName());
}
当AvroSource接收到数据时,会调用append函数,append函数会调用getChannelProcessor().processEvent处理接收的event
@Override
public Status append(AvroFlumeEvent avroEvent) {
logger.debug("Avro source {}: Received avro event: {}", getName(),
avroEvent);
sourceCounter.incrementAppendReceivedCount();
sourceCounter.incrementEventReceivedCount();
Event event = EventBuilder.withBody(avroEvent.getBody().array(),
toStringMap(avroEvent.getHeaders()));
try {
调用 getChannelProcessor().processEvent处理接收的event
getChannelProcessor().processEvent(event);
} catch (ChannelException ex) {
logger.warn("Avro source " + getName() + ": Unable to process event. " +
"Exception follows.", ex);
return Status.FAILED;
}
sourceCounter.incrementAppendAcceptedCount();
sourceCounter.incrementEventAcceptedCount();
return Status.OK;
}
AvroLegacySource
AvroLegacySource implementation that receives Avro events from Avro sink of Flume OG
@Override
public void start() {
// setup http server to receive OG events
res = new SpecificResponder(FlumeOGEventAvroServer.class, this);
try {
http = new HttpServer(res, host, port);
} catch (IOException eI) {
LOG.warn("Failed to start server", eI);
return;
}
http.start();
super.start();
}
@Override
public Void append( AvroFlumeOGEvent evt ) throws AvroRemoteException {
counterGroup.incrementAndGet("rpc.received");
Map<String, String> headers = new HashMap<String, String>();
// extract Flume OG event headers
headers.put(HOST, evt.getHost().toString());
headers.put(TIMESTAMP, evt.getTimestamp().toString());
headers.put(PRIORITY, evt.getPriority().toString());
headers.put(NANOS, evt.getNanos().toString());
for (Entry<CharSequence, ByteBuffer> entry : evt.getFields().entrySet()) {
headers.put(entry.getKey().toString(), entry.getValue().toString());
}
headers.put(OG_EVENT, "yes");
Event event = EventBuilder.withBody(evt.getBody().array(), headers);
try {
getChannelProcessor().processEvent(event);
counterGroup.incrementAndGet("rpc.events");
} catch (ChannelException ex) {
return null;
}
counterGroup.incrementAndGet("rpc.successful");
return null;
}
EmbeddedSource
EmbeddedSource is simple source used to allow direct access to the channel for the Embedded Agent. There is a EmbeddedAgent class. When call EmbeddedAgent put event, EmbeddedAgent will call put method of EmbeddedSource, EmbeddedSource directory call processEvent function.
public class EmbeddedSource extends AbstractSource
implements EventDrivenSource, Configurable {
@Override
public void configure(Context context) {
}
public void put(Event event) throws ChannelException {
getChannelProcessor().processEvent(event);
}
public void putAll(List<Event> events) throws ChannelException {
getChannelProcessor().processEventBatch(events);
}
}
ExecSource
execsource启动了一个ExecRunnable用于执行command
public void start() {
logger.info("Exec source starting with command:{}", command);
executor = Executors.newSingleThreadExecutor();
runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
// FIXME: Use a callback-like executor / future to signal us upon failure.
runnerFuture = executor.submit(runner);
/*
* NB: This comes at the end rather than the beginning of the method because
* it sets our state to running. We want to make sure the executor is alive
* and well first.
*/
sourceCounter.start();
super.start();
logger.debug("Exec source started");
}
下面是ExecRunnable的run函数,先启动了一个定时任务用于定时刷新数据到channel,然后从process的input stream 读数据,并提交到channel,当process结束的时候,如果需要restart,会重启另一个处理进程。
public void run() {
do {
String exitCode = "unknown";
BufferedReader reader = null;
String line = null;
final List<Event> eventList = new ArrayList<Event>();
timedFlushService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(
"timedFlushExecService" +
Thread.currentThread().getId() + "-%d").build());
try {
if(shell != null) {
String[] commandArgs = formulateShellCommand(shell, command);
process = Runtime.getRuntime().exec(commandArgs);
} else {
String[] commandArgs = command.split("\\s+");
process = new ProcessBuilder(commandArgs).start();
}
reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), charset));
// StderrLogger dies as soon as the input stream is invalid
StderrReader stderrReader = new StderrReader(new BufferedReader(
new InputStreamReader(process.getErrorStream(), charset)), logStderr);
stderrReader.setName("StderrReader-[" + command + "]");
stderrReader.setDaemon(true);
stderrReader.start();
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if(!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
while ((line = reader.readLine()) != null) {
synchronized (eventList) {
sourceCounter.incrementEventReceivedCount();
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if(eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}
synchronized (eventList) {
if(!eventList.isEmpty()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Failed while running command: " + command, e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
logger.error("Failed to close reader for exec source", ex);
}
}
exitCode = String.valueOf(kill());
}
if(restart) {
logger.info("Restarting in {}ms, exit code {}", restartThrottle,
exitCode);
try {
Thread.sleep(restartThrottle);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
logger.info("Command [" + command + "] exited with " + exitCode);
}
} while(restart);
}
HTTPSource
httpsource which accepts Flume Events by HTTP POST and GET. GET should be used for experimentation only. HTTP requests are converted into flume events by a pluggable "handler" which must implement the {@linkplain HTTPSourceHandler} interface.
在start函数中启动了server并且调用FlumeHTTPServlet中的doPost方法来处理接收到的request
@Override
public void start() {
Preconditions.checkState(srv == null,
"Running HTTP Server found in source: " + getName()
+ " before I started one."
+ "Will not attempt to start.");
srv = new Server();
SocketConnector connector = new SocketConnector();
connector.setPort(port);
connector.setHost(host);
srv.setConnectors(new Connector[] { connector });
try {
org.mortbay.jetty.servlet.Context root =
new org.mortbay.jetty.servlet.Context(
srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
srv.start();
Preconditions.checkArgument(srv.getHandler().equals(root));
} catch (Exception ex) {
LOG.error("Error while starting HTTPSource. Exception follows.", ex);
Throwables.propagate(ex);
}
Preconditions.checkArgument(srv.isRunning());
sourceCounter.start();
super.start();
}
@Override
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException {
List<Event> events = Collections.emptyList(); //create empty list
try {
events = handler.getEvents(request);
} catch (HTTPBadRequestException ex) {
LOG.warn("Received bad request from client. ", ex);
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Bad request from client. "
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Deserializer threw unexpected exception. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Deserializer threw unexpected exception. "
+ ex.getMessage());
return;
}
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
try {
getChannelProcessor().processEventBatch(events);
} catch (ChannelException ex) {
LOG.warn("Error appending event to channel. "
+ "Channel might be full. Consider increasing the channel "
+ "capacity or make sure the sinks perform faster.", ex);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Error appending event to channel. Channel might be full."
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Unexpected error appending event to channel. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Unexpected error while appending event to channel. "
+ ex.getMessage());
return;
}
response.setCharacterEncoding(request.getCharacterEncoding());
response.setStatus(HttpServletResponse.SC_OK);
response.flushBuffer();
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
}
MultiportSyslogTCPSource
MultiportSyslogTCPSource是一个多端口的SyslogTCPSource,用于接收多个端口上的TCP消息。在start方法中启动了一个acceptor,并且bind到多个端口,当有消息接收到时会调用MultiportSyslogHandler的messageReceived方法,将line String转换成event,提交到channel。(用mina来实现传输)
@Override
public void start() {
logger.info("Starting {}...", this);
// allow user to specify number of processors to use for thread pool
if (numProcessors != null) {
acceptor = new NioSocketAcceptor(numProcessors);
} else {
acceptor = new NioSocketAcceptor();
}
acceptor.setReuseAddress(true);
acceptor.getSessionConfig().setReadBufferSize(readBufferSize);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.setHandler(new MultiportSyslogHandler(maxEventSize, batchSize,
getChannelProcessor(), sourceCounter, portHeader, defaultDecoder,
portCharsets));
for (int port : ports) {
InetSocketAddress addr;
if (host != null) {
addr = new InetSocketAddress(host, port);
} else {
addr = new InetSocketAddress(port);
}
try {
//Not using the one that takes an array because we won't want one bind
//error affecting the next.
acceptor.bind(addr);
} catch (IOException ex) {
logger.error("Could not bind to address: " + String.valueOf(addr), ex);
}
}
sourceCounter.start();
super.start();
logger.info("{} started.", this);
}
public void messageReceived(IoSession session, Object message) {
IoBuffer buf = (IoBuffer) message;
IoBuffer savedBuf = (IoBuffer) session.getAttribute(SAVED_BUF);
ParsedBuffer parsedLine = new ParsedBuffer();
List<Event> events = Lists.newArrayList();
// the character set can be specified per-port
CharsetDecoder decoder = defaultDecoder.get();
int port =
((InetSocketAddress) session.getLocalAddress()).getPort();
if (portCharsets.containsKey(port)) {
decoder = portCharsets.get(port).get();
}
// while the buffer is not empty
while (buf.hasRemaining()) {
events.clear();
// take number of events no greater than batchSize
for (int num = 0; num < batchSize && buf.hasRemaining(); num++) {
if (lineSplitter.parseLine(buf, savedBuf, parsedLine)) {
Event event = parseEvent(parsedLine, decoder);
if (portHeader != null) {
event.getHeaders().put(portHeader, String.valueOf(port));
}
events.add(event);
} else {
logger.trace("Parsed null event");
}
}
// don't try to write anything if we didn't get any events somehow
if (events.isEmpty()) {
logger.trace("Empty set!");
return;
}
int numEvents = events.size();
sourceCounter.addToEventReceivedCount(numEvents);
// write the events to the downstream channel
try {
channelProcessor.processEventBatch(events);
sourceCounter.addToEventAcceptedCount(numEvents);
} catch (Throwable t) {
logger.error("Error writing to channel, event dropped", t);
if (t instanceof Error) {
Throwables.propagate(t);
}
}
}
}
NetcatSource
NetcatSource open了一个ServerSocketChannel,用于接收client的链接,当接收到数据的时候调用NetcatSocketHandler的run函数来解析line string数据。(采用bio)
@Override
public void start() {
logger.info("Source starting");
counterGroup.incrementAndGet("open.attempts");
handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat("netcat-handler-%d").build());
try {
SocketAddress bindPoint = new InetSocketAddress(hostName, port);
serverSocket = ServerSocketChannel.open();
serverSocket.socket().setReuseAddress(true);
serverSocket.socket().bind(bindPoint);
logger.info("Created serverSocket:{}", serverSocket);
} catch (IOException e) {
counterGroup.incrementAndGet("open.errors");
logger.error("Unable to bind to socket. Exception follows.", e);
throw new FlumeException(e);
}
AcceptHandler acceptRunnable = new AcceptHandler(maxLineLength);
acceptThreadShouldStop.set(false);
acceptRunnable.counterGroup = counterGroup;
acceptRunnable.handlerService = handlerService;
acceptRunnable.shouldStop = acceptThreadShouldStop;
acceptRunnable.ackEveryEvent = ackEveryEvent;
acceptRunnable.source = this;
acceptRunnable.serverSocket = serverSocket;
acceptThread = new Thread(acceptRunnable);
acceptThread.start();
logger.debug("Source started");
super.start();
}
@Override
public void run() {
logger.debug("Starting connection handler");
Event event = null;
try {
Reader reader = Channels.newReader(socketChannel, "utf-8");
Writer writer = Channels.newWriter(socketChannel, "utf-8");
CharBuffer buffer = CharBuffer.allocate(maxLineLength);
buffer.flip(); // flip() so fill() sees buffer as initially empty
while (true) {
// this method blocks until new data is available in the socket
int charsRead = fill(buffer, reader);
logger.debug("Chars read = {}", charsRead);
// attempt to process all the events in the buffer
int eventsProcessed = processEvents(buffer, writer);
logger.debug("Events processed = {}", eventsProcessed);
if (charsRead == -1) {
// if we received EOF before last event processing attempt, then we
// have done everything we can
break;
} else if (charsRead == 0 && eventsProcessed == 0) {
if (buffer.remaining() == buffer.capacity()) {
// If we get here it means:
// 1. Last time we called fill(), no new chars were buffered
// 2. After that, we failed to process any events => no newlines
// 3. The unread data in the buffer == the size of the buffer
// Therefore, we are stuck because the client sent a line longer
// than the size of the buffer. Response: Drop the connection.
logger.warn("Client sent event exceeding the maximum length");
counterGroup.incrementAndGet("events.failed");
writer.write("FAILED: Event exceeds the maximum length (" +
buffer.capacity() + " chars, including newline)\n");
writer.flush();
break;
}
}
}
socketChannel.close();
counterGroup.incrementAndGet("sessions.completed");
} catch (IOException e) {
counterGroup.incrementAndGet("sessions.broken");
}
logger.debug("Connection handler exiting");
}
ScribeSource
Flume should adopt the Scribe entry {@code LogEntry} from existing Scribe system. Mostly, we may receive message from local Scribe and Flume take responsibility of central Scribe. Scribe是一个分布式的日志收集系统。facebook 广泛采用它,一般用scrbe收集数据,hdfs存储数据,mapreduce 处理数据。start 方法中启动了一个Startup 线程,在startup线程中启动了THsHaServer,THsHaServer在接收到消息时将调用Receiver的Log方法
private class Startup extends Thread {
public void run() {
try {
Scribe.Processor processor = new Scribe.Processor(new Receiver());
TNonblockingServerTransport transport = new TNonblockingServerSocket(port);
THsHaServer.Args args = new THsHaServer.Args(transport);
args.workerThreads(workers);
args.processor(processor);
args.transportFactory(new TFramedTransport.Factory());
args.protocolFactory(new TBinaryProtocol.Factory(false, false));
server = new THsHaServer(args);
LOG.info("Starting Scribe Source on port " + port);
server.serve();
} catch (Exception e) {
LOG.warn("Scribe failed", e);
}
}
}
@Override
public void start() {
Startup startupThread = new Startup();
startupThread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}
if (!server.isServing()) {
throw new IllegalStateException("Failed initialization of ScribeSource");
}
sourceCounter.start();
super.start();
}
class Receiver implements Iface {
public ResultCode Log(List<LogEntry> list) throws TException {
if (list != null) {
sourceCounter.addToEventReceivedCount(list.size());
try {
List<Event> events = new ArrayList<Event>(list.size());
for (LogEntry entry : list) {
Map<String, String> headers = new HashMap<String, String>(1, 1);
headers.put(SCRIBE_CATEGORY, entry.getCategory());
Event event = EventBuilder.withBody(entry.getMessage().getBytes(), headers);
events.add(event);
}
if (events.size() > 0) {
getChannelProcessor().processEventBatch(events);
}
sourceCounter.addToEventAcceptedCount(list.size());
return ResultCode.OK;
} catch (Exception e) {
LOG.warn("Scribe source handling failure", e);
}
}
return ResultCode.TRY_LATER;
}
}
SequenceGeneratorSource
SequenceGeneratorSource是一个数字自动加1作为消息的source
SpoolDirectorySource
SpoolDirectorySource是用来监控文件夹下的文件的,它要求被监控的文件不能被修改,所以只能将文件MOVE到这个文件夹下。 在start方法中创建了一个reader对象,并生成了一个定时器,每500毫米调用一下
SpoolDirectoryRunnable的run函数,run函数中会调用read的readEvent函数
public void start() {
logger.info("SpoolDirectorySource source starting with directory: {}",
spoolDirectory);
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
File directory = new File(spoolDirectory);
try {
reader = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(directory)
.completedSuffix(completedSuffix)
.ignorePattern(ignorePattern)
.trackerDirPath(trackerDirPath)
.annotateFileName(fileHeader)
.fileNameHeader(fileHeaderKey)
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
.inputCharset(inputCharset)
.build();
} catch (IOException ioe) {
throw new FlumeException("Error instantiating spooling event parser",
ioe);
}
Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
executor.scheduleWithFixedDelay(
runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
super.start();
logger.debug("SpoolDirectorySource source started");
sourceCounter.start();
}
public List<Event> readEvents(int numEvents) throws IOException {
currentFile = getNextFile();
EventDeserializer des = currentFile.get().getDeserializer();
List<Event> events = des.readEvents(numEvents);
/* It's possible that the last read took us just up to a file boundary.
* If so, try to roll to the next file, if there is one. */
if (events.isEmpty()) {
retireCurrentFile();
currentFile = getNextFile();
if (!currentFile.isPresent()) {
return Collections.emptyList();
}
events = currentFile.get().getDeserializer().readEvents(numEvents);
}
if (annotateFileName) {
String filename = currentFile.get().getFile().getAbsolutePath();
for (Event event : events) {
event.getHeaders().put(fileNameHeader, filename);
}
}
committed = false;
lastFileRead = currentFile;
return events;
}
SyslogUDPSource 和SyslogTcpSource 主要用于处理Syslog的收集。在start方法中启动了一个server,并且在
messageReceived方法中调用syslogUtils.extractEvent方法去获得Event
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) {
ChannelBuffer buff = (ChannelBuffer) mEvent.getMessage();
while (buff.readable()) {
Event e = syslogUtils.extractEvent(buff);
if (e == null) {
logger.debug("Parsed partial event, event will be generated when " +
"rest of the event is received.");
continue;
}
try {
getChannelProcessor().processEvent(e);
counterGroup.incrementAndGet("events.success");
} catch (ChannelException ex) {
counterGroup.incrementAndGet("events.dropped");
logger.error("Error writting to channel, event dropped", ex);
}
}
}
}
@Override
public void start() {
ChannelFactory factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
syslogTcpHandler handler = new syslogTcpHandler();
handler.setEventSize(eventSize);
handler.setFormater(formaterProp);
return Channels.pipeline(handler);
}
});
logger.info("Syslog TCP Source starting...");
if (host == null) {
nettyChannel = serverBootstrap.bind(new InetSocketAddress(port));
} else {
nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port));
}
super.start();
}
ThriftLegacySource 和ThriftSource 主要用Thrift传输日志
文档:http://blog.csdn.net/amuseme_lu/article/details/6262572。Thrift是Facebook的一个开源项目,主要是一个跨语言的服务开发框架。server提供借口,client远程调用接口。
分享到:
相关推荐
Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...
在修改后的版本`mlt-flume-ng-sql-source-1.0-SNAPSHOT.jar`中,源码优化了查询逻辑,使其能根据指定的时间字段进行增量抽取。这意味着,如果你有一个记录事件发生时间的字段,你可以设置该字段为增量条件,每次...
Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。在大数据领域,它常被用于收集来自不...通过其灵活的架构和丰富的功能,Flume使得日志数据的管理和分析变得更加简单和高效。
通过分析这个源码,你不仅可以了解 Flume 的内部工作机制,还可以学习如何扩展 Flume 来支持新的数据源、通道或接收器,或者优化 Flume 的性能和稳定性。同时,对于希望深入了解大数据处理和日志管理的人来说,研究...
在Flume配置文件(如`conf/flume.conf`)中,我们需要定义一个Agent,并指定其Source、Channel和Sink。对于Elasticsearch Sink,配置示例如下: ``` agent.sources = source1 agent.channels = channel1 agent....
文件名称列表中的“apache-flume-1.9.0-bin.tar.gz”、“zookeeper-3.3.6_.tar.gz”和“kafka_2.11-0.10.1.0.tgz”分别是Flume、ZooKeeper和Kafka的不同版本的源码包。用户需要先解压缩这些文件,然后根据各自的安装...
Flume NG的核心概念主要包括源(Source)、通道(Channel)和接收器(Sink)。源负责从各种数据源(如网络套接字、日志文件、Web服务器等)收集数据;通道则作为一个临时存储,确保数据在传输过程中的可靠性,它具有...
此外,源码分析对于解决实际部署中遇到的问题、优化性能或实现更高级的集成场景都非常有价值。 总结来说,`apache-flume-1.8.0-src.tar.gz`是一个包含Flume 1.8.0版本完整源代码的压缩包,它为开发者提供了深入了解...
01-Flume-课程介绍.mp4 02-Flume-学习任务.avi 03-Flume-概念.mp4 05-Flume-安装.avi 06-Flume官网案例-需求说明.avi ...35-Flume自定义Source-打包测试.avi 38-Flume-事务源码.avi 42-Flume监控-测试W
在压缩包文件 "ces-flume-jdbc-source1.2.0" 中,我们可以期待找到 Flume JDBC Source 的实现代码,可能包括源码、编译后的 JAR 文件、配置示例和相关的文档。这些资源可以帮助开发者理解和定制 JDBC Source,以适应...
##### 源码分析 - Source Flume NG 的 Source 是数据收集的第一步,它可以监听不同的数据源并将数据送入 Channel。例如,`ExecSource` 可以从外部程序或脚本中获取数据。 ##### 源码分析 - Channel - Transaction ...
"flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的基本概念和架构** Flume 主要由三个核心组件构成:Sources、Channels 和 Sinks。Sources 负责接收...
通过分析 Apache Flume 1.7.0 的源码,开发者可以深入了解其内部工作流程,优化性能,解决实际问题,并为构建自己的数据采集和传输解决方案打下坚实基础。同时,对于大数据领域的研究者,源码也是深入研究数据处理...
agent.sources.dbSource.type = org.apache.flume.source.jdbc.JDBCSource agent.sources.dbSource.connectionUri = jdbc:mysql://localhost:3306/mydb agent.sources.dbSource.sql = SELECT * FROM mytable ...
- 扩展性和可插拔架构:分析 Flume 如何支持自定义 Source、Sink 和 Channel 的开发。 源代码阅读可以提供对 Flume 工作原理的深刻理解,这对于优化 Flume 配置、排查问题或开发自定义组件都非常有价值。如果你想要...
Flume是Apache开发的一款用于收集、聚合和移动大量日志数据的工具,而Elasticsearch则是一个分布式、实时的搜索与分析引擎,广泛用于大数据的存储、检索和分析。本篇文章将详细探讨如何将Flume 1.8.0版本与Elastic...
Flume 是 Apache Hadoop 生态系统中的一个分布式、可靠且可用于有效收集、聚合和移动大量日志数据的...这样,你可以轻松地管理和分析大量的日志数据,尤其是在分布式环境中,Flume 提供了一种高效且可扩展的解决方案。
为了编译Flume源码,打开命令行,导航至Flume源码根目录,然后运行`mvn clean install`命令。这个过程会下载所有依赖库,编译源代码,并将编译后的JAR文件放置到`target`目录下。如果编译过程中没有错误,这意味着源...
通过分析和学习这个压缩包中的源码,开发者可以深入理解Flume的工作机制,了解如何利用Java实现类似的数据处理服务,同时也能为自定义Source、Sink和Channel提供参考。这将有助于在实际项目中更好地利用Flume来构建...