`
dacoolbaby
  • 浏览: 1263290 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

开发自定义同步到MongoDB的OutputFormat

阅读更多

需求需要将Hadoop的数据插入到MongoDB。

 

数据类型是将字符串转换成一个类似Map的对象,插入到数据库中。以替换原有的单线程接口。

 

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class MongoOutputFormat implements OutputFormat<Text, Text> {

    @Override
    public void checkOutputSpecs(FileSystem arg0, JobConf arg1) throws IOException {
        // TODO Auto-generated method stub
        System.out.println("OutputFormat CheckOutpuSpecs() function is not supported~!");
    }
	
	//实现OutputFormat接口的时候,返回一个RecordWriter对象。
	//这里可以实例化数据库连接JDBC对象,和RecordWriter共一个生命周期。
	//数据库连接串的相关对象,通过JobConf传入。
    @Override
    public RecordWriter<Text, Text> getRecordWriter(FileSystem arg0, JobConf conf, String arg2, Progressable arg3) throws IOException {
//        Configuration conf = jobconf.g.getConfiguration() ;
        String ip = conf.get("mongoIp");
        String port = conf.get("mongoPort");
        Mongo mongo = new Mongo(ip,Integer.parseInt(port));
        
        String username = conf.get("muser");
        String password = conf.get("mpwd");
        
        
        String dbname = conf.get("mongoDb");
        String collectionName = conf.get("mongoCollection");
        
        try {

            return new MongoDBRecordWriter(mongo,dbname,collectionName,username,password);
        }
        catch (Exception ex) {
            throw new IOException(ex);
        }
    }
    
    /**
     * A RecordWriter that writes the reduce output to a SQL table or MongoDB Collection!
     */
    public static class MongoDBRecordWriter implements RecordWriter<Text, Text> {

        private DBCollection coll;
        private Mongo mongo;
        
        public MongoDBRecordWriter() throws SQLException {

        }

        //使用这个构造函数
        public MongoDBRecordWriter(DBCollection coll) {
            this.coll = coll;
        }
        
        public MongoDBRecordWriter(Mongo mongo, String dbname, String collectionName, String username, String password) {
            this.mongo = mongo;
            DB d = this.mongo.getDB(dbname);
            d.authenticate(username, password.toCharArray());
            this.coll = d.getCollection(collectionName);
        }
        
        public DBCollection getCollection() {
            return coll;
        }

        // public PreparedStatement getStatement() {
        // return statement;
        // }

        @Override
        /** Close函数,用于关闭OutputFormat中用到的资源对象 */
        public void close(Reporter arg0) throws IOException {
            try {
                this.mongo.close();
            }
            catch (Exception e) {
                try {
                    System.out.println("Close() is not supported here...");
                }
                catch (Exception ex) {
					ex.printStackTrace();
                }
                throw new IOException(e);
            } finally {
                try {
                    System.out.println("Close() is not supported here...");
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }

		//RecordWriter中输出的方法,必须实现的。
        @Override
        public void write(Text key, Text value) throws IOException {
            try {
                String line = value.toString();
                String[] rs = line.split("\001");
                Map m = new HashMap();

                m.put("created_by", rs[7]);
                m.put("created_date", rs[8]);
                m.put("updated_by", rs[9]);
                m.put("updated_date", rs[10]);

                DBObject dbObj = new BasicDBObject();
                dbObj.putAll(m);
                coll.save(dbObj);
            }
            catch (Exception e) {
                // LoggingUtils.logAll(LOG, "Exception encountered", e);.
                System.err.print(e);
                e.printStackTrace();
            }
        }


    }
}

 

 

分享到:
评论

相关推荐

    go-mysql-mongodb:将MySQL数据同步到MongoDB

    go-mysql-mongodb是一项将MySQL数据自动同步到MongoDB的服务。 它首先使用mysqldump来获取原始数据,然后与binlog增量同步数据。 安装 安装Go( )并设置您的 go get github.com/WangXiangUSTC/go-mysql-mongodb ...

    Python-同步MongoDB数据到ElasticSearch

    本文将详细介绍如何使用Python来实现MongoDB数据到ElasticSearch的同步,并探讨全量同步、增量同步和实时同步的实现方法,以及如何处理中间数据。 **Python与数据库交互** Python作为一种强大且易用的编程语言,...

    canal 的 mysql 与 redis/memcached/mongodb 的 nosql 数据实时同步方案

    标题中的“canal”的MySQL与“redis/memcached/mongodb”的NoSQL数据实时同步方案,主要涉及了数据库间的数据迁移和实时同步技术。这个话题涵盖了多个关键知识点,包括: 1. **Canal**: Canal是阿里巴巴开源的一个...

    jdbc java mongodb mysql 相互同步

    4. **数据迁移工具**:例如Flyway和 Liquibase,用于数据库版本管理和迁移,它们可以在特定条件下运行SQL脚本或自定义Java代码,实现数据同步。 5. **手动编程**:直接编写Java代码,通过JDBC和MongoDB Java Driver...

    windows服务自动定时启动SQLServer同步数据到MongoDB.zip(c#源代码)

    本软件使用c#编写,是SQL转存MongoDB的工具,可独立运行,也可定时运行,利用sql数据库时间戳字段进行更新采集区分。 本软件综合了,windows服务控制(安装卸载等),windows服务启动程序(服务控制定时运行程序),...

    python开发,基于flask-restful 的中小型项目,restful风格API接口开发实例,以mongodb作为数据库

    python开发,基于flask-restful 的中小型项目,restful风格API接口开发实例,以mongodb作为数据库 python开发,基于flask-restful 的中小型项目,restful风格API接口开发实例,以mongodb作为数据库python开发,基于...

    同步Mongodb数据库.zip_MongoDB_Mongodb 同步数据库

    在本文中,我们将深入探讨如何在局域网内同步MongoDB数据库。 一、MongoDB概述 MongoDB是一款NoSQL数据库,它采用灵活的数据模型,支持JSON格式的文档存储,具有强大的查询能力。其主要特点包括: 1. 高性能:...

    C#\MongoDB应用开发实战\MongoDB

    MongoDB是一种流行的开源、分布式文档数据库,以其灵活性、高性能和可扩展性而受到广大开发者喜爱。在C#环境中,MongoDB提供了丰富的驱动程序,使得在.NET应用程序中集成和操作MongoDB变得十分便捷。本课程旨在深入...

    关系型到MongoDB实时数据同步解决方案.pdf

    关系型到MongoDB实时数据同步解决方案 本解决方案旨在解决关系型数据库到MongoDB实时数据同步的挑战。传统的关系型数据库架构难以扩展,成本高昂且开发困难,直接更换核心系统更加困难。为了解决这些问题,我们提出...

    node.js将MongoDB数据同步到MySQL的步骤

    最近由于业务需要,APP端后台需要将MongoDB中的数据同步到Java端后台的MySQL中,然后又将MySQL中算好的数据,同步到MongoDB数据库。 这个过程看是很繁琐,实际上这就是一个互相写表的过程。 接下来就看看node.js将...

    前端 Vue+Node+MongoDB高级全栈开发

    通过以上介绍,我们了解到 Vue.js、Node.js 和 MongoDB 在全栈开发中的重要作用及其各自的特性。这些技术结合在一起,可以构建出功能强大、易于维护的现代 Web 应用。希望本文能为您提供有价值的信息,并帮助您更好...

    SqlServer数据导入MongoDB

    本篇文章将详细探讨如何将数据从SQL Server迁移到MongoDB,这一过程通常被称为数据导入或数据迁移。 首先,SQL Server是一款由微软开发的关系型数据库系统,以其强大的事务处理能力、数据完整性以及对ACID(原子性...

    全栈开发之道:MongoDB Express AngularJS Node.js

    MongoDB、Express、AngularJS和Node.js构成了广受欢迎的全栈开发框架——MEAN,它为开发者提供了一种高效、灵活且现代化的解决方案,用于构建动态Web应用。在这个框架中,每个组件都扮演着关键角色,共同促进快速的...

    关系型到MongoDB实时数据同步解决方案.pptx

    关系型到 MongoDB 实时数据同步解决方案 本解决方案主要介绍了关系型到 MongoDB 实时数据同步的解决方案,旨在解决传统关系型数据库的扩展性和高可用性问题。该解决方案基于 Tapdata 的 CDC 和 RDM 技术,实时同步...

    Laravel开发-passport-mongodb

    总结,Laravel开发中使用Passport与MongoDB的集成涉及替换默认的Eloquent ORM、配置MongoDB连接、创建适应MongoDB的Passport服务提供者、更新认证配置以及创建适应的模型。这样,我们就能在保持Laravel的优雅和强大...

    Laravel开发-mongodb MongoDB 数据库驱动

    标题"laravel-mongodb"表明我们将关注的是将MongoDB集成到Laravel项目中的技术细节。 首先,我们需要了解Laravel。Laravel是一个基于PHP的开源Web应用框架,它遵循MVC(模型-视图-控制器)架构模式,旨在提升开发...

    mongodb与oralce的数据同步

    MongoDB与Oracle数据库的数据同步是现代企业数据管理中常见的需求,尤其在大数据处理和分布式系统中。MongoDB是一个流行的文档型数据库,而Oracle则是一款成熟的SQL关系型数据库。本示例将探讨如何通过Java实现...

    mongodb 编译开发库

    MongoDB的编译开发涉及到多个步骤,首先需要确保你的系统满足编译MongoDB所需的最低要求,包括一个兼容的C++编译器(如GCC或Visual Studio)、Python 2.6或更高版本,以及SCons构建工具。在Windows环境下,通常会...

    slf4j输入日志到mongodb

    在Java开发中,将SLF4J与MongoDB结合可以创建一个高效、灵活的日志管理系统,特别适合处理大量实时或近实时的日志数据。 首先,我们需要理解SLF4J的工作原理。SLF4J提供了一个API,它允许我们编写日志语句而不直接...

    Laravel开发-laravel-mongodb-log

    现在,Laravel会将所有日志记录到MongoDB的指定集合中。你可以使用MongoDB的查询工具如MongoDB Compass或者编程方式来查看和分析这些日志数据。 此外,`laravel-mongodb-log`还允许开发者自定义日志记录行为,比如...

Global site tag (gtag.js) - Google Analytics