环境:Vmware 8.0 和Ubuntu11.04
Hadoop 实战之MapReduce链接作业之预处理
第一步:首先创建一个工程命名为HadoopTest.目录结构如下图:
第二步: 在/home/tanglg1987目录下新建一个start.sh脚本文件,每次启动虚拟机都要删除/tmp目录下的全部文件,重新格式化namenode,代码如下:
- sudo rm -rf /tmp/*
- rm -rf /home/tanglg1987/hadoop-0.20.2/logs
- hadoop namenode -format
- hadoop datanode -format
- start-all.sh
- hadoop fs -mkdir input
- hadoop dfsadmin -safemode leave
第三步:给start.sh增加执行权限并启动hadoop伪分布式集群,代码如下:
- chmod 777 /home/tanglg1987/start.sh
- ./start.sh
执行过程如下:
第四步:上传本地文件到hdfs
在/home/tanglg1987目录下新建Customer.txt内容如下:
- 100 tom 90
- 101 mary 85
- 102 kate 60
上传本地文件到hdfs:
- hadoop fs -put /home/tanglg1987/ChainMapper.txt input
第五步:新建一个ChainMapperDemo.java,代码如下:
- package com.baison.action;
- import java.io.IOException;
- import java.util.*;
- import java.lang.String;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.mapred.*;
- import org.apache.hadoop.util.*;
- import org.apache.hadoop.mapred.lib.*;
- publicclass ChainMapperDemo {
- publicstaticclass Map00 extends MapReduceBase implements
- Mapper<Text, Text, Text, Text> {
- publicvoid map(Text key, Text value, OutputCollector output,
- Reporter reporter) throws IOException {
- Text ft = new Text("100");
- if (!key.equals(ft)) {
- output.collect(key, value);
- }
- }
- }
- publicstaticclass Map01 extends MapReduceBase implements
- Mapper<Text, Text, Text, Text> {
- publicvoid map(Text key, Text value, OutputCollector output,
- Reporter reporter) throws IOException {
- Text ft = new Text("101");
- if (!key.equals(ft)) {
- output.collect(key, value);
- }
- }
- }
- publicstaticclass Reduce extends MapReduceBase implements
- Reducer<Text, Text, Text, Text> {
- publicvoid reduce(Text key, Iterator values, OutputCollector output,
- Reporter reporter) throws IOException {
- while (values.hasNext()) {
- output.collect(key, values.next());
- }
- }
- }
- publicstaticvoid main(String[] args) throws Exception {
- String[] arg = { "hdfs://localhost:9100/user/tanglg1987/input/ChainMapper.txt",
- "hdfs://localhost:9100/user/tanglg1987/output" };
- JobConf conf = new JobConf(ChainMapperDemo.class);
- conf.setJobName("ChainMapperDemo");
- conf.setInputFormat(KeyValueTextInputFormat.class);
- conf.setOutputFormat(TextOutputFormat.class);
- ChainMapper cm = new ChainMapper();
- JobConf mapAConf = new JobConf(false);
- cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class,
- Text.class, true, mapAConf);
- JobConf mapBConf = new JobConf(false);
- cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class,
- Text.class, true, mapBConf);
- conf.setReducerClass(Reduce.class);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
- FileInputFormat.setInputPaths(conf, new Path(arg[0]));
- FileOutputFormat.setOutputPath(conf, new Path(arg[1]));
- JobClient.runJob(conf);
- }
- }
第六步:Run On Hadoop,运行过程如下:
12/10/17 21:05:53 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/10/17 21:05:53 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/10/17 21:05:53 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
12/10/17 21:05:54 INFO mapred.FileInputFormat: Total input paths to process : 1
12/10/17 21:05:54 INFO mapred.JobClient: Running job: job_local_0001
12/10/17 21:05:54 INFO mapred.FileInputFormat: Total input paths to process : 1
12/10/17 21:05:54 INFO mapred.MapTask: numReduceTasks: 1
12/10/17 21:05:54 INFO mapred.MapTask: io.sort.mb = 100
12/10/17 21:05:54 INFO mapred.MapTask: data buffer = 79691776/99614720
12/10/17 21:05:54 INFO mapred.MapTask: record buffer = 262144/327680
12/10/17 21:05:54 INFO mapred.MapTask: Starting flush of map output
12/10/17 21:05:54 INFO mapred.MapTask: Finished spill 0
12/10/17 21:05:54 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/10/17 21:05:54 INFO mapred.LocalJobRunner: hdfs://localhost:9100/user/tanglg1987/input/ChainMapper.txt:0+35
12/10/17 21:05:54 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
12/10/17 21:05:54 INFO mapred.LocalJobRunner:
12/10/17 21:05:54 INFO mapred.Merger: Merging 1 sorted segments
12/10/17 21:05:54 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 16 bytes
12/10/17 21:05:54 INFO mapred.LocalJobRunner:
12/10/17 21:05:54 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/10/17 21:05:54 INFO mapred.LocalJobRunner:
12/10/17 21:05:54 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/10/17 21:05:54 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://localhost:9100/user/tanglg1987/output
12/10/17 21:05:54 INFO mapred.LocalJobRunner: reduce > reduce
12/10/17 21:05:54 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
12/10/17 21:05:55 INFO mapred.JobClient: map 100% reduce 100%
12/10/17 21:05:55 INFO mapred.JobClient: Job complete: job_local_0001
12/10/17 21:05:55 INFO mapred.JobClient: Counters: 15
12/10/17 21:05:55 INFO mapred.JobClient: FileSystemCounters
12/10/17 21:05:55 INFO mapred.JobClient: FILE_BYTES_READ=36152
12/10/17 21:05:55 INFO mapred.JobClient: HDFS_BYTES_READ=70
12/10/17 21:05:55 INFO mapred.JobClient: FILE_BYTES_WRITTEN=73202
12/10/17 21:05:55 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=12
12/10/17 21:05:55 INFO mapred.JobClient: Map-Reduce Framework
12/10/17 21:05:55 INFO mapred.JobClient: Reduce input groups=1
12/10/17 21:05:55 INFO mapred.JobClient: Combine output records=0
12/10/17 21:05:55 INFO mapred.JobClient: Map input records=3
12/10/17 21:05:55 INFO mapred.JobClient: Reduce shuffle bytes=0
12/10/17 21:05:55 INFO mapred.JobClient: Reduce output records=1
12/10/17 21:05:55 INFO mapred.JobClient: Spilled Records=2
12/10/17 21:05:55 INFO mapred.JobClient: Map output bytes=12
12/10/17 21:05:55 INFO mapred.JobClient: Map input bytes=35
12/10/17 21:05:55 INFO mapred.JobClient: Combine input records=0
12/10/17 21:05:55 INFO mapred.JobClient: Map output records=1
12/10/17 21:05:55 INFO mapred.JobClient: Reduce input records=1
第七步:查看结果集,运行结果如下:
- sudo rm -rf /tmp/*
- rm -rf /home/tanglg1987/hadoop-0.20.2/logs
- hadoop namenode -format
- hadoop datanode -format
- start-all.sh
- hadoop fs -mkdir input
- hadoop dfsadmin -safemode leave
第三步:给start.sh增加执行权限并启动hadoop伪分布式集群,代码如下:
- chmod 777 /home/tanglg1987/start.sh
- ./start.sh
相关推荐
由于HBase是建立在Hadoop生态系统之上,因此它可以和Hadoop的其他组件,如MapReduce,很好地协同工作。 Hadoop MapReduce是Hadoop的核心组件之一,它提供了编程模型,允许开发者以分布式方式进行复杂的数据处理。在...
标题中的“Hadoop MapReduce:数据集链接的Hadoop MapReduce实践问题”表明我们将探讨如何在Hadoop MapReduce框架中处理数据集之间的连接操作。在大数据处理领域,数据集链接是常见的任务,例如用于合并来自不同来源...
map函数是MapReduce模型中的核心组件之一。它的输入是一个键值对,通常表示为(key, value),输出也是一组键值对。map函数的主要目的是对输入数据进行预处理,例如拆分文本文件中的行、解析XML文档或执行其他形式的...
7. **运行MapReduce作业**:最后,通过Hadoop的JobTracker或YARN提交并运行MapReduce作业。作业完成后,话单数据将成功导入到HBase。 文件“LoadDataToHBase.sh”可能是一个bash脚本,用于自动化上述过程,包括启动...
“对”和“条纹”是两种用于组织MapReduce作业输出的有效方式。通过对输出结果进行适当的组织,可以进一步简化后续处理步骤,提高整体效率。 - **对**:是指将相关联的数据放在一起,便于后续处理。 - **条纹**:则...
10. **案例应用**:MapReduce常用于网页链接分析、日志处理、机器学习模型训练等场景。例如,Google的PageRank算法就是MapReduce的一个典型应用,通过Map阶段计算网页之间的链接关系,Reduce阶段计算每个网页的...
Java作为实现语言,提供了Hadoop库支持MapReduce编程,开发人员需要熟悉Hadoop API,如`org.apache.hadoop.mapreduce`包中的类和接口,以及如何配置和运行MapReduce作业。 总结来说,本项目是使用Java和MapReduce...
在这个项目中,它可能会详细解释如何获取和预处理新冠疫情数据,以及如何配置和执行Hadoop MapReduce作业。 `pom.xml` 是一个Maven项目的配置文件,用于管理项目依赖。在这个项目中,它可能列出了Hadoop和其他相关...
Pagerank是Google搜索引擎早期的关键技术之一,它通过分析网页之间的链接关系来确定其在整体网络中的权重。在这个项目中,我们将Pagerank应用于微博数据,以便理解用户影响力、话题热度和社交网络结构。 **Pagerank...
在实际运行中,我们可能需要多次执行MapReduce作业,直到PageRank值的改变小于某个阈值,或者达到预定的最大迭代次数。最后,Reduce任务的输出将包含每篇文章的最终PageRank值,可以进一步存储到数据库或文件系统中...
在Hadoop Eclipse Plugin 2.7.3的帮助下,开发者可以直接在Eclipse中创建、编辑和运行Hadoop MapReduce作业,无需离开熟悉的IDE。这大大减少了在不同工具间切换的时间成本,提高了开发效率。此外,该插件还支持HDFS...
6. **优化技巧**:MapReduceCase中的例子可能还涵盖了如何优化MapReduce作业,如减少数据传输、利用Combiner减少网络负载、优化Reduce数量以及使用自定义分区策略等。 通过深入学习和实践MapReduceCase中的每个案例...
在Hadoop集群上提交MapReduce作业,指定输入路径(TXT文件所在目录)和输出路径(生成的ORC文件目录)。 5. **ORC文件的优势**: - **压缩**:ORC文件支持多种压缩算法,如ZLIB和SNAPPY,可以有效减小存储空间。 ...
在Google,MapReduce被广泛应用于各种日常任务,每天有成千上万的MapReduce作业在运行。 然而,随着大数据技术的发展,Hadoop也在不断进化,例如YARN(Yet Another Resource Negotiator)取代了JobTracker,提供了...
目录:网盘文件永久链接 000 大数据学习要求-王艳芝 001 大数据行业与技术趋势-李申浩 002 Fusion Insight HD解决方案介绍-李申浩 003 HDFS技术原理01-李申浩 004 HDFS技术原理02-李申浩 005 大数据平台架构和...
5. **图计算系统**:专门用于处理社交网络、网页链接等图数据。 6. **内存计算系统**:利用内存的高速特性处理大规模数据。 #### 六、大数据处理流程 1. **数据采集**:获取原始数据的过程。 2. **数据清理**:...
通过PyHadoop,开发者可以直接在Python环境中编写MapReduce作业。 5. **Data Science Libraries**:Python拥有众多的数据科学库,如NumPy、Pandas和Scikit-learn,这些库能够很好地与Hadoop结合,进行数据预处理、...
1. **数据预处理**:首先,需要从Common Crawl Dataset中提取出所有网页的URL和它们之间的链接关系。这通常通过MapReduce的map阶段完成,map函数读取WARC文件,解析出链接结构,并将数据格式化为`(url1, [url2, url3...
- **MapReduce**:默认的执行引擎,处理数据时使用 MapReduce 作业。 - **Tez** 和 **Spark**:作为替代的执行引擎,提供更快的查询性能。 5. **Hive 源码分析**: - **解析器(Parser)**:将 SQL 语句转化为...