hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式
原文:http://blog.csdn.net/fansy1990/article/details/26267637
版本:
CDH5.0.0 (hdfs:2.3,mapreduce:2.3,yarn:2.3)
hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式。
比如现在有如下的需求:
现有两份数据:
phone:
- 123,good number
- 124,common number
- 125,bad number
user:
- zhangsan,123
- lisi,124
- wangwu,125
现在需要把user和phone按照phone number连接起来,得到下面的结果:
- zhangsan,123,good number
- lisi,123,common number
- wangwu,125,bad number
那么就可以使用MultipleInputs来操作,这里把user和phone上传到hdfs目录中,分别是/multiple/user/user , /multiple/phone/phone。
设计的MultipleDriver如下:
- package multiple.input;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- //import org.slf4j.Logger;
- //import org.slf4j.LoggerFactory;
- /**
- * input1(/multiple/user/user):
- * username,user_phone
- *
- * input2(/multiple/phone/phone):
- * user_phone,description
- *
- * output: username,user_phone,description
- *
- * @author fansy
- *
- */
- public class MultipleDriver extends Configured implements Tool{
- // private Logger log = LoggerFactory.getLogger(MultipleDriver.class);
- private String input1=null;
- private String input2=null;
- private String output=null;
- private String delimiter=null;
- public static void main(String[] args) throws Exception {
- Configuration conf=new Configuration();
- // conf.set("fs.defaultFS", "hdfs://node33:8020");
- // conf.set("mapreduce.framework.name", "yarn");
- // conf.set("yarn.resourcemanager.address", "node33:8032");
- ToolRunner.run(conf, new MultipleDriver(), args);
- }
- @Override
- public int run(String[] arg0) throws Exception {
- configureArgs(arg0);
- checkArgs();
- Configuration conf= getConf();
- conf.set("delimiter", delimiter);
- @SuppressWarnings("deprecation")
- Job job = new Job(conf, "merge user and phone information ");
- job.setJarByClass(MultipleDriver.class);
- job.setReducerClass(MultipleReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(FlagStringDataType.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
- job.setNumReduceTasks(1);
- MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, Multiple1Mapper.class);
- MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, Multiple2Mapper.class);
- FileOutputFormat.setOutputPath(job, new Path(output));
- int res = job.waitForCompletion(true) ? 0 : 1;
- return res;
- }
- /**
- * check the args
- */
- private void checkArgs() {
- if(input1==null||"".equals(input1)){
- System.out.println("no user input...");
- printUsage();
- System.exit(-1);
- }
- if(input2==null||"".equals(input2)){
- System.out.println("no phone input...");
- printUsage();
- System.exit(-1);
- }
- if(output==null||"".equals(output)){
- System.out.println("no output...");
- printUsage();
- System.exit(-1);
- }
- if(delimiter==null||"".equals(delimiter)){
- System.out.println("no delimiter...");
- printUsage();
- System.exit(-1);
- }
- }
- /**
- * configuration the args
- * @param args
- */
- private void configureArgs(String[] args) {
- for(int i=0;i<args.length;i++){
- if("-i1".equals(args[i])){
- input1=args[++i];
- }
- if("-i2".equals(args[i])){
- input2=args[++i];
- }
- if("-o".equals(args[i])){
- output=args[++i];
- }
- if("-delimiter".equals(args[i])){
- delimiter=args[++i];
- }
- }
- }
- public static void printUsage(){
- System.err.println("Usage:");
- System.err.println("-i1 input \t user data path.");
- System.err.println("-i2 input \t phone data path.");
- System.err.println("-o output \t output data path.");
- System.err.println("-delimiter data delimiter , default is comma .");
- }
- }
这里指定两个mapper和一个reducer,两个mapper分别对应处理user和phone的数据,分别如下:
mapper1(处理user数据):
- package multiple.input;
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * input :
- * username,phone
- *
- * output:
- * <key,value> --> <[phone],[0,username]>
- * @author fansy
- *
- */
- public class Multiple1Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
- private Logger log = LoggerFactory.getLogger(Multiple1Mapper.class);
- private String delimiter=null; // default is comma
- @Override
- public void setup(Context cxt){
- delimiter= cxt.getConfiguration().get("delimiter", ",");
- log.info("This is the begin of Multiple1Mapper");
- }
- @Override
- public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
- String info= new String(value.getBytes(),"UTF-8");
- String[] values = info.split(delimiter);
- if(values.length!=2){
- return;
- }
- log.info("key-->"+values[1]+"=========value-->"+"[0,"+values[0]+"]");
- cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
- }
- }
mapper2(处理phone数据):
- package multiple.input;
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * input :
- * phone,description
- *
- * output:
- * <key,value> --> <[phone],[1,description]>
- * @author fansy
- *
- */
- public class Multiple2Mapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
- private Logger log = LoggerFactory.getLogger(Multiple2Mapper.class);
- private String delimiter=null; // default is comma
- @Override
- public void setup(Context cxt){
- delimiter= cxt.getConfiguration().get("delimiter", ",");
- log.info("This is the begin of Multiple2Mapper");
- }
- @Override
- public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
- String[] values= value.toString().split(delimiter);
- if(values.length!=2){
- return;
- }
- log.info("key-->"+values[0]+"=========value-->"+"[1,"+values[1]+"]");
- cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
- }
- }
这里的FlagStringDataType是自定义的:
- package multiple.input;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.WritableComparable;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.common.primitives.Ints;
- public class FlagStringDataType implements WritableComparable<FlagStringDataType> {
- private Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
- private String value;
- private int flag;
- public FlagStringDataType() {
- }
- public FlagStringDataType(int flag,String value) {
- this.value = value;
- this.flag=flag;
- }
- public String get() {
- return value;
- }
- public void set(String value) {
- this.value = value;
- }
- @Override
- public boolean equals(Object other) {
- return other != null && getClass().equals(other.getClass())
- && ((FlagStringDataType) other).get() == value
- &&((FlagStringDataType) other).getFlag()==flag;
- }
- @Override
- public int hashCode() {
- return Ints.hashCode(flag)+value.hashCode();
- }
- @Override
- public int compareTo(FlagStringDataType other) {
- if (flag >= other.flag) {
- if (flag > other.flag) {
- return 1;
- }
- } else {
- return -1;
- }
- return value.compareTo(other.value);
- }
- @Override
- public void write(DataOutput out) throws IOException {
- log.info("in write()::"+"flag:"+flag+",vlaue:"+value);
- out.writeInt(flag);
- out.writeUTF(value);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
- flag=in.readInt();
- value = in.readUTF();
- log.info("in read()::"+"flag:"+flag+",vlaue:"+value);
- }
- public int getFlag() {
- return flag;
- }
- public void setFlag(int flag) {
- this.flag = flag;
- }
- public String toString(){
- return flag+":"+value;
- }
- }
这个自定义类,使用一个flag来指定是哪个数据,而value则对应是其值。这样做的好处是在reduce端可以根据flag的值来判断其输出位置,这种设计方式可以对多种输入的整合有很大帮助,在mahout中也可以看到这样的设计。
reducer(汇总输出数据):
- package multiple.input;
- import java.io.IOException;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class MultipleReducer extends Reducer<Text,FlagStringDataType,Text,NullWritable>{
- private Logger log = LoggerFactory.getLogger(MultipleReducer.class);
- private String delimiter=null; // default is comma
- @Override
- public void setup(Context cxt){
- delimiter= cxt.getConfiguration().get("delimiter", ",");
- }
- @Override
- public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
- log.info("================");
- log.info(" =======");
- log.info(" ==");
- String[] value= new String[3];
- value[2]=key.toString();
- for(FlagStringDataType v:values){
- int index= v.getFlag();
- log.info("index:"+index+"-->value:"+v.get());
- value[index]= v.get();
- }
- log.info(" ==");
- log.info(" =======");
- log.info("================");
- cxt.write(new Text(value[2]+delimiter+value[0]+delimiter+value[1]),NullWritable.get());
- }
- }
这样设计的好处是,可以针对不同的输入数据采取不同的逻辑处理,而且不同的输入数据可以是序列文件的格式。
下面介绍一种方式和上面的比,略有不足,但是可以借鉴。
首先是Driver:
- package multiple.input;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- //import org.slf4j.Logger;
- //import org.slf4j.LoggerFactory;
- /**
- * input1(/multiple/user/user):
- * username,user_phone
- *
- * input2(/multiple/phone/phone):
- * user_phone,description
- *
- * output: username,user_phone,description
- *
- * @author fansy
- *
- */
- public class MultipleDriver2 extends Configured implements Tool{
- // private Logger log = LoggerFactory.getLogger(MultipleDriver.class);
- private String input1=null;
- private String input2=null;
- private String output=null;
- private String delimiter=null;
- public static void main(String[] args) throws Exception {
- Configuration conf=new Configuration();
- // conf.set("fs.defaultFS", "hdfs://node33:8020");
- // conf.set("mapreduce.framework.name", "yarn");
- // conf.set("yarn.resourcemanager.address", "node33:8032");
- ToolRunner.run(conf, new MultipleDriver2(), args);
- }
- @Override
- public int run(String[] arg0) throws Exception {
- configureArgs(arg0);
- checkArgs();
- Configuration conf= getConf();
- conf.set("delimiter", delimiter);
- @SuppressWarnings("deprecation")
- Job job = new Job(conf, "merge user and phone information ");
- job.setJarByClass(MultipleDriver2.class);
- job.setMapperClass(MultipleMapper.class);
- job.setReducerClass(MultipleReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(FlagStringDataType.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
- job.setNumReduceTasks(1);
- FileInputFormat.addInputPath(job, new Path(input1));
- FileInputFormat.addInputPath(job, new Path(input2));
- FileOutputFormat.setOutputPath(job, new Path(output));
- int res = job.waitForCompletion(true) ? 0 : 1;
- return res;
- }
- /**
- * check the args
- */
- private void checkArgs() {
- if(input1==null||"".equals(input1)){
- System.out.println("no user input...");
- printUsage();
- System.exit(-1);
- }
- if(input2==null||"".equals(input2)){
- System.out.println("no phone input...");
- printUsage();
- System.exit(-1);
- }
- if(output==null||"".equals(output)){
- System.out.println("no output...");
- printUsage();
- System.exit(-1);
- }
- if(delimiter==null||"".equals(delimiter)){
- System.out.println("no delimiter...");
- printUsage();
- System.exit(-1);
- }
- }
- /**
- * configuration the args
- * @param args
- */
- private void configureArgs(String[] args) {
- for(int i=0;i<args.length;i++){
- if("-i1".equals(args[i])){
- input1=args[++i];
- }
- if("-i2".equals(args[i])){
- input2=args[++i];
- }
- if("-o".equals(args[i])){
- output=args[++i];
- }
- if("-delimiter".equals(args[i])){
- delimiter=args[++i];
- }
- }
- }
- public static void printUsage(){
- System.err.println("Usage:");
- System.err.println("-i1 input \t user data path.");
- System.err.println("-i2 input \t phone data path.");
- System.err.println("-o output \t output data path.");
- System.err.println("-delimiter data delimiter , default is comma .");
- }
- }
这里添加路径直接使用FileInputFormat添加输入路径,这样的话,针对不同的输入数据的不同业务逻辑可以在mapper中先判断目前正在处理的是那个数据,然后根据其路径来进行相应的业务逻辑处理:
- package multiple.input;
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- /**
- * input1 :
- * username,phone
- *
- * input2
- * phone,description
- *
- * output:
- * <key,value> --> <[phone],[0,username]>
- * <key,value> --> <[phone],[1,description]>
- * @author fansy
- *
- */
- public class MultipleMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
- private String delimiter=null; // default is comma
- private boolean flag=false;
- @Override
- public void setup(Context cxt){
- delimiter= cxt.getConfiguration().get("delimiter", ",");
- InputSplit input=cxt.getInputSplit();
- String filename=((FileSplit) input).getPath().getParent().getName();
- if("user".equals(filename)){
- flag=true;
- }
- }
- @Override
- public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
- String[] values= value.toString().split(delimiter);
- if(values.length!=2){
- return;
- }
- if(flag){
- cxt.write(new Text(values[1]), new FlagStringDataType(0,values[0]));
- }else{
- cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
- }
- }
- }
总体来说,这种处理方式其实是不如第一种的,在每个map函数中都需要进行判断,比第一种多了很多操作;同时,针对不同的序列文件,这种方式处理不了(Key、value的类型不一样的情况下)。所以针对多文件格式的输入,最好还是使用第一种方式。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
相关推荐
【使用Hadoop分布式文件系统检索云存储中的文件】 在当今的云计算时代,云存储系统已成为企业和个人存储大量数据的关键解决方案。这些系统由一系列独立的存储服务器组成,通过网络为用户提供服务,允许经过身份验证...
压缩包内附带的源码允许用户直接复制粘贴修改配置,避免手动输入时可能出现的拼写错误或格式问题,提高了配置的准确性和效率。 总之,Hadoop的配置文件是管理和运行大数据处理作业的关键。理解并正确配置这些文件...
在搭建Hadoop集群时,我们需要在该文件中输入所有奴隶节点的主机名,这样Hadoop的启动脚本会知道哪些机器应该运行DataNode和NodeManager。 3. `mapred-site.xml.template`:这是一个模板文件,用于配置MapReduce...
- 分割输入文件:首先,HDFS会根据预设的块大小(通常是128MB或256MB)将输入文件分割为多个数据块。 - 数据分布:这些数据块被复制到集群的不同节点上,确保数据的冗余和可用性。 - Map阶段:在Map任务中,每个...
【标题】:“Hadoop文件”通常指的是在Hadoop生态系统中处理和存储的大数据文件。Hadoop是Apache软件基金会开发的一个开源框架,专为分布式存储和处理大规模数据集而设计。它允许用户在廉价硬件上构建大规模可扩展的...
【标签】"hadoop上传文件共5页.pd" 这个标签可能是输入错误,因为".pd"不是一个标准的文件扩展名。不过,我们可以假设它应该是".pdf"的误写,代表这个文件与PDF格式相关,且可能涉及到Hadoop的文件操作。 【压缩...
`hadoop.exp` 包含了Hadoop库的导出符号信息,而`hadoop.lib` 是静态链接库,提供了链接到Hadoop功能的链接器输入。 `libwinutils.lib` 类似地,可能是`winutils.exe` 的静态链接库文件,提供编译时链接到`winutils...
本篇将深入探讨如何利用Hadoop的Java接口进行文件操作,实现对Hadoop服务的增、删、改、查等功能。 首先,我们要了解Hadoop的核心组件:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是Hadoop的数据...
本文将深入探讨Hadoop的核心文件,特别是针对Nutch项目中使用的Hadoop核心JAR包。 Nutch是一个开源的Web搜索引擎项目,它依赖于Hadoop进行大规模的数据抓取、索引和搜索。在Nutch中,使用了经过重新编译的Hadoop...
HDFS是Hadoop的数据存储系统,它将大文件分布在多台廉价服务器上,提供高可用性和容错性。MapReduce则是处理这些数据的计算框架,它将大型任务拆分为小任务并在集群中并行执行。 为了在Eclipse中开发和调试Hadoop...
在Hadoop生态系统中,`winutils.exe` 和 `hadoop.dll` 是两个关键的组件,主要用于...不过,由于Hadoop主要面向Linux,Windows用户可能会遇到更多问题,因此建议熟悉Linux环境,以便更好地利用Hadoop的全部潜力。
为了运行这个程序,你需要将《唐诗三百首》的文本文件上传到HDFS,并在Hadoop的配置中指定输入文件路径。在Eclipse中,通过右键点击项目,选择“Run As” > “Hadoop Job”,设置好Job配置后提交作业。程序执行完成...
### Hadoop分布式文件系统(HDFS)运行测试知识点详解 #### 实验背景与目的 本次实验旨在通过对Hadoop分布式文件系统(HDFS)进行运行测试,深入理解并掌握Hadoop及其核心组件之一——HDFS的基本原理与操作方法。随着...
例如,使用Hadoop命令行执行`hadoop fs -cat`查看文件内容时,如果文件不是UTF-8编码,可以通过`iconv`工具先转换编码再查看。 6. **源码分析**: 对于开发者来说,深入理解Hadoop源码有助于找出乱码的根源。可以...
本资源"hadop3.2.1配置文件亲测有效"提供了一组适用于Hadoop 3.2.1版本的配置文件,这些文件通常为`.txt`格式,方便用户直接复制并根据自己的环境进行调整。以下是关于Hadoop 3.0配置的一些关键知识点: 1. **...
标题中的“Hadoop fs包文件系统抽象”指的是Hadoop生态系统中的一个核心组件——FileSystem API,它是Hadoop处理分布式存储的基础。FileSystem API为不同类型的文件系统(如本地文件系统、HDFS、S3等)提供了一个...