`
hadoopforcloud
  • 浏览: 6658 次
  • 性别: Icon_minigender_1
  • 来自: 北京
最近访客 更多访客>>
社区版块
存档分类
最新评论
阅读更多
4. Hadoop I/O
4.1. Data Integrity 数据完整性
一般使用checksum检查数据的完整性,但是他仅能检查完整性,而不提供任何的修复办法,checksum的值也有出错的可能。
Hadoop采取了不同策略的checksum来克服以上的不足
4.1.1. Data Integrity In HDFS HDFS中的数据完整性
1) HDFS透明的计算其内部数据的checksum,并在读取数据的时候验证checksum。
2) HDFS为每io.bytes.per.checksum个字节创建checksum,默认是每512bytes创建一次
3) Datanode负责在存储数据和数据的checksum之前验证数据。在管道中的datanodes,最后一个负责验证数据的checksum,若错误则抛出ChecksumException。
4) 当client读取的数据的时候也验证checksum,它将获取的数据的checksum与datanode的checksum进行比较。(每个datanode都保存checksum的验证记录,他客户端成功验证一个block之后,它将告诉给datanode,datanode更新验证记录。保存这样的记录对检测硬盘损坏非常有价值)
5) 除了在client验证block之外,每个datanode都在运行了一个后台线程——DataBlockScanner——来定期对数据进行校验
6) 因为HDFS存储了block的复制,所以HDFS可以通过拷贝没有被损坏的datanode上面的block来产生新的block复制。
7) 可以在通过Open()方法读取文件之前,通过给FIleSystem的setVerifyChecksum()方法传递false来禁止数据校验。也可以在shell中使用-ignoreCrc选项配合-get或者-copyToLocal命令来禁止数据校验
4.1.2. LocalFileSystem
LocalFileSystem进行client断的checksum计算。当你写filename文件的时候,文件系统会透明的创建一个隐藏的filename.crc文件。在与filename.crc文件的同一个目录中包含了filename的每个chunk的checksum。Chunk的大小由io.bytes.per.checksum控制(默认512),并作为metadata存储在.crc文件中。正因为这样的设计,所以在以后改变了chunk size的值以后也可以准确的读取出来数据。
Checksum的开销比较小,但也可以禁用。禁用办法:
全局:设置fs.file.impl为org.apache.hadoop.fs.RawLocalFileSystem.
也可以创建一个RawLocalFileSystem的实例,这样当你需要为某些读操作禁止checksum的时候比较有用:
Configuration conf=……
FIleSystem fs=new RawLocalFileSystem();
fs.initalize(null,conf);
4.1.3. ChecksumFileSystem
LocalFileSystem使用ChecksumFileSystem来完成它的工作。ChecksumFileSystem这个类使得给没有Checksum功能的FileSystem添加Checksum变得简单。一个典型的用法如下:
FileSystem rawFs=……
FileSystem ChecksumFs=new ChecksumFIleSystem(rawFs);
底层的文件系统成为raw filesystem,可以使用ChecksumFileSystem的getFileSystem方法获得。ChecksumFileSystem还有几个好用的方法,例如getChecksumFile()可以获得指定文件的checksum文件路径。
4.2. Compression 压缩
文件压缩的好处:节省空间、加快在网络上的传输速度
下面是几种压缩格式:

其中要注意的问题是压缩比和压缩速度的权衡。压缩速度快相应的压缩比就小,而压缩比大压缩的速度就会慢下来。
可分割的(splitable)压缩非常适合MapReduce程序(因为可以seek to any point in the stream)。
4.2.1. Codecs
Codecs为压缩,解压缩的算法实现。
在Hadoop中,codecs由CompressionCode的实现来表示。下面是一些实现:


4.2.1.1. 使用CompressionCodec压缩、解压缩流
使用CompressionCodec的CreateOutputStream(OutputStream out)方法创建CompressionOutputSream。可以把你的未压缩的数据使用压缩格式写入流
使用CompressionCodec的CreateInputStream(InputStream in)获得CompressionInputStream。可以把从流中解压缩之后的数据。
CompressionOutputSream和CompressionInputStream提供重设compressor和decompressor的能力。
下面的例子演示了从标准输入读取并压缩数据,然后写入到标准输出流中:
public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils
.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
}
使用ReflectionUtils创建一个codecs的实例。给createOutputStream传递一个标准输入进而获得一个封装了system.out的CompressionOutputStream,通过IOUtils的copyBytes方法把标准输入的数据拷贝到CompressionOutputStream中。
下面是运行的方式:
%echo “Text” | hadoop StreamCompressor org.apache.hadoop.io.compress.Gzipcodec | gunzip –
Text
4.2.1.2. 使用compressionCodecFactory推断CompressionCodecs
CompressionCodecFactory提供从扩展名到CompressionCodec的映射。下面的例子是一个演示:
public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri, codec
.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
下面是运行方式:
%hadoop FileDecompressor file.gz
4.2.1.3. native libraries
使用本地库可以提高效率,但是请注意并不是所有的实现都提供了本地库,但是有的实现却只提供了本地库(native实现,native implementation)。下面是几种实现以及本地库情况:


使用本地库默认不需要修改任何设置。可以通过设置hadoop.native.lib为false禁用本地库。
4.2.1.4. CodecPool codec池
顾名思义,使用CodecPool可以降低反复创建Codec实例的开销。下面的例子演示了如何使用:
public class PooledStreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils
.newInstance(codecClass, conf);
Compressor compressor = null;
try {
compressor = CodecPool.getCompressor(codec);
CompressionOutputStream out = codec.createOutputStream(
System.out, compressor);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
} finally {
CodecPool.returnCompressor(compressor);
}
}
}
注意,使用完compressor之后要归还给pool。
4.3. Compression and InputSplit 压缩与输入分块
当考虑将怎样压缩将被MapReduce处理的数据的时候,考虑一下压缩格式是否支持split非常重要。对于gzip格式来说,因为gzip格式的stream不支持从任意点读取,因此不能用于MapReduce。这种情况下,MapReduce将不对文件进行切分(split),因为通过文件扩展名可以知道文件是gzip格式的,从而不切分.但是因为不能保证文件的每一个block都在同一个datanode上,所以,这样做效率不高,因为有可能需要通过网络从另外的datanode上面读取数据。
Bzip2支持作为split输入。
Zip死一个归档文件,里面存放多个文件。文件的位置保存在zip文件尾的中央目录中,所以理论上它支持,但事实上Hadoop现在还没有能支持zip格式的split输入。

怎么挑选压缩格式:
对于大的,没有边界的文件,像log文件,下面的策略可供参考:

1) 存储成非压缩的
2) 使用一个支持spliting的,像bzip2
3) 在程序中把文件分成chunks,对chunks进行单独压缩(使用任何压缩格式都行)。这种情况下,你需要选择chunk的大小,以期能与HDFS的block大小接近。
4) 使用SequenceFile,它支持压缩和切分(split)
4.4. 在MapReduce中使用压缩
在4.2.1.2使用CompressionCodecFactory推断CompressionCodec一节中提到了如果输入文件是压缩的,他们将会在被MapReduce读取的时候自动被解压(使用文件扩展名来确定要使用的codec)。
压缩MapReduce job输出结果的方法:
1) 在job configuration中设置mapred.output.compress属性为true
2) 设置mapred.output.compression.codec属性为想要使用的codec对应的类名
下面的例子是一个演示:
public class MaxTemperatureWithCompression {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperatureWithCompression <input path> "
+ "<output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
conf.setJobName("Max temperature with output compression");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", GzipCodec.class,
CompressionCodec.class);
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducer.class);
JobClient.runJob(conf);
}
}
运行方法:
%hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output
最终的输出结果是压缩的,这个例子的结果如下:
%gunzip –c output/part-00000.gz
1949 111
1950 22
如果输入是sequencefile,那么你可以设置mapred.output.compression.type的属性来控制压缩的类型。默认是RECORD(压缩单个的records),若换成BLOCK他将压缩成组的records,这是推荐使用的。
Compressing map output
可以为map出来的中间文件进行压缩,因为中间文件是要在网络上进行传输的,所以压缩可以节省开销。
下面是涉及的两个属性值及例子

下面两行添加到程序中
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);
4.5. Serialization
4.6. File-Based Data Structures
分享到:
评论

相关推荐

    Hadoop实战-第2版-陆嘉恒.pdf

    Hadoop I/O操作8. 下一代MapReduce: Yarn9. HDFS简介10. HDFS文件结构11. Hive详解12. HBase详解13. Mahout简介14. Pig详解15. ZooKeeper详解16. Avro详解17. Chukwa详解18. Hadoop的常用插件与开发19. Hadoop在...

    Hadoop实战-第二版-陆嘉恒 (2012版)

    Hadoop I/O操作8. 下一代MapReduce: Yarn9. HDFS简介10. HDFS文件结构11. Hive详解12. HBase详解13. Mahout简介14. Pig详解15. ZooKeeper详解16. Avro详解17. Chukwa详解18. Hadoop的常用插件与开发19. Hadoop在...

    hadoop权威指南.示例代码

    ch04 - Hadoop I/O ch05 - Developing a MapReduce Application ch06 - How MapReduce Works ch07 - MapReduce Types and Formats ch08 - MapReduce Features ch09 - Setting Up a Hadoop Cluster ch10 - ...

    Hadoop权威指南第2版中文版 PDF

    4. Hadoop I/O Hadoop I/O指的是Hadoop框架中处理输入输出的相关机制和API。这部分内容涉及如何在Hadoop平台上高效地读写数据,包括对压缩文件的读写、数据序列化、数据类型API、文件系统操作等。Hadoop I/O支持...

    Hadoop教程.pdf

    4. Hadoop处理的数据通常都是巨大的(T级),网络I/O开销不可忽视,但分析程序通常不会很大,所以它传递的是计算方法(程序),而不是数据文件。 5. Hadoop可以降低I/O消耗,因为计算程序如果要经常使用的话也是...

    编译hadoop-2.5.0-cdh5.3.6 + snappy 源码的native包

    Hadoop的native库是一组C++编译的库,它们提供了与Java层交互的接口,以提高Hadoop性能,特别是对于I/O密集型操作,如压缩和解压缩。Snappy是一种高效的压缩算法,常被用作Hadoop的数据压缩格式,因为它的速度非常快...

    hadoop3.2.1配置文件亲测有效

    - **core-site.xml**:配置Hadoop的基本行为,如命名节点通信的默认FS、I/O设置等。 3. **HDFS配置文件**: - **hdfs-site.xml**:定义HDFS的参数,如副本数量、数据节点目录、名称节点地址等。 - **dfs.hosts**...

    hadoop_the_definitive_guide_3nd_edition

    4. Hadoop I/O . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 83 Data Integrity 83 Data Integrity in HDFS 83 LocalFileSystem 84 ...

    Hadoop技术详解.Hadoop Operation

    同时,监控系统性能,包括CPU利用率、内存使用、磁盘I/O等,对于保持系统稳定运行和及时发现潜在问题至关重要。 故障排查和安全策略也是Hadoop运维的重要部分。如何诊断并解决Hadoop集群中的各种错误,以及如何实施...

    HadoopAPI使用

    org.apache.hadoop.io 包定义了通用的 I/O API,用于读写各种数据对象,包括网络、数据库和文件等。org.apache.hadoop.ipc 包提供了网络服务端和客户端的工具,用于封装网络异步 I/O 操作。org.apache.hadoop.mapred...

    HADOOP监控GANGLIA安装文档.docx

    在Hadoop集群中,有效地监控系统资源至关重要,Ganglia是一个强大的开源监控系统,能够提供实时的性能数据,包括CPU、I/O、MapReduce带宽等关键指标。本安装文档将指导你如何在基于CentOS 5.6的环境中安装Ganglia来...

    hadoop api.doc

    4. **org.apache.hadoop.io**: 包含了各种基本数据类型(如`IntWritable`, `Text`, `BytesWritable`)和I/O操作的类。`Writable`接口定义了对象如何序列化和反序列化,而`WritableComparable`则添加了比较操作,常...

    winutils.exe_hadoop-3.0.0

    `hadoop.dll`是Hadoop在Windows上运行所需的动态链接库文件,它包含了Hadoop的一些核心功能,如网络通信、I/O操作等。这个文件同样需要被正确配置,以确保Hadoop在Windows上的正常运行。 在下载并解压`winutils.exe...

    hadoop集群内lzo的安装与配置.doc

    同时,监控系统的性能,如CPU、内存和磁盘I/O,根据实际需求进行优化。 总之,安装和配置LZO是提升Hadoop集群效率的关键步骤,它能有效减少数据存储和网络传输的成本,但需要注意的是,不同的Hadoop发行版可能需要...

    Hadoop源码的入门解析

    4. **org.apache.hadoop.io**:通用I/O接口,用于网络、数据库、文件等数据对象的读写。 5. **org.apache.hadoop.ipc**:网络服务的客户端和服务器工具,处理网络异步I/O。 6. **org.apache.hadoop.mapred**:...

    【IT十八掌徐培成】Hadoop第02天-06.hadoop本地目录修改-属性查看.zip

    例如,当磁盘空间不足或需要优化I/O性能时,我们可能需要调整`hadoop.tmp.dir`的路径,将其指向一个更大或者更快的硬盘。修改配置后,我们需要重启Hadoop服务,以使新的配置生效。 属性查看在Hadoop的管理和维护中...

    hadoop.dll 和 winutils.exe

    5. **改进的性能**:通过优化I/O路径、网络通信和内存管理,Hadoop 3.0实现了更快的数据处理速度。 6. **更好的安全性**:增加了Kerberos之外的安全选项,如Token-based认证,提升了整体安全性。 7. **新的APIs**...

    spark-2.2.0-bin-hadoop2.6.tgz

    这种设计极大地提高了数据处理速度,减少了磁盘I/O的开销。 Spark 2.2.0引入了许多新特性,例如增强SQL支持,改进了DataFrame和Dataset API,使得开发人员可以更加便捷地处理结构化和半结构化数据。DataFrame是...

Global site tag (gtag.js) - Google Analytics