Mongodb sink
import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cloudera.flume.conf.Context; import com.cloudera.flume.conf.SinkFactory.SinkBuilder; import com.cloudera.flume.core.Event; import com.cloudera.flume.core.EventSink; import com.cloudera.util.Pair; import com.google.common.base.Preconditions; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.Mongo; import com.mongodb.MongoException; public class MongoDBSink extends EventSink.Base { static final Logger LOG = LoggerFactory.getLogger(MongoDBSink.class); private String serverName; private int serverPort; private String dbName; private String collName; private Mongo mongo; private DB db; private DBCollection collection; public MongoDBSink(String server, String port, String dbName, String collName) { this.serverName = server; this.serverPort = Integer.parseInt(port); this.dbName = dbName; this.collName = collName; } @Override public synchronized void append(Event e) throws IOException { BasicDBObject entry = new BasicDBObject(); entry.put("timestamp", new Date(e.getTimestamp())); entry.put("hostname", e.getHost()); entry.put("priority", e.getPriority().name()); entry.put("message", new String(e.getBody())); Map<String, byte[]> metadata = e.getAttrs(); if (!metadata.isEmpty()) { BasicDBObject metadataEntry = new BasicDBObject(); for (String key: metadata.keySet()) { metadataEntry.put(key, new String(metadata.get(key))); } entry.put("metadata", metadataEntry); } collection.insert(entry); } @Override public void close() throws IOException { mongo.close(); } @Override public void open() throws IOException { try { mongo = new Mongo(serverName, serverPort); db = mongo.getDB(dbName); collection = db.getCollection(collName); } catch (UnknownHostException e) { LOG.error("Could not find specified server.", e); } catch (MongoException e) { LOG.error("Error connecting to server.", e); } } public static SinkBuilder builder() { return new SinkBuilder() { // construct a new parameterized sink @Override public EventSink build(Context context, String... argv) { Preconditions.checkArgument(argv.length == 4, "usage: mongoDBSink(\"server\",\"port\",\"db\",\"collection\")"); return new MongoDBSink(argv[0], argv[1], argv[2], argv[3]); } }; } /** * 将sink注入SourceFactory * 别名mongoDBSink */ public static List<Pair<String, SinkBuilder>> getSinkBuilders() { List<Pair<String, SinkBuilder>> builders = new ArrayList<Pair<String, SinkBuilder>>(); builders.add(new Pair<String, SinkBuilder>("mongoDBSink", builder())); return builders; } }
相关推荐
FlumeNG是Apache Flume的一个分支版本,旨在通过重写和重构来解决现有版本中的一些已知问题和限制。Flume是Cloudera开发的一个分布式、可靠且可用的系统,用于有效地收集、聚合和移动大量日志数据。它的主要用途是将...
### Flume NG 分享资料详解 #### Flume NG 概述 Flume NG 是一个分布式、可靠且可用的服务,用于高效地收集、聚合并移动大量的日志数据。它具有简单而灵活的架构,基于流式数据流。Flume NG 非常健壮且能够容忍...
`Mvn Flume NG SDK` 是一个用于Apache Flume集成开发的重要工具,它基于Maven构建系统,使得在Java环境中开发、管理和部署Flume插件变得更加便捷。Apache Flume是一款高度可配置的数据收集系统,广泛应用于日志聚合...
Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,...
标题中的"flumeng for streaming spark"涉及到两个关键的开源技术:Flume和Spark Streaming。Flume是Apache软件基金会的一个项目,主要用于收集、聚合和移动大量日志数据,而Spark Streaming是Apache Spark的一个...
Flume-ng 在 Windows 环境搭建并测试 + Log4j 日志通过 Flume 输出到 HDFS Flume-ng 是一个高可用、可靠、分布式的日志聚合系统,可以实时地从各种数据源(如日志文件、网络 socket、数据库等)中收集数据,并将其...
Kafka+FlumeNG+Storm+HBase实时处理系统介绍
《Flume与Kafka集成:深入理解flumeng-kafka-plugin》 在大数据处理领域,Apache Flume 和 Apache Kafka 都扮演着至关重要的角色。Flume 是一款用于收集、聚合和移动大量日志数据的工具,而 Kafka 则是一个分布式流...
### Kafka+FlumeNG+Storm+HBase 架构设计详解 #### 一、概述 在当前的大数据处理场景下,构建一个既能处理历史数据又能实时处理新增数据的架构至关重要。本文将详细介绍如何利用Kafka、FlumeNG、Storm与HBase搭建...
Flume-NG 安装与配置指南 Flume-NG 是一个分布式日志收集系统,能够从各种数据源中实时采集数据,并将其传输到集中式存储系统中。本文将指导您完成 Flume-NG 的安装和基本配置。 安装 Flume-NG 1. 先决条件:...
《Flume NG 1.6.0 在 CDH 5.5.2 中的应用与解析》 Flume NG,全称为“Next Generation Flume”,是Apache Hadoop项目中用于高效、可靠、分布式地收集、聚合和移动大量日志数据的工具。在CDH(Cloudera Distribution...
《Flume NG 1.5.0-cdh5.3.6:大数据日志收集利器》 Apache Flume,作为一款高效、可靠且分布式的海量日志聚合工具,是大数据处理领域的重要组件之一。在CDH(Cloudera Distribution Including Apache Hadoop)5.3.6...
《Flume NG与Elasticsearch 6.5.4集成详解》 Flume NG,全称为Apache Flume,是一款由Apache软件基金会开发的数据收集系统,主要用于日志聚合、监控和数据传输。它设计的目标是高效、可靠且易于扩展,特别适合...
《Flume NG 1.6.0-cdh5.12.0在大数据生态中的角色与应用》 Apache Flume,作为一个分布式、可靠且可用的数据收集系统,是大数据处理链路中不可或缺的一部分。Flume NG(Next Generation)是其发展到第二代后的版本...
使用mvn clean package检出和构建将flumeng-plugins-udp-[VERSION].jar或flumeng-plugins-udp-[VERSION]-jar-with-dependencies.jar复制到您的flume 库路径中。 将配置文件或示例配置文件复制或创建到某个位置。 ...
`flume-ng-1.6.0-cdh5.10.1.tar.gz` 是一个针对Cloudera Distribution Including Apache Hadoop (CDH) 5.10.1 版本优化的Flume NG的打包文件,Flume NG是Flume的下一代版本,提供更先进的特性和性能。 Flume的核心...
flume-ng-mongodb-sink Flume NG MongoDB接收器。 该源已实现为将JSON填充到MongoDB中。入门克隆存储库安装最新的Maven并通过“ mvn软件包”构建源通过'MVN依赖:生成类路径'生成类路径在$ FLUME_HOME / conf / ...
Flumeng简介 Apache Flume是从不同数据源收集、聚合、传输大量数据、日志到数据中心的分布式系统,具有可靠、可伸缩、可定制、高可用、高性能等明显优点。其主要特点有:声明式配置,可动态更新;提供上下文路由,...
Flume-ng-sql-source-1.5.2是Apache Flume的一个扩展,它允许Flume从SQL数据库中收集数据。Apache Flume是一个分布式、可靠且可用于有效聚合、移动大量日志数据的系统。"ng"代表"next generation",表明这是Flume的...