`
guoyunsky
  • 浏览: 854282 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
3d3a22a0-f00f-3227-8d03-d2bbe672af75
Heritrix源码分析
浏览量:206243
Group-logo
SQL的MapReduce...
浏览量:0
社区版块
存档分类
最新评论

Hadoop Core 学习笔记(一) SequenceFile文件写入和读取Writable数据

 
阅读更多

 

本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1265944

欢迎加入Hadoop超级群: 180941958   

 

     刚接触Hadoop时,对SequenceFile和Writable还产生了一点联想,以为是什么神奇的东西.后来也明白,不过就是自己IO的一些协议,用于自己的输入输出.这里介绍下如何从sequence file中读出和写入Writable数据.

     Writable类似传输的数据,相对于Java来说等同于对象,只是引用到Hadoop中需要一套协议去进行传输转换这个对象.于是有了里面的 public void write(DataOutput out) throws IOException 和public void readFields(DataInput in) throws IOException方法,一个怎么写入,一个怎么读取.如此这些对象才可以在整个Hadoop集群无障碍的通行.至于Hadoop为什么要另起炉灶自己搞一套序列化的东西,之前也看过一些介绍,但还没有心得,日后再慢慢领会.

      所以这个例子就是自己构造一个Writable对象,然后写入到sequence file以及读出.最后将读出的数据进行对比,是否正确.具体看代码吧:

 

package com.guoyun.hadoop.io.study;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileStudy {
  
  public static class UserWritable implements Writable,Comparable{
    private long userId;
    private String userName;
    private int userAge;
    
    
    public long getUserId() {
      return userId;
    }

    public void setUserId(long userId) {
      this.userId = userId;
    }

    public String getUserName() {
      return userName;
    }

    public void setUserName(String userName) {
      this.userName = userName;
    }

    public int getUserAge() {
      return userAge;
    }

    public void setUserAge(int userAge) {
      this.userAge = userAge;
    }

    public UserWritable(long userId, String userName, int userAge) {
      super();
      this.userId = userId;
      this.userName = userName;
      this.userAge = userAge;
    }

    public UserWritable() {
      super();
    }

    @Override
    public void write(DataOutput out) throws IOException {
      out.writeLong(this.userId);
      out.writeUTF(this.userName);
      out.writeInt(this.userAge);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      this.userId=in.readLong();
      this.userName=in.readUTF();
      this.userAge=in.readInt();
    }

    @Override
    public String toString() {
     return this.userId+"\t"+this.userName+"\t"+this.userAge;
    }

    /**
     * 只对比userId
     */
    @Override
    public boolean equals(Object obj) {
      if(obj instanceof UserWritable){
        UserWritable u1=(UserWritable)obj;
        return u1.getUserId()==this.getUserId();
      }
      return false;
    }
    
    /**
     * 只对比userId
     */
    @Override
    public int compareTo(Object obj) {
      int result=-1;
      if(obj instanceof UserWritable){
       UserWritable u1=(UserWritable)obj;
       if(this.userId>u1.userId){
         result=1;
       }else if(this.userId==u1.userId){
         result=1;
       }
      }
      return result; 
    }
    
    @Override
    public int hashCode() {
      return (int)this.userId&Integer.MAX_VALUE;
    }
    
  }
  
  /**
   * 写入到sequence file
   * 
   * @param filePath
   * @param conf
   * @param datas
   */
  public static void write2SequenceFile(String filePath,Configuration conf,Collection<UserWritable> datas){
    FileSystem fs=null;
    SequenceFile.Writer writer=null;
    Path path=null;
    LongWritable idKey=new LongWritable(0);
    
    try {
      fs=FileSystem.get(conf);
      path=new Path(filePath);
      writer=SequenceFile.createWriter(fs, conf, path, LongWritable.class, UserWritable.class);
      
      for(UserWritable user:datas){
        idKey.set(user.getUserId());  // userID为Key
        writer.append(idKey, user);
      }
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }finally{
      IOUtils.closeStream(writer);
    }
  }
  
  /**
   * 从sequence file文件中读取数据
   * 
   * @param sequeceFilePath
   * @param conf
   * @return
   */
  public static List<UserWritable> readSequenceFile(String sequeceFilePath,Configuration conf){
    List<UserWritable> result=null;
    FileSystem fs=null;
    SequenceFile.Reader reader=null;
    Path path=null;
    Writable key=null;
    UserWritable value=new UserWritable();
    
    try {
      fs=FileSystem.get(conf);
      result=new ArrayList<UserWritable>();
      path=new Path(sequeceFilePath);
      reader=new SequenceFile.Reader(fs, path, conf); 
      key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf); // 获得Key,也就是之前写入的userId
      while(reader.next(key, value)){
        result.add(value);
        value=new UserWritable();
      }
      
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }catch (Exception e){
      e.printStackTrace();
    }finally{
        IOUtils.closeStream(reader);
    }
    return result;
  }
  
  private  static Configuration getDefaultConf(){
    Configuration conf=new Configuration();
    conf.set("mapred.job.tracker", "local");
    conf.set("fs.default.name", "file:///");
    //conf.set("io.compression.codecs", "com.hadoop.compression.lzo.LzoCodec");
    return conf;
  }
  
  /**
   * @param args
   */
  public static void main(String[] args) {
    String filePath="data/user.sequence"; // 文件路径
    Set<UserWritable> users=new HashSet<UserWritable>();
    UserWritable user=null;
    // 生成数据
    for(int i=1;i<=10;i++){
      user=new UserWritable(i+(int)(Math.random()*100000),"name-"+(i+1),(int)(Math.random()*50)+10);
      users.add(user);
    }
    // 写入到sequence file
    write2SequenceFile(filePath,getDefaultConf(),users);
    //从sequence file中读取
    List<UserWritable> readDatas=readSequenceFile(filePath,getDefaultConf());
    
    // 对比数据是否正确并输出
    for(UserWritable u:readDatas){
      if(users.contains(u)){
        System.out.println(u.toString());
      }else{
        System.err.println("Error data:"+u.toString());
      }
    }
    
  }

}

 

更多技术文章、感悟、分享、勾搭,请用微信扫描:

3
1
分享到:
评论

相关推荐

    sequenceFile打包多个小文件

    程序首先获取了配置信息和文件系统,然后遍历了输入路径下的所有小文件,对每个小文件进行读取和写入SequenceFile。SequenceFile的写入是通过SequenceFile.Writer来实现的,並将小文件的名称和内容写入SequenceFile...

    Hadoop HA搭建笔记和配置文件

    10. **测试与监控**:通过写入和读取数据到HDFS,以及提交MapReduce作业,验证HA功能是否正常。同时,应定期检查日志和监控系统,确保NameNode和ResourceManager的健康状态。 在提供的压缩包文件中,"HDP HAģʽ...

    最新Hadoop学习笔记

    Hadoop是一个开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储海量数据。它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,两者构成了大数据处理的基础架构。本笔记将从环境搭建...

    hadoop学习笔记(三)

    在本篇"Hadoop学习笔记(三)"中,我们将探讨如何使用Hadoop的MapReduce框架来解决一个常见的问题——从大量数据中找出最大值。这个问题与SQL中的`SELECT MAX(NUMBER) FROM TABLE`查询相似,但在这里我们通过编程...

    Hadoop学习笔记

    Hadoop学习笔记,自己总结的一些Hadoop学习笔记,比较简单。

    Hadoop 学习笔记.md

    Hadoop 学习笔记.md

    datax支持读取parquet格式文件,支持写入parquet格式文件,修复读取orc读取数据丢失问题

    新增的对Parquet格式的读取和写入支持意味着DataX现在可以更好地融入大数据工作流程,将数据高效地从一个Parquet文件系统迁移到另一个,或者从其他数据源导入到Parquet,为数据分析和处理提供便利。 其次,提到的...

    3.Hadoop学习笔记.pdf

    Hadoop是一个开源框架,用于存储和处理大型数据集。由Apache软件基金会开发,Hadoop已经成为大数据处理事实上的标准。它特别适合于存储非结构化和半结构化数据,并且能够存储和运行在廉价硬件之上。Hadoop具有高可靠...

    HADOOP学习笔记

    1. HDFS(Hadoop Distributed File System):Hadoop的分布式文件系统,它设计为跨多台机器存储大量数据,并提供高容错性和高吞吐量的数据访问。HDFS遵循主从架构,由NameNode(主节点)负责元数据管理,DataNode...

    Hadoop云计算2.0笔记第一课Hadoop介绍

    Hadoop 云计算 2.0 ...Hadoop 云计算 2.0 笔记第一课 Hadoop 介绍为我们提供了 Hadoop 的生态系统特点、Hadoop 生态系统概况、HDFS 和 MapReduce 的架构和原理等方面的知识点,对于学习 Hadoop 和云计算非常有帮助。

    云计算hadoop学习笔记

    云计算,hadoop,学习笔记, dd

    hadoop学习笔记.rar

    本文将根据提供的Hadoop学习笔记,深入解析Hadoop的关键概念和实战技巧,帮助读者构建扎实的Hadoop知识体系。 一、Hadoop简介 Hadoop的核心思想源于Google的两篇论文——“GFS”(Google File System)和...

    hadoop3.x笔记.docx

    Hadoop 3.x 笔记 Hadoop 是一个基于分布式存储的大数据处理框架...在 DataNode 上,一个数据块以文件形式存储在磁盘上,包括两个文件:一个是数据本身,一个是元数据包括数据块的长度、块数据的校验和、时间戳等信息。

    Hadoop学习笔记.pdf

    首先,Hadoop的分布式文件系统(HDFS)是其核心组件之一,它具有高吞吐量的数据访问能力,非常适合大规模数据集的存储和处理。HDFS的设计是基于这样的理念:硬件故障是常态,因此它通过数据复制机制来实现高可靠性。...

    java 从hadoop hdfs读取文件 进行groupby并显示为条形图

    Java API提供了访问HDFS的接口,例如`org.apache.hadoop.fs.FileSystem`类,可以用于读取、写入和管理文件系统中的文件。 2. **Hadoop MapReduce**:MapReduce是Hadoop用于并行处理和分析大数据的编程模型。在GROUP...

    Hadoop学习笔记整理

    Hadoop的配置文件包括core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml等,所有的配置文件目录在/xxx/hadoop-3.x.x/etc/hadoop中。 七、HDFS分布式文件系统 HDFS(Hadoop Distributed File System)...

    sequencefile&mapfile代码

    在Hadoop生态系统中,SequenceFile和MapFile是两种常见的数据存储格式,它们为大数据处理提供了高效、可扩展的解决方案。这两个文件格式都是Hadoop原生支持的,用于存储大规模数据集,尤其适用于分布式环境。接下来...

Global site tag (gtag.js) - Google Analytics