`

hadoop解决中文输出乱码

阅读更多
    hadoop涉及输出文本的默认输出编码统一用没有BOM的UTF-8的形式,但是对于中文的输出window系统默认的是GBK,有些格式文件例如CSV格式的文件用excel打开输出编码为没有BOM的UTF-8文件时,输出的结果为乱码,只能由UE或者记事本打开才能正常显示。因此将hadoop默认输出编码更改为GBK成为非常常见的需求。
      默认的情况下MR主程序中,设定输出编码的设置语句为:
job.setOutputFormatClass(TextOutputFormat.class);

TextOutputFormat.class
的代码如下:
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.output;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.*;

/** An {@link OutputFormat} that writes plain text files. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  protected static class LineRecordWriter<K, V>
    extends RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";  // 将UTF-8转换成GBK 
    private static final byte[] newline;
    static {
      try {
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }

    /**
     * Write the object to the byte stream, handling Text as a special
     * case.
     * @param o the object to print
     * @throws IOException if the write throws, we pass it on
     */
    private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
        Text to = (Text) o;   // 将此行代码注释掉
        out.write(to.getBytes(), 0, to.getLength());  // 将此行代码注释掉
      } else { // 将此行代码注释掉      
        out.write(o.toString().getBytes(utf8));
      }
    }

    public synchronized void write(K key, V value)
      throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }

    public synchronized 
    void close(TaskAttemptContext context) throws IOException {
      out.close();
    }
  }

  public RecordWriter<K, V> 
         getRecordWriter(TaskAttemptContext job
                         ) throws IOException, InterruptedException {
    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator= conf.get(SEPERATOR, "\t");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
      Class<? extends CompressionCodec> codecClass = 
        getOutputCompressorClass(job, GzipCodec.class);
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
      extension = codec.getDefaultExtension();
    }
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(conf);
    if (!isCompressed) {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(new DataOutputStream
                                        (codec.createOutputStream(fileOut)),
                                        keyValueSeparator);
    }
  }
}



从上述代码的第48行可以看出hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat(注意是org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)即可,如下代码:
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.*;

@InterfaceAudience.Public
@InterfaceStability.Stable
public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> {
  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  protected static class LineRecordWriter<K, V>
    extends RecordWriter<K, V> {
    private static final String utf8 = "GBK";
    private static final byte[] newline;
    static {
      try {
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }

    /**
     * Write the object to the byte stream, handling Text as a special
     * case.
     * @param o the object to print
     * @throws IOException if the write throws, we pass it on
     */
    private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
//        Text to = (Text) o;
//        out.write(to.getBytes(), 0, to.getLength());
//      } else {
        out.write(o.toString().getBytes(utf8));
      }
    }

    public synchronized void write(K key, V value)
      throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }

    public synchronized 
    void close(TaskAttemptContext context) throws IOException {
      out.close();
    }
  }

  public RecordWriter<K, V> 
         getRecordWriter(TaskAttemptContext job
                         ) throws IOException, InterruptedException {
    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator= conf.get(SEPERATOR, "\t");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
      Class<? extends CompressionCodec> codecClass = 
        getOutputCompressorClass(job, GzipCodec.class);
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
      extension = codec.getDefaultExtension();
    }
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(conf);
    if (!isCompressed) {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(new DataOutputStream
                                        (codec.createOutputStream(fileOut)),
                                        keyValueSeparator);
    }
  }
}


最后将输出编码类型设置成GbkOutputFormat.class,如:
job.setOutputFormatClass(GbkOutputFormat.class);
分享到:
评论

相关推荐

    hadoop中文乱码问题

    本篇将深入探讨Hadoop中文乱码问题的原因及解决方案。 首先,我们需要理解Hadoop系统中的编码设置。Hadoop默认使用的是UTF-8编码,但并非所有系统或文件都遵循这一标准。当输入的中文数据使用了不同的编码格式,...

    hadoop数据输出压缩

    在探讨Hadoop数据输出压缩这一主题时,我们深入解析了Hadoop如何通过不同的压缩格式、工具及算法来优化数据处理效率。以下是对标题、描述、标签以及部分内容中提及的关键知识点的详细阐述: ### Hadoop数据输出压缩...

    hadoop出错解决方法

    Hadoop 故障解决方法 Hadoop 是一种大数据处理技术,它可以对大量数据进行处理和分析。但是在使用 Hadoop 过程中,我们经常会遇到一些错误和问题,本文将为您提供一些常见的 Hadoop 故障解决方法。 一、Shuffle ...

    hadoop2.7中文文档

    hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文文档hadoop2.7中文...

    Hadoop MapReduce多输出详细介绍

    在使用MapReduce框架时,经常需要处理输出数据,这时可能会遇到需要将输出分散到多个文件中的需求,这就是Hadoop MapReduce多输出功能的用途。Hadoop MapReduce多输出的功能主要由MultipleOutputFormat类及其相关类...

    EMC Hadoop解决方案 PPT

    大数据与Hadoop解决方案是当前信息技术领域的重要议题,它预示着数据管理和分析方法的重大变革。EMC作为信息存储和管理的领先企业,其Hadoop解决方案是针对大数据分析需求而设计的一套系统。 大数据商机在于数据的...

    Hadoop官方中文文档

    根据提供的文件信息,我们可以提炼出以下关于Hadoop官方中文文档的知识点。 首先,文档标题为“Hadoop官方中文文档”,这意味着文档是Apache Hadoop项目的官方指南,且已经被翻译成中文,以便中文读者更容易理解和...

    hadoop常见问题及解决方法

    hadoop常见问题及解决方法 Hadoop是大数据处理的重要工具,但是在安装和使用Hadoop时,可能会出现一些常见的问题,这些问题可能会导致Hadoop无法正常工作,或者无法达到预期的性能。下面是Hadoop常见的问题及解决...

    大数据Hadoop解决方案.rar

    大数据Hadoop解决方案是一种高效、可扩展的处理海量数据的开源框架。Hadoop是Apache软件基金会下的一个项目,其设计目标是使大型数据集的处理变得简单、快速且可靠。本解决方案主要围绕Hadoop的核心组件HDFS(Hadoop...

    Hadoop 2.7.1 中文文档

    Hadoop 2.7.1 是一个开源框架,主要...通过阅读这份Hadoop 2.7.1的中文文档,无论是初学者还是经验丰富的开发者,都能深入了解Hadoop的工作原理,掌握其核心功能,并学会如何在实际环境中应用Hadoop解决大数据问题。

    hadoop 实战 中文版

    在《Hadoop实战》一书中,读者将了解到如何设置和管理Hadoop集群,包括安装配置Hadoop环境、优化集群性能以及解决常见问题。此外,书中还会详细介绍如何使用Hadoop进行数据导入与导出,如使用Hadoop的工具如sqoop与...

    Hadoop2.7.1中文文档

    这些项目与Hadoop结合使用,可以构建出更强大的大数据解决方案。 总的来说,Hadoop2.7.1中文文档是一个宝贵的资源,涵盖了Hadoop的基本概念、架构、配置、使用和最佳实践。无论你是初学者还是经验丰富的开发者,都...

    Hadoop高级编程- 构建与实现大数据解决方案

    作为一个开源框架,Hadoop为海量数据的存储、处理和分析提供了高效且可扩展的解决方案。本文将深入探讨“Hadoop高级编程——构建与实现大数据解决方案”这一主题,旨在帮助读者掌握如何利用Hadoop构建实际的大数据...

    Hadoop 官方文档(中文版)

    这个“Hadoop 官方文档(中文版)”提供了全面的指导,涵盖了从初学者到高级用户的各个层面。下面将详细阐述文档中可能涉及的主要知识点。 1. **Hadoop 快速入门**: - Hadoop 的核心组件:包括HDFS(Hadoop 分布式...

    Hadoop权威指南中文版(第二版)+Hadoop in Action

    《Hadoop权威指南中文版(第二版)》与《Hadoop in Action》及《Pro Hadoop》这三本书是深入理解和掌握Hadoop生态系统的关键资源。Hadoop作为一个分布式计算框架,其核心是解决大规模数据处理的问题,它允许在廉价...

    hadoop中文版API.zip

    Hadoop中文版API是针对Apache Hadoop开源框架的中文文档,它为开发者提供了全面的、易于理解的API指南,帮助中国开发者更好地理解和使用Hadoop。Hadoop是一个分布式计算框架,广泛应用于大数据处理和分析,其核心...

    Hadoop In Action 中文版

    《Hadoop In Action》中文版是一本专门为对大数据处理感兴趣的读者设计的专业书籍,它深入浅出地介绍了Hadoop生态系统的核心概念和技术。Hadoop是Apache软件基金会的一个开源项目,旨在提供分布式存储和计算的能力,...

    hadoop中文版API

    hadoop中文版API.chm文件,查找hadoop的类中方法、方法的用法等,方便、好用

    Hadoop In Action (Hadoop实战)中文版

    ### Hadoop In Action (Hadoop实战)中文版 #### Hadoop概述 Hadoop是一个能够对大量数据进行分布式处理的软件框架。它通过提供一个高可靠性、高性能、可扩展的平台来处理海量数据集,使组织能够高效地存储、处理和...

Global site tag (gtag.js) - Google Analytics