`
yugouai
  • 浏览: 498596 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Flume NG

 
阅读更多

Mongodb sink

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSink;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.Mongo;
import com.mongodb.MongoException;

public class MongoDBSink extends EventSink.Base {
  static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class);
  
  private String serverName;
  private int serverPort;
  private String dbName;
  private String collName;
  private Mongo mongo;
  private DB db;
  private DBCollection collection;
  
  public MongoDBSink(String server, String port, String dbName, String collName) {
    this.serverName = server;
    this.serverPort = Integer.parseInt(port);
    this.dbName = dbName;
    this.collName = collName;
  }

  @Override
  public synchronized void append(Event e) throws IOException {
    BasicDBObject entry = new BasicDBObject();
    entry.put("timestamp", new Date(e.getTimestamp()));
    entry.put("hostname", e.getHost());
    entry.put("priority", e.getPriority().name());
    entry.put("message", new String(e.getBody()));
    Map<String, byte[]> metadata = e.getAttrs();
    if (!metadata.isEmpty()) {
      BasicDBObject metadataEntry = new BasicDBObject();
      for (String key: metadata.keySet()) {
        metadataEntry.put(key, new String(metadata.get(key)));
      }
      entry.put("metadata", metadataEntry);
    }
    collection.insert(entry);
  }

  @Override
  public void close() throws IOException {
    mongo.close();
  }

  @Override
  public void open() throws IOException {
    try {
      mongo = new Mongo(serverName, serverPort);
      db = mongo.getDB(dbName);
      collection = db.getCollection(collName);
    } catch (UnknownHostException e) {
      LOG.error("Could not find specified server.", e);
    } catch (MongoException e) {
      LOG.error("Error connecting to server.", e);
    }
  }
  
  public static SinkBuilder builder() {
    return new SinkBuilder() {
      // construct a new parameterized sink
      @Override
      public EventSink build(Context context, String... argv) {
        Preconditions.checkArgument(argv.length == 4,
            "usage: mongoDBSink(\"server\",\"port\",\"db\",\"collection\")");

        return new MongoDBSink(argv[0], argv[1], argv[2], argv[3]);
      }
    };
  }

  /**
   * 将sink注入SourceFactory
   * 别名mongoDBSink
   */
  public static List<Pair<String, SinkBuilder>> getSinkBuilders() {
    List<Pair<String, SinkBuilder>> builders =
      new ArrayList<Pair<String, SinkBuilder>>();
    builders.add(new Pair<String, SinkBuilder>("mongoDBSink", builder()));
    return builders;
  }
}

 

分享到:
评论

相关推荐

    FLUME-FlumeNG-210517-1655-5858

    FlumeNG是Apache Flume的一个分支版本,旨在通过重写和重构来解决现有版本中的一些已知问题和限制。Flume是Cloudera开发的一个分布式、可靠且可用的系统,用于有效地收集、聚合和移动大量日志数据。它的主要用途是将...

    Flume ng share

    ### Flume NG 分享资料详解 #### Flume NG 概述 Flume NG 是一个分布式、可靠且可用的服务,用于高效地收集、聚合并移动大量的日志数据。它具有简单而灵活的架构,基于流式数据流。Flume NG 非常健壮且能够容忍...

    mvn flume ng sdk

    `Mvn Flume NG SDK` 是一个用于Apache Flume集成开发的重要工具,它基于Maven构建系统,使得在Java环境中开发、管理和部署Flume插件变得更加便捷。Apache Flume是一款高度可配置的数据收集系统,广泛应用于日志聚合...

    Flume-ng资料合集

    Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,...

    flumeng for streaming spark

    标题中的"flumeng for streaming spark"涉及到两个关键的开源技术:Flume和Spark Streaming。Flume是Apache软件基金会的一个项目,主要用于收集、聚合和移动大量日志数据,而Spark Streaming是Apache Spark的一个...

    Flume-ng在windows环境搭建并测试+log4j日志通过Flume输出到HDFS.docx

    Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...

    Kafka+FlumeNG+Storm+HBase

    Kafka+FlumeNG+Storm+HBase实时处理系统介绍

    flumeng-kafka-plugin:flumeng-kafka-plugin

    《Flume与Kafka集成:深入理解flumeng-kafka-plugin》 在大数据处理领域,Apache Flume 和 Apache Kafka 都扮演着至关重要的角色。Flume 是一款用于收集、聚合和移动大量日志数据的工具,而 Kafka 则是一个分布式流...

    Kafka+FlumeNG+Storm+HBase构架设计

    ### Kafka+FlumeNG+Storm+HBase 架构设计详解 #### 一、概述 在当前的大数据处理场景下,构建一个既能处理历史数据又能实时处理新增数据的架构至关重要。本文将详细介绍如何利用Kafka、FlumeNG、Storm与HBase搭建...

    flume-ng安装

    Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...

    flume-ng-1.6.0-cdh5.5.2-src.tar.gz

    《Flume NG 1.6.0 在 CDH 5.5.2 中的应用与解析》 Flume NG,全称为“Next Generation Flume”,是Apache Hadoop项目中用于高效、可靠、分布式地收集、聚合和移动大量日志数据的工具。在CDH(Cloudera Distribution...

    flume-ng-1.5.0-cdh5.3.6.tar.gz

    《Flume NG 1.5.0-cdh5.3.6:大数据日志收集利器》 Apache Flume,作为一款高效、可靠且分布式的海量日志聚合工具,是大数据处理领域的重要组件之一。在CDH(Cloudera Distribution Including Apache Hadoop)5.3.6...

    flume-ng-elasticsearch-sink-6.5.4.jar.zip

    《Flume NG与Elasticsearch 6.5.4集成详解》 Flume NG,全称为Apache Flume,是一款由Apache软件基金会开发的数据收集系统,主要用于日志聚合、监控和数据传输。它设计的目标是高效、可靠且易于扩展,特别适合...

    flume-ng-1.6.0-cdh5.12.0.tar.gz

    《Flume NG 1.6.0-cdh5.12.0在大数据生态中的角色与应用》 Apache Flume,作为一个分布式、可靠且可用的数据收集系统,是大数据处理链路中不可或缺的一部分。Flume NG(Next Generation)是其发展到第二代后的版本...

    flumeng-plugins-udp:使用java nio消费udp消息的flume-ng源码插件

    使用mvn clean package检出和构建将flumeng-plugins-udp-[VERSION].jar或flumeng-plugins-udp-[VERSION]-jar-with-dependencies.jar复制到您的flume 库路径中。 将配置文件或示例配置文件复制或创建到某个位置。 ...

    flume-ng-1.6.0-cdh5.10.1.tar.gz的下载

    `flume-ng-1.6.0-cdh5.10.1.tar.gz` 是一个针对Cloudera Distribution Including Apache Hadoop (CDH) 5.10.1 版本优化的Flume NG的打包文件,Flume NG是Flume的下一代版本,提供更先进的特性和性能。 Flume的核心...

    flume-ng-mongodb-sink:Flume NG MongoDB源

    flume-ng-mongodb-sink Flume NG MongoDB接收器。 该源已实现为将JSON填充到MongoDB中。入门克隆存储库安装最新的Maven并通过“ mvn软件包”构建源通过'MVN依赖:生成类路径'生成类路径在$ FLUME_HOME / conf / ...

    Flume-ng搭建及sink配置

    Flume­ng简介 Apache Flume是从不同数据源收集、聚合、传输大量数据、日志到数据中心的分布式系统,具有可靠、可伸缩、可定制、高可用、高性能等明显优点。其主要特点有:声明式配置,可动态更新;提供上下文路由,...

    flume-ng-sql-source-1.5.2

    Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...

Global site tag (gtag.js) - Google Analytics