- 浏览: 294157 次
文章分类
最新评论
-
feargod:
...
ActivityGroup的子activity响应back事件的顺序问题 -
hoarhoar:
谢谢你,终于解决了,我真是受够了,总是45秒钟,真是疯了。
youku 的广告必须要屏蔽 -
lilai:
...
youku 的广告必须要屏蔽 -
aijuans2:
...
youku 的广告必须要屏蔽 -
weiwo1978:
说的非常好,mark
SELECT语句执行的顺序
package org.myorg; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /**摘{@link TextOutputFormat}碌ineRecordWriter隆拢 */ public class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } }
public static class myOutput extends MultipleOutputFormat<Text, Text> { @Override protected String generateFileNameForKeyValue(Text key, Text value, TaskAttemptContext taskID) { String mykey = key.toString(); String myValue = value.toString(); String tasknum = taskID.getTaskAttemptID().getTaskID().toString(); String fileNum = tasknum.substring(tasknum.length()-3); String newname = mykey.substring(0,3); return fileNum+newname; } }
目的:
根据输出数据的某些特征,分类输出到不同的文件夹下以便管理,而不是放在同一个文件夹下。
实现:
1、重写MultipleOutputFormat的某些方法,参考org.apache.hadoop.mapred.lib.MultipleOutputFormat,需要在程序中实现的子类方法是:
protected String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job),即通过key和value及conf配置信息决定文件名
(含扩展名)。其中,需要改写的方法是最后一个方法:private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job,
String baseName),baseName 即为 在程序中重写的子类 generateFileNameForKeyValue 的返回值。
代码:
package org.myorg; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class MultipleOutputFormat<K extends WritableComparable, V extends Writable> extends FileOutputFormat<K, V> { private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } protected abstract String generateFileNameForKeyValue(K key, V value, TaskAttemptContext job);//Configuration conf); public class MultiRecordWriter extends RecordWriter<K, V> { private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { String baseName = generateFileNameForKeyValue(key, value, job);//job.getConfiguration()); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } //LongWritable keys=(LongWritable)key; //long ret=keys.get()>>1; //keys.set(ret); rw.write(key, value);//change } private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = "\t"; //change String pathname=baseName.substring(12); //change RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); //String pathname=baseName.substring(12); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11) + codec.getDefaultExtension()); //change FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath+"/"+pathname, baseName.substring(0,11)); //change FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }
2、把LineRecordWriter从TextOutputFormat抽取出来,作为一个独立的公共类使用。RecordWriter的一个实现,用于把<Key, Value>转化为一行文本。在Hadoop中,这个类作为TextOutputFormat的一个子类存在,protected访问权限,因此普通程序无法访问。
代码如下:
3、在主程序中加载generateFileNameForKeyValue方法:
在main函数中需添加 job.setOutputFormatClass(myOutput.class);
更多信息请查看 java进阶网 http://www.javady.com
发表评论
-
hadoop FSNamesystem中的recentInvalidateSets
2012-04-20 20:28 1009今天早就回来了,然后偷懒了2个小时,现在才开始分析代码, ... -
hadoop namenode后台jetty web
2012-04-20 20:28 1685现在开始分析namenode启动时开启的第2类线程, ... -
hadoop namenode format做了什么?
2012-04-18 20:58 1145一看到format就和磁盘格式化联想到一起,然后这个fo ... -
hadoop分布式配置(服务器系统为centos5,配置时使用的用户是root)
2012-04-14 21:19 1063目前我们使 ... -
Hadoop 安装问题和解决方案
2012-04-10 13:21 1256前几天在Window和Linux主机安装了Hadoop, ... -
运行Hadoop遇到的问题
2012-04-10 13:19 1608运行Hadoop遇到的问题 1, 伪分布式模式 ... -
运行Hadoop遇到的问题
2012-04-10 13:19 0运行Hadoop遇到的问题 1, 伪分布式模式 ... -
hadoop使用过程中的一些小技巧
2012-04-09 10:16 1166hadoop使用过程中的一些小技巧 ------------- ... -
运行hadoop时的一些技巧
2012-04-09 10:14 767//用来给key分区的,需要实现Partitioner接口 ... -
hive相关操作文档收集
2012-04-08 10:51 0How to load data into Hive ... -
hive sql doc
2012-04-08 10:51 0记录2个常用的hive sql语法查询地 官方 ht ... -
hive Required table missing : "`DBS`" in Catalog "" Schema "
2012-04-08 10:51 0最近需要提取一些数据,故开始使用hive,本机搭建了一个hiv ... -
HDFS数据兼容拷贝
2012-04-08 10:50 0系统中使用了hadoop 19.2 20.2 2个版本,为啥有 ... -
hdfs 简单的api 读写文件
2012-04-08 10:50 0Java代码 import ... -
hbase之htable线程安全性
2012-04-22 15:22 1186在单线程环境下使用hbase的htable是没有问题,但是突然 ... -
hbase之scan的rowkey问题
2012-04-22 15:22 1768最近使用到hbase做存储,发现使用scan的时候,返回的ro ... -
datanode启动开启了那些任务线程
2012-04-22 15:22 1086今天开始分析datanode,首先看看datanode开启了哪 ... -
namenode这个类的主要功能
2012-04-22 15:22 1507今天来总看下namenode这个类的主要功能 首先看下这个类 ... -
hadoop监控
2012-04-22 15:21 1597通过从hadoop的 hadoop-metrics文件中就可以 ... -
zookeeper集群配置注意项
2012-04-21 21:32 1144项目中需要使用hbase,故准备在本机搭建hbase,考虑到h ...
相关推荐
- **日志处理示例**: 介绍了如何使用Hadoop进行日志文件分析,这通常涉及到对日志文件进行过滤、排序和汇总统计。 #### 四、Hadoop杂志——《Hadoop开发者》 - **网址**: [百度文库中的《Hadoop开发者》系列]...
根据提供的《Hadoop 数据分析平台》课程毕业测试题的相关信息,我们可以提炼出一系列与Hadoop相关的知识点,这些知识点不仅能够帮助学生更好地理解Hadoop的工作原理和技术细节,还能够加深他们对大数据处理技术的...
- **Hadoop Shell**:作为操作Hadoop集群的工具,Hadoop Shell提供了一系列用于文件管理、数据传输等功能的强大命令,是日常维护和开发工作中不可或缺的一部分。 - **HDPData文件夹**:这是每个Hadoop集群节点上的...
sum = numbers.reduce(lambda a, b: a + b) print("Sum is: ", sum) # 释放缓存 numbers.unpersist() ``` **4. 灵活的数据处理API** - **数据处理API的灵活性:**Spark提供了丰富的数据处理API,使得...
通过以上介绍,我们可以清晰地了解到Spark作为一种现代的大数据处理框架,是如何在多个方面改进了Hadoop,并且展示了其在内存计算、数据处理API以及实时数据处理等方面的优势。这对于理解和应用Spark技术处理大规模...
同样,首先创建输入文件夹并编写输入文件,接着运行Hadoop的wordcount程序,指定输入和输出目录。运行完成后,查看输出结果,可以发现每个单词及其对应的计数值。 **2. 伪分布式运行模式** 伪分布式模式在单台机器...
你需要下载Hadoop发行版,配置环境变量,包括HADOOP_HOME和PATH,并修改Hadoop配置文件如`core-site.xml`和`hdfs-site.xml`来指定本地文件系统为HDFS的默认存储。启动Hadoop服务,包括NameNode、DataNode和YARN等...
6. **结果展示**:实验输出显示了每个单词及其对应的文档列表,例如单词"a"出现在文件3.txt的第7个位置,单词"file6.txt"出现在第6个位置,单词"file2.txt"出现在第9个位置等。 通过这个实验,我们不仅了解了Hadoop...
例如,任务可以被组织成一系列的阶段,如预处理、聚合、分区等,每个阶段可以由不同的任务(TaskA-1, TaskB-2等)组成。 2. **灵活的输入-处理器-输出运行时模型**:Tez允许动态构造物理运行时执行器,通过连接不同...
3. **Hadoop案例之单表关联输出祖孙关系.docx**:这个文件可能介绍了一个使用Hadoop处理多表关联,特别是如何找出数据之间的层次或祖孙关系的问题。在数据处理中,这样的操作常见于构建复杂的分析模型或者数据仓库。...
* Hadoop 集群执行完 MapReduce 程序后,会输出_SUCCESS 和 part-r-00000 结果文件(√) * 传统文件系统存储数据时,若文件太大,会导致上传和下载非常耗时(√) * 使用虚拟机软件,可以在同一台电脑上构建多个 ...
`Inverted-Index-Using-a-Hadoop-Cluster-master`这个压缩包文件很可能是项目源代码,包含了实现上述逻辑的所有Java类和其他资源。通过解压并编译运行这个项目,可以在实际的Hadoop集群上构建反向索引。 总之,利用...
- 输出:`("A", "c")`, `("A", "a")`, `("A", "t")`, `("A", "s")` #### 示例:筛选映射器 ```plaintext let map(k, v) = if(isPrime(v)) then emit(k, v) ``` - 输入:`("foo", 7)` - 输出:`("foo", 7)` ####...
根据给定的大数据核心技术A卷的部分内容,我们可以总结并扩展其中涉及的重要知识点,具体包括Hadoop生态系统中的关键组件及其功能、MapReduce编程模型的基本概念、以及数据存储与处理中的相关技术。 ### Hadoop生态...
当Hadoop接收到一个大文件时,它会根据默认的块大小(通常是128MB)将其分割成多个数据块,并将这些块复制到集群的不同节点上,以确保容错性和并行性。在这个阶段,文件内容并没有被解析或处理,只是物理上的切分。...