`

基于ODPS的MapReduce例子

阅读更多
公司提了一个要求,要基于阿里云的ODPS实现一个简单的数据ETL Demo。
基本需求如下:多条身份证,姓名,来源部门信息遵循两条规则, 有权威部门则采用权威部门数据,无权威部门则采用出现次数多权重数据。
实现过程如下:
1.去阿里云申请accessID, accessKey
2.下载SDK开发工具
3.下载ODPS Eclipse插件并集成
4.仿造WordCount例子实现需求

具体表结构及代码

project=example_project
table=etl_in
columns=idcard:STRING,name:STRING,dept:STRING

522635****0114890X,张三,公安
522635****0114890X,张三,人社
522635****0114890X,张四,计生
522635****09122343,李四,计生
522635****09122343,张四,人社
522635****09122343,张四,综治
522635****09122343,张四,教育
622635****01148903,王五,公安
622635****01148903,王八,计生
622635****01148903,王八,民政


project=example_project
table=etl_out
columns=idcard:STRING,name:STRING
结果数据
522635****09122343,张四
522635****0114890X,张三
622635****01148903,王五



package com.aliyun.odps.examples.mr;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;

public class EtlTools {

  public static class TokenizerMapper extends MapperBase {

    Record idCard;
    Record name_dept;
    Record result;

    @Override
    public void setup(TaskContext context) throws IOException {
      idCard = context.createMapOutputKeyRecord();
      name_dept = context.createMapOutputValueRecord();
    }

    @Override
    public void map(long recordNum, Record record, TaskContext context) throws IOException {         
    idCard.set(new Object[]{record.get("idcard").toString()});
    name_dept.set(new Object[] {record.get("name").toString() + "_" + record.get("dept").toString()});
    context.write(idCard, name_dept);    
    }
  }

  /**
   * A combiner class that combines map output by sum them.
   */
  public static class SumCombiner extends ReducerBase {
    private Record count;

    @Override
    public void setup(TaskContext context) throws IOException {
      count = context.createMapOutputValueRecord();
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
   
      System.out.println("reduce key:" + key.get(0).toString());
      Map<String, Integer>  name2Dept = new HashMap<String, Integer>();
      String finalName ="";
      Integer countValue = 0;
      while (values.hasNext()) {
        Record val = values.next();
        System.out.println("value: "+val.get(0));
        String[] vals = (val.get(0).toString()).split("_");
        String name = vals[0];
        String dept = vals[1];
        if("公安".equals(dept)) {//权威部门,规则一
        finalName = name;
        break;
        }
      
        if(name2Dept.containsKey(name)) {
        Integer count = name2Dept.get(name) + 1;
        name2Dept.put(name, count);
       
        } else {
        name2Dept.put(name, 1);
        }               
    
      }
   
      //规则二,权重,次数多
      for(String name : name2Dept.keySet()) {
    Integer val =  name2Dept.get(name);
    if (val.intValue() > countValue.intValue()) {
    countValue = val;
    finalName = name;
    }    
     
      }
     
      System.out.println("key: " + key.toString());
      System.out.println("finalName: " + finalName);
      count.set(new Object[] {finalName});
      context.write(key, count);
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class SumReducer extends ReducerBase {
    private Record result;
 

    @Override
    public void setup(TaskContext context) throws IOException {
      result = context.createOutputRecord();      
    }

    @Override
    public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {

      Object obj = null;
      while (values.hasNext()) {
        Record val = values.next();
        obj = val.get(0);   
      }
      result.set(0, key.get(0));
      result.set(1, obj);
      context.write(result);
    }
  }

public static void main(String[] args) throws OdpsException {
JobConf conf = new JobConf();
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(SumCombiner.class);
conf.setReducerClass(SumReducer.class);
conf.setMapOutputKeySchema(SchemaUtils.fromString("idcard:string"));
conf.setMapOutputValueSchema(SchemaUtils.fromString("name:string"));
InputUtils.addTable(TableInfo.builder().tableName("etl_in").cols(new String[]{ "idcard", "name", "dept"}).build(), conf);
OutputUtils.addTable(TableInfo.builder().tableName("etl_out").build(), conf);

RunningJob  job = JobClient.runJob(conf);
job.waitForCompletion();
}

}



分享到:
评论

相关推荐

    大数据技术 ODPS MapReduce对外开放实践 共20页.pptx

    《大数据技术:ODPS MapReduce对外开放实践》 在大数据领域,ODPS(Open Data Processing Service)是阿里巴巴集团推出的一种用于大规模数据处理的底层平台。ODPS的核心目标是为用户提供一个高效、稳定且易于使用的...

    少杰 (徐东):ODPS MapReduce对外开放实践

    徐东作为阿里巴巴数据平台事业部的ODPS技术专家,在2014年的中国大数据技术大会上分享了关于ODPS MapReduce对外开放实践的演讲。ODPS,全称为OpenDataProcessSystem,是一个大规模数据处理的底层平台,每天都能够...

    ODPS MapReduce 实现和开放实践.zip

    ODPS(Open Data Processing Service)是阿里云推出的一种大规模数据处理服务,它提供了一种基于Hadoop MapReduce的计算框架,使得用户可以在云端进行大规模的数据分析。本实践主要围绕ODPS MapReduce的实现原理和...

    [内部文档]阿里巴巴ODPS的map-reduce服务

    包含5个pdf文档(都是内部文档截止到2014-4-28未开放的): Map-Reduce SDK简介 — ODPS mapreduce快速入门 — ODPS MapReduce — ODPS 如何运行MapReduce — ODPS 应用限制 — ODPS

    aliyun-odps-eclipse-plugin:Eclipse插件,用于开发ODPS UDF和MR作业

    ODPS Eclipse插件 ...右键单击类WordCount(或类Resource)-&gt;运行方式-&gt; ODPS Mapreduce-&gt;输入参数-&gt;完成 单击菜单栏中的运行-&gt;运行配置-&gt;右键单击ODPS Mapreduce-&gt;新建-&gt;输入参数-&gt;运行 本地Debug UDF

    Java连接ODPS文档和代码

    以上就是Java连接ODPS的基本知识点,涵盖了ODPS的实例创建、表操作、SQL执行、MapReduce编程以及分桶和分区表的创建。实际开发中,还需要根据具体需求进行更复杂的数据处理和分析。提供的"ODPS_JAVA"压缩包文件可能...

    odps权威指南最新版

    4. **计算模型**:ODPS采用MapReduce计算模型,将复杂的任务分解为一系列Map和Reduce阶段,以并行的方式在大量服务器上执行,大大提升了计算效率。 5. **资源调度**:ODPS使用了基于YARN的资源调度系统,能够自动...

    ODPS参考手册

    ODPS基于Hadoop生态系统,但相比Hadoop MapReduce,ODPS提供了更高效、易用的SQL接口,使得用户可以使用SQL语句进行大数据分析,无需编写复杂的MapReduce程序。 在“ODPS参考手册”中,你可能会学到以下关键知识点...

    odps-eclipse-plugin-bundle-0.16.0.zip

    一旦插件安装成功,开发者可以在Eclipse中创建ODPS项目,编写MapReduce、SQL等任务,并进行调试和运行。ODPS Eclipse插件提供了代码自动补全、语法高亮、错误检查等功能,极大地提高了开发效率。此外,它还支持项目...

    odps_book:我关于 odps 的书的源代码

    mapreduce - ch07, ch08 xlab - ch09,使用机器学习算法 use_sdk - ch10,使用ODPS SDK访问ODPS服务 as_dba - ch11,账户/资源/数据管理 数据 - 用于演示书中示例的数据 图像 - 书中的一些(彩色)图像 接触 任何...

    阿里云 odps 文档

    1. **ODPS概述**:ODPS是阿里云推出的一种分布式数据处理平台,它基于大规模并行处理(MPP)架构,可以处理PB级别的数据。ODPS支持离线批处理任务,如ETL(提取、转换、加载)以及复杂的数据分析。 2. **ODPS SQL**...

    odps操作手册

    【ODPS概述】 开放数据处理服务(ODPS)是由阿里巴巴集团研发的一种大数据处理...在后续的学习和实践中,还可以深入研究ODPS的高级特性,如MapReduce编程、多表JOIN操作、复杂SQL查询优化等,提升数据分析效率和质量。

    odps(MaxCompute) 权威详尽说明帮助手册

    ODPS(MaxCompute)是阿里巴巴集团推出的一种大数据处理平台,主要设计用于海量数据的离线分析。本权威详尽的帮助手册旨在深入解析ODPS的核心功能、底层优化原理以及实际操作中的各种细节,帮助用户充分利用这一工具...

    阿里云 odps 文档.pdf

    阿里云ODPS(Open Data Processing Service)是一种大规模数据处理服务,提供了基于SQL的数据处理能力。ODPS SQL是ODPS的一部分,提供了类似于SQL的语法,用于处理大规模数据。 ODPS SQL的特点 ODPS SQL采用的是...

    云计算环境下基于MapReduce的并行化排列熵算法.pdf

    MaxCompute表能够存储海量的监测数据,并基于MapReduce模型进行高效的数据处理。MapReduce是一种编程模型,用于大规模数据集的并行运算,其核心思想是将自动并行化,以及将计算和存储分离。 研究者们基于MaxCompute...

    datahub_test_001_odps_datahub_IDEAL_datahub和odps_

    首先,ODPS(开放数据处理服务)是阿里云推出的一种大数据处理平台,它提供了海量数据的存储和计算能力,支持SQL查询以及MapReduce等计算框架。ODPS主要适用于离线数据分析,如日志分析、报表生成等场景,具备高扩展...

    阿里巴巴开放数据处理服务odps

    开放数据处理服务(Open Data Processing Service,ODPS)是基于飞天分布式系统构建的海 量数据处理和分析的服务平台,具有 PB 级别的数据处理能力, 主要适用于实时性要求不高 的海量数据处理,如数据分析、海量数据...

    ODPS权威指南 阿里大数据平台应用开发实践

    《ODPS权威指南:阿里大数据平台应用开发实践》是一本深度解析阿里巴巴ODPS技术的专著,旨在为读者提供全面、深入的ODPS理解和应用经验。ODPS,全称为Open Data Processing Service,是阿里巴巴集团自主研发的大数据...

    阿里云java源码-aliyun-odps-jdbc:用于ODPS的JDBC驱动程序

    ODPS JDBC 驱动程序有两种方法。 1.第一个是使用独立库: 从 . 结帐。 2.二是依靠maven为你解决依赖: &lt; dependency &gt; &lt; groupId &gt;com.aliyun.odps&lt;/ groupId &gt; &lt; artifactId &gt;odps-jdbc&lt;/ ...

    阿里开放数据处理服务ODPS介绍.pptx

    阿里开放数据处理服务ODPS是一款基于云计算的数据处理和分析平台,旨在帮助企业解决大数据处理和分析的问题。ODPS提供了一站式的数据处理和分析服务,使用户可以快速构建大数据应用程序。 背景与概况: 随着大数据...

Global site tag (gtag.js) - Google Analytics