需要引入的类包:mongo-java-driver-2.11.2.jar、mongo-hadoop-core_1.0.4-1.1.0.jar 一、从MongoDB上读数据,进行MapReduce后,把结果在在HDFS上。 1、Job的配置启动类: package com.test.similarity.dataimport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.log4j.Logger; import com.test.similarity.util.Constants; import com.test.similarity.util.HdfsUtil; import com.mongodb.hadoop.MongoInputFormat; import com.mongodb.hadoop.util.MongoConfigUtil; /** * 从mongondb中读取数据,导入到指定的hdfs路径上. * @author 907897 */ public class ImportJob { private static final Logger logger = Logger.getLogger(ImportJob.class); public boolean run(String import_source,String import_data){ try{ Configuration conf = new Configuration(); MongoConfigUtil.setInputURI(conf,import_source); conf.set("mapred.job.tracker", Constants.MAPRED_JOB_TRACKER); conf.set("fs.default.name", Constants.FS_DEFAULT_NAME); Job job = new Job(conf, "MongoDBJob job"); // job 处理类配置 job.setJarByClass(ImportJob.class); job.setMapperClass(ImportMapper.class); job.setReducerClass(ImportReducer.class); FileSystem fileSys = FileSystem.get(conf); // job 类型配置 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(MongoInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 判断输出路径relation_data是否已存在。如存在:则删除之。 if (HdfsUtil.checkFileExist(fileSys, import_data)) { logger.info("输出路径:" + import_data + "已存在。程序已对其进行删除。"); HdfsUtil.deleteFileOrPath(fileSys, import_data); } // job 输入输出路径配置 FileOutputFormat.setOutputPath(job, new Path(import_data)); // job 运行 boolean runOK = job.waitForCompletion(true) ? true : false; return runOK; }catch(Exception e){ e.printStackTrace(); } return false; } } 2、对应的Mapper处理函数 package com.test.similarity.dataimport; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.bson.BSONObject; import com.test.similarity.asset.AssetUtil; import com.test.similarity.asset.AssetVO; public class ImportMapper extends Mapper<Object, BSONObject, Text, Text> { public void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException{ // System.out.println("value:"+value); AssetVO assetVO = AssetUtil.trunBSONObjectToAssetVO(value); value = null; context.write(new Text(assetVO.getAssetId()), new Text(assetVO.toString())); } } 3、对应的Reducer处理函数 package com.test.similarity.dataimport; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ImportReducer extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iterator = values.iterator(); while(iterator.hasNext()) { context.write(iterator.next(),null); } } } 二、从HDFS上读数据,通过MapReduce处理后,结果存放于MongoDB中。 1、Job的配置启动类: package com.test.similarity.dataexport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import com.test.similarity.util.Constants; import com.mongodb.hadoop.MongoOutputFormat; import com.mongodb.hadoop.util.MongoConfigUtil; public class ExportJob { //private static Log logger = LogFactory.getLog(ExportJob.class); public boolean run(String relation_data,String export_dest){ try{ Configuration conf = new Configuration(); MongoConfigUtil.setOutputURI(conf,export_dest); conf.set("mapred.job.tracker", Constants.MAPRED_JOB_TRACKER); conf.set("fs.default.name", Constants.FS_DEFAULT_NAME); Job job = new Job(conf, "MongoDBJob job"); // job 配置 job.setJarByClass(ExportJob.class); job.setMapperClass(ExportMapper.class); job.setReducerClass(ExportReducer.class); job.setOutputKeyClass(Text.class); // 输出的key类型,在OutputFormat会检查 job.setOutputValueClass(Text.class);// 输出的value类型,在OutputFormat会检查 job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(MongoOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(relation_data)); // job 运行 boolean runOK = job.waitForCompletion(true) ? true : false; return runOK; }catch(Exception e){ e.printStackTrace(); } return false; } } 2、对应的Mapper处理函数 package com.test.similarity.dataexport; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ExportMapper extends Mapper<Text, Text, Text, Text> { //private static Log logger = LogFactory.getLog(ExportMapper.class); public void map(Text key, Text value, Context context ) throws IOException, InterruptedException{ System.out.println("key:"+key.toString()); System.out.println("value:"+value.toString()); context.write(key, value); } } 3、对应的Reducer处理函数 package com.test.similarity.dataexport; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.bson.types.ObjectId; import com.test.similarity.util.Constants; import com.test.similarity.util.DateUtil; import com.test.similarity.util.StringUtil; import com.mongodb.BasicDBObject; import com.mongodb.hadoop.io.BSONWritable; public class ExportReducer extends Reducer<Text, Text, Text, BSONWritable>{ private static final String splitFlag = Constants.FIELD_SPLIT; // ; private static final String keyValueFlag = Constants.KEY_VALUE_SIGN; // = private static final String itemSplitFlag = Constants.ITEM_SPLIT_SIGN; // #@# public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String text = key.toString(); if (StringUtil.isNull(text)) { return; } BSONWritable bsonWritable = new BSONWritable(); BasicDBObject document = new BasicDBObject(); String[] result = text.toString().trim().split(itemSplitFlag); int pos = 0; String content = ""; document.put("_id",new ObjectId()); for (int i = 0; i < result.length; i++) { content = result[i]; pos = content.indexOf(keyValueFlag); document.put(content.substring(0, pos), content.substring(pos + 1)); } // document.put("createTime", DateUtil.getCurDate(1)); bsonWritable.setDoc(document); // System.out.println("bsonWritable:"+bsonWritable); context.write(key, bsonWritable); } }
相关推荐
Hadoop与MongoDB都是在大数据时代被广泛使用的技术,它们在处理和分析大规模数据方面拥有各自的优势。为了更好地处理日益增长的数据量,Hadoop与MongoDB之间的整合技术变得越来越重要。 Hadoop是一个由Apache基金会...
MongoDB与Hadoop的结合使用也是常见的解决方案,例如,可以使用MongoDB进行实时查询和数据预处理,然后通过Hadoop进行深度分析和挖掘。这种混合架构可以充分利用两者的优势,实现高效的大数据处理流程。 总之,...
其核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,HDFS提供分布式文件存储,MapReduce则负责数据处理。 Hadoop的特点: 1. **分布式存储**:HDFS允许数据在多台机器上分散存储,提供高容错性和高...
4. 高性能:MongoDB在读写性能上有出色表现,尤其是对于大数据集,可以实现毫秒级的响应时间。 5. 全局部署:MongoDB支持全球部署,能够轻松地在多个数据中心之间进行数据复制和同步。 Spark是一个开源的集群计算...
分布式模板(distributeTemplate)是一种高效且灵活的工具,它允许开发者执行SQL操作,包括增、删、改、查(CRUD),不仅限于单一的数据库系统,而是扩展到多种数据库环境,如MySQL、MongoDB、File、RabbitMQ、Redis...
《大数据技术原理与应用第二版》是林子雨教授的一本深入探讨大数据技术的专业书籍,其配套的5个实验答案为学习者提供了实践操作的指导。在这个领域,大数据不仅仅是一种技术,更是一种处理海量信息的新思维方式。...
**Shell命令操作HDFS**: - `copyFromLocal`:将本地文件系统中的文件复制到HDFS。 - `cat`:查看HDFS中文件的内容。 - `copyToLocal`:将HDFS中的文件复制回本地文件系统。 - `chmod`:更改HDFS文件或目录的权限。 ...
- Hadoop命令行工具:如`hdfs dfs`用于与HDFS交互,`hadoop fs`进行文件操作。 2. **HDFS深入理解**(第4章): - HDFS设计理念:高容错性、可扩展性和数据本地化。 - 数据块与副本策略:了解数据是如何被切分成...
"大数据管理实验之三Hadoop基础命令与编程初步.docx"则深入到Hadoop的使用层面,包括HDFS的基本命令,如上传、下载、查看文件等,以及编写MapReduce程序的初步概念,如Mapper和Reducer的工作原理,以及使用Java API...
与批处理的MapReduce不同,流计算能够在数据到达时立即处理,确保了低延迟的响应时间。 【内存数据库】 内存数据库将数据存储在内存中,以实现更快的读写速度和更高的并发处理能力,特别适合于需要快速响应的交易...
Hadoop是Apache软件基金会的一个开源项目,主要由HDFS(Hadoop Distributed File System)和MapReduce两部分组成,为海量数据提供了存储和计算能力。 【描述】中的技术选型 1. **SpringBoot**: SpringBoot是Java...
例如,Spring Data Hadoop允许我们轻松地连接到Hadoop集群,执行MapReduce任务或者操作HDFS。Spring Data Spark提供了对Spark API的封装,简化了数据处理代码。 对于Spring Boot与Hadoop的整合,我们需要在项目中...
1. NoSQL数据库:面对非结构化和半结构化的移动网络数据,NoSQL数据库如MongoDB、Cassandra等提供了灵活的数据模型和高效的读写性能。 2. 数据仓库:例如Hive和Pig,它们基于Hadoop,提供SQL-like查询语言,简化了...
6. 实践操作与应用:课程特别强调实践操作的重要性,尤其是对Hadoop、HDFS、HBase和MapReduce等关键章节安排了入门级的实践操作,使学生能更好地掌握大数据关键技术,并了解这些技术在互联网、生物医学、物流等领域...
3. HDFS:深入讲解分布式文件系统的基本概念、HDFS的架构、存储机制和读写操作,让学生熟练掌握其使用。 4. HBase:介绍分布式数据库HBase的访问接口、数据模型、工作原理和实际操作。 5. NoSQL数据库:对比传统...
第三章“分布式文件系统HDFS”会详细解析HDFS的工作原理,包括数据块的概念、副本策略、NameNode和DataNode的角色,以及HDFS的读写流程。理解HDFS对于开发和优化Hadoop应用至关重要。 第四章和第五章分别讨论了...
3. 探索NoSQL数据库与传统关系型数据库的区别,了解NoSQL的四大类型(键值对、列族、文档型、图形数据库)和三大基石,掌握Redis和MongoDB等NoSQL数据库的使用。 4. 了解云数据库的基本概念、工作原理和实际应用,...
- 大数据的特点包括高并发读写、海量存储和高可扩展性与高可用性需求。 - 大数据与云计算的关系密切,云计算为大数据提供了基础架构支持。 2. 大数据市场分析 - 2011年是中国大数据市场的起点,此后市场规模迅速...
**Hadoop**:一款开源的大数据处理框架,主要由HDFS(Hadoop Distributed File System)和MapReduce组成,用于存储和处理大规模数据集。 **Zookeeper**:分布式协调服务,用于解决分布式应用中常见的一致性问题,如...