`
小网客
  • 浏览: 1243713 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop之MultipleOutputs

 
阅读更多

背景:

根据业务输出有规则的业务数据,比如都在/abc/a/下他们根据业务不同,其文件名称也不同

/abc/a/good-001

/abc/a/bad-001

那么下个job可以基于文件名做相应的业务操作

hadoop版本信息:

[ ~]$ hadoop version
Hadoop 0.20.2-cdh3u4
Subversion git://ubuntu-slave01/var/lib/jenkins/workspace/CDH3u4-Full-RC/build/cdh3/hadoop20/0.20.2-cdh3u4/source -r 214dd731e3bdb687cb55988d3f47dd9e248c5690
Compiled by jenkins on Mon May  7 13:01:39 PDT 2012
From source with checksum a60c9795e41a3248b212344fb131c12c

实现方式:

1.基于MultipleOutputs

 

实现代码:

mapper:访问hbase某个表然后利用MultipleOutputs写

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import com.alibaba.fastjson.JSONObject;
public class CommentMapper extends TableMapper<NullWritable, Text> {

	private static final Log LOGGER = LogFactory.getLog(CommentMapper.class);
	private static Set<String> set = new HashSet<String>();
	private org.apache.hadoop.mapreduce.lib.output.MultipleOutputs<NullWritable, Text> mos;

	@Override
	public void setup(Context context) {
		mos = new MultipleOutputs<NullWritable, Text>(context);
	}

	@Override
	protected void cleanup(Context context) throws IOException, InterruptedException {
		mos.close();
		super.cleanup(context);
	}

	@Override
	protected final void map(ImmutableBytesWritable key, Result value, Context context)
	        throws IOException, InterruptedException {
		try {
			 
			List<KeyValue> list = value.list();
			Iterator<KeyValue> iterator = list.iterator();
			Map<String, Object> map = new HashMap<String, Object>();
			while (iterator.hasNext()) {

				KeyValue keyValue = iterator.next();
				byte[] bytes = value.getValue(keyValue.getFamily(), keyValue.getQualifier());
				String keyId = StringUtils.lowerCase(Bytes.toString(keyValue.getFamily()))
				        + StringUtils.capitalize(StringUtils.lowerCase(Bytes.toString(keyValue
				                .getQualifier())));
				if (set.contains(keyId)) {
					continue;
				}
				if ("eS".equals(keyId)) {
					map.put(keyId, Float.toString(Bytes.toFloat(bytes)));
				} else {
					map.put(keyId, Bytes.toString(bytes));
				}
			}

			JSONObject json = new JSONObject(map);

			mos.write(RandomUtils.nextBoolean() + "", NullWritable.get(),
			        new Text(json.toJSONString()));
			LOGGER.info("working dir:" + context.getWorkingDirectory().getName());
			LOGGER.info("getInputSplit:" + Arrays.toString(context.getInputSplit().getLocations()));
		} catch (Throwable e) {
			LOGGER.error("Error occurs when running CommentMapper", e);
			throw new RunTimeException("Error occurs when running CommentMapper", e);
		}
	}

}

Job执行:

private static void runJob() {

	String inputTableName = "RECMD_JD_COMMENT";
	Configuration conf = HBaseConfiguration.create();
	conf.set("hbase.master", XXX);
	conf.set("hbase.zookeeper.quorum", XXX);
	conf.set("hbase.cluster.distributed", "true");
	conf.set("mapreduce.job.counters.limit", "100000");
	conf.set("mapreduce.job.counters.max", "100000");
	String outPathStr = "/user/search/test/CommentText";
	conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
	conf.set("mapreduce.output.basename", "val");
	try {
		HadoopUtil.delete(conf, new Path(outPathStr));
		Scan scan = new Scan();
		scan.setCacheBlocks(false);
		scan.setCaching(200);
		Job job = new Job(conf, "CommentDDTask");

		job.setJarByClass(DDTask.class);
		TableMapReduceUtil.initTableMapperJob(inputTableName, scan, CommentMapper.class,
			NullWritable.class, Text.class, job);

		TextOutputFormat.setOutputPath(job, new Path(outPathStr));
		MultipleOutputs.addNamedOutput(job, "true", TextOutputFormat.class, NullWritable.class,
			Text.class);
		MultipleOutputs.addNamedOutput(job, "false", TextOutputFormat.class,
			NullWritable.class, Text.class);
		job.setNumReduceTasks(0);
		job.waitForCompletion(true);

	} catch (Throwable e) {
		throw new RuntimeException("Run DDTask error! ", e);
	} finally {
		HConnectionManager.deleteConnection(conf, true);
	}

}

 

小技巧:

可以通过mapreduce.output.basename来控制写文件生成的名称

 

0
0
分享到:
评论

相关推荐

    Hadoop MultipleOutputs输出到多个文件中的实现方法

    Hadoop MultipleOutputs输出到多个文件中的实现方法 Hadoop MultipleOutputs是Hadoop MapReduce框架中的一种输出机制,可以将输出写入到多个文件中。下面将详细介绍Hadoop MultipleOutputs输出到多个文件中的实现...

    Hadoop权威指南第四版

    此外,还会探讨高级话题,如Combiner、MultipleOutputs和新版本的MapReduce API(YARN和Mesos上的运行机制)。 除了Hadoop核心,书中还涵盖了Hadoop生态中的其他重要组件,如HBase(一个分布式的、支持列族的NoSQL...

    Hadoop控制输出文件命名.docx

    `MultipleOutputs`是Hadoop提供的一个工具类,它允许我们在Reducer中创建多个输出流,并分别指定它们的名称和路径。这使得我们可以在处理数据时,根据业务需求灵活地组织输出结果。 使用`MultipleOutputs`的基本...

    hadoop开发者第三期

    ### Hadoop中的数据库访问 #### DBInputFormat简介 DBInputFormat是Hadoop自0.19.0版本开始引入的一种特殊输入格式,它允许Hadoop应用程序通过标准的JDBC接口直接与现有数据库系统进行交互。这为那些需要处理结构...

    离线计算项目案例--版本轨迹统计1

    这可以通过Apache Hadoop的MultipleOutputs类实现,它允许数据流到多个输出路径,满足了这一需求。在Hadoop环境中,数据通常以分布式文件系统(如HDFS)的形式存储,并通过MapReduce进行处理。 项目开发中,Maven是...

    基于Java的Hadoop HDFS和MapReduce实践案例设计源码

    该套源码是个人学习Hadoop HDFS和MapReduce技术的实践案例集合,采用Java语言编写,包含45个文件,涵盖34个Java源文件、4个XML配置文件、3个偏好设置文件以及1个Git忽略文件等。内容涵盖HDFS的JAVA API操作,如文件...

    离线计算项目案例1

    项目中有一个特殊需求,即清洗后的数据需要按照设备类型(iOS、Android和其他)分别输出到不同的文件夹,这可以通过Hadoop的MultipleOutputs功能实现。 Maven作为项目管理工具,在这个项目中也扮演了重要角色。它...

    Hadoop MapReduce多输出详细介绍

    Hadoop MapReduce是大数据处理领域中的一个核心框架,它允许开发者通过编写Map和Reduce函数来处理大规模数据集。在使用MapReduce框架时,经常需要处理输出数据,这时可能会遇到需要将输出分散到多个文件中的需求,这...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    - **MultipleInputs/MultipleOutputs**:Hadoop API提供的工具类,用于一个Job处理多个输入源或产生多个输出结果。 3. **参数传递**: - **JobConf**:每个Job都有自己的JobConf对象,可以通过设置conf属性将参数...

    hadoop_program_java

    - 拓展MapReduce:使用MultipleOutputs、ChainMapper/Reducer等高级特性。 总结来说,“hadoop_program_java”主要涵盖了使用Java编程语言开发Hadoop MapReduce程序的相关知识,包括Hadoop的基本概念、MapReduce...

    让一个reducer产生多个输出文件.docx

    总之,通过使用Hadoop提供的`MultipleOutputs`工具类,可以在一个Reducer中实现生成多个输出文件的功能,这对于提高数据处理灵活性和效率具有重要意义。开发者需要根据具体的业务需求合理设计数据处理逻辑,同时也要...

    Java编写Mapreduce程序过程浅析

    2. **MultipleOutputs**:允许多个输出文件,便于处理不同类型的输出。 3. **Mapper/Reducer性能调优**:合理设置内存大小、槽位数量、并行度等参数。 通过以上介绍,你应该对Java MapReduce编程有了基本的认识。...

    MapReduce2.0源码分析与实战编程

    本章介绍MapReduce的高级特性,如MultipleOutputs、New API、以及MapReduce与HBase、Hive等其他Hadoop组件的集成。同时,可能也会涵盖MapReduce在YARN上的新特性,如动态资源调度和Container重用。 综上所述,...

Global site tag (gtag.js) - Google Analytics