`
退役的龙弟弟
  • 浏览: 453752 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop1.2.1 MultipleOutputs将结果输出到多个文件或文件夹

 
阅读更多

hadoop1.2.1中使用MultipleOutputs将结果输出到多个文件或文件夹

使用步骤主要有三步:

1、在reduce或map类中创建MultipleOutputs对象,将结果输出

class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{

	//将结果输出到多个文件或多个文件夹
	private MultipleOutputs<Text,IntWritable> mos;
    //创建对象
    protected void setup(Context context) throws IOException,InterruptedException {
        mos = new MultipleOutputs<Text, IntWritable>(context);
     }
    	
        //关闭对象
	protected void cleanup(Context context) throws IOException,InterruptedException {
        mos.close();
	}
}

 2、在map或reduce方法中使用MultipleOutputs对象输出数据,代替congtext.write()

protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		IntWritable V = new IntWritable();
		int sum = 0;
		for(IntWritable value : values){
			sum = sum + value.get();
		}
		System.out.println("word:" + key.toString() + "     sum = " + sum);
		V.set(sum);

		//使用MultipleOutputs对象输出数据
		if(key.toString().equals("hello")){
			mos.write("hello", key, V);
		}else if(key.toString().equals("world")){
			mos.write("world", key, V);
		}else if(key.toString().equals("hadoop")){
			//输出到hadoop/hadoopfile-r-00000文件
			mos.write("hadoopfile", key, V, "hadoop/");
		}
		
	}

 

 3、在创建job时,定义附加的输出文件,这里的文件名称与第二步设置的文件名相同

//定义附加的输出文件
			MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);
			MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);
			MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);

 

完整代码:

 

package com.ru.hadoop.wordcount;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Progressable;

public class WordCount2 extends Configured{

	public static void main(String[] args) {
		String in = "/home/nange/work/test/word/";
		String out = "hdfs://localhost:9000/hdfs/test/wordcount/out/";
		
		Job job;
		try {
			//删除hdfs目录
			WordCount2 wc2 = new WordCount2();
			wc2.removeDir(out);
			
			job = new Job(new Configuration(), "wordcount Job");
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			job.setMapperClass(mapperString.class);
//			job.setCombinerClass(reduceStatistics.class);
			job.setReducerClass(reduceStatistics.class);
			
			//定义附加的输出文件
			MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);
			MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);
			MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);
			
			FileInputFormat.addInputPath(job, new Path(in));
			FileOutputFormat.setOutputPath(job, new Path(out));
			job.waitForCompletion(true);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (URISyntaxException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public void removeDir(String filePath) throws IOException, URISyntaxException{
		String url = "hdfs://localhost:9000";
		FileSystem fs  = FileSystem.get(new URI(url), new Configuration());
		fs.delete(new Path(filePath));
	}
}


/**
 * 重写maptask使用的map方法 
 * @author nange
 *
 */
class mapperString extends Mapper<LongWritable, Text, Text, IntWritable>{
	//设置正则表达式的编译表达形式
	public static Pattern PATTERN = Pattern.compile(" ");
	Text K = new Text();
	IntWritable V = new IntWritable(1);
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		String[] words = PATTERN.split(value.toString());
		System.out.println("********" + value.toString());
		for(String word : words){
			K.set(word);
			context.write(K, V);
		}
	}
}

/**
 * 对单词做统计
 * @author nange
 *
 */
class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{

	//将结果输出到多个文件或多个文件夹
	private MultipleOutputs<Text,IntWritable> mos;
	//创建MultipleOutputs对象
    protected void setup(Context context) throws IOException,InterruptedException {
        mos = new MultipleOutputs<Text, IntWritable>(context);
     }
    
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		IntWritable V = new IntWritable();
		int sum = 0;
		for(IntWritable value : values){
			sum = sum + value.get();
		}
		System.out.println("word:" + key.toString() + "     sum = " + sum);
		V.set(sum);

		//使用MultipleOutputs对象输出数据
		if(key.toString().equals("hello")){
			mos.write("hello", key, V);
		}else if(key.toString().equals("world")){
			mos.write("world", key, V);
		}else if(key.toString().equals("hadoop")){
			//输出到hadoop/hadoopfile-r-00000文件
			mos.write("hadoopfile", key, V, "hadoop/");
		}
		
	}
	
	//关闭MultipleOutputs对象
	protected void cleanup(Context context) throws IOException,InterruptedException {
        mos.close();
	}
}

 

分享到:
评论

相关推荐

    hadoop1.2.1修改WordCount并编译

    在 IntSumReducer 中,我们使用一个可变的整数值来累加每个单词的出现次数,并将最终结果输出到控制台上。在 reduce 函数中,我们还使用 System.out.print 语句来输出调试信息,以便在控制台上查看程序的执行过程。 ...

    hadoop1.2.1安装部署文档

    - 不支持多用户写入或并发写入同一个文件。 - **HDFS 系统架构**:HDFS 采用 Master/Slave 架构,Master 即 NameNode,负责管理文件系统的命名空间以及客户端对文件的访问;Slave 即 DataNode,负责存储实际的数据...

    hadoop1.2.1安装

    - 在namenode (test1)上解压缩Hadoop 1.2.1软件包,然后将其移动到/home/hadoop目录。 - 修改六个关键配置文件: - `hadoop-env.sh`: 设置JAVA_HOME路径。 - `core-site.xml`: 配置Hadoop临时目录和默认FS。 - ...

    hadoop-1.2.1-api

    `hadoop中文版API.chm`是一个帮助文件,包含了Hadoop 1.2.1 API的中文翻译,对于中国开发者来说非常实用。这个CHM文件通常包含详细的类和接口描述,方法、构造函数、枚举和常量等,便于开发者快速查找和理解Hadoop的...

    hadoop-1.2.1运行WordCount

    ### Hadoop-1.2.1 运行WordCount...总之,运行Hadoop-1.2.1下的WordCount示例涉及多个步骤,从环境搭建、数据准备到程序执行及结果验证都需要仔细操作。特别是在遇到问题时,应根据具体错误信息逐一排查并解决问题。

    hadoop-1.2.1源码(完整版)

    这个压缩包“hadoop-1.2.1源码(完整版)”提供了Hadoop 1.2.1版本的完整源代码,这对于开发者来说是极其宝贵的资源,能够深入理解Hadoop的工作原理,进行二次开发或自定义优化。 首先,我们来看“src”目录,这是...

    hadoop 1.2.1核心源码

    这个压缩包文件“hadoop 1.2.1核心源码”包含了Hadoop项目的核心组件,让我们深入探讨一下其中涉及的关键知识点。 1. **Hadoop架构**:Hadoop的核心由两个主要部分组成:HDFS(Hadoop Distributed File System)和...

    hadoop1.2.1环境

    在Linux上配置的hadoop1.2.1完全分布式环境

    hadoop1.2.1-API中文版

    Hadoop 1.2.1是Hadoop发展中的一个重要版本,它在早期版本的基础上进行了多方面的优化和改进,为开发者提供了更稳定、功能更全面的API接口。 Hadoop的核心组件主要包括HDFS(Hadoop Distributed File System)和...

    hadoop1.2.1-eclipse-indigo插件

    7. **版本兼容性**:Hadoop1.2.1 版本是一个相对早期的版本,其特点是稳定性和兼容性较好,但可能不包含一些较新的特性或性能优化。使用这个插件,开发者可以确保与该版本 Hadoop 的兼容性。 8. **安装与更新**:在...

    hadoop -1.2.1-jar(全)

    在给定的压缩包文件“hadoop-1.2.1-jar(全)”中,包含了Hadoop 1.2.1版本的多个重要组件的JAR包,这对于开发者和系统管理员在搭建、理解和使用Hadoop环境时非常关键。 1. **hadoop-core-1.2.1.jar**:这是Hadoop的...

    hadoop1.2.1配置过程

    创建或修改 `/app/hadoop-1.2.1/conf/core-site.xml` 文件,在 `&lt;configuration&gt;` 标签内添加如下内容: &lt;name&gt;fs.default.name &lt;value&gt;hdfs://HMY2:9000 &lt;name&gt;hadoop.tmp.dir &lt;value&gt;/app/data/tmp ...

    Hadoop1.2.1配置Ganlia监控

    Hadoop1.2.1配置Ganlia监控

    hadoop1-2-1源码

    Hadoop的核心思想是将大型数据集分布到集群上的多台计算机上进行处理,从而实现高效的数据存储和计算。HDFS是Hadoop的分布式文件系统,它设计的目标是高容错性、高吞吐量和低成本。HDFS通过数据复制来保证数据的可靠...

    hadoop1.2.1部署说明

    部署Hadoop 1.2.1集群是一个复杂但有序的过程,涉及到系统配置、软件安装、环境变量设置、SSH安全配置以及详细的Hadoop配置。遵循上述指南,您可以顺利地在三台虚拟机上建立一个功能完善的Hadoop集群,为大数据处理...

    好用hadoop-eclipse-plugin-1.2.1

    hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1

    Hadoop 1.2.1 伪分布式安装 Mac版

    在这个教程中,我们将详细介绍如何在Mac系统上安装Hadoop 1.2.1的伪分布式模式,这是一种单机模拟多节点环境的方式,适合学习和测试。 1. **下载Hadoop 1.2.1** 访问Apache Hadoop的下载页面...

    Hadoop MultipleOutputs输出到多个文件中的实现方法

    Hadoop MultipleOutputs是Hadoop MapReduce框架中的一种输出机制,可以将输出写入到多个文件中。下面将详细介绍Hadoop MultipleOutputs输出到多个文件中的实现方法。 1. 输出到多个文件或多个文件夹 在使用Hadoop ...

    hadoop-eclipse-plugin1.2.1 and hadoop-eclipse-plugin2.8.0

    本文将详细介绍这两个版本的Hadoop Eclipse Plugin——1.2.1和2.8.0。 首先,Hadoop-Eclipse-Plugin 1.2.1是针对较早期的Hadoop版本设计的。这个版本的插件支持Hadoop 1.x系列,适配的是Hadoop MapReduce的旧版API...

    hadoop1.2.1 Centos系统搭建

    hadoop1.2.1在Centos6.5系统中搭建的详细步骤

Global site tag (gtag.js) - Google Analytics