`
szjian
  • 浏览: 74748 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

用MapReduce操作mongodb与hdfs的读写例子

阅读更多
需要引入的类包:mongo-java-driver-2.11.2.jar、mongo-hadoop-core_1.0.4-1.1.0.jar

一、从MongoDB上读数据,进行MapReduce后,把结果在在HDFS上。
1、Job的配置启动类:
package com.test.similarity.dataimport;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.Logger;

import com.test.similarity.util.Constants;
import com.test.similarity.util.HdfsUtil;
import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;

/**
 * 从mongondb中读取数据,导入到指定的hdfs路径上.
 * @author 907897
 */

public class ImportJob {
	private static final Logger logger = Logger.getLogger(ImportJob.class);
	public boolean run(String import_source,String import_data){
		try{
			Configuration conf = new Configuration();
			MongoConfigUtil.setInputURI(conf,import_source);
			conf.set("mapred.job.tracker", Constants.MAPRED_JOB_TRACKER);
			conf.set("fs.default.name", Constants.FS_DEFAULT_NAME);
			Job job = new Job(conf, "MongoDBJob job");
			
			// job 处理类配置
			job.setJarByClass(ImportJob.class);
			job.setMapperClass(ImportMapper.class);
			job.setReducerClass(ImportReducer.class);
			
			FileSystem fileSys = FileSystem.get(conf);
			
			// job 类型配置
			job.setOutputKeyClass(Text.class);
	                job.setOutputValueClass(Text.class);
			job.setInputFormatClass(MongoInputFormat.class);
			job.setOutputFormatClass(TextOutputFormat.class);
			
			// 判断输出路径relation_data是否已存在。如存在:则删除之。
			if (HdfsUtil.checkFileExist(fileSys, import_data)) {
				logger.info("输出路径:" + import_data + "已存在。程序已对其进行删除。");
				HdfsUtil.deleteFileOrPath(fileSys, import_data);
			}		
			
			// job 输入输出路径配置
			
			FileOutputFormat.setOutputPath(job, new Path(import_data));
			
			// job 运行
			boolean runOK = job.waitForCompletion(true) ? true : false;
			return runOK;
		}catch(Exception e){
			e.printStackTrace();
		}
		return false;
	}
}



2、对应的Mapper处理函数
package com.test.similarity.dataimport;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.bson.BSONObject;

import com.test.similarity.asset.AssetUtil;
import com.test.similarity.asset.AssetVO;

public class ImportMapper  extends Mapper<Object, BSONObject, Text, Text> {
	public void map(Object key, BSONObject value, Context context) throws IOException, InterruptedException{
//		System.out.println("value:"+value);
		AssetVO assetVO = AssetUtil.trunBSONObjectToAssetVO(value);
		value = null;
        context.write(new Text(assetVO.getAssetId()), new Text(assetVO.toString()));
    }
}



3、对应的Reducer处理函数
package com.test.similarity.dataimport;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ImportReducer extends Reducer<Text, Text, Text, Text>{
	public void reduce(Text key, Iterable<Text> values,
			Context context) throws IOException, InterruptedException {
		
		Iterator<Text> iterator = values.iterator(); 
		while(iterator.hasNext()) { 
			context.write(iterator.next(),null);
		}
	}
}


二、从HDFS上读数据,通过MapReduce处理后,结果存放于MongoDB中。
1、Job的配置启动类:
package com.test.similarity.dataexport;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import com.test.similarity.util.Constants;
import com.mongodb.hadoop.MongoOutputFormat;
import com.mongodb.hadoop.util.MongoConfigUtil;

public class ExportJob {
//private static Log logger = LogFactory.getLog(ExportJob.class);
	
	public boolean run(String relation_data,String export_dest){
		try{
			Configuration conf = new Configuration();
			MongoConfigUtil.setOutputURI(conf,export_dest);
			
			conf.set("mapred.job.tracker", Constants.MAPRED_JOB_TRACKER);
			conf.set("fs.default.name", Constants.FS_DEFAULT_NAME);
			Job job = new Job(conf, "MongoDBJob job");
			
			// job 配置
			job.setJarByClass(ExportJob.class);
			job.setMapperClass(ExportMapper.class);
			job.setReducerClass(ExportReducer.class);
			
			job.setOutputKeyClass(Text.class); // 输出的key类型,在OutputFormat会检查
			job.setOutputValueClass(Text.class);// 输出的value类型,在OutputFormat会检查
	        
	        job.setInputFormatClass(KeyValueTextInputFormat.class);
	        job.setOutputFormatClass(MongoOutputFormat.class);
	        
	        FileInputFormat.setInputPaths(job, new Path(relation_data));
			
			// job 运行
			boolean runOK = job.waitForCompletion(true) ? true : false;
			return runOK;
		}catch(Exception e){
			e.printStackTrace();
		}
		return false;
	}
}

2、对应的Mapper处理函数
package com.test.similarity.dataexport;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ExportMapper  extends Mapper<Text, Text, Text, Text> {
//private static Log logger = LogFactory.getLog(ExportMapper.class);

    public void map(Text key, Text value, Context context ) throws IOException, InterruptedException{
        System.out.println("key:"+key.toString());
        System.out.println("value:"+value.toString());
    	context.write(key, value);
    }
}

3、对应的Reducer处理函数
package com.test.similarity.dataexport;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.bson.types.ObjectId;

import com.test.similarity.util.Constants;
import com.test.similarity.util.DateUtil;
import com.test.similarity.util.StringUtil;
import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;

public class ExportReducer extends Reducer<Text, Text, Text, BSONWritable>{
	private static final String splitFlag = Constants.FIELD_SPLIT; // ;
	private static final String keyValueFlag = Constants.KEY_VALUE_SIGN; // =
	private static final String itemSplitFlag = Constants.ITEM_SPLIT_SIGN; // #@#
	
	public void reduce(Text key, Iterable<Text> values,
			Context context) throws IOException, InterruptedException {
		String text = key.toString();
		if (StringUtil.isNull(text)) {
			return;
		}
		
		BSONWritable bsonWritable = new BSONWritable();
		BasicDBObject document = new BasicDBObject();
		String[] result = text.toString().trim().split(itemSplitFlag);
		int pos = 0;
		String content = "";
		document.put("_id",new ObjectId());
		for (int i = 0; i < result.length; i++) {
			content = result[i];
			pos = content.indexOf(keyValueFlag);
			document.put(content.substring(0, pos), content.substring(pos + 1));
		}
//		document.put("createTime", DateUtil.getCurDate(1));
		bsonWritable.setDoc(document);
		// System.out.println("bsonWritable:"+bsonWritable);
		context.write(key, bsonWritable);
	}
}

 

分享到:
评论
3 楼 aqi915 2015-12-08  
aqi915 写道
可以发下代码么,你的其它类没有呢

929228748@qq.com
2 楼 aqi915 2015-12-08  
可以发下代码么,你的其它类没有呢
1 楼 linux_yao 2014-03-26  
你好,可以贡献一下你的这个程序么?我是初学Hadoop,正想了解一下这方面的资料。你的这个我复制eclipse里面缺少很多类。谢谢了(mywiki95@gmail.com)。

相关推荐

    基于Hadoop与MongoDB整合技术的大数据处理分析.pdf

    Hadoop与MongoDB都是在大数据时代被广泛使用的技术,它们在处理和分析大规模数据方面拥有各自的优势。为了更好地处理日益增长的数据量,Hadoop与MongoDB之间的整合技术变得越来越重要。 Hadoop是一个由Apache基金会...

    MongoDB权威指南 + Hadoop权威指南

    MongoDB与Hadoop的结合使用也是常见的解决方案,例如,可以使用MongoDB进行实时查询和数据预处理,然后通过Hadoop进行深度分析和挖掘。这种混合架构可以充分利用两者的优势,实现高效的大数据处理流程。 总之,...

    [文档]Mongodb&Hadoop技术交流.pptx

    其核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,HDFS提供分布式文件存储,MapReduce则负责数据处理。 Hadoop的特点: 1. **分布式存储**:HDFS允许数据在多台机器上分散存储,提供高容错性和高...

    MongoDB Spark - Mongo首席技術架構師唐建法

    4. 高性能:MongoDB在读写性能上有出色表现,尤其是对于大数据集,可以实现毫秒级的响应时间。 5. 全局部署:MongoDB支持全球部署,能够轻松地在多个数据中心之间进行数据复制和同步。 Spark是一个开源的集群计算...

    distributeTemplate 可以进行所有mysql mongodb file rab分布式上的SQL语法进行 增删改查

    分布式模板(distributeTemplate)是一种高效且灵活的工具,它允许开发者执行SQL操作,包括增、删、改、查(CRUD),不仅限于单一的数据库系统,而是扩展到多种数据库环境,如MySQL、MongoDB、File、RabbitMQ、Redis...

    大数据技术原理与应用第二版(林子雨) 5个实验答案

    《大数据技术原理与应用第二版》是林子雨教授的一本深入探讨大数据技术的专业书籍,其配套的5个实验答案为学习者提供了实践操作的指导。在这个领域,大数据不仅仅是一种技术,更是一种处理海量信息的新思维方式。...

    Hadoop框架和生态圈介绍.docx

    **Shell命令操作HDFS**: - `copyFromLocal`:将本地文件系统中的文件复制到HDFS。 - `cat`:查看HDFS中文件的内容。 - `copyToLocal`:将HDFS中的文件复制回本地文件系统。 - `chmod`:更改HDFS文件或目录的权限。 ...

    hadoop应用开发技术详解代码

    - Hadoop命令行工具:如`hdfs dfs`用于与HDFS交互,`hadoop fs`进行文件操作。 2. **HDFS深入理解**(第4章): - HDFS设计理念:高容错性、可扩展性和数据本地化。 - 数据块与副本策略:了解数据是如何被切分成...

    Hadoop大数据管理实验详细参考文档

    "大数据管理实验之三Hadoop基础命令与编程初步.docx"则深入到Hadoop的使用层面,包括HDFS的基本命令,如上传、下载、查看文件等,以及编写MapReduce程序的初步概念,如Mapper和Reducer的工作原理,以及使用Java API...

    大数据技术交流78.pptx

    与批处理的MapReduce不同,流计算能够在数据到达时立即处理,确保了低延迟的响应时间。 【内存数据库】 内存数据库将数据存储在内存中,以实现更快的读写速度和更高的并发处理能力,特别适合于需要快速响应的交易...

    基于hadoop的云盘系统

    Hadoop是Apache软件基金会的一个开源项目,主要由HDFS(Hadoop Distributed File System)和MapReduce两部分组成,为海量数据提供了存储和计算能力。 【描述】中的技术选型 1. **SpringBoot**: SpringBoot是Java...

    bigdata_springboot,springboot整合

    例如,Spring Data Hadoop允许我们轻松地连接到Hadoop集群,执行MapReduce任务或者操作HDFS。Spring Data Spark提供了对Spark API的封装,简化了数据处理代码。 对于Spring Boot与Hadoop的整合,我们需要在项目中...

    移动网络数据中大数据处理的关键技术探索.zip

    1. NoSQL数据库:面对非结构化和半结构化的移动网络数据,NoSQL数据库如MongoDB、Cassandra等提供了灵活的数据模型和高效的读写性能。 2. 数据仓库:例如Hive和Pig,它们基于Hadoop,提供SQL-like查询语言,简化了...

    大数据技术原理与应用课程标准.pdf

    6. 实践操作与应用:课程特别强调实践操作的重要性,尤其是对Hadoop、HDFS、HBase和MapReduce等关键章节安排了入门级的实践操作,使学生能更好地掌握大数据关键技术,并了解这些技术在互联网、生物医学、物流等领域...

    教学大纲厦门大学-林子雨-大数据技术原理与应用

    3. HDFS:深入讲解分布式文件系统的基本概念、HDFS的架构、存储机制和读写操作,让学生熟练掌握其使用。 4. HBase:介绍分布式数据库HBase的访问接口、数据模型、工作原理和实际操作。 5. NoSQL数据库:对比传统...

    大数据技术原理与应用课件.7z

    第三章“分布式文件系统HDFS”会详细解析HDFS的工作原理,包括数据块的概念、副本策略、NameNode和DataNode的角色,以及HDFS的读写流程。理解HDFS对于开发和优化Hadoop应用至关重要。 第四章和第五章分别讨论了...

    《大数据技术原理与应用》课程标准.pdf

    3. 探索NoSQL数据库与传统关系型数据库的区别,了解NoSQL的四大类型(键值对、列族、文档型、图形数据库)和三大基石,掌握Redis和MongoDB等NoSQL数据库的使用。 4. 了解云数据库的基本概念、工作原理和实际应用,...

    基于Hadoop的大数据处理关键技术综述22.pptx

    - 大数据的特点包括高并发读写、海量存储和高可扩展性与高可用性需求。 - 大数据与云计算的关系密切,云计算为大数据提供了基础架构支持。 2. 大数据市场分析 - 2011年是中国大数据市场的起点,此后市场规模迅速...

    Hadoop+Zookeeper+HBase部署指南

    **Hadoop**:一款开源的大数据处理框架,主要由HDFS(Hadoop Distributed File System)和MapReduce组成,用于存储和处理大规模数据集。 **Zookeeper**:分布式协调服务,用于解决分布式应用中常见的一致性问题,如...

Global site tag (gtag.js) - Google Analytics