`
backkom1982
  • 浏览: 28386 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

自行实现Hadoop的多属性WritableComparable

    博客分类:
  • java
 
阅读更多

在使用hadoop做map/reduce时,有很多场景需要自行实现有多个属性的WritableComparable。以下示例希望对广大开发有所启示。

 

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class StatWritable implements WritableComparable<StatWritable> {
    private long timestamp;
    private int systemId;
    private String group;
    private String item;

    public String getGroup() {
        return group;
    }

    public void setGroup(final String group) {
        this.group = group;
    }

    public String getItem() {
        return item;
    }

    public void setItem(final String item) {
        this.item = item;
    }

    public int getSystemId() {
        return systemId;
    }

    public void setSystemId(final int systemId) {
        this.systemId = systemId;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(final long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public int compareTo(final StatWritable o) {
        int cmp = new Long(timestamp).compareTo(o.getTimestamp());
        if (cmp != 0) {
            return cmp;
        }
        cmp = systemId - o.getSystemId();
        if (cmp != 0) {
            return cmp;
        }
        cmp = group.compareTo(o.getGroup());
        if (cmp != 0) {
            return cmp;
        }

        return item.compareTo(o.getItem());
    }

    /**
     * 此方法中写出数据的顺序必须与{@link StatWritable#readFields(java.io.DataInput)}的读取数据一致。
     * 根据写入的属性类型调用{@link java.io.DataOutput}中对应的write方法。当写入属性不定长时,必须先写出此字符串的长度后,再写出真实数据
     *
     * @param out
     * @throws IOException
     */
    @Override
    public void write(final DataOutput out) throws IOException {
        out.writeLong(timestamp);
        out.writeInt(systemId);

        final byte[] groupBytes = group.getBytes();
        WritableUtils.writeVInt(out, groupBytes.length);
        out.write(groupBytes, 0, groupBytes.length);

        final byte[] itemBytes = item.getBytes();
        WritableUtils.writeVInt(out, itemBytes.length);
        out.write(itemBytes, 0, itemBytes.length);
    }

    /**
     * 此方法中读取数据的顺序必须与{@link StatWritable#write(java.io.DataOutput)}的写入数据一致
     * 根据读取的属性类型调用{@link java.io.DataInput}中对应的read方法。当读取属性不定长时,必须先读取此字符串的长度后,再读取真实数据
     *
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(final DataInput in) throws IOException {
        timestamp = in.readLong();
        systemId = in.readInt();

        final int groupLength = WritableUtils.readVInt(in);
        byte[] groupBytes = new byte[groupLength];
        in.readFully(groupBytes, 0, groupLength);
        group = new String(groupBytes);

        int itemLength = WritableUtils.readVInt(in);
        byte[] itemBytes = new byte[itemLength];
        in.readFully(itemBytes, 0, itemLength);
        item = new String(itemBytes);
    }

    /**
     * 覆盖toString方法,以便记录到map输出文件或reduce输出文件文件
     *
     * @return
     */
    @Override
    public String toString() {
        return systemId + " " + timestamp + " " + group + " " + item;
    }

    /**
     * 此类为了hadoop快速进行数据比较而设。覆盖{@link com.unionpay.stat.hadoop.domain.StatWritable.Comparator#compare(byte[], int, int, byte[], int, int)}方法时,
     * 比较属性的顺序必须与{@link org.apache.hadoop.io.Writable#readFields(java.io.DataInput)}和{@link org.apache.hadoop.io.Writable#write(java.io.DataOutput)}中对属性进行读写操作的顺序一致
     */
    public static class Comparator extends WritableComparator {

        protected Comparator() {
            super(StatWritable.class);
        }

        @Override
        public int compare(final byte[] b1, final int s1, final int l1, final byte[] b2, final int s2, final int l2) {
            try {
                final long timestampL1 = readLong(b1, s1);
                final long timestampL2 = readLong(b2, s2);
                final int cmp1 = timestampL1 < timestampL2 ? -1 : (timestampL1 == timestampL2 ? 0 : 1);
                if (cmp1 != 0) {
                    return cmp1;
                }

                final int startIndex1_1 = s1 + 8;
                final int startIndex1_2 = s2 + 8;
                final int systemId1 = readInt(b1, startIndex1_1);
                final int systemId2 = readInt(b2, startIndex1_2);
                final int cmp2 = systemId1 < systemId2 ? -1 : (systemId1 == systemId2 ? 0 : 1);
                if (cmp2 != 0) {
                    return cmp2;
                }

                final int startIndex2_1 = startIndex1_1 + 4;
                final int startIndex2_2 = startIndex1_2 + 4;
                final int groupLength1 = WritableUtils.decodeVIntSize(b1[startIndex2_1]) + readVInt(b1, startIndex2_1);
                final int groupLength2 = WritableUtils.decodeVIntSize(b2[startIndex2_2]) + readVInt(b2, startIndex2_2);

                final int cmp3 = compareBytes(b1, startIndex2_1, groupLength1, b2, startIndex2_2, groupLength2);
                if (cmp3 != 0) {
                    return cmp3;
                }

                final int startIndex3_1 = startIndex2_1 + groupLength1;
                final int startIndex3_2 = startIndex2_2 + groupLength2;
                final int itemLength1 = WritableUtils.decodeVIntSize(b1[startIndex3_1]) + readVInt(b1, startIndex3_1);
                final int itemLength2 = WritableUtils.decodeVIntSize(b2[startIndex3_2]) + readVInt(b2, startIndex3_2);
                return compareBytes(b1, startIndex3_1, itemLength1, b2, startIndex3_2, itemLength2);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * 注册到hadoop,以便其能识别到
     */
    static {
        WritableComparator.define(StatWritable.class, new Comparator());
    }
}
 
分享到:
评论

相关推荐

    hadoop配置属性

    Hadoop配置属性是指在搭建Hadoop集群或运行Hadoop分布式处理任务时,需要进行的参数设置。Hadoop配置属性主要通过三个XML文件进行设置:core-site.xml、hdfs-site.xml和mapred-site.xml。每个文件都有对应的默认配置...

    java WriteHDFS实现,hadoop应用

    java WriteHDFS实现,hadoop应用java WriteHDFS实现,hadoop应用java WriteHDFS实现,hadoop应用java WriteHDFS实现,hadoop应用java WriteHDFS实现,hadoop应用java WriteHDFS实现,hadoop应用java WriteHDFS实现,...

    异地多机房Hadoop架构实践.pdf

    1. 三步实现Hadoop多机房架构:美团点评使用三步实现Hadoop多机房架构,包括多机房资源管理、跨机房计算调度和数据Cache处理。 2. 多机房存储资源管理:美团点评使用NameNode来管理多机房存储资源,以确保数据的一致...

    【IT十八掌徐培成】Hadoop第02天-06.hadoop本地目录修改-属性查看.zip

    首先,我们要明白Hadoop的运行环境通常涉及到多个节点,这些节点上的数据需要通过本地文件系统进行交互。本地目录在Hadoop中的角色是作为数据的临时存储或输入输出的桥梁。例如,当我们需要将数据上传到HDFS或者从...

    Hadoop数据迁移--从Hadoop向Oracle

    综上所述,Hadoop到Oracle的数据迁移是一个涉及多个步骤的复杂过程,包括配置数据库连接、开发MapReduce作业、处理数据类型转换、执行数据库操作以及数据验证。正确实施这些步骤将有助于高效、准确地完成数据迁移...

    Flink实现 Hadoop distcp

    Flink实现 Hadoop distcp

    Hadoop 2.9.0 已废除属性集

    这些属性的变更涉及到Hadoop的很多核心组件,包括HDFS、MapReduce和YARN。HDFS是Hadoop的分布式文件系统,它被设计用来存储大量数据,并在廉价的硬件上提供高吞吐量。MapReduce是Hadoop的编程模型,用于处理大规模...

    Java实现Hadoop下词配对Wordcount计数代码实现

    在这个场景中,我们将探讨如何使用Java编程语言在Hadoop环境下实现一个基础但重要的任务——Wordcount。这个任务涉及到对文本数据的预处理、分词、以及对单词的计数。 首先,我们要理解Hadoop的MapReduce编程模型。...

    Hadoop实现大矩阵乘法

    本主题聚焦于如何使用Hadoop实现大矩阵乘法,这是一个在计算机科学和数据分析中常见的运算,特别是在机器学习和数值计算中。在Hadoop上实现大矩阵乘法,可以充分利用其并行计算的优势,提高计算效率。 大矩阵乘法的...

    Hadoop MapReduce实现tfidf源码

    本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...

    使用hadoop实现WordCount实验报告.docx

    **使用Hadoop实现WordCount实验报告** 实验报告的目的是详细记录使用Hadoop在Windows环境下实现WordCount应用的过程,包括环境配置、WordCount程序的实现以及实验结果分析。本实验旨在理解Hadoop分布式计算的基本...

    基于hadoop的简易云盘实现.zip

    【标题】"基于hadoop的简易云盘实现.zip"揭示了这个项目是关于利用Hadoop框架构建一个简单的云存储服务。Hadoop是一个开源的分布式计算框架,它允许处理和存储大量数据,尤其适合大数据处理场景。这个简易云盘的实现...

    两台 Ubuntu 机器实现 Hadoop 集群

    标题中的“两台 Ubuntu 机器实现 Hadoop 集群”指的是在两台运行 Ubuntu 操作系统的计算机上搭建一个 Hadoop 分布式计算环境。Hadoop 是一个由 Apache 基金会开发的开源框架,主要用于处理和存储大量数据。它基于 ...

    用shell脚本实现hadoop多用户配置

    这个shell脚本是在业余时间写的,从一开始不懂shell,到写出这个程序还是经过了一段时间的,收取小小1分希望得到大家的鼓励 :) 。程序的解释和hadoop多用户配置的步骤也都可以在博客中找到:)

    机器学习算法 hadoop相关实现hadoop

    【作品名称】:机器学习算法 hadoop相关实现【hadoop】 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】:机器学习算法 ...

    hadoop搭建与eclipse开发环境设置

    目的很简单,为进行研究与学习,部署一个hadoop运行环境,并搭建一个hadoop开发与测试环境。 具体目标是: 1.在ubuntu系统上部署hadoop 2.在windows 上能够使用eclipse连接ubuntu系统上部署的hadoop进行开发与测试 3...

    Spark和Hadoop的集成

    Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。Storm是一个分布式的、容错的实时计算系统。两者整合,优势互补。

    基于Hadoop的电影推荐系统的设计与实现源码(毕业设计)java+Hadoop

    基于Hadoop的电影推荐系统的设计与实现源码(毕业设计)java+Hadoop,使用Hadoop2.7,jdk1.8;分为前台,后台和推荐三个子系统,可直接运行。sql私信

    Hadoop高级编程- 构建与实现大数据解决方案

    本文将深入探讨“Hadoop高级编程——构建与实现大数据解决方案”这一主题,旨在帮助读者掌握如何利用Hadoop构建实际的大数据项目。 首先,我们要理解Hadoop的基础架构。Hadoop由两个主要组件构成:Hadoop ...

Global site tag (gtag.js) - Google Analytics