`
dasheng
  • 浏览: 148598 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RandomWriter代码注释

 
阅读更多
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Date;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 程序是hadoop的 map/reducer例子程序,主要功能是生成随机数的二进制文件
代码中自定义了inputformat,作为虚拟的mapper文件输入。代码中还用counter统计了一些状态。
 * This program uses map/reduce to just run a distributed job where there is
 * no interaction between the tasks and each task write a large unsorted
 * random binary sequence file of BytesWritable.
 * In order for this program to generate data for terasort with 10-byte keys
 * and 90-byte values, have the following config:
 * <xmp>
 * <?xml version="1.0"?>
 * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 * <configuration>
 *   <property>
 *     <name>test.randomwrite.min_key</name>
 *     <value>10</value>
 *   </property>
 *   <property>
 *     <name>test.randomwrite.max_key</name>
 *     <value>10</value>
 *   </property>
 *   <property>
 *     <name>test.randomwrite.min_value</name>
 *     <value>90</value>
 *   </property>
 *   <property>
 *     <name>test.randomwrite.max_value</name>
 *     <value>90</value>
 *   </property>
 *   <property>
 *     <name>test.randomwrite.total_bytes</name>
 *     <value>1099511627776</value>
 *   </property>
 * </configuration></xmp>
 * 
 * Equivalently, {@link RandomWriter} also supports all the above options
 * and ones supported by {@link GenericOptionsParser} via the command-line.
 */
public class RandomWriter extends Configured implements Tool {
  
  /**
   * User counters
   */
  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
  
  /**自定义的文件输入格式作为虚拟的mapper文件输入,需要实现接口InputFormat两个方法。一个是getSplits,另一个是getRecordReader
   * A custom input format that creates virtual inputs of a single string
   * for each map.
   */
  static class RandomInputFormat implements InputFormat<Text, Text> {

    /** 返回inputsplit数组,filesplit是inputsplit的一个实现。实例化有四个参数   第一个是文件名,第二个是filesplit开始字节位置,第三个是filesplit字节长度,第4个是filesplit位置信息,host数组的列表
     * Generate the requested number of file splits, with the filename
     * set to the filename of the output file.
     */
    public InputSplit[] getSplits(JobConf job, 
                                  int numSplits) throws IOException {
      InputSplit[] result = new InputSplit[numSplits];
      Path outDir = FileOutputFormat.getOutputPath(job);
      for(int i=0; i < result.length; ++i) {
        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
                                  (String[])null);
      }
      return result;
    }

    /**嵌套静态类,自定义的recordreader。用于读取分片split
     * Return a single record (filename, "") where the filename is taken from
     * the file split.
     */
    static class RandomRecordReader implements RecordReader<Text, Text> {
      Path name;
      public RandomRecordReader(Path p) {
        name = p;
      }
      public boolean next(Text key, Text value) {
        if (name != null) {
          key.set(name.getName());
          name = null;
          return true;
        }
        return false;
      }
      public Text createKey() {
        return new Text();
      }
      public Text createValue() {
        return new Text();
      }
      public long getPos() {
        return 0;
      }
      public void close() {}
      public float getProgress() {
        return 0.0f;
      }
    }

    public RecordReader<Text, Text> getRecordReader(InputSplit split,
                                        JobConf job, 
                                        Reporter reporter) throws IOException {
      return new RandomRecordReader(((FileSplit) split).getPath());
    }
  }
/* mapper类*/
  static class Map extends MapReduceBase
    implements Mapper<WritableComparable, Writable,
                      BytesWritable, BytesWritable> {
    
    private long numBytesToWrite; //生成的字节长度总数
    private int minKeySize;//最小key大小
    private int keySizeRange;//key的大小范围
    private int minValueSize;//最小value大小
    private int valueSizeRange;//value的大小范围
    private Random random = new Random(); //随机数
    private BytesWritable randomKey = new BytesWritable();
    private BytesWritable randomValue = new BytesWritable();
    
   /* 为每个字节生成一个随机数*/
    private void randomizeBytes(byte[] data, int offset, int length) {
      for(int i=offset + length - 1; i >= offset; --i) {
        data[i] = (byte) random.nextInt(256);
      }
    }
    
    /**map方法
     * Given an output filename, write a bunch of random records to it.
     */
    public void map(WritableComparable key, 
                    Writable value,
                    OutputCollector<BytesWritable, BytesWritable> output, 
                    Reporter reporter) throws IOException {
      int itemCount = 0;
      while (numBytesToWrite > 0) {
        int keyLength = minKeySize + 
          (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
        randomKey.setSize(keyLength);
        randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
        int valueLength = minValueSize +
          (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
        randomValue.setSize(valueLength);
        randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
        output.collect(randomKey, randomValue);//输出随机的key和随机的value
        numBytesToWrite -= keyLength + valueLength;
        reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);//状态统计
        reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);//状态统计
        if (++itemCount % 200 == 0) {
          reporter.setStatus("wrote record " + itemCount + ". " + 
                             numBytesToWrite + " bytes left.");
        }
      }
      reporter.setStatus("done with " + itemCount + " records.");
    }
    
    /**初始化参数
     * Save the values out of the configuaration that we need to write
     * the data.
     */
    @Override
    public void configure(JobConf job) {
      numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
                                    1*1024*1024*1024);
      minKeySize = job.getInt("test.randomwrite.min_key", 10);
      keySizeRange = 
        job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
      minValueSize = job.getInt("test.randomwrite.min_value", 0);
      valueSizeRange = 
        job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
    }
    
  }
  
  /**driver方法
   * This is the main routine for launching a distributed random write job.
   * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
   * The reduce doesn't do anything.
   * 
   * @throws IOException 
   */
  public int run(String[] args) throws Exception {    
    if (args.length == 0) {
      System.out.println("Usage: writer <out-dir>");
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }
    
    Path outDir = new Path(args[0]);
    JobConf job = new JobConf(getConf());
    
    job.setJarByClass(RandomWriter.class);
    job.setJobName("random-writer");
    FileOutputFormat.setOutputPath(job, outDir);
    
    job.setOutputKeyClass(BytesWritable.class);
    job.setOutputValueClass(BytesWritable.class);
    
    job.setInputFormat(RandomInputFormat.class);//设置输入文件格式类
    job.setMapperClass(Map.class);        
    job.setReducerClass(IdentityReducer.class);
    job.setOutputFormat(SequenceFileOutputFormat.class);//设置输出文件格式
    
    JobClient client = new JobClient(job);
    ClusterStatus cluster = client.getClusterStatus();
    int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
    long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
                                             1*1024*1024*1024);
    if (numBytesToWritePerMap == 0) {
      System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
      return -2;
    }
    long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
         numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
    int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
    if (numMaps == 0 && totalBytesToWrite > 0) {
      numMaps = 1;
      job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
    }
    
    job.setNumMapTasks(numMaps);
    System.out.println("Running " + numMaps + " maps.");
    
    // reducer NONE
    job.setNumReduceTasks(0); //设置reducer的数目为0
    
    Date startTime = new Date();
    System.out.println("Job started: " + startTime);
    JobClient.runJob(job);
    Date endTime = new Date();
    System.out.println("Job ended: " + endTime);
    System.out.println("The job took " + 
                       (endTime.getTime() - startTime.getTime()) /1000 + 
                       " seconds.");
    
    return 0;
  }
  
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
    System.exit(res);
  }

}

 

分享到:
评论

相关推荐

    Stanford WordLadder and Randomwriter

    在给定的压缩包中,"WordLadder"和"RandomWriter"可能分别是这两个工具的源代码文件或者可执行文件。如果你是新手,可以通过阅读源代码学习它们的实现方式,理解如何在Linux环境下调用Stanford的库,以及如何处理...

    PPT模板 -龙湖新员工转正答辩模板.pptx

    PPT模板 -龙湖新员工转正答辩模板.pptx

    PPT模板 -生产计划管理.pptx

    PPT模板 -生产计划管理.pptx

    生产单元数字化改造23年国赛

    生产单元数字化改造23年国赛

    ECharts柱状图-极坐标系下的堆叠柱状图2.rar

    图表效果及代码实现讲解链接:https://blog.csdn.net/zhangjiujiu/article/details/143997013

    机器人算法的 Python 示例代码 .zip

    Pythonbot高斯网格图射线投射网格图激光雷达至网格地图k-均值对象聚类矩形接头大满贯迭代最近点 (ICP) 匹配FastSLAM 1.0路径规划动态窗口方法基于网格的搜索Dijkstra 算法A* 算法D*算法D* Lite 算法位场算法基于网格的覆盖路径规划国家网格规划偏极采样车道采样概率路线图(PRM)规划快速探索随机树(RRT)回程时间*RRT* 和 reeds-shepp 路径LQR-RRT*五次多项式规划Reeds Shepp 规划基于LQR的路径规划Frenet 框架中的最佳轨迹路径追踪移动到姿势控制斯坦利控制后轮反馈控制线性二次调节器 (LQR) 速度和转向控制模型预测速度和转向控制采用 C-GMRES 的非线性模型预测控制手臂导航N关节臂对点控制带避障功能的手臂导航航空导航无人机三维轨迹跟踪火箭动力着陆双足动物倒立摆双

    sql综合学习基础知识及练习题考试题实测题.zip

    SQL,全称为结构化查询语言(Structured Query Language),是用于管理和操作关系型数据库的标准化语言。它广泛应用于数据插入、查询、更新和删除等操作,并且拥有超过40年的历史,证明了其在数据处理领域的核心地位。以下是对SQL综合学习基础知识及练习题考试题实测题的介绍

    java面向对象 - 类与对象.doc

    java面向对象 - 类与对象 在Java编程语言中,面向对象编程(OOP)是一个核心概念。它强调以对象作为程序的基本单位,并将相关的数据和功能封装在对象中。类和对象是Java OOP的两个关键组成部分。 ### 类(Class) 类是一个模板或蓝图,它定义了对象的属性和行为。我们可以将类视为对象的类型或种类。通过类,我们可以创建(实例化)具有特定属性和行为的对象。 类的组成部分通常包括: 1. **成员变量**(属性):用于存储对象的状态或数据。 2. **方法**(行为):定义了对象可以执行的操作或功能。 3. **构造方法**:一种特殊类型的方法,用于在创建对象时初始化其状态。 4. **块**(如静态块、实例初始化块):用于执行类级别的初始化代码。 5. **嵌套类**:一个类可以包含其他类,这被称为嵌套或内部类。 ### 对象(Object) 对象是类的实例。它是根据类模板创建的具体实体,具有自己的状态和行为。每个对象都是其类的一个唯一实例,可以访问其类中定义的属性和方法。 创建对象的过程通常涉及以下几个步骤: 1. **声明**:指定对象的类型(即其所属的类

    原生JS实现鼠标感应图片左右滚动代码.zip

    原生JS实现鼠标感应图片左右滚动代码.zip

    随机密码生成器,支持字符、数字、字母大小写组合

    随机密码生成器,支持字符、数字、字母大小写组合

    自动化部署管道创建的代码库(含 Concourse 和 Jenkins 相关).zip

    1、资源项目源码均已通过严格测试验证,保证能够正常运行; 2、项目问题、技术讨论,可以给博主私信或留言,博主看到后会第一时间与您进行沟通; 3、本项目比较适合计算机领域相关的毕业设计课题、课程作业等使用,尤其对于人工智能、计算机科学与技术等相关专业,更为适合; 4、下载使用后,可先查看README.md文件(如有),本项目仅用作交流学习参考,请切勿用于商业用途。

    高等工程数学试题详解:矩阵分析与最优化方法

    内容概要:本文档为一份高级数学复习试题,内容涵盖线性代数、数值分析及最优化理论等领域,主要包括矩阵范数的计算、遗传算法中的变异操作、内点法解非线性优化问题、证明矩阵有互异特征值、求解矩阵的标准形以及应用单纯形法和FR共轭梯度法解决具体的数学问题等方面。 适合人群:正在备考研究生入学考试或者准备参加各类数学竞赛的学生、对高等数学感兴趣的学习者及从事相关领域科研工作的专业人士。 使用场景及目标:用于巩固和检验个人关于矩阵论、优化方法及概率统计的知识掌握情况,帮助应试者系统地复习相关考点,提高解题技巧。 阅读建议:建议结合具体题目深入理解每一个概念及其应用方式,遇到复杂的计算或证明步骤不妨动手尝试推导一次,这样有助于加深记忆并培养灵活运用知识的能力。同时,在理解算法原理的基础上,还可以参考一些实际案例来进行练习。

    使用了脉冲码调制(PCM).计算了所需的比特率和信号量化误差Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 替换数据可以直接使用,注释清楚,适合新手

    Google 表格 Python API.zip

    Google Spreadsheet Python API v4Google Sheets 配合使用的简单界面。特征通过标题、关键字或URL打开电子表格。读取、写入和格式化单元格区域。共享和访问控制。批量更新。安装pip install gspread要求Python 3.8+。基本用法在 Google API 控制台中创建凭据开始使用 gspreadimport gspreadgc = gspread.service_account()# Open a sheet from a spreadsheet in one gowks = gc.open("Where is the money Lebowski?").sheet1# Update a range of cells using the top left corner addresswks.update([[1, 2], [3, 4]], "A1")# Or update a single cellwks.update_acell("B42", "it's

    AICon 2024全球人工智能开发与应用大会(脱敏)PPT合集(30份).zip

    AICon 2024全球人工智能开发与应用大会(脱敏)PPT合集,共30份。 AI辅助编程测评与企业实践 SmartEV和AI 蔚来的思考与实践 下一代 RAG 引擎的技术挑战与实现 书生万象大模型的技术演进与应用探索 人工智能行业数据集构建及模型训练方法实践周华 全方位评测神经网络模型的基础能力 千亿参数 LLM 的训练效率优化 向量化与文档解析技术加速大模型RAG应用落地 基于大模型的缺陷静态检查 多环境下的 LLM Agent 应用与增强 大模型在华为推荐场景中的探索和应用 大模型在推荐系统中的落地实践 大模型的异构计算和加速 大模型辅助需求代码开发 大语言模型在法律领域的应用探索 大语言模型在计算机视觉领域的应用 大语言模型的幻觉检测 小米大模型端侧部署落地探索 快手可图大模型的技术演进与应用探索 提升大模型知识密度,做高效的终端智能 电商大模型及搜索应用实践 百度大模型 原生安全构建之路 硅基流动高性能低成本的大模型推理云实践 语言模型驱动的软件工具思考:可解释与可溯源 长文本大模型推理实践:以 KVCache 为中心的分离式推理架构 阿里云 AI 搜索 RAG 大模型优

    子弹打穿金属后留下弹痕flash动画.zip

    子弹打穿金属后留下弹痕flash动画.zip

    雷达目标一维距离像仿真实验,以及多目标成像 matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 替换数据可以直接使用,注释清楚,适合新手

    原生js竖直动画手风琴下拉菜单代码.zip

    原生js竖直动画手风琴下拉菜单代码.zip

    受循环荷载作用的土壤或路面层分析Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。 替换数据可以直接使用,注释清楚,适合新手

    Centos6.x通过RPM包升级OpenSSH9.7最新版 升级有风险,前务必做好快照,以免升级后出现异常影响业务

    Centos6.x通过RPM包升级OpenSSH9.7最新版 升级有风险,前务必做好快照,以免升级后出现异常影响业务

Global site tag (gtag.js) - Google Analytics