本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1265944
欢迎加入Hadoop超级群: 180941958
刚接触Hadoop时,对SequenceFile和Writable还产生了一点联想,以为是什么神奇的东西.后来也明白,不过就是自己IO的一些协议,用于自己的输入输出.这里介绍下如何从sequence file中读出和写入Writable数据.
Writable类似传输的数据,相对于Java来说等同于对象,只是引用到Hadoop中需要一套协议去进行传输转换这个对象.于是有了里面的 public void write(DataOutput out) throws IOException 和public void readFields(DataInput in) throws IOException方法,一个怎么写入,一个怎么读取.如此这些对象才可以在整个Hadoop集群无障碍的通行.至于Hadoop为什么要另起炉灶自己搞一套序列化的东西,之前也看过一些介绍,但还没有心得,日后再慢慢领会.
所以这个例子就是自己构造一个Writable对象,然后写入到sequence file以及读出.最后将读出的数据进行对比,是否正确.具体看代码吧:
package com.guoyun.hadoop.io.study; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; public class SequenceFileStudy { public static class UserWritable implements Writable,Comparable{ private long userId; private String userName; private int userAge; public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public int getUserAge() { return userAge; } public void setUserAge(int userAge) { this.userAge = userAge; } public UserWritable(long userId, String userName, int userAge) { super(); this.userId = userId; this.userName = userName; this.userAge = userAge; } public UserWritable() { super(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(this.userId); out.writeUTF(this.userName); out.writeInt(this.userAge); } @Override public void readFields(DataInput in) throws IOException { this.userId=in.readLong(); this.userName=in.readUTF(); this.userAge=in.readInt(); } @Override public String toString() { return this.userId+"\t"+this.userName+"\t"+this.userAge; } /** * 只对比userId */ @Override public boolean equals(Object obj) { if(obj instanceof UserWritable){ UserWritable u1=(UserWritable)obj; return u1.getUserId()==this.getUserId(); } return false; } /** * 只对比userId */ @Override public int compareTo(Object obj) { int result=-1; if(obj instanceof UserWritable){ UserWritable u1=(UserWritable)obj; if(this.userId>u1.userId){ result=1; }else if(this.userId==u1.userId){ result=1; } } return result; } @Override public int hashCode() { return (int)this.userId&Integer.MAX_VALUE; } } /** * 写入到sequence file * * @param filePath * @param conf * @param datas */ public static void write2SequenceFile(String filePath,Configuration conf,Collection<UserWritable> datas){ FileSystem fs=null; SequenceFile.Writer writer=null; Path path=null; LongWritable idKey=new LongWritable(0); try { fs=FileSystem.get(conf); path=new Path(filePath); writer=SequenceFile.createWriter(fs, conf, path, LongWritable.class, UserWritable.class); for(UserWritable user:datas){ idKey.set(user.getUserId()); // userID为Key writer.append(idKey, user); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ IOUtils.closeStream(writer); } } /** * 从sequence file文件中读取数据 * * @param sequeceFilePath * @param conf * @return */ public static List<UserWritable> readSequenceFile(String sequeceFilePath,Configuration conf){ List<UserWritable> result=null; FileSystem fs=null; SequenceFile.Reader reader=null; Path path=null; Writable key=null; UserWritable value=new UserWritable(); try { fs=FileSystem.get(conf); result=new ArrayList<UserWritable>(); path=new Path(sequeceFilePath); reader=new SequenceFile.Reader(fs, path, conf); key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf); // 获得Key,也就是之前写入的userId while(reader.next(key, value)){ result.add(value); value=new UserWritable(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }catch (Exception e){ e.printStackTrace(); }finally{ IOUtils.closeStream(reader); } return result; } private static Configuration getDefaultConf(){ Configuration conf=new Configuration(); conf.set("mapred.job.tracker", "local"); conf.set("fs.default.name", "file:///"); //conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzoCodec"); return conf; } /** * @param args */ public static void main(String[] args) { String filePath="data/user.sequence"; // 文件路径 Set<UserWritable> users=new HashSet<UserWritable>(); UserWritable user=null; // 生成数据 for(int i=1;i<=10;i++){ user=new UserWritable(i+(int)(Math.random()*100000),"name-"+(i+1),(int)(Math.random()*50)+10); users.add(user); } // 写入到sequence file write2SequenceFile(filePath,getDefaultConf(),users); //从sequence file中读取 List<UserWritable> readDatas=readSequenceFile(filePath,getDefaultConf()); // 对比数据是否正确并输出 for(UserWritable u:readDatas){ if(users.contains(u)){ System.out.println(u.toString()); }else{ System.err.println("Error data:"+u.toString()); } } } }
更多技术文章、感悟、分享、勾搭,请用微信扫描:
相关推荐
程序首先获取了配置信息和文件系统,然后遍历了输入路径下的所有小文件,对每个小文件进行读取和写入SequenceFile。SequenceFile的写入是通过SequenceFile.Writer来实现的,並将小文件的名称和内容写入SequenceFile...
10. **测试与监控**:通过写入和读取数据到HDFS,以及提交MapReduce作业,验证HA功能是否正常。同时,应定期检查日志和监控系统,确保NameNode和ResourceManager的健康状态。 在提供的压缩包文件中,"HDP HAģʽ...
Hadoop是一个开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储海量数据。它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,两者构成了大数据处理的基础架构。本笔记将从环境搭建...
在本篇"Hadoop学习笔记(三)"中,我们将探讨如何使用Hadoop的MapReduce框架来解决一个常见的问题——从大量数据中找出最大值。这个问题与SQL中的`SELECT MAX(NUMBER) FROM TABLE`查询相似,但在这里我们通过编程...
Hadoop学习笔记,自己总结的一些Hadoop学习笔记,比较简单。
Hadoop 学习笔记.md
新增的对Parquet格式的读取和写入支持意味着DataX现在可以更好地融入大数据工作流程,将数据高效地从一个Parquet文件系统迁移到另一个,或者从其他数据源导入到Parquet,为数据分析和处理提供便利。 其次,提到的...
Hadoop是一个开源框架,用于存储和处理大型数据集。由Apache软件基金会开发,Hadoop已经成为大数据处理事实上的标准。它特别适合于存储非结构化和半结构化数据,并且能够存储和运行在廉价硬件之上。Hadoop具有高可靠...
1. HDFS(Hadoop Distributed File System):Hadoop的分布式文件系统,它设计为跨多台机器存储大量数据,并提供高容错性和高吞吐量的数据访问。HDFS遵循主从架构,由NameNode(主节点)负责元数据管理,DataNode...
Hadoop 云计算 2.0 ...Hadoop 云计算 2.0 笔记第一课 Hadoop 介绍为我们提供了 Hadoop 的生态系统特点、Hadoop 生态系统概况、HDFS 和 MapReduce 的架构和原理等方面的知识点,对于学习 Hadoop 和云计算非常有帮助。
云计算,hadoop,学习笔记, dd
本文将根据提供的Hadoop学习笔记,深入解析Hadoop的关键概念和实战技巧,帮助读者构建扎实的Hadoop知识体系。 一、Hadoop简介 Hadoop的核心思想源于Google的两篇论文——“GFS”(Google File System)和...
Hadoop 3.x 笔记 Hadoop 是一个基于分布式存储的大数据处理框架...在 DataNode 上,一个数据块以文件形式存储在磁盘上,包括两个文件:一个是数据本身,一个是元数据包括数据块的长度、块数据的校验和、时间戳等信息。
首先,Hadoop的分布式文件系统(HDFS)是其核心组件之一,它具有高吞吐量的数据访问能力,非常适合大规模数据集的存储和处理。HDFS的设计是基于这样的理念:硬件故障是常态,因此它通过数据复制机制来实现高可靠性。...
Java API提供了访问HDFS的接口,例如`org.apache.hadoop.fs.FileSystem`类,可以用于读取、写入和管理文件系统中的文件。 2. **Hadoop MapReduce**:MapReduce是Hadoop用于并行处理和分析大数据的编程模型。在GROUP...
Hadoop的配置文件包括core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml等,所有的配置文件目录在/xxx/hadoop-3.x.x/etc/hadoop中。 七、HDFS分布式文件系统 HDFS(Hadoop Distributed File System)...
在Hadoop生态系统中,SequenceFile和MapFile是两种常见的数据存储格式,它们为大数据处理提供了高效、可扩展的解决方案。这两个文件格式都是Hadoop原生支持的,用于存储大规模数据集,尤其适用于分布式环境。接下来...