`
jinnianshilongnian
  • 浏览: 21499237 次
  • 性别: Icon_minigender_1
博客专栏
5c8dac6a-21dc-3466-8abb-057664ab39c7
跟我学spring3
浏览量:2417753
D659df3e-4ad7-3b12-8b9a-1e94abd75ac3
Spring杂谈
浏览量:3008041
43989fe4-8b6b-3109-aaec-379d27dd4090
跟开涛学SpringMVC...
浏览量:5638944
1df97887-a9e1-3328-b6da-091f51f886a1
Servlet3.1规范翻...
浏览量:259796
4f347843-a078-36c1-977f-797c7fc123fc
springmvc杂谈
浏览量:1597022
22722232-95c1-34f2-b8e1-d059493d3d98
hibernate杂谈
浏览量:250104
45b32b6f-7468-3077-be40-00a5853c9a48
跟我学Shiro
浏览量:5858130
Group-logo
跟我学Nginx+Lua开...
浏览量:701773
5041f67a-12b2-30ba-814d-b55f466529d5
亿级流量网站架构核心技术
浏览量:784895
社区版块
存档分类
最新评论

Flume架构与源码分析-核心组件分析-1

阅读更多

 

首先所有核心组件都会实现org.apache.flume.lifecycle.LifecycleAware接口:

public interface LifecycleAware {
  public void start();
  public void stop();
  public LifecycleState getLifecycleState();
}

start方法在整个Flume启动时或者初始化组件时都会调用start方法进行组件初始化,Flume组件出现异常停止时会调用stop,getLifecycleState返回组件的生命周期状态,有IDLE, START, STOP, ERROR四个状态。

 

如果开发的组件需要配置,如设置一些属性;可以实现org.apache.flume.conf.Configurable接口: 

public interface Configurable {
   public void configure(Context context);
}

Flume在启动组件之前会调用configure来初始化组件一些配置。

 

1、Source

Source用于采集日志数据,有两种实现方式:轮训拉取和事件驱动机制;Source接口如下:

public interface Source extends LifecycleAware, NamedComponent {
  public void setChannelProcessor(ChannelProcessor channelProcessor);
  public ChannelProcessor getChannelProcessor();
} 

Source接口首先继承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口,也就是说它的的所有逻辑的实现应该在LifecycleAware接口的start和stop中实现;ChannelProcessor之前介绍过用来进行日志流的过滤和Channel的选择及调度。

 

Source是通过SourceFactory工厂创建,默认提供了DefaultSourceFactory,其首先通过Enum类型org.apache.flume.conf.source.SourceType查找默认实现,如exec,则找到org.apache.flume.source.ExecSource实现,如果找不到直接Class.forName(className)创建。 

 

Source提供了两种机制: PollableSource(轮训拉取)和EventDrivenSource(事件驱动):


   

PollableSource默认提供了如下实现:

比如JMSSource实现使用javax.jms.MessageConsumer.receive(pollTimeout)主动去拉取消息。

 

EventDrivenSource默认提供了如下实现:


  

比如NetcatSourceHttpSource就是事件驱动,即被动等待;比如HttpSource就是内部启动了一个内嵌的Jetty启动了一个Servlet容器,通过FlumeHTTPServlet去接收消息。

 

Flume提供了SourceRunner用来启动Source的流转:


 

public class EventDrivenSourceRunner extends SourceRunner {
  private LifecycleState lifecycleState;
  public EventDrivenSourceRunner() {
      lifecycleState = LifecycleState.IDLE; //启动之前是空闲状态
  }

  @Override
  public void start() {
    Source source = getSource(); //获取Source
    ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
    cp.initialize(); //初始化Channel处理器
    source.start();  //启动Source
    lifecycleState = LifecycleState.START; //本组件状态改成启动状态
  }
  @Override
  public void stop() {
    Source source = getSource(); //先停Source
    source.stop();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.close();//再停Channel处理器
    lifecycleState = LifecycleState.STOP; //本组件状态改成停止状态
  }
} 

从本组件也可以看出:1、首先要初始化ChannelProcessor,其实现时初始化过滤器链;2、接着启动Source并更改本组件的状态。

 

public class PollableSourceRunner extends SourceRunner {
 @Override
 public void start() {
  PollableSource source = (PollableSource) getSource();
  ChannelProcessor cp = source.getChannelProcessor();
  cp.initialize();
  source.start();

  runner = new PollingRunner();
  runner.source = source;
  runner.counterGroup = counterGroup;
  runner.shouldStop = shouldStop;

  runnerThread = new Thread(runner);
  runnerThread.setName(getClass().getSimpleName() + "-" + 
      source.getClass().getSimpleName() + "-" + source.getName());
  runnerThread.start(); 

  lifecycleState = LifecycleState.START;
 }
} 

 

PollingRunner首先初始化组件,但是又启动了一个线程PollingRunner,其作用就是轮训拉取数据: 

  @Override
  public void run() {
    while (!shouldStop.get()) { //如果没有停止,则一直在死循环运行
      counterGroup.incrementAndGet("runner.polls");

      try {
        //调用PollableSource的process方法进行轮训拉取,然后判断是否遇到了失败补偿
        if (source.process().equals(PollableSource.Status.BACKOFF)) {/
          counterGroup.incrementAndGet("runner.backoffs");

          //失败补偿时暂停线程处理,等待超时时间之后重试
          Thread.sleep(Math.min(
              counterGroup.incrementAndGet("runner.backoffs.consecutive")
              * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
        } else {
          counterGroup.set("runner.backoffs.consecutive", 0L);
        }
      } catch (InterruptedException e) {
                }
      }
    }
  }
} 

Flume在启动时会判断SourcePollableSource还是EventDrivenSource来选择使用PollableSourceRunner还是EventDrivenSourceRunner

 

 

比如HttpSource实现,其通过FlumeHTTPServlet接收消息然后: 

    List<Event> events = Collections.emptyList(); //create empty list
    //首先从请求中获取Event
    events = handler.getEvents(request);
    //然后交给ChannelProcessor进行处理
    getChannelProcessor().processEventBatch(events); 

到此基本的Source流程就介绍完了,其作用就是监听日志,采集,然后交给ChannelProcessor进行处理。

 

 

2Channel

Channel用于连接SourceSinkSource生产日志发送到ChannelSinkChannel消费日志;也就是说通过Channel实现了SourceSink的解耦,可以实现多对多的关联,和SourceSink的异步化。     

 

之前Source采集到日志后会交给ChannelProcessor处理,那么接下来我们先从ChannelProcessor入手,其依赖三个组件: 

  private final ChannelSelector selector;  //Channel选择器
  private final InterceptorChain interceptorChain; //拦截器链
  private ExecutorService execService; //用于实现可选Channel的ExecutorService,默认是单线程实现 

 

接下来看下其是如何处理Event的: 

public void processEvent(Event event) {
  event = interceptorChain.intercept(event); //首先进行拦截器链过滤
  if (event == null) {
    return;
  }
  List<Event> events = new ArrayList<Event>(1);
  events.add(event);

  //通过Channel选择器获取必须成功处理的Channel,然后事务中执行
  List<Channel> requiredChannels = selector.getRequiredChannels(event);
  for (Channel reqChannel : requiredChannels) { 
    executeChannelTransaction(reqChannel, events, false);
  }

  //通过Channel选择器获取可选的Channel,这些Channel失败是可以忽略,不影响其他Channel的处理
  List<Channel> optionalChannels = selector.getOptionalChannels(event);
  for (Channel optChannel : optionalChannels) {
    execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
  }
} 

 

另外内部还提供了批处理实现方法processEventBatch;对于内部事务实现的话可以参考executeChannelTransaction方法,整体事务机制类似于JDBC

private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
  //1、获取Channel上的事务
  Transaction tx = channel.getTransaction();
  Preconditions.checkNotNull(tx, "Transaction object must not be null");
  try {
    //2、开启事务
    tx.begin();
    //3、在Channel上执行批量put操作
    for (Event event : batch) {
      channel.put(event);
    }
    //4、成功后提交事务
    tx.commit();
  } catch (Throwable t) {
    //5、异常后回滚事务
    tx.rollback();
    if (t instanceof Error) {
       LOG.error("Error while writing to channel: " +
           channel, t);
       throw (Error) t;
    } else if(!isOptional) {//如果是可选的Channel,异常忽略
       throw new ChannelException("Unable to put batch on required " +
             "channel: " + channel, t);
    }
  } finally {
    //最后关闭事务
    tx.close();
  }
}

 

Interceptor用于过滤Event,即传入一个Event然后进行过滤加工,然后返回一个新的Event,接口如下:   

public interface Interceptor {
    public void initialize();
    public Event intercept(Event event);
    public List<Event> intercept(List<Event> events);
    public void close();
} 

可以看到其提供了initializeclose方法用于启动和关闭;intercept方法用于过滤或加工Event。比如HostInterceptor拦截器用于获取本机IP然后默认添加到Event的字段为hostHeader中。

  

接下来就是ChannelSelector选择器了,其通过如下方式创建: 

//获取ChannelSelector配置,比如agent.sources.s1.selector.type = replicating
ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
//使用Source关联的Channel创建,比如agent.sources.s1.channels = c1 c2
ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig); 

 

ChannelSelector默认提供了两种实现:复制和多路复用:


默认实现是复制选择器ReplicatingChannelSelector,即把接收到的消息复制到每一个Channel;多路复用选择器MultiplexingChannelSelector会根据Event Header中的参数进行选择,以此来选择使用哪个Channel

 

ChannelEvent中转的地方,Source发布EventChannelSink消费ChannelEventChannel接口提供了如下接口用来实现Event流转:  

public interface Channel extends LifecycleAware, NamedComponent {
  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  public Transaction getTransaction();
} 

put用于发布Eventtake用于消费EventgetTransaction用于事务支持。默认提供了如下Channel的实现: 



 对于Channel的实现我们后续单独章节介绍。

 

3Sink

SinkChannel消费Event,然后进行转移到收集/聚合层或存储层。Sink接口如下所示: 

public interface Sink extends LifecycleAware, NamedComponent {
  public void setChannel(Channel channel);
  public Channel getChannel();
  public Status process() throws EventDeliveryException;
  public static enum Status {
    READY, BACKOFF
  }
} 

类似于Source,其首先继承了LifecycleAware,然后提供了Channelgetter/setter方法,并提供了process方法进行消费,此方法会返回消费的状态,READYBACKOFF

 

Sink也是通过SinkFactory工厂来创建,其也提供了DefaultSinkFactory默认工厂,比如传入hdfs,会先查找Enum org.apache.flume.conf.sink.SinkType,然后找到相应的默认处理类org.apache.flume.sink.hdfs.HDFSEventSink,如果没找到默认处理类,直接通过Class.forName(className)进行反射创建。  

 

我们知道Sink还提供了分组功能,用于把多个Sink聚合为一组进行使用,内部提供了SinkGroup用来完成这个事情。此时问题来了,如何去调度多个Sink,其内部使用了SinkProcessor来完成这个事情,默认提供了故障转移和负载均衡两个策略。

 

首先SinkGroup就是聚合多个Sink为一组,然后将多个Sink传给SinkProcessorFactory进行创建SinkProcessor,而策略是根据配置文件中配置的如agent.sinkgroups.g1.processor.type = load_balance来选择的。

 

SinkProcessor提供了如下实现:

DefaultSinkProcessor:默认实现,用于单个Sink的场景使用。

FailoverSinkProcessor:故障转移实现: 

public Status process() throws EventDeliveryException {
  Long now = System.currentTimeMillis();
	//1、首先检查失败队列的头部的Sink是否已经过了失败补偿等待时间了
  while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
    //2、如果可以使用了,则从失败Sink队列获取队列第一个Sink
    FailedSink cur = failedSinks.poll();
    Status s;
    try {
      s = cur.getSink().process(); //3、使用此Sink进行处理
      if (s  == Status.READY) { //4、如果处理成功
        liveSinks.put(cur.getPriority(), cur.getSink()); //4.1、放回存活Sink队列
        activeSink = liveSinks.get(liveSinks.lastKey());
      } else {
        failedSinks.add(cur); //4.2、如果此时不是READY,即BACKOFF期间,再次放回失败队列
      }
      return s;
    } catch (Exception e) {
      cur.incFails(); //5、如果遇到异常了,则增加失败次数,并放回失败队列
      failedSinks.add(cur);
    }
  }

  Status ret = null;
  while(activeSink != null) { //6、此时失败队列中没有Sink能处理了,那么需要使用存活Sink队列进行处理
    try {
      ret = activeSink.process();
      return ret;
    } catch (Exception e) { //7、处理失败进行转移到失败队列
      activeSink = moveActiveToDeadAndGetNext();
    }
  }

  throw new EventDeliveryException("All sinks failed to process, " +
      "nothing left to failover to");
}

 

失败队列是一个优先级队列,使用refresh属性排序,而refresh是通过如下机制计算的: 

refresh = System.currentTimeMillis()
        + Math.min(maxPenalty, (1 << sequentialFailures) * FAILURE_PENALTY); 

其中maxPenalty是最大等待时间,默认30s,而(1 << sequentialFailures) * FAILURE_PENALTY)用于实现指数级等待时间递增, FAILURE_PENALTY1s

 

LoadBalanceSinkProcessor:用于实现Sink的负载均衡,其通过SinkSelector进行实现,类似于ChannelSelectorLoadBalanceSinkProcessor在启动时会根据配置,如agent.sinkgroups.g1.processor.selector = random进行选择,默认提供了两种选择器:


  

LoadBalanceSinkProcessor使用如下机制进行负载均衡: 

public Status process() throws EventDeliveryException {
  Status status = null;
  //1、使用选择器创建相应的迭代器,也就是用来选择Sink的迭代器
  Iterator<Sink> sinkIterator = selector.createSinkIterator();
  while (sinkIterator.hasNext()) {
    Sink sink = sinkIterator.next();
    try {
      //2、选择器迭代Sink进行处理,如果成功直接break掉这次处理,此次负载均衡就算完成了
      status = sink.process();
      break;
    } catch (Exception ex) {
      //3、失败后会通知选择器,采取相应的失败退避补偿算法进行处理
      selector.informSinkFailed(sink);
      LOGGER.warn("Sink failed to consume event. "
          + "Attempting next sink if available.", ex);
    }
  }
  if (status == null) {
    throw new EventDeliveryException("All configured sinks have failed");
  }
  return status;
} 

 

如上的核心就是怎么创建迭代器,如何进行失败退避补偿处理,首先我们看下RoundRobinSinkSelector实现,其内部是通过通用的RoundRobinOrderSelector选择器实现: 

public Iterator<T> createIterator() {
  //1、获取存活的Sink索引,
  List<Integer> activeIndices = getIndexList();
  int size = activeIndices.size();
  //2、如果上次记录的下一个存活Sink的位置超过了size,那么从队列头重新开始计数
  if (nextHead >= size) {
    nextHead = 0;
  }
  //3、获取本次使用的起始位置
  int begin = nextHead++;
  if (nextHead == activeIndices.size()) {
    nextHead = 0;
  }
  //4、从该位置开始迭代,其实现类似于环形队列,比如整个队列是5,起始位置是3,则按照 3、4、0、1、2的顺序进行轮训,实现了轮训算法 
  int[] indexOrder = new int[size];
  for (int i = 0; i < size; i++) {
    indexOrder[i] = activeIndices.get((begin + i) % size);
  }
  //indexOrder是迭代顺序,getObjects返回相关的Sinks;
  return new SpecificOrderIterator<T>(indexOrder, getObjects());
} 

 

getIndexList实现如下: 

protected List<Integer> getIndexList() {
  long now = System.currentTimeMillis();
  List<Integer> indexList = new ArrayList<Integer>();
  int i = 0;
  for (T obj : stateMap.keySet()) {
    if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
      indexList.add(i);
    }
    i++;
  }
  return indexList;
}

isShouldBackOff()表示是否开启退避算法支持,如果不开启,则认为每个Sink都是存活的,每次都会重试,通过agent.sinkgroups.g1.processor.backoff = true配置开启,默认falserestoreTime和之前介绍的refresh一样,是退避补偿等待时间,算法类似,就不多介绍了。 

 

那么什么时候调用Sink进行消费呢?其类似于SourceRunnerSink提供了SinkRunner进行轮训拉取处理,SinkRunner会轮训调度SinkProcessor消费Channel的消息,然后调用Sink进行转移。SinkProcessor之前介绍过,其负责消息复制/路由。

 

SinkRunner实现如下: 

public void start() {
  SinkProcessor policy = getPolicy();
  policy.start();
  runner = new PollingRunner();
  runner.policy = policy;
  runner.counterGroup = counterGroup;
  runner.shouldStop = new AtomicBoolean();
  runnerThread = new Thread(runner);
  runnerThread.setName("SinkRunner-PollingRunner-" +
      policy.getClass().getSimpleName());
  runnerThread.start();
  lifecycleState = LifecycleState.START;
} 

 

即获取SinkProcessor然后启动它,接着启动轮训线程去处理。PollingRunner线程负责轮训消息,核心实现如下: 

public void run() {
  while (!shouldStop.get()) { //如果没有停止
    try {
      if (policy.process().equals(Sink.Status.BACKOFF)) {//如果处理失败了,进行退避补偿处理
        counterGroup.incrementAndGet("runner.backoffs");
        Thread.sleep(Math.min(
            counterGroup.incrementAndGet("runner.backoffs.consecutive")
            * backoffSleepIncrement, maxBackoffSleep)); //暂停退避补偿设定的超时时间
      } else {
        counterGroup.set("runner.backoffs.consecutive", 0L);
      }
    } catch (Exception e) {
      try {
        Thread.sleep(maxBackoffSleep); //如果遇到异常则等待最大退避时间
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
  }
} 

 

整体实现类似于PollableSourceRunner实现,整体处理都是交给SinkProcessor完成的。SinkProcessor会轮训Sinkprocess方法进行处理;此处以LoggerSink为例:

@Override
public Status process() throws EventDeliveryException {
  Status result = Status.READY;
  Channel channel = getChannel();
  //1、获取事务
  Transaction transaction = channel.getTransaction();
  Event event = null;

  try {
    //2、开启事务
    transaction.begin();
    //3、从Channel获取Event
    event = channel.take();
    if (event != null) {
      if (logger.isInfoEnabled()) {
        logger.info("Event: " + EventHelper.dumpEvent(event, maxBytesToLog));
      }
    } else {//4、如果Channel中没有Event,则默认进入故障补偿机制,即防止死循环造成CPU负载高
      result = Status.BACKOFF;
    }
    //5、成功后提交事务
    transaction.commit();
  } catch (Exception ex) {
    //6、失败后回滚事务
    transaction.rollback();
    throw new EventDeliveryException("Failed to log event: " + event, ex);
  } finally {
    //7、关闭事务
    transaction.close();
  }
  return result;
} 

 

Sink中一些实现是支持批处理的,比如RollingFileSink

//1、开启事务
//2、批处理
for (int i = 0; i < batchSize; i++) {
  event = channel.take();
  if (event != null) {
    sinkCounter.incrementEventDrainAttemptCount();
    eventAttemptCounter++;
    serializer.write(event);
  }
}
//3、提交/回滚事务、关闭事务

 

定义一个批处理大小然后在事务中执行批处理。 

 

  • 大小: 9.6 KB
  • 大小: 9.2 KB
  • 大小: 8.4 KB
  • 大小: 5.1 KB
  • 大小: 8 KB
  • 大小: 14.3 KB
  • 大小: 8.5 KB
  • 大小: 7.7 KB
3
1
分享到:
评论

相关推荐

    flume-ng-sql-source-1.5.2

    - **架构**:Flume由三个主要组件构成:Sources、Channels和Sinks。Sources负责数据的摄入,Channels作为临时存储,Sinks负责将数据传输到目的地。 - **可靠性**:通过使用可配置的持久化Channels(如File Channel...

    flume-ng-1.6.0-cdh5.10.1.tar.gz的下载

    Apache Flume 是一个分布式、可靠且可用的服务,用于有效地收集、聚合和移动大量日志数据。在大数据领域,它常被用于收集来自不...通过其灵活的架构和丰富的功能,Flume使得日志数据的管理和分析变得更加简单和高效。

    apache-flume-1.8.0-src.tar.gz

    1. **Agent**:Flume的核心工作单元,它由Source、Channel和Sink三部分组成。Source负责接收数据,Channel作为临时存储,Sink则负责将数据发送到目标位置。 2. **Source**:数据流入Flume的入口,如日志文件、网络...

    flume1.7.0源码

    1. **Flume架构**: Flume 的核心架构由三个主要组件构成:Sources、Channels 和 Sinks。Sources 负责从各种数据源(如网络套接字、日志文件、应用程序API等)收集数据;Channels 作为临时存储,保证数据在传输过程...

    apache-flume-1.9.0-bin.tar,kafka_2.11-0.10.1.0,zookeeper-3.3.6_.tar

    标题中的三个文件“apache-flume-1.9.0-bin.tar”,“kafka_2.11-0.10.1.0”,以及“zookeeper-3.3.6_.tar”是三个重要的分布式系统组件,分别代表了Apache Flume、Apache Kafka和Apache ZooKeeper。这些组件在大...

    Flume ng share

    Flume NG 的架构主要由三个核心组件组成: 1. **Source**(源):负责从外部系统收集数据,并将其送入 Channel 中。 2. **Channel**(通道):作为所有数据的中转站,存储从 Source 收集的数据直到 Sink 处理。 3. ...

    flume-1.9.0.tgz

    在大数据处理领域,Flume 是一个至关重要的组件,尤其在日志管理和实时数据分析中扮演着核心角色。"flume-1.9.0.tgz" 是 Flume 的一个版本压缩包,包含了该版本的所有源码、库文件和必要的配置文档。 **Flume 的...

    Apache flume1.6_src

    Flume 的核心组件主要包括 Channel、Sink 和 Source 三部分: 1. **Source**:源,是数据流入 Flume 的入口。Flume 支持多种类型的 Source,如 TailSource(监听日志文件尾部)、AvroSource(接收 Avro 格式的数据...

    《深入理解Spark 核心思想与源码分析》耿嘉安 完整版带书签

    《深入理解Spark核心思想与源码分析》是耿嘉安撰写的一本专著,全面而深入地探讨了Apache Spark这一大数据处理框架的核心理念和技术细节。这本书不仅覆盖了Spark的基础概念,还深入到了源码层面,为读者揭示了Spark...

    word源码java-sparkstreaming:SparkStreaming实时流处理项目实战

    二、Flume架构及核心组件 三、Flume&JDK环境部署 1.前置条件 Java Runtime Environment - Java 1.8 or later Memory - Sufficient memory for configurations used by sources, channels or sinks Disk Space - ...

    基于Java的实例源码-日志服务器 Apache Flume.zip

    这个压缩包“基于Java的实例源码-日志服务器 Apache Flume.zip”包含了Apache Flume 1.2.0版本的源代码,可以为开发者提供深入理解Flume工作原理以及如何自定义Flume组件的参考。 Apache Flume的核心概念主要包括...

    flume+kafka+storm搭建

    此外,搭建过程中也需要注意各个组件之间的网络通信问题,如Zookeeper与Kafka之间的通信、Kafka集群内部的通信,以及Flume和Kafka之间的数据交互。确保通信顺畅需要做好相应的网络安全和权限设置。 总结来看,利用...

    深入理解Spark:核心思想与源码分析

    《深入理解Spark:核心思想与源码分析》是一本针对Apache Spark进行深度解析的专业书籍,旨在帮助读者透彻掌握Spark的核心理念与实现机制。Spark作为大数据处理领域的重要框架,其高性能、易用性和弹性分布式计算的...

    Flume配置双HA hdfsSink.docx

    **提取关键组件**:从Flume源码包中提取关键的Java类,这些类位于`apache-flume-1.8.0-src/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/`目录下,主要包括`BucketWriter`和`...

    基于 Apache Flume 定制的数据采集工具.zip

    1. **Flume 组件**:Flume 主要由三个基本组件构成:Source、Channel 和 Sink。Source 负责从数据源接收数据,如日志文件、网络套接字等;Channel 是一个临时存储区域,用于在 Source 和 Sink 之间传递事件;Sink ...

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    《构建基于Spark+Flume+Kafka+HBase的实时日志处理分析系统》 在当前大数据时代,实时数据处理已经成为企业决策与运营的关键。本项目以“基于Spark+Flume+Kafka+HBase的实时日志处理分析系统”为主题,旨在提供一个...

    基于Flume + Kafka + Spark的电商实时访问日志分析系统.zip

    在构建大数据处理系统时,基于Flume、Kafka和Spark的架构是一种常见且高效的选择,尤其适用于电商领域的实时访问日志分析。这个“基于Flume + Kafka + Spark的电商实时访问日志分析系统”项目,提供了完整的源码,...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和...

    日志服务器 Apache Flume

    1. **Flume 的核心概念** - **Agent**:Flume 的基本工作单元,负责数据的采集、传输和存储。每个 Agent 包含三个主要组件:Source、Channel 和 Sink。 - **Source**:负责接收数据,例如从应用程序服务器、网络...

Global site tag (gtag.js) - Google Analytics