- 浏览: 26696 次
- 性别:
- 来自: 深圳
文章分类
最新评论
hadoop涉及输出文本的默认输出编码统一用没有BOM的UTF-8的形式,但是对于中文的输出window系统默认的是GBK,有些格式文件例如CSV格式的文件用excel打开输出编码为没有BOM的UTF-8文件时,输出的结果为乱码,只能由UE或者记事本打开才能正常显示。因此将hadoop默认输出编码更改为GBK成为非常常见的需求。
默认的情况下MR主程序中,设定输出编码的设置语句为:
从上述代码的第48行可以看出hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat(注意是org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)即可,如下代码:
最后将输出编码类型设置成GbkOutputFormat.class,如:
默认的情况下MR主程序中,设定输出编码的设置语句为:
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.class的代码如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.mapreduce.lib.output; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.*; /** An {@link OutputFormat} that writes plain text files. */ @InterfaceAudience.Public @InterfaceStability.Stable public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; // 将UTF-8转换成GBK private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; // 将此行代码注释掉 out.write(to.getBytes(), 0, to.getLength()); // 将此行代码注释掉 } else { // 将此行代码注释掉 out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get(SEPERATOR, "\t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } } }
从上述代码的第48行可以看出hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat(注意是org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)即可,如下代码:
import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.*; @InterfaceAudience.Public @InterfaceStability.Stable public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> { public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> { private static final String utf8 = "GBK"; private static final byte[] newline; static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special * case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { // Text to = (Text) o; // out.write(to.getBytes(), 0, to.getLength()); // } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get(SEPERATOR, "\t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } } }
最后将输出编码类型设置成GbkOutputFormat.class,如:
job.setOutputFormatClass(GbkOutputFormat.class);
相关推荐
本篇将深入探讨Hadoop中文乱码问题的原因及解决方案。 首先,我们需要理解Hadoop系统中的编码设置。Hadoop默认使用的是UTF-8编码,但并非所有系统或文件都遵循这一标准。当输入的中文数据使用了不同的编码格式,...
"Hadoop 使用常见问题以及解决方法" Hadoop 作为一个大数据处理的开源框架,广泛应用于数据存储、处理和分析等领域。但是在使用 Hadoop 时,经常会遇到一些常见的问题,本文将对这些问题进行总结和解决。 Shuffle ...
在探讨Hadoop数据输出压缩这一主题时,我们深入解析了Hadoop如何通过不同的压缩格式、工具及算法来优化数据处理效率。以下是对标题、描述、标签以及部分内容中提及的关键知识点的详细阐述: ### Hadoop数据输出压缩...
Hadoop 故障解决方法 Hadoop 是一种大数据处理技术,它可以对大量数据进行处理和分析。但是在使用 Hadoop 过程中,我们经常会遇到一些错误和问题,本文将为您提供一些常见的 Hadoop 故障解决方法。 一、Shuffle ...
在使用MapReduce框架时,经常需要处理输出数据,这时可能会遇到需要将输出分散到多个文件中的需求,这就是Hadoop MapReduce多输出功能的用途。Hadoop MapReduce多输出的功能主要由MultipleOutputFormat类及其相关类...
hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文...
大数据与Hadoop解决方案是当前信息技术领域的重要议题,它预示着数据管理和分析方法的重大变革。EMC作为信息存储和管理的领先企业,其Hadoop解决方案是针对大数据分析需求而设计的一套系统。 大数据商机在于数据的...
根据提供的文件信息,我们可以提炼出以下关于Hadoop官方中文文档的知识点。 首先,文档标题为“Hadoop官方中文文档”,这意味着文档是Apache Hadoop项目的官方指南,且已经被翻译成中文,以便中文读者更容易理解和...
hadoop常见问题及解决方法 Hadoop是大数据处理的重要工具,但是在安装和使用Hadoop时,可能会出现一些常见的问题,这些问题可能会导致Hadoop无法正常工作,或者无法达到预期的性能。下面是Hadoop常见的问题及解决...
大数据Hadoop解决方案是一种高效、可扩展的处理海量数据的开源框架。Hadoop是Apache软件基金会下的一个项目,其设计目标是使大型数据集的处理变得简单、快速且可靠。本解决方案主要围绕Hadoop的核心组件HDFS(Hadoop...
解决办法:该问题是由于reduce预处理阶段shuffle时获取已完成的map输出失败次数超过上限造成的,上限默认为5。解决办法是修改/etc/security/limits.conf文件,增加nofile参数的值。具体来说,需要添加两行:`* soft ...
Hadoop 2.7.1 是一个开源框架,主要...通过阅读这份Hadoop 2.7.1的中文文档,无论是初学者还是经验丰富的开发者,都能深入了解Hadoop的工作原理,掌握其核心功能,并学会如何在实际环境中应用Hadoop解决大数据问题。
在《Hadoop实战》一书中,读者将了解到如何设置和管理Hadoop集群,包括安装配置Hadoop环境、优化集群性能以及解决常见问题。此外,书中还会详细介绍如何使用Hadoop进行数据导入与导出,如使用Hadoop的工具如sqoop与...
这些项目与Hadoop结合使用,可以构建出更强大的大数据解决方案。 总的来说,Hadoop2.7.1中文文档是一个宝贵的资源,涵盖了Hadoop的基本概念、架构、配置、使用和最佳实践。无论你是初学者还是经验丰富的开发者,都...
作为一个开源框架,Hadoop为海量数据的存储、处理和分析提供了高效且可扩展的解决方案。本文将深入探讨“Hadoop高级编程——构建与实现大数据解决方案”这一主题,旨在帮助读者掌握如何利用Hadoop构建实际的大数据...
这个“Hadoop 官方文档(中文版)”提供了全面的指导,涵盖了从初学者到高级用户的各个层面。下面将详细阐述文档中可能涉及的主要知识点。 1. **Hadoop 快速入门**: - Hadoop 的核心组件:包括HDFS(Hadoop 分布式...
《Hadoop权威指南中文版(第二版)》与《Hadoop in Action》及《Pro Hadoop》这三本书是深入理解和掌握Hadoop生态系统的关键资源。Hadoop作为一个分布式计算框架,其核心是解决大规模数据处理的问题,它允许在廉价...
Hadoop中文版API是针对Apache Hadoop开源框架的中文文档,它为开发者提供了全面的、易于理解的API指南,帮助中国开发者更好地理解和使用Hadoop。Hadoop是一个分布式计算框架,广泛应用于大数据处理和分析,其核心...
《Hadoop In Action》中文版是一本专门为对大数据处理感兴趣的读者设计的专业书籍,它深入浅出地介绍了Hadoop生态系统的核心概念和技术。Hadoop是Apache软件基金会的一个开源项目,旨在提供分布式存储和计算的能力,...