- 浏览: 200059 次
- 性别:
- 来自: 广州
文章分类
最新评论
-
永立s:
这篇博客帮我解决了一个问题,十分感谢.
HBase表增加snappy压缩 -
BlackWing:
日志是job运行日志,看你怎么配置了,一般就在hadoop安装 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
heymaomao 写道有两个问题,想请教下楼主 第一是日志楼 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
heymaomao:
有两个问题,想请教下楼主 第一是日志楼主到底看的是哪个日志文件 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误 -
atomduan:
本地的Unix 进程创建失败,检查下服务器内存是否够用,是不是 ...
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误
转载请标明出处:http://blackwing.iteye.com/blog/2191454
网上虽然有不少关于MultipleOutputs实现多文件输出的文章,但发现要不还是使用mapred.lib旧接口,要不就是说明不清楚。
Mapper
Reducer
main
其中注意的是:
1.reducer中调用时,要调用MultipleOutputs以下接口:
如果调用
则需要在job中,预先声明named output(如下),不然会报错:named output xxx not defined:
2.默认情况下,输出目录会生成part-r-00000或者part-m-00000的空文件,需要如下设置后,才不会生成:
就是去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
这里只是以Text的输入输出格式说明。
官方的文档:
https://hadoop.apache.org/docs/current2/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html#write(java.lang.String, K, V)
网上虽然有不少关于MultipleOutputs实现多文件输出的文章,但发现要不还是使用mapred.lib旧接口,要不就是说明不清楚。
Mapper
package com.yy.hiido.itemcf.hadoop.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class ReadFilesMapper extends Mapper<LongWritable, Text, Text, Text> { private final String SPLITTER="\\s+"; private Text outkey; private Text values; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); outkey = new Text(); values = new Text(); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); //System.out.println("line : "+line); String [] fields = line.split(this.SPLITTER); outkey.set(fields[0]); values.set(fields[1]); System.out.println("key : "+fields[0]+" | values : "+fields[1]); context.write(outkey,values); } }
Reducer
package com.yy.hiido.itemcf.hadoop.reducer; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ReadFilesReducer extends Reducer<Text, Text, Text, Text> { private MultipleOutputs<Text,Text> mos; @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text,Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { String temp=""; for(Text t : values) temp+=t.toString()+" | "; //mos.write(key.toString(), key, new Text(temp));//这样需要预定义named output mos.write(key, new Text(temp), key.toString());//这样不需要与定义named output } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close();; } }
main
package com.yy.hiido.itemcf.hadoop.job; import java.io.IOException; import java.net.URI; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.yy.hiido.itemcf.hadoop.mapper.ReadFilesMapper; import com.yy.hiido.itemcf.hadoop.reducer.ReadFilesReducer; /** * @author BlackWing 测试multioutput * */ public class TestMultiOutputJob extends Configured implements Tool { // 配置文件名 public final static String propertyFileName = "config.xml"; private static final Log LOG = LogFactory.getLog(TestMultiOutputJob.class); private static Configuration conf = HBaseConfiguration.create(); public int myjob() { // 读配置文件 conf.addResource(propertyFileName); String input = conf.get("file.to.read"); String outputDir = conf.get("test.output"); System.out.println("input : "+input); System.out.println("output : "+outputDir); // 若输出目录存在,则删除 try { FileSystem fs = FileSystem.get(URI.create(outputDir), new Configuration()); fs.delete(new Path(outputDir), true); fs.close(); } catch (Exception e) { e.printStackTrace(); } Job myjob = null; try { myjob = new Job(conf); } catch (IOException e) { e.printStackTrace(); } myjob.setJarByClass(ReadFilesMapper.class); try { FileInputFormat.setInputPaths(myjob, input); FileOutputFormat.setOutputPath(myjob, new Path(outputDir)); } catch (IOException e1) { e1.printStackTrace(); } myjob.setMapperClass(ReadFilesMapper.class); myjob.setInputFormatClass(TextInputFormat.class); // myjob.setOutputFormatClass(TextOutputFormat.class); LazyOutputFormat.setOutputFormatClass(myjob, TextOutputFormat.class); myjob.setReducerClass(ReadFilesReducer.class); myjob.setOutputKeyClass(Text.class); myjob.setOutputValueClass(Text.class); //MultipleOutputs.addNamedOutput(myjob, "moshouzhengba", TextOutputFormat.class, Text.class, Text.class); //MultipleOutputs.addNamedOutput(myjob, "maoxiandao", TextOutputFormat.class, Text.class, Text.class); //MultipleOutputs.addNamedOutput(myjob, "yingxionglianmen", TextOutputFormat.class, Text.class, Text.class); boolean succeeded = false; try { succeeded = myjob.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } if (!succeeded) return -1; // status记录job运行状态 LOG.info("Job complete !"); return 1; } @Override public int run(String[] as) throws Exception { // 读配置文件 conf.addResource(propertyFileName); myjob(); return 0; } /** * @param args */ public static void main(String[] args) { long start = System.currentTimeMillis(); try { ToolRunner.run(new TestMultiOutputJob(), args); } catch (Exception e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("run the program costs time:" + (end - start) / 60000 + "Minutes"); } }
其中注意的是:
1.reducer中调用时,要调用MultipleOutputs以下接口:
write public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) throws IOException, InterruptedException
如果调用
write public <K,V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException
则需要在job中,预先声明named output(如下),不然会报错:named output xxx not defined:
MultipleOutputs.addNamedOutput(myjob, "moshouzhengba", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(myjob, "maoxiandao", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(myjob, "yingxionglianmen", TextOutputFormat.class, Text.class, Text.class);
2.默认情况下,输出目录会生成part-r-00000或者part-m-00000的空文件,需要如下设置后,才不会生成:
// myjob.setOutputFormatClass(TextOutputFormat.class); LazyOutputFormat.setOutputFormatClass(myjob, TextOutputFormat.class);
就是去掉job设置outputFormatClass,改为通过LazyOutputFormat设置
这里只是以Text的输入输出格式说明。
官方的文档:
https://hadoop.apache.org/docs/current2/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html#write(java.lang.String, K, V)
发表评论
-
解决直接读HFile时因表数据写入而导致文件目录变化问题
2015-03-02 18:22 1536转载请标明出处:http://blackwing.iteye. ... -
解决Exception from container-launch: ExitCodeException exitCode=1的另类错误
2014-08-21 18:18 23880转载请标明出处:http://blackwing.iteye. ... -
LoadIncrementalHFiles是copy而不是move的疑惑
2013-12-19 10:57 4151转载请标明出处:http://blackwing.iteye. ... -
Hadoop生成HFile直接入库HBase心得
2013-12-18 16:15 5270转载请标明出处:http://blackwing.iteye. ... -
NullPointerException SerializationFactory.getSerializer解决
2013-12-04 17:30 1646转载请标明出处:http://blackwing.iteye. ... -
Hadoop的Text类getBytes字节数据put到HBase后有多余字符串问题
2013-11-21 15:53 2142转载请标明出处:http://blackwing.iteye. ... -
hadoop 1.0.3增加snappy压缩
2013-09-11 17:27 1835转载请标明来源:http://blackwing.iteye. ... -
把hadoop的metrics加入ganglia监控
2013-09-04 17:02 1584hadoop的metrics加入ganglia其实是很简单的, ... -
MultithreadedMapper多线程读取数据
2013-04-27 15:51 0由于当前业务需求读取HBase表时,会存在数据倾斜,大部分数据 ... -
shuffle & sort解释
2013-04-16 17:31 1252转载请标明出处:http://blackwing.iteye. ... -
hadoop的 IncompatibleClassChangeError
2013-02-06 17:26 2089开发环境中,使用的是官方版的hadoop 1.0.1版,而集群 ... -
分拆TableSplit 让多个mapper同时读取
2013-01-06 18:13 2609默认情况下,一个region是一个tableSplit,对应一 ... -
hadoop的java.opts设置有误导致job setup失败
2012-10-17 11:14 1447由于各台机器配置不同,想单独设置每个节点的mapred.chi ... -
Hadoop使用lzo压缩输出
2012-08-02 17:48 2387Hadoop处理大量的数据, ... -
Hadoop的data.dir配置导致balancer无法平衡各节点
2012-07-26 17:25 2295hdfs的data.dir配置如下: <property ... -
导出导入HBase数据库
2012-06-01 16:44 1942系统上已经安装来Hadoop,并且hbase通过hadoop存 ... -
Hadoop的tasktracker启动失败,重编译native库
2012-05-19 12:34 2233hadoop的datanode启动失败,只能启动datanod ... -
Windows下访问VM中HBase
2012-04-27 17:38 3011资源所限,只能先在本机上模拟hadoop集群。(见文章:htt ... -
VM下Hadoop集群配置
2012-04-27 17:10 1195hadoop集群的教程网上很多,看起来不难,但自己动手时,却总 ... -
Windows下Hadoop安装
2012-04-18 12:47 1162在windows下安装hadoop,需要cygwin模拟lin ...
相关推荐
Hadoop MultipleOutputs输出到多个文件中的实现方法 Hadoop MultipleOutputs是Hadoop MapReduce框架中的一种输出机制,可以将输出写入到多个文件中。下面将详细介绍Hadoop MultipleOutputs输出到多个文件中的实现...
在Hadoop MapReduce框架中,Reducer的默认行为是将每个Reducer的输出合并成一个或多个以`part-r-00000`等格式命名的文件。然而,有时我们需要更精细地控制输出文件的命名,比如根据特定的键或值生成不同的文件,或者...
在使用MapReduce框架时,经常需要处理输出数据,这时可能会遇到需要将输出分散到多个文件中的需求,这就是Hadoop MapReduce多输出功能的用途。Hadoop MapReduce多输出的功能主要由MultipleOutputFormat类及其相关类...
- 比如设置Reduce任务的数量、文件输出路径等。 2. **性能考虑**: - 虽然使用`MultipleOutputs`可以灵活地生成多个输出文件,但在实际应用中还需考虑性能问题。 - 大量的小文件可能会导致NameNode压力增大,...
- **MultipleInputs/MultipleOutputs**:Hadoop API提供的工具类,用于一个Job处理多个输入源或产生多个输出结果。 3. **参数传递**: - **JobConf**:每个Job都有自己的JobConf对象,可以通过设置conf属性将参数...
2. **MultipleOutputs**:这是一个实用工具类,它提供了一种简单的方法来实现多文件输出。通过配置`MultipleOutputs`,可以在Mapper或Reducer中指定输出文件名的生成规则。 3. **Partitioner**:虽然分区器本身并...
- Hadoop允许自定义输入输出格式,以适应不同类型的数据源和目标。 - 如`TextInputFormat`和`TextOutputFormat`分别用于处理文本输入和输出。 7. **Hadoop配置参数**: - 通过修改`Configuration`对象,可以设置...
这可以通过Apache Hadoop的MultipleOutputs类实现,它允许数据流到多个输出路径,满足了这一需求。在Hadoop环境中,数据通常以分布式文件系统(如HDFS)的形式存储,并通过MapReduce进行处理。 项目开发中,Maven是...
2. **MultipleOutputs**:允许多个输出文件,便于处理不同类型的输出。 3. **Mapper/Reducer性能调优**:合理设置内存大小、槽位数量、并行度等参数。 通过以上介绍,你应该对Java MapReduce编程有了基本的认识。...