- 浏览: 564212 次
- 性别:
- 来自: 济南
文章分类
- 全部博客 (270)
- Ask chenwq (10)
- JSF (2)
- ExtJS (5)
- Life (19)
- jQuery (5)
- ASP (7)
- JavaScript (5)
- SQL Server (1)
- MySQL (4)
- En (1)
- development tools (14)
- Data mining related (35)
- Hadoop (33)
- Oracle (13)
- To Do (2)
- SSO (2)
- work/study diary (10)
- SOA (6)
- Ubuntu (7)
- J2SE (18)
- NetWorks (1)
- Struts2 (2)
- algorithm (9)
- funny (1)
- BMP (1)
- Paper Reading (2)
- MapReduce (23)
- Weka (3)
- web design (1)
- Data visualisation&R (1)
- Mahout (7)
- Social Recommendation (1)
- statistical methods (1)
- Git&GitHub (1)
- Python (1)
- Linux (1)
最新评论
-
brandNewUser:
楼主你好,问个问题,为什么我写的如下的:JobConf pha ...
Hadoop ChainMap -
Molisa:
Molisa 写道mapred.min.split.size指 ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
Molisa:
mapred.min.split.size指的是block数, ...
Hadoop MapReduce Job性能调优——修改Map和Reduce个数 -
heyongcs:
请问导入之后,那些错误怎么解决?
Eclipse导入Mahout -
a420144030:
看了你的文章深受启发,想请教你几个问题我的数据都放到hbase ...
Mahout clustering Canopy+K-means 源码分析
Hadoop0.20.2中无法使用MultipleOutputFormat,多文件输出这个方法。尽管0.19.2中的方法老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat还是可以继续在0.20.2中使用,但是org.apache.hadoop.mapred下的方法都是标记为“已过时”,在hadoop下个版本中可能就不能使用了。hadoop 0.20.2中是推荐使用Configuration替换JobConf,而这个老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat中还是使用的JobConf,就是说还没有新的可替换API。
此外hadoop 0.20.2还只是一个中间版本,并不是所有API都升级到最新了,没有提供的API只能自己写。
重写MultipleOutputFormat需要2个类:
LineRecordWriter
MultipleOutputFormat
PartitionByFilenameOutputFormat是实验中需要自定义的每个文件各自输出结果
LineRecordWriter:
package cn.xmu.dm; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "/t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write("\r\n".getBytes()); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
MultipleOutputFormat:
package cn.xmu.dm; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter<K, V> { private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }
PartitionByFilenameOutputFormat:
package cn.xmu.dm; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; public class PartitionByFilenameOutputFormat extends MultipleOutputFormat<Text, Text>{ @Override protected String generateFileNameForKeyValue(Text key, Text value, Configuration conf) { return value.toString().substring(0, value.toString().indexOf("\t")); } }
发表评论
-
Parallel K-Means Clustering Based on MapReduce
2012-08-04 20:28 1407K-means is a pleasingly paral ... -
Pagerank在Hadoop上的实现原理
2012-07-19 16:04 1474转自:pagerank 在 hadoop 上的实现原理 ... -
Including external jars in a Hadoop job
2012-06-25 20:24 1222办法1: 把所有的第三方jar和自己的class打成一个大 ... -
[转]BSP模型与实例分析(一)
2012-06-15 22:26 0一、BSP模型概念 BSP(Bulk Synchr ... -
Hadoop中两表JOIN的处理方法
2012-05-29 10:35 9651. 概述 在传统数据库(如:MYSQL)中,JOIN ... -
Hadoop DistributedCache
2012-05-27 23:45 1129Hadoop的DistributedCache,可以把 ... -
MapReduce,组合式,迭代式,链式
2012-05-27 23:27 23931.迭代式mapreduce 一些复杂的任务难以用一 ... -
Hadoop ChainMap
2012-05-27 23:09 1990单一MapReduce对一些非常简单的问题提供了很好的支持。 ... -
广度优先BFS的MapReduce实现
2012-05-25 21:47 4313社交网络中的图模型经常需要构造一棵树型结构:从一个特定的节点出 ... -
HADOOP程序日志
2012-05-23 19:53 1020*.log日志文件和*.out日志文件 进入Hadoo ... -
TFIDF based on MapReduce
2012-05-23 11:58 952Job1: Map: input: ( ... -
个人Hadoop 错误列表
2012-05-23 11:31 1492错误1:Too many fetch-failure ... -
Hadoop Map&Reduce个数优化设置以及JVM重用
2012-05-22 11:29 2435Hadoop与JVM重用对应的参数是map ... -
有空读下
2012-05-20 23:59 0MapReduce: JT默认task scheduli ... -
Hadoop MapReduce Job性能调优——修改Map和Reduce个数
2012-05-20 23:46 26764map task的数量即mapred ... -
Hadoop用于和Map Reduce作业交互的命令
2012-05-20 16:02 1228用法:hadoop job [GENERIC_OPTION ... -
Eclipse:Run on Hadoop 没有反应
2012-05-20 11:46 1283原因: hadoop-0.20.2下自带的eclise ... -
Custom KeyValueTextInputFormat
2012-05-19 16:23 1719在看老版的API时,发现旧的KeyValueTextInpu ... -
Hadoop SequenceFile Writer And Reader
2012-05-19 15:22 2068package cn.edu.xmu.dm.mpdemo ... -
Hadoop Archive解决海量小文件存储
2012-05-18 21:32 2700单台服务器作为Namenode,当文件数量规 ...
相关推荐
Hadoop2.2+Zookeeper3.4.5+HBase0.96集群环境搭建 Hadoop2.2+Zookeeper3.4.5+HBase0.96集群环境搭建是大数据处理和存储的重要组件,本文档将指导用户从零开始搭建一个完整的Hadoop2.2+Zookeeper3.4.5+HBase0.96集群...
在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...
这个压缩包“hadoop2.8.1+hadoop+winutils编译包”显然包含了Hadoop 2.8.1版本的相关组件,特别是针对Windows环境的WinUtils工具。下面我们将深入探讨Hadoop的基本概念、2.8.1版本的特点以及WinUtils在Hadoop中的...
本文将详细介绍如何在三台虚拟机上安装配置Hadoop-0.20.205.0和HBase-0.90.5,包括单机模式和集群模式的安装配置流程。 #### 二、环境准备 首先,我们需要准备三台虚拟机,并安装CentOS-5.7操作系统。这三台虚拟机...
hadoop初学者的福音,包含已经安装好hadoop0.20的CentOS7,在vmware下可以直接导入,运行成功;已经配置好的hadoop0.20;windows下用eclipse开发用的插件及对应的eclipse版本;hadoop的入门程序WordCount.java;还有...
毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip毕业设计-基于Hadoop+Spark的大数据金融信贷风险控系统源码.zip毕业设计-基于Hadoop+...
Hadoop 0.20 API 0.20版本新增加了mapreduce包,性能得到很大提高。
在Java大数据开发领域,Hadoop是一个至关重要的分布式计算框架,它允许存储和处理海量数据。Hadoop 2.7+是Hadoop的一个较新版本,提供了许多改进和优化,以提高性能和稳定性。"winutils.exe.zip_hadoop2.7+"这个...
在Windows 10环境下搭建Hadoop生态系统,包括JDK、MySQL、Hadoop、Scala、Hive和Spark等组件,是一项繁琐但重要的任务,这将为你提供一个基础的大数据处理平台。下面将详细介绍每个组件的安装与配置过程。 **1. JDK...
Hadoop+Hbase+Spark+Hive搭建指南 Hadoop是Apache开源的大数据处理框架,它提供了可靠的高效的数据存储和处理能力。Hbase是基于Hadoop的分布式NoSQL数据库,提供了高效的数据存储和检索能力。Spark是基于内存的数据...
hadoop 0.20.203.0 api.chm ,自己手工制作的文档
Hadoop2.6+HA+Zookeeper3.4.6+Hbase1.0.0 集群安装详细步骤
在构建大数据处理环境时,Hadoop、HBase、Spark和Hive是四个核心组件,它们协同工作以实现高效的数据存储、处理和分析。本教程将详细介绍如何在Ubuntu系统上搭建这些组件的集群。 1. **Hadoop**:Hadoop是Apache...
### hadoop2.2+hbase0.96+hive0.12安装整合详细高可靠文档及经验总结 #### 一、Hadoop2.2的安装 **问题导读:** 1. Hadoop的安装需要安装哪些软件? 2. Hadoop与HBase整合需要注意哪些问题? 3. Hive与HBase的...
适合新手,详细 01-Java环境安装 02- Eclipse下载与安装 03-VMware虚拟机的安装 04-在VMware中安装CentOS 05- Hadoop集群+ Hive+ MySQL搭建