`
小野bupt
  • 浏览: 14456 次
  • 性别: Icon_minigender_1
文章分类
社区版块
存档分类
最新评论

在MapReduce远程提交输出结果边为空

 
阅读更多

今天在做hadoop小实验(类似于倒排索引),

在本地运行时本地job“Running job: job_local_0001” 的得到的数据格式为为:

Hello	file3.txt:1;
MapReduce	file3.txt:2;file1.txt:1;file2.txt:1;
bye	file3.txt:1;
is	file1.txt:1;file2.txt:2;
powerful	file2.txt:1;
simple	file2.txt:1;file1.txt:1;


而 提交到集群上运行“Running job: job_201405091426_0019”得到数据格式为空值。

输入文件内容为:

file1.txt
	MapReduce is simple
file2.txt
	MapReduce is powerful is simple
file3.txt
	Hello MapReduce bye MapReduce
这三个文件。


搞了半天不知道什么问题。记录下来 以后解决。

程序源码如下:

package org.edu.bupt.xiaoye.hadooptest;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyInverseIndex {
	public static final String INPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_in";
	public static final String OUTPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUTPUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}
		conf.set("hadoop.job.user","hadoop");
		conf.set("mapred.job.tracker", "10.103.240.160:9001");
		final Job job = new Job(conf, MyInverseIndex.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, outPath);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(1);//设置个数为1
		job.waitForCompletion(true);
		
	}

	/**
	 * 只适用于文档中只出现一行 (可以一次读取整个文档)
	 * 
	 * @author hadoop
	 * 
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
		Map<String, Integer> map = new HashMap();

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] words = line.split(" ");
			for (String s : words) {
				if (map.containsKey(s)) {
					map.put(s, map.get(s) + 1);
				} else {
					map.put(s, 1);
				}
			}
			Set<String> keys = map.keySet();
			for (Iterator it = keys.iterator(); it.hasNext();) {
				String s = (String) it.next();
				context.write(new Text(s),
						new Text(((FileSplit) context.getInputSplit()).getPath().getName().toString() + ":" + map.get(s)));
			}
			map.clear();
		}

	}

	public static class MyReduce extends Reducer<Text, Text, Text, Text> {
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException 
		{
			StringBuffer files = new StringBuffer();
			for(Text fileName : values){
				files.append(fileName+";");
			}
			context.write(key , new Text(files.toString()));			
		}
	}

}

今天在检查的时候,重写了一遍,把MyReduce改成这样就好了,奇怪。

public static class MyReduce extends Reducer<Text, Text, Text, Text> {		
		private Text result = new Text();
		// 实现reduce函数
		public void reduce(Text key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException {
			StringBuffer fileList = new StringBuffer();
			for(Text value : values){
				fileList.append(value.toString()+";");
			}
			result.set(fileList.toString());
			context.write(key, result);
		}
	}


分享到:
评论

相关推荐

    MapReduce的实现细节

    在Hadoop MapReduce中,服务器间的通信主要依赖于远程过程调用(RPC)机制。具体来说: - 客户端通过RPC接口向作业服务器提交作业。 - 作业服务器通过RPC接口分配任务给任务服务器。 - 任务服务器通过RPC接口向作业...

    window下eclipse中运行mapreduce程序所需要的Hadoop全部jar包

    总之,要在Windows下的Eclipse环境中成功运行MapReduce程序,关键在于正确配置Hadoop环境,导入所有必要的jar包,并理解如何设置和提交MapReduce作业。这个过程可能需要一些时间和实践,但一旦配置完成,将为高效...

    大数据Hadoop核心之MapReduce详解

    MapReduce的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 MapReduce的优点: 1. 易于编程:MapReduce简单的实现一些接口,就可以完成一个...

    大数据技术 ODPS MapReduce对外开放实践 共20页.pptx

    总的来说,ODPS MapReduce是大数据处理领域的一个强大工具,它的开放实践不仅展示了其在大规模数据处理中的能力,也为开发者提供了丰富的工具和接口,促进了大数据应用的创新与发展。随着技术的不断进步,我们期待...

    eclipse運行mapreduce的插件

    Eclipse中的Hadoop插件通常包含了Hadoop的模拟器或者与Hadoop集群的连接功能,允许开发者在本地环境中模拟MapReduce作业的执行流程,或者直接提交到远程集群上运行。这样,开发者可以在不离开Eclipse IDE的情况下...

    详细介绍Hadoop家族中的MapReduce原理

    3. 不擅长 DAG(有向图)计算:MapReduce 并不是不能做,但是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。 MapReduce 的核心思想是将分布式运算程序分成至少...

    hadoop mapreduce

    它借鉴了Google的MapReduce编程模型,将大型数据集分解为小块,然后在集群中的多台机器上并行处理这些小块。MapReduce包含两个主要阶段:Map阶段和Reduce阶段,它们共同实现了数据的分布式处理。 Map阶段: 在这个...

    mapreduce_plugin

    Eclipse MapReduce Plugin则是专门为在Eclipse集成开发环境中进行Hadoop MapReduce应用程序开发而设计的工具。这款插件极大地简化了开发者的工作流程,提供了丰富的功能,帮助开发者编写、调试和运行MapReduce程序。...

    MapReduce - WordCount案例 - 含各种部署方式源码

    本文将详细解析MapReduce在实现WordCount案例中的原理、步骤以及如何通过Java进行编程,并涵盖本地提交和远程调用的不同部署方式。 1. **MapReduce基本原理** MapReduce分为两个主要阶段:Map阶段和Reduce阶段。...

    Hadoop_MapReduce_HDFS示例代码

    5. **remote_test**:这个可能涉及到远程Hadoop集群的测试代码,如提交MapReduce作业到远程集群运行,检查作业状态,或者对HDFS上的文件进行远程操作。 学习这些示例代码,不仅可以理解Hadoop生态的基本组件和它们...

    远程调用执行Hadoop Map/Reduce

    7. **工具集成**:有许多开源工具可以帮助我们远程提交和管理Hadoop作业,如Hadoop命令行工具、Hadoop的Web UI、Apache Oozie工作流管理系统等。这些工具提供了方便的接口,使开发者能便捷地与集群交互。 8. **安全...

    基于RPC的MapReduce简单实现.zip

    MapReduce是Google提出的一种分布式计算模型,它将大规模数据处理任务分解为两个主要阶段:Map阶段和Reduce阶段,使得在分布式集群中并行处理成为可能。 Hadoop是Apache软件基金会开发的一个开源框架,它实现了...

    分布式计算(MapReduce)参考.pdf

    Hadoop MapReduce的通信基于远程过程调用(RPC)接口,JobTracker作为RPC服务器,TaskTracker和客户端通过RPC代理进行通信。与HDFS相比,MapReduce的通信简化了客户端与TaskTracker以及TaskTracker之间的直接交互,...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    客户端用于提交MapReduce作业,监控作业执行状态,并且获取作业的执行结果。JobTracker则是Hadoop集群中的主节点,负责资源管理、任务调度和作业监控。TaskTracker运行在从节点上,执行实际的数据处理任务,包括Map...

    myEclipse10.0与hadoop集群远程连接

    在IT行业中,myEclipse10.0是一款广泛使用的Java集成开发环境,它为开发者提供了丰富的功能,包括代码编辑、调试、构建以及项目管理等。Hadoop则是一个开源的大数据处理框架,它允许分布式存储和处理大规模数据集。...

    Hadoop系统安装运行与程序开发

    在集群环境中,远程提交作业涉及到使用Hadoop提供的API或者命令行工具将作业提交到Hadoop集群上运行。这涉及到对HDFS的读写操作以及MapReduce任务的调度和执行。Hadoop集群通过HDFS存储数据,通过YARN(Yet Another ...

    基于Java和ssh在Hadoop平台上完成文件操作

    结合SSH,可以在远程节点上执行数据分析脚本,对MapReduce的结果进行进一步处理。 文件名为“ssh_v3-1.1”的压缩包可能包含的是JSch库的一个版本,它是Java实现SSH连接的常用库,版本可能是v3.1.1。这个库可以帮助...

    hadoop mr程序0.20之后版本所需jar包

    在MapReduce程序中,它支持与HDFS(Hadoop Distributed File System)的交互,数据输入/输出,以及节点间的通信。 2. **hadoop-mapreduce-client-core-2.0.5-alpha.jar**:这个JAR文件包含了Hadoop MapReduce客户端...

    HadoopMapReduceArch.pdf

    - **Combiner 输出示例**:假设输入为 "hi Owen bye Owen",Mapper 生成的输出为 (hi, 1), (Owen, 1), (bye, 1), (Owen, 1),而 Combiner 的输出则为 (Owen, 2), (bye, 1), (hi, 1)。 #### 七、进程间通信 ...

Global site tag (gtag.js) - Google Analytics