需求需要将Hadoop的数据插入到MongoDB。
数据类型是将字符串转换成一个类似Map的对象,插入到数据库中。以替换原有的单线程接口。
import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.Mongo; public class MongoOutputFormat implements OutputFormat<Text, Text> { @Override public void checkOutputSpecs(FileSystem arg0, JobConf arg1) throws IOException { // TODO Auto-generated method stub System.out.println("OutputFormat CheckOutpuSpecs() function is not supported~!"); } //实现OutputFormat接口的时候,返回一个RecordWriter对象。 //这里可以实例化数据库连接JDBC对象,和RecordWriter共一个生命周期。 //数据库连接串的相关对象,通过JobConf传入。 @Override public RecordWriter<Text, Text> getRecordWriter(FileSystem arg0, JobConf conf, String arg2, Progressable arg3) throws IOException { // Configuration conf = jobconf.g.getConfiguration() ; String ip = conf.get("mongoIp"); String port = conf.get("mongoPort"); Mongo mongo = new Mongo(ip,Integer.parseInt(port)); String username = conf.get("muser"); String password = conf.get("mpwd"); String dbname = conf.get("mongoDb"); String collectionName = conf.get("mongoCollection"); try { return new MongoDBRecordWriter(mongo,dbname,collectionName,username,password); } catch (Exception ex) { throw new IOException(ex); } } /** * A RecordWriter that writes the reduce output to a SQL table or MongoDB Collection! */ public static class MongoDBRecordWriter implements RecordWriter<Text, Text> { private DBCollection coll; private Mongo mongo; public MongoDBRecordWriter() throws SQLException { } //使用这个构造函数 public MongoDBRecordWriter(DBCollection coll) { this.coll = coll; } public MongoDBRecordWriter(Mongo mongo, String dbname, String collectionName, String username, String password) { this.mongo = mongo; DB d = this.mongo.getDB(dbname); d.authenticate(username, password.toCharArray()); this.coll = d.getCollection(collectionName); } public DBCollection getCollection() { return coll; } // public PreparedStatement getStatement() { // return statement; // } @Override /** Close函数,用于关闭OutputFormat中用到的资源对象 */ public void close(Reporter arg0) throws IOException { try { this.mongo.close(); } catch (Exception e) { try { System.out.println("Close() is not supported here..."); } catch (Exception ex) { ex.printStackTrace(); } throw new IOException(e); } finally { try { System.out.println("Close() is not supported here..."); } catch (Exception ex) { ex.printStackTrace(); } } } //RecordWriter中输出的方法,必须实现的。 @Override public void write(Text key, Text value) throws IOException { try { String line = value.toString(); String[] rs = line.split("\001"); Map m = new HashMap(); m.put("created_by", rs[7]); m.put("created_date", rs[8]); m.put("updated_by", rs[9]); m.put("updated_date", rs[10]); DBObject dbObj = new BasicDBObject(); dbObj.putAll(m); coll.save(dbObj); } catch (Exception e) { // LoggingUtils.logAll(LOG, "Exception encountered", e);. System.err.print(e); e.printStackTrace(); } } } }
相关推荐
go-mysql-mongodb是一项将MySQL数据自动同步到MongoDB的服务。 它首先使用mysqldump来获取原始数据,然后与binlog增量同步数据。 安装 安装Go( )并设置您的 go get github.com/WangXiangUSTC/go-mysql-mongodb ...
标题中的“canal”的MySQL与“redis/memcached/mongodb”的NoSQL数据实时同步方案,主要涉及了数据库间的数据迁移和实时同步技术。这个话题涵盖了多个关键知识点,包括: 1. **Canal**: Canal是阿里巴巴开源的一个...
本文将详细介绍如何使用Python来实现MongoDB数据到ElasticSearch的同步,并探讨全量同步、增量同步和实时同步的实现方法,以及如何处理中间数据。 **Python与数据库交互** Python作为一种强大且易用的编程语言,...
MongoDB与Oracle数据库的数据同步是现代企业数据管理中常见的需求,尤其在大数据处理和分布式系统中。MongoDB是一个流行的文档型数据库,而Oracle则是一款成熟的SQL关系型数据库。本示例将探讨如何通过Java实现...
4. **数据迁移工具**:例如Flyway和 Liquibase,用于数据库版本管理和迁移,它们可以在特定条件下运行SQL脚本或自定义Java代码,实现数据同步。 5. **手动编程**:直接编写Java代码,通过JDBC和MongoDB Java Driver...
本软件使用c#编写,是SQL转存MongoDB的工具,可独立运行,也可定时运行,利用sql数据库时间戳字段进行更新采集区分。 本软件综合了,windows服务控制(安装卸载等),windows服务启动程序(服务控制定时运行程序),...
python开发,基于flask-restful 的中小型项目,restful风格API接口开发实例,以mongodb作为数据库 python开发,基于flask-restful 的中小型项目,restful风格API接口开发实例,以mongodb作为数据库python开发,基于...
在本文中,我们将深入探讨如何在局域网内同步MongoDB数据库。 一、MongoDB概述 MongoDB是一款NoSQL数据库,它采用灵活的数据模型,支持JSON格式的文档存储,具有强大的查询能力。其主要特点包括: 1. 高性能:...
MongoDB是一种流行的开源、分布式文档数据库,以其灵活性、高性能和可扩展性而受到广大开发者喜爱。在C#环境中,MongoDB提供了丰富的驱动程序,使得在.NET应用程序中集成和操作MongoDB变得十分便捷。本课程旨在深入...
关系型到MongoDB实时数据同步解决方案 本解决方案旨在解决关系型数据库到MongoDB实时数据同步的挑战。传统的关系型数据库架构难以扩展,成本高昂且开发困难,直接更换核心系统更加困难。为了解决这些问题,我们提出...
最近由于业务需要,APP端后台需要将MongoDB中的数据同步到Java端后台的MySQL中,然后又将MySQL中算好的数据,同步到MongoDB数据库。 这个过程看是很繁琐,实际上这就是一个互相写表的过程。 接下来就看看node.js将...
通过以上介绍,我们了解到 Vue.js、Node.js 和 MongoDB 在全栈开发中的重要作用及其各自的特性。这些技术结合在一起,可以构建出功能强大、易于维护的现代 Web 应用。希望本文能为您提供有价值的信息,并帮助您更好...
本篇文章将详细探讨如何将数据从SQL Server迁移到MongoDB,这一过程通常被称为数据导入或数据迁移。 首先,SQL Server是一款由微软开发的关系型数据库系统,以其强大的事务处理能力、数据完整性以及对ACID(原子性...
在Java开发中,将SLF4J与MongoDB结合可以创建一个高效、灵活的日志管理系统,特别适合处理大量实时或近实时的日志数据。 首先,我们需要理解SLF4J的工作原理。SLF4J提供了一个API,它允许我们编写日志语句而不直接...
MongoDB、Express、AngularJS和Node.js构成了广受欢迎的全栈开发框架——MEAN,它为开发者提供了一种高效、灵活且现代化的解决方案,用于构建动态Web应用。在这个框架中,每个组件都扮演着关键角色,共同促进快速的...
关系型到 MongoDB 实时数据同步解决方案 本解决方案主要介绍了关系型到 MongoDB 实时数据同步的解决方案,旨在解决传统关系型数据库的扩展性和高可用性问题。该解决方案基于 Tapdata 的 CDC 和 RDM 技术,实时同步...
MongoDB/SQL Server增量同步方案是一项复杂而关键的任务,尤其考虑到SQL Server的特性,目前市场上缺乏成熟的开源解决方案。本文探讨了三种可行的同步方法:基于时间戳更新、使用触发器和利用SQL Server的CDC...
总结,Laravel开发中使用Passport与MongoDB的集成涉及替换默认的Eloquent ORM、配置MongoDB连接、创建适应MongoDB的Passport服务提供者、更新认证配置以及创建适应的模型。这样,我们就能在保持Laravel的优雅和强大...
标题"laravel-mongodb"表明我们将关注的是将MongoDB集成到Laravel项目中的技术细节。 首先,我们需要了解Laravel。Laravel是一个基于PHP的开源Web应用框架,它遵循MVC(模型-视图-控制器)架构模式,旨在提升开发...
MongoDB的编译开发涉及到多个步骤,首先需要确保你的系统满足编译MongoDB所需的最低要求,包括一个兼容的C++编译器(如GCC或Visual Studio)、Python 2.6或更高版本,以及SCons构建工具。在Windows环境下,通常会...