`
zhangbaoming815
  • 浏览: 150411 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop实现自定义的数据类型

阅读更多

关于自定义数据类型,http://book.douban.com/annotation/17067489/ 一文中给出了一个比较清晰的说明和解释。

以wordCount为例子

定义自己的数据类型Http类

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Http implements WritableComparable<Http>
{
    public Http(){ }
    
    private String value;
    
    public Http(String value)
    {
        setValue(value);
    }

    public String getValue()
    {
        return value;
    }

    public void setValue(String value)
    {
        this.value = value;
    }

    public void readFields(DataInput in) throws IOException
    {
        value = in.readUTF();
    }

    public void write(DataOutput out) throws IOException
    {
        out.writeUTF(value);
    }

    public int compareTo(Http http)
    {
        return (value.compareTo(http.value));
    }

    @Override
    public int hashCode()
    {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((value == null) ? 0 : value.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj)
    {
        if (!(obj instanceof Http))
            return false;
        Http other = (Http)obj;
        return this.value.equals(other.value);
    }

    @Override
    public String toString()
    {
        return value;
    }
}

 编写wordcount程序

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountEntry
{
    public static class TokenizerMapper extends
            Mapper<LongWritable, Http, Http, IntWritable>
    {

        private final static IntWritable one = new IntWritable(1);

        private Http word = new Http();

        public void map(LongWritable key, Http value, Context context)
                throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens())
            {
                word.setValue(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
            Reducer<Http, IntWritable, Http, IntWritable>
    {
        private IntWritable result = new IntWritable();

        public void reduce(Http key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values)
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) 
            throws IOException, InterruptedException, ClassNotFoundException 
    {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2)
        {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        Path input = new Path(args[0]);
        Path output = new Path(args[1]);
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCountEntry.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Http.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

 编写mrUnit测试用例进行mapreduce程序测试

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

import com.geo.dmp.WordCountEntry.IntSumReducer;
import com.geo.dmp.WordCountEntry.TokenizerMapper;

public class WordCountEntryTest
{

    private MapDriver<LongWritable, Http, Http, IntWritable> mapDriver;
    private ReduceDriver<Http, IntWritable, Http, IntWritable> reduceDriver;
    
    @Before
    public void setUpBeforeClass() throws Exception
    {
        TokenizerMapper tm = new TokenizerMapper();
        mapDriver = MapDriver.newMapDriver(tm);
        
        IntSumReducer isr = new IntSumReducer();
        reduceDriver = ReduceDriver.newReduceDriver(isr);
    }

    @Test
    public void TokenizerMapperTest()
    {
        mapDriver.withInput(new LongWritable(), new Http("01a55\tablsd"));
        
        mapDriver.withOutput(new Http("01a55"), new IntWritable(1));
        mapDriver.withOutput(new Http("ablsd"), new IntWritable(1));
        
        mapDriver.runTest();
    }
    
    @Test
    public void IntSumReducerTest()
    {
        List<IntWritable> values = new ArrayList<IntWritable>();
        values.add(new IntWritable(1));
        values.add(new IntWritable(1));
        
        reduceDriver.withInput(new Http("01a55"), values);
        
        reduceDriver.withOutput(new Http("01a55"), new IntWritable(2));
        
        reduceDriver.runTest();
    }
}

 

分享到:
评论

相关推荐

    MapReduce模型--自定义数据类型

    自定义数据类型必须实现WritableComparable接口,这个接口是Hadoop中用于序列化和比较数据的接口。实现了WritableComparable接口的类,不仅可以将对象写入到Hadoop的数据流中,还能在MapReduce框架中比较这些对象,...

    hadoop自定义类型编程

    本教程将深入探讨如何在MapReduce作业中创建和使用自定义数据类型,以更好地适应特定业务场景。 一、自定义类型的重要性 在大数据处理中,原始数据往往包含了丰富的结构和信息,这些信息可能无法直接用Hadoop的基础...

    Hadoop 自定义 Partitioner 源代码

    在标题“Hadoop 自定义 Partitioner 源代码”中,我们可以理解为讨论的是如何创建和理解 Partitioner 的源代码,以便于开发者可以更好地控制 MapReduce job 中的数据分片过程。自定义 Partitioner 可能涉及到以下...

    基于hadoop的数据分析系统.zip

    4. 数据处理:利用Pig、Hive等SQL-like工具进行数据查询和分析,或者编写MapReduce程序实现自定义处理逻辑。 5. 安全管理:设置Hadoop的安全策略,包括权限控制、数据加密和审计日志等,保障数据安全。 6. 性能...

    mapreduce在hadoop实现词统计和列式统计

    此外,Hadoop支持自定义InputFormat和OutputFormat,以适应不同格式的数据源和结果输出需求。 总结,MapReduce通过分布式计算能力,使得在Hadoop平台上处理大规模数据变得更加高效和便捷。无论是简单的词统计还是...

    基于 Hadoop 的游戏数据分析系统.zip

    综上所述,基于Hadoop的游戏数据分析系统通过Java编程实现,能够有效地处理和分析海量游戏数据,帮助游戏公司深入了解用户行为,驱动产品的改进和运营策略的制定。对于开发者而言,深入理解和熟练掌握Hadoop与Java是...

    大数据第二次作业1

    自定义数据类型 如果需要使用自定义的数据类型,例如自定义的结构体或对象,那么需要继承 Writable 接口,并实现其方法。例如: ```java public class Person implements Writable { private String name; ...

    Hadoop大数据实训,求最高温度最低温度实验报告

    通过这个实验,学生将深入理解Hadoop MapReduce的工作原理,掌握如何处理自定义数据类型,使用Combiner优化性能,以及如何通过Eclipse提交和管理MapReduce任务。这些都是大数据处理和分布式计算中的核心技能,对于...

    Hadoop实训求最高温度和最低温度的数据集

    (1)统计全球每年的最高气温和最低气温。 (2)MapReduce输出结果包含年份、最高气温、...(4)结合Combiner和自定义数据类型完成全球每年最高气温和最低气温的统计。 (5)应用ToolRunner的使用和Eclipse提交MapReduce任务。

    2022 毕业设计,基于 Hadoop 的游戏数据分析系统.zip

    2. **游戏数据类型**:分析游戏日志,理解常见的游戏数据,如用户行为、游戏事件、付费数据等,以及它们的结构和格式。 3. **数据预处理**:使用Hadoop的工具或自定义MapReduce程序对原始数据进行清洗、转换和整合...

    陌陌聊天数据实现FineBI数据分析报表

    本文将深入探讨如何使用FineBI工具,结合Hadoop和Hive,实现对陌陌聊天数据的深度挖掘与可视化的报表构建。 首先,让我们了解核心组件的作用: 1. **Hadoop**:这是一个开源的分布式计算框架,它允许在大规模集群...

    hadoop-streaming-2.8.0_jar_2.8.0_hadoop_streaming_

    2. **DumpTypedBytes**: 这可能是用于处理二进制数据的工具类,它可能在读取或写入特定格式的数据时被调用,尤其是在处理自定义数据类型时。 3. **Environment.class**: 这可能涉及到 Hadoop Streaming 中的任务...

    Hadoop数据仓库工具hive介绍.pdf

    ### Hadoop数据仓库工具Hive介绍 #### 一、简介 **1.1 Hive是什么** Hive是一款构建在Hadoop之上的数据仓库工具,它利用HDFS(Hadoop Distributed File System)进行数据存储,并通过Hadoop MapReduce来执行数据...

    广东工业大学Hadoop高级应用实验报告

    4. 复杂数据类型和算法:例如,使用SequenceFile存储自定义数据类型,或应用更复杂的排序和聚合算法。 5. 集群监控与管理:通过Ambari等工具监控Hadoop集群状态,进行故障排查和性能调优。 6. Spark与Hadoop集成:...

    springboot对hadoop增删改查源码及hadoop图片访问

    - 对于“增删改查”操作,我们可以定义自定义的Repository接口,扩展Spring Data Hadoop提供的方法,比如`save()`, `delete()`, `findById()`, `findAll()`等。 - 而具体的数据操作逻辑,如MapReduce任务,可以...

    【Hadoop篇09】Hadoop序列化1

    ong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow);...在统计手机号码的流量数据案例中,`FlowBean`就是这样一个关键的自定义数据类型,使得我们可以高效地处理和分析用户流量信息。

    Hadoop数据仓库--hive介绍

    Hive 是一个基于 Hadoop 的数据仓库工具,它允许用户使用类似于 SQL 的查询语言(称为 HiveQL 或 HQL)对大规模数据集进行分析和处理。Hive 的设计初衷是为了简化大数据处理,使得非编程背景的用户也能方便地进行...

    hadoop入门学习文档

    此外,鉴于数据类型和数量的爆炸性增长,大数据领域的人才相对紧缺,特别是 Hadoop 分布式集群搭建、HDFS 和 MapReduce 使用等方面的专业人才尤为抢手。 #### Hadoop 在大数据、云计算中的位置和关系 - **云计算...

    大数据之路选择Hadoop还是MaxCompute?Hadoop开源与MaxCompute对比材料

    - **2.x版本系列**:引入了YARN (Yet Another Resource Negotiator),这是一种新的资源管理和任务调度系统,使得Hadoop能够更好地支持多种类型的数据处理应用程序。 此外,市场上还有几家知名的Hadoop发行商,如...

Global site tag (gtag.js) - Google Analytics