今天在做hadoop小实验(类似于倒排索引),
在本地运行时本地job“Running job: job_local_0001” 的得到的数据格式为为:
Hello file3.txt:1;
MapReduce file3.txt:2;file1.txt:1;file2.txt:1;
bye file3.txt:1;
is file1.txt:1;file2.txt:2;
powerful file2.txt:1;
simple file2.txt:1;file1.txt:1;
而 提交到集群上运行“Running job: job_201405091426_0019”得到数据格式为空值。
输入文件内容为:
file1.txt
MapReduce is simple
file2.txt
MapReduce is powerful is simple
file3.txt
Hello MapReduce bye MapReduce
这三个文件。
搞了半天不知道什么问题。记录下来 以后解决。
程序源码如下:
package org.edu.bupt.xiaoye.hadooptest;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyInverseIndex {
public static final String INPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_in";
public static final String OUTPUT_PATH = "hdfs://10.103.240.160:9000/usr/hadoop/MyInverseIndex_out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUTPUT_PATH);
if (fileSystem.exists(outPath)) {
fileSystem.delete(outPath, true);
}
conf.set("hadoop.job.user","hadoop");
conf.set("mapred.job.tracker", "10.103.240.160:9001");
final Job job = new Job(conf, MyInverseIndex.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job, outPath);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);//设置个数为1
job.waitForCompletion(true);
}
/**
* 只适用于文档中只出现一行 (可以一次读取整个文档)
*
* @author hadoop
*
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
Map<String, Integer> map = new HashMap();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String s : words) {
if (map.containsKey(s)) {
map.put(s, map.get(s) + 1);
} else {
map.put(s, 1);
}
}
Set<String> keys = map.keySet();
for (Iterator it = keys.iterator(); it.hasNext();) {
String s = (String) it.next();
context.write(new Text(s),
new Text(((FileSplit) context.getInputSplit()).getPath().getName().toString() + ":" + map.get(s)));
}
map.clear();
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
StringBuffer files = new StringBuffer();
for(Text fileName : values){
files.append(fileName+";");
}
context.write(key , new Text(files.toString()));
}
}
}
今天在检查的时候,重写了一遍,把MyReduce改成这样就好了,奇怪。
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuffer fileList = new StringBuffer();
for(Text value : values){
fileList.append(value.toString()+";");
}
result.set(fileList.toString());
context.write(key, result);
}
}
分享到:
相关推荐
在Hadoop MapReduce中,服务器间的通信主要依赖于远程过程调用(RPC)机制。具体来说: - 客户端通过RPC接口向作业服务器提交作业。 - 作业服务器通过RPC接口分配任务给任务服务器。 - 任务服务器通过RPC接口向作业...
总之,要在Windows下的Eclipse环境中成功运行MapReduce程序,关键在于正确配置Hadoop环境,导入所有必要的jar包,并理解如何设置和提交MapReduce作业。这个过程可能需要一些时间和实践,但一旦配置完成,将为高效...
MapReduce的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 MapReduce的优点: 1. 易于编程:MapReduce简单的实现一些接口,就可以完成一个...
总的来说,ODPS MapReduce是大数据处理领域的一个强大工具,它的开放实践不仅展示了其在大规模数据处理中的能力,也为开发者提供了丰富的工具和接口,促进了大数据应用的创新与发展。随着技术的不断进步,我们期待...
Eclipse中的Hadoop插件通常包含了Hadoop的模拟器或者与Hadoop集群的连接功能,允许开发者在本地环境中模拟MapReduce作业的执行流程,或者直接提交到远程集群上运行。这样,开发者可以在不离开Eclipse IDE的情况下...
3. 不擅长 DAG(有向图)计算:MapReduce 并不是不能做,但是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。 MapReduce 的核心思想是将分布式运算程序分成至少...
7. **工具集成**:有许多开源工具可以帮助我们远程提交和管理Hadoop作业,如Hadoop命令行工具、Hadoop的Web UI、Apache Oozie工作流管理系统等。这些工具提供了方便的接口,使开发者能便捷地与集群交互。 8. **安全...
它借鉴了Google的MapReduce编程模型,将大型数据集分解为小块,然后在集群中的多台机器上并行处理这些小块。MapReduce包含两个主要阶段:Map阶段和Reduce阶段,它们共同实现了数据的分布式处理。 Map阶段: 在这个...
Eclipse MapReduce Plugin则是专门为在Eclipse集成开发环境中进行Hadoop MapReduce应用程序开发而设计的工具。这款插件极大地简化了开发者的工作流程,提供了丰富的功能,帮助开发者编写、调试和运行MapReduce程序。...
本文将详细解析MapReduce在实现WordCount案例中的原理、步骤以及如何通过Java进行编程,并涵盖本地提交和远程调用的不同部署方式。 1. **MapReduce基本原理** MapReduce分为两个主要阶段:Map阶段和Reduce阶段。...
5. **remote_test**:这个可能涉及到远程Hadoop集群的测试代码,如提交MapReduce作业到远程集群运行,检查作业状态,或者对HDFS上的文件进行远程操作。 学习这些示例代码,不仅可以理解Hadoop生态的基本组件和它们...
MapReduce是Google提出的一种分布式计算模型,它将大规模数据处理任务分解为两个主要阶段:Map阶段和Reduce阶段,使得在分布式集群中并行处理成为可能。 Hadoop是Apache软件基金会开发的一个开源框架,它实现了...
Hadoop MapReduce的通信基于远程过程调用(RPC)接口,JobTracker作为RPC服务器,TaskTracker和客户端通过RPC代理进行通信。与HDFS相比,MapReduce的通信简化了客户端与TaskTracker以及TaskTracker之间的直接交互,...
在IT行业中,myEclipse10.0是一款广泛使用的Java集成开发环境,它为开发者提供了丰富的功能,包括代码编辑、调试、构建以及项目管理等。Hadoop则是一个开源的大数据处理框架,它允许分布式存储和处理大规模数据集。...
在集群环境中,远程提交作业涉及到使用Hadoop提供的API或者命令行工具将作业提交到Hadoop集群上运行。这涉及到对HDFS的读写操作以及MapReduce任务的调度和执行。Hadoop集群通过HDFS存储数据,通过YARN(Yet Another ...
结合SSH,可以在远程节点上执行数据分析脚本,对MapReduce的结果进行进一步处理。 文件名为“ssh_v3-1.1”的压缩包可能包含的是JSch库的一个版本,它是Java实现SSH连接的常用库,版本可能是v3.1.1。这个库可以帮助...
在MapReduce程序中,它支持与HDFS(Hadoop Distributed File System)的交互,数据输入/输出,以及节点间的通信。 2. **hadoop-mapreduce-client-core-2.0.5-alpha.jar**:这个JAR文件包含了Hadoop MapReduce客户端...
- **Combiner 输出示例**:假设输入为 "hi Owen bye Owen",Mapper 生成的输出为 (hi, 1), (Owen, 1), (bye, 1), (Owen, 1),而 Combiner 的输出则为 (Owen, 2), (bye, 1), (hi, 1)。 #### 七、进程间通信 ...
在远程模式下,Hive on Spark 会将 Task 提交到 Spark 集群中执行,然后等待执行结果。 在任务执行过程中,Hive on Spark 会将 Task 分配到多个 executor 中执行,每个 executor 负责执行一个 Task。 executor 会将...