`
侯上校
  • 浏览: 225721 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

flume加载配置

 
阅读更多
PropertiesFileConfigurationProvider.java
@Override
  public FlumeConfiguration getFlumeConfiguration() {
    BufferedReader reader = null;
    try {
      reader = new BufferedReader(new FileReader(file));
      Properties properties = new Properties();
      properties.load(reader);
      return new FlumeConfiguration(toMap(properties));
    } catch (IOException ex) {
      LOGGER.error("Unable to load file:" + file
          + " (I/O failure) - Exception follows.", ex);
    } finally {
      if (reader != null) {
        try {
          reader.close();
        } catch (IOException ex) {
          LOGGER.warn(
              "Unable to close file reader for file: " + file, ex);
        }
      }
    }
    return new FlumeConfiguration(new HashMap<String, String>());
  }


private Map<String, String> toMap(Properties properties) {
    Map<String, String> result = Maps.newHashMap();
    Enumeration<?> propertyNames = properties.propertyNames();
    while (propertyNames.hasMoreElements()) {
      String name = (String) propertyNames.nextElement();
      String value = properties.getProperty(name);
      result.put(name, value);
    }
    return result;
  }

  /**
   * Creates a populated Flume Configuration object.
   */
  public FlumeConfiguration(Map<String, String> properties) {
    agentConfigMap = new HashMap<String, AgentConfiguration>();
    errors = new LinkedList<FlumeConfigurationError>();
    // Construct the in-memory component hierarchy
    for(String name : properties.keySet()) {
      String value = properties.get(name);

      if (!addRawProperty(name, value)) {
        logger.warn("Configuration property ignored: " + name + " = " + value);
      }
    }
    // Now iterate thru the agentContext and create agent configs and add them
    // to agentConfigMap

    // validate and remove improperly configured components
    validateConfiguration();
  }

 private boolean addRawProperty(String name, String value) {
    // Null names and values not supported
    if (name == null || value == null) {
      errors
      .add(new FlumeConfigurationError("", "",
          FlumeConfigurationErrorType.AGENT_NAME_MISSING,
          ErrorOrWarning.ERROR));
      return false;
    }

    // Empty values are not supported
    if (value.trim().length() == 0) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
          ErrorOrWarning.ERROR));
      return false;
    }

    // Remove leading and trailing spaces
    name = name.trim();
    value = value.trim();

    int index = name.indexOf('.');

    // All configuration keys must have a prefix defined as agent name
    if (index == -1) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.AGENT_NAME_MISSING,
          ErrorOrWarning.ERROR));
      return false;
    }

    String agentName = name.substring(0, index);

    // Agent name must be specified for all properties
    if (agentName.length() == 0) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.AGENT_NAME_MISSING,
          ErrorOrWarning.ERROR));
      return false;
    }

    String configKey = name.substring(index + 1);

    // Configuration key must be specified for every property
    if (configKey.length() == 0) {
      errors
      .add(new FlumeConfigurationError(name, "",
          FlumeConfigurationErrorType.PROPERTY_NAME_NULL,
          ErrorOrWarning.ERROR));
      return false;
    }

    AgentConfiguration aconf = agentConfigMap.get(agentName);

    if (aconf == null) {
      aconf = new AgentConfiguration(agentName, errors);
      agentConfigMap.put(agentName, aconf);
    }

    // Each configuration key must begin with one of the three prefixes:
    // sources, sinks, or channels.
    return aconf.addProperty(configKey, value);
  }

 

分享到:
评论

相关推荐

    flume采集日志存入MySQL,支持分库分表,动态加载配置文件-flume-mysql.zip

    总之,"flume-mysql.zip"项目展示了如何利用 Flume 实现高效日志收集,结合 MySQL 的分库分表策略提升数据处理能力,并通过动态加载配置文件实现灵活的运维管理。对于大型分布式系统来说,这样的解决方案是监控和...

    大数据采集技术-Flume配置.pptx

    使用上述命令启动名为`a1`的Flume Agent,加载配置文件`conf/hdfs.conf`。 ### Flume工作原理 Flume的工作流程通常包括以下步骤: 1. Source监控数据源,如日志文件,当检测到新的数据时,将数据发送到Channel。 2...

    apache-flume-1.8.0

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式存储系统,如Hadoop HDFS。它设计为容错性强,可扩展,允许从多个源收集数据,并将其流向目标,如数据仓库或...

    kafka+flume 实时采集oracle数据到hive中.docx

    然后,需要配置Flume的配置文件flume.conf,指定Kafka Topic和HDFS的路径。 首先,需要创建一个Kafka Topic,使用命令./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions...

    flume_jars.zip

    Flume是Apache Hadoop生态体系中的一个数据收集系统,它被设计用来高效、可靠地聚合、转换和加载大规模日志数据。在这个“flume_jars.zip”压缩包中,包含了98个与Flume相关的JAR文件,这些文件是开发和运行Flume...

    Flume-HDFS-驱动-jar包

    这样,Flume启动时会自动加载这些库,使得Flume可以识别并正确操作HDFS。 配置Flume来使用HDFS通常涉及以下步骤: 1. 安装Hadoop并确保其正确运行。 2. 将Hadoop的相关JAR文件(如上述压缩包内容)添加到Flume的`...

    apache-flume-1.7.0-bin.zip

    Apache Flume 是一个分布式、可靠且可用的数据收集系统,用于高效地聚合、移动和加载大量日志数据到集中式数据存储系统,如Hadoop HDFS。这个“apache-flume-1.7.0-bin.zip”文件是Apache Flume 1.7.0版本的二进制...

    flume-es5.X依赖.zip

    2. 配置与加载:在Flume配置文件中,通过`type`属性指定自定义Sink的类全名,同时提供必要的配置参数。 3. 实现数据处理逻辑:在`process()`方法中,根据业务需求对事件数据进行处理,然后写入目标系统,如Elastic...

    flume所需要的hdfs包.zip

    在标题和描述中提到的 "flume所需要的hdfs包.zip" 暗示了这个压缩包可能包含了一些 Flume 与 HDFS(Hadoop 分布式文件系统)集成所必需的组件或配置。 HDFS 是 Hadoop 的核心组件之一,它是一个分布式文件系统,...

    Flume对接Spark Streaming的相关jar包

    配置 Flume 配置文件(通常是 conf/flume.conf),设置 sink 类型为 `org.apache.spark.streaming.flume.sink.SparkStreamingSink`,并指定相应的 Spark 配置,如 Master URL、批次间隔等,完成 Flume 与 Spark ...

    flume-demo_大数据_flume_DEMO_自定义拦截器_

    - `initialize()` 方法用于初始化拦截器,可以加载配置信息等。 - `intercept(List)` 方法是核心,它接收一个事件列表,根据业务逻辑进行处理,如过滤掉某些事件或修改事件内容。 - `close()` 方法在拦截器不再...

    Apache flume1.6_src

    - 配置文件解析:理解如何加载和解析 Flume 配置文件,以构建 Agent 结构。 - 事务管理:深入研究 Channel 如何提供事务支持,确保数据不丢失。 - 错误处理和恢复策略:学习 Flume 如何处理故障和恢复数据流。 - ...

    flume-ng-1.6.0 cdh5.7.0安装包

    5. 动态配置和重新加载:运行时可以修改Flume配置,无需停止服务,便于系统调整和优化。 在安装和配置Flume-ng-1.6.0-cdh5.7.0时,用户需要注意以下几点: 1. 确保系统环境满足CDH 5.7.0的硬件和软件要求。 2. 安装...

    flume-chd版本

    在大数据分析场景中,Flume-chd常用于收集网站访问日志、应用程序日志、社交媒体数据等,为后续的ETL(提取、转换、加载)和数据分析提供稳定的数据来源。 总的来说,Flume-chd是CDH生态中的重要一环,它简化了大...

    flume1.9, win10, code.zip

    然后,你可以按照通常的方式启动 Flume,此时 Flume 将会加载这个修改过的 `.jar` 文件,从而支持在 Windows 10 上运行 `exec` 和 `taildir` 源。 在实际操作中,以下是你可能需要遵循的步骤: 1. 下载并解压 `...

    flume log4f示例源码

    `run.sh` 是一个启动 Flume 代理的脚本,它可能包含了启动 Flume 并加载 `flume-conf.properties` 配置的命令。例如: ```bash #!/bin/bash export JAVA_HOME=/path/to/java flume-ng agent --conf conf --conf-...

    flume-hadoop-jar.zip

    在Flume的配置文件中,我们需要指定这些jar包的路径,以便Flume在运行时能够正确加载并使用。 配置Flume以将数据输出到HDFS,我们通常需要修改Flume配置文件(如flume.conf),设置Sink为HDFS类型,并提供HDFS相关...

    mvn flume ng sdk

    5. **部署与运行**:将生成的JAR包放入Flume的lib目录下,然后启动Flume Agent,Flume会自动加载并运行自定义组件。 通过`flume-ng-sdk`,开发者能够快速地为Apache Flume构建定制化的数据收集解决方案,适应不同的...

    Flume构建高可用、可扩展的海量日志采集系统.zip

    Flume还支持动态配置和重新加载,这意味着可以在运行时添加、删除或修改Flume agent的配置,而无需停止服务。这对于应对不断变化的数据源和需求非常有用。 为了实现可扩展性,Flume可以通过级联多个agent来处理大...

    大数据采集技术-flume监控httpsources.pdf

    Flume是一个高度可定制化和灵活的数据收集系统,设计用于高效地聚集、传输和加载大规模日志数据。它支持多种数据源(如Syslog、HTTP、Thrift等),并能够将这些数据流式传输到各种目的地,如HDFS、HBase、Solr等。...

Global site tag (gtag.js) - Google Analytics