参考:http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/
附件是我编译和下载好的jar包(hadoop版本:hadoop-0.20.2-cdh3u3),需要的可下载直接使用。
首先,编译MongoDB Adapter
1.下载源码:
https://github.com/mongodb/mongo-hadoop
2.修改build.sbt
hadoopRelease in ThisBuild := "cdh3"
3.编译:
./sbt package
4.编译好后
在target目录生成mongo-hadoop_cdh3u3-1.1.0-SNAPSHOT.jar
在core/target目录生成mongo-hadoop-core_cdh3u3-1.1.0-SNAPSHOT.jar
5.下载mongo-2.7.3.jar:
wget --no-check-certificate https://github.com/downloads/mongodb/mongo-java-driver/mongo-2.7.3.jar
6. 将这两个jar包拷贝至hadoop集群每个节点的$HADOOP_HOME/lib目录
cp mongo-2.7.3.jar $HADOOP_HOME/lib/
cp core/target/mongo-hadoop-core_cdh3u3-1.1.0-SNAPSHOT.jar $HADOOP_HOME/lib/
7.准备input文件:hadoop fs -put input.txt /tmp/input/
vi input.txt
Hello,MongoDB! MongoDB Hello Hello,Hadoop Hadoop with MongoDB
8.wordcount程序:(我是在windows上通过eclipse连接hadoop的)
import java.io.*; import java.util.*; import org.apache.commons.logging.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.bson.*; import com.mongodb.hadoop.*; import com.mongodb.hadoop.util.*; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private final IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (final IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { final Configuration conf = new Configuration(); conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml")); conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml")); conf.addResource(new Path("F:/lxw-hadoop/core-site.xml")); conf.set("mapred.job.tracker", "10.133.103.21:50021"); MongoConfigUtil.setOutputURI(conf, "mongodb://10.133.103.23/test.out"); System.out.println("Conf: " + conf); final Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(MongoOutputFormat.class); FileInputFormat.addInputPath(job, new Path("/tmp/input/")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
9.运行结果:
10. 在MongoDB中查看结果:
> use test switched to db test > show collections blog out system.indexes > db.out.find() { "_id" : "Hadoop", "value" : 2 } { "_id" : "Hello", "value" : 4 } { "_id" : "MongoDB", "value" : 2 } { "_id" : "MongoDB!", "value" : 1 } { "_id" : "with", "value" : 1 } { "_id" : "world!", "value" : 1 }
相关推荐
在这个"阿里云EMR spark kafka redis MongoDB例子demo"中,我们看到了如何整合这些技术,构建一个实时数据处理系统,从Kafka获取数据,利用Spark Streaming进行实时分析,然后将结果存储到Redis和MongoDB,实现了...
1. **大数据的写入方法**:大数据写入通常涉及到分布式存储系统,如Hadoop HDFS(Hadoop Distributed File System)。在这样的系统中,数据被分割成多个块,并在多台服务器上并行写入,以提高写入速度和容错能力。...
文件写入时,Client将文件切分成Block并分别写入DataNode。MapReduce则是处理大数据的计算模型,数据先通过Map阶段进行拆分处理,然后由Reduce阶段汇总结果。 【Hbase详解】 Hbase是基于Hadoop的分布式NoSQL数据库...
- **优点**:高度可扩展、成本效益高、可靠且高效,尤其适用于一次写入多次读取的场景。 - **Hadoop生态系统**:包括HBase、Pig、Hive、Chukwa和ZooKeeper等组件,提供数据存储、分析和协调等功能。 5. **Hadoop...
HDFS遵循“一次写入,多次读取”的原则,确保数据的完整性。NameNode是HDFS的主节点,负责元数据管理,DataNodes则是存储数据的实际节点。 2. MapReduce MapReduce是一种编程模型,用于大规模数据集的并行计算。它...
2. Sqoop生成一个MapReduce任务,其中Mapper负责读取数据库中的记录,Reducer则将数据写入HDFS。 3. 数据传输过程中,Sqoop会自动处理分页和分块,以提高效率。 4. 对于增量导入,Sqoop可以根据时间戳、序列号或...
- **特点**:高容错性、适合一次写入多次读取的场景、可构建在廉价硬件上。 - **应用场景**:日志收集、流式数据处理、数据仓库等。 #### MapReduce - **原理**:将复杂问题分解成多个子任务进行并行计算。 - **...
数据通常以增量方式一次性写入,很少需要修改,读取时以顺序方式为主,同时支持并发的文件读写。分布式存储系统应该具备良好的可扩展性,以支持数据量的不断增长。 HDFS是一个分布式、可扩展、可移植的文件系统,用...
1. **键值数据库**:以键值对的形式存储数据,如Amazon的Dynamo,适用于需要快速读取和写入的场景,常用于缓存系统,如Memcached和Redis。 2. **列式数据库**:如Google的BigTable,适用于数据分析和处理,因为它们...
- 适用于一次写入多次读取的场景。 **3. 计算框架** - **MapReduce**:通过将计算任务分解为Map阶段和Reduce阶段,实现并行处理。适用于离线批处理场景。 - **Spark**:相比于MapReduce,Spark更加高效,尤其在...
Java API使得开发者能够方便地实现数据的读取、处理和写入。 2. **Spark与Java**:Apache Spark是另一个大数据处理框架,它提供了一种快速、通用和可扩展的数据处理方法。Spark支持多种编程语言,包括Java。Java ...
- **关键特性:** HDFS采用了一次写入多次读取的简单文件模型。 - **应用场景:** 这种模型非常适合用于存储静态或半静态数据,如日志文件、文档存储等。 **5. 跨平台兼容性** - **关键特性:** 由于HDFS基于...
- **文件写流程**:客户端向NameNode请求写入文件,NameNode返回DataNode信息,客户端将文件分割为Block后,按序写入各DataNode。 - **MapReduce**:一种分布式计算框架,用于处理大规模数据集,通过Map(映射)和...
Hadoop的优点在于其可扩展性、经济性、可靠性和高效性,尤其适用于批处理和“一次写入多次读取”的应用场景。然而,Hadoop不适合处理小文件、频繁的随机读取或需要频繁修改文件的场景。 Hadoop体系架构中还包括其他...
- **Hive**:通过类似SQL的语言(HiveQL)来读取、写入和管理Hadoop中的数据,降低了使用MapReduce进行数据分析的难度。 - **HBase**:基于Hadoop的一个分布式、可扩展的列存储数据库,适用于需要随机读取和更新数据...
- **结果输出**:将最终结果写入文件、数据库或可视化展示。 5. **大数据技术**:可能涉及到的技术和概念有: - **MapReduce**:一种分布式计算模型,用于处理大规模数据集。 - **Hadoop**:一个开源框架,支持...