- 浏览: 25912 次
- 性别:
- 来自: 深圳
文章分类
最新评论
公司提了一个要求,要基于阿里云的ODPS实现一个简单的数据ETL Demo。
基本需求如下:多条身份证,姓名,来源部门信息遵循两条规则, 有权威部门则采用权威部门数据,无权威部门则采用出现次数多权重数据。
实现过程如下:
1.去阿里云申请accessID, accessKey
2.下载SDK开发工具
3.下载ODPS Eclipse插件并集成
4.仿造WordCount例子实现需求
具体表结构及代码
project=example_project
table=etl_in
columns=idcard:STRING,name:STRING,dept:STRING
522635****0114890X,张三,公安
522635****0114890X,张三,人社
522635****0114890X,张四,计生
522635****09122343,李四,计生
522635****09122343,张四,人社
522635****09122343,张四,综治
522635****09122343,张四,教育
622635****01148903,王五,公安
622635****01148903,王八,计生
622635****01148903,王八,民政
project=example_project
table=etl_out
columns=idcard:STRING,name:STRING
结果数据
522635****09122343,张四
522635****0114890X,张三
622635****01148903,王五
package com.aliyun.odps.examples.mr;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class EtlTools {
public static class TokenizerMapper extends MapperBase {
Record idCard;
Record name_dept;
Record result;
@Override
public void setup(TaskContext context) throws IOException {
idCard = context.createMapOutputKeyRecord();
name_dept = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
idCard.set(new Object[]{record.get("idcard").toString()});
name_dept.set(new Object[] {record.get("name").toString() + "_" + record.get("dept").toString()});
context.write(idCard, name_dept);
}
}
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
System.out.println("reduce key:" + key.get(0).toString());
Map<String, Integer> name2Dept = new HashMap<String, Integer>();
String finalName ="";
Integer countValue = 0;
while (values.hasNext()) {
Record val = values.next();
System.out.println("value: "+val.get(0));
String[] vals = (val.get(0).toString()).split("_");
String name = vals[0];
String dept = vals[1];
if("公安".equals(dept)) {//权威部门,规则一
finalName = name;
break;
}
if(name2Dept.containsKey(name)) {
Integer count = name2Dept.get(name) + 1;
name2Dept.put(name, count);
} else {
name2Dept.put(name, 1);
}
}
//规则二,权重,次数多
for(String name : name2Dept.keySet()) {
Integer val = name2Dept.get(name);
if (val.intValue() > countValue.intValue()) {
countValue = val;
finalName = name;
}
}
System.out.println("key: " + key.toString());
System.out.println("finalName: " + finalName);
count.set(new Object[] {finalName});
context.write(key, count);
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class SumReducer extends ReducerBase {
private Record result;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
Object obj = null;
while (values.hasNext()) {
Record val = values.next();
obj = val.get(0);
}
result.set(0, key.get(0));
result.set(1, obj);
context.write(result);
}
}
public static void main(String[] args) throws OdpsException {
JobConf conf = new JobConf();
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(SumCombiner.class);
conf.setReducerClass(SumReducer.class);
conf.setMapOutputKeySchema(SchemaUtils.fromString("idcard:string"));
conf.setMapOutputValueSchema(SchemaUtils.fromString("name:string"));
InputUtils.addTable(TableInfo.builder().tableName("etl_in").cols(new String[]{ "idcard", "name", "dept"}).build(), conf);
OutputUtils.addTable(TableInfo.builder().tableName("etl_out").build(), conf);
RunningJob job = JobClient.runJob(conf);
job.waitForCompletion();
}
}
基本需求如下:多条身份证,姓名,来源部门信息遵循两条规则, 有权威部门则采用权威部门数据,无权威部门则采用出现次数多权重数据。
实现过程如下:
1.去阿里云申请accessID, accessKey
2.下载SDK开发工具
3.下载ODPS Eclipse插件并集成
4.仿造WordCount例子实现需求
具体表结构及代码
project=example_project
table=etl_in
columns=idcard:STRING,name:STRING,dept:STRING
522635****0114890X,张三,公安
522635****0114890X,张三,人社
522635****0114890X,张四,计生
522635****09122343,李四,计生
522635****09122343,张四,人社
522635****09122343,张四,综治
522635****09122343,张四,教育
622635****01148903,王五,公安
622635****01148903,王八,计生
622635****01148903,王八,民政
project=example_project
table=etl_out
columns=idcard:STRING,name:STRING
结果数据
522635****09122343,张四
522635****0114890X,张三
622635****01148903,王五
package com.aliyun.odps.examples.mr;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class EtlTools {
public static class TokenizerMapper extends MapperBase {
Record idCard;
Record name_dept;
Record result;
@Override
public void setup(TaskContext context) throws IOException {
idCard = context.createMapOutputKeyRecord();
name_dept = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context) throws IOException {
idCard.set(new Object[]{record.get("idcard").toString()});
name_dept.set(new Object[] {record.get("name").toString() + "_" + record.get("dept").toString()});
context.write(idCard, name_dept);
}
}
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
System.out.println("reduce key:" + key.get(0).toString());
Map<String, Integer> name2Dept = new HashMap<String, Integer>();
String finalName ="";
Integer countValue = 0;
while (values.hasNext()) {
Record val = values.next();
System.out.println("value: "+val.get(0));
String[] vals = (val.get(0).toString()).split("_");
String name = vals[0];
String dept = vals[1];
if("公安".equals(dept)) {//权威部门,规则一
finalName = name;
break;
}
if(name2Dept.containsKey(name)) {
Integer count = name2Dept.get(name) + 1;
name2Dept.put(name, count);
} else {
name2Dept.put(name, 1);
}
}
//规则二,权重,次数多
for(String name : name2Dept.keySet()) {
Integer val = name2Dept.get(name);
if (val.intValue() > countValue.intValue()) {
countValue = val;
finalName = name;
}
}
System.out.println("key: " + key.toString());
System.out.println("finalName: " + finalName);
count.set(new Object[] {finalName});
context.write(key, count);
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class SumReducer extends ReducerBase {
private Record result;
@Override
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
Object obj = null;
while (values.hasNext()) {
Record val = values.next();
obj = val.get(0);
}
result.set(0, key.get(0));
result.set(1, obj);
context.write(result);
}
}
public static void main(String[] args) throws OdpsException {
JobConf conf = new JobConf();
conf.setMapperClass(TokenizerMapper.class);
conf.setCombinerClass(SumCombiner.class);
conf.setReducerClass(SumReducer.class);
conf.setMapOutputKeySchema(SchemaUtils.fromString("idcard:string"));
conf.setMapOutputValueSchema(SchemaUtils.fromString("name:string"));
InputUtils.addTable(TableInfo.builder().tableName("etl_in").cols(new String[]{ "idcard", "name", "dept"}).build(), conf);
OutputUtils.addTable(TableInfo.builder().tableName("etl_out").build(), conf);
RunningJob job = JobClient.runJob(conf);
job.waitForCompletion();
}
}
发表评论
-
Canal相关理解
2017-12-29 16:18 465转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 7361.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 871设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+kafka+Spark Steaming demo2
2017-11-22 13:15 465一,flume配置 # Name the components ... -
Flume+Kafka+Spark Steaming demo
2017-11-21 15:21 448一.准备flume配置 a1.sources = r1 a1. ... -
HBase表导出成HDFS
2017-10-19 19:40 906导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 367一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 912一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3701. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 1047为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 480package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 475#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 4221.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1364一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 361192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 398物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
Storm demo
2016-12-19 15:50 446public class SentenceSpout exte ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 1041将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1684当在windows下运行MR程序时,会报各种错误。现把这次碰到 ...
相关推荐
《大数据技术:ODPS MapReduce对外开放实践》 在大数据领域,ODPS(Open Data Processing Service)是阿里巴巴集团推出的一种用于大规模数据处理的底层平台。ODPS的核心目标是为用户提供一个高效、稳定且易于使用的...
徐东作为阿里巴巴数据平台事业部的ODPS技术专家,在2014年的中国大数据技术大会上分享了关于ODPS MapReduce对外开放实践的演讲。ODPS,全称为OpenDataProcessSystem,是一个大规模数据处理的底层平台,每天都能够...
ODPS(Open Data Processing Service)是阿里云推出的一种大规模数据处理服务,它提供了一种基于Hadoop MapReduce的计算框架,使得用户可以在云端进行大规模的数据分析。本实践主要围绕ODPS MapReduce的实现原理和...
包含5个pdf文档(都是内部文档截止到2014-4-28未开放的): Map-Reduce SDK简介 — ODPS mapreduce快速入门 — ODPS MapReduce — ODPS 如何运行MapReduce — ODPS 应用限制 — ODPS
ODPS Eclipse插件 ...右键单击类WordCount(或类Resource)->运行方式-> ODPS Mapreduce->输入参数->完成 单击菜单栏中的运行->运行配置->右键单击ODPS Mapreduce->新建->输入参数->运行 本地Debug UDF
以上就是Java连接ODPS的基本知识点,涵盖了ODPS的实例创建、表操作、SQL执行、MapReduce编程以及分桶和分区表的创建。实际开发中,还需要根据具体需求进行更复杂的数据处理和分析。提供的"ODPS_JAVA"压缩包文件可能...
4. **计算模型**:ODPS采用MapReduce计算模型,将复杂的任务分解为一系列Map和Reduce阶段,以并行的方式在大量服务器上执行,大大提升了计算效率。 5. **资源调度**:ODPS使用了基于YARN的资源调度系统,能够自动...
ODPS基于Hadoop生态系统,但相比Hadoop MapReduce,ODPS提供了更高效、易用的SQL接口,使得用户可以使用SQL语句进行大数据分析,无需编写复杂的MapReduce程序。 在“ODPS参考手册”中,你可能会学到以下关键知识点...
一旦插件安装成功,开发者可以在Eclipse中创建ODPS项目,编写MapReduce、SQL等任务,并进行调试和运行。ODPS Eclipse插件提供了代码自动补全、语法高亮、错误检查等功能,极大地提高了开发效率。此外,它还支持项目...
mapreduce - ch07, ch08 xlab - ch09,使用机器学习算法 use_sdk - ch10,使用ODPS SDK访问ODPS服务 as_dba - ch11,账户/资源/数据管理 数据 - 用于演示书中示例的数据 图像 - 书中的一些(彩色)图像 接触 任何...
1. **ODPS概述**:ODPS是阿里云推出的一种分布式数据处理平台,它基于大规模并行处理(MPP)架构,可以处理PB级别的数据。ODPS支持离线批处理任务,如ETL(提取、转换、加载)以及复杂的数据分析。 2. **ODPS SQL**...
【ODPS概述】 开放数据处理服务(ODPS)是由阿里巴巴集团研发的一种大数据处理...在后续的学习和实践中,还可以深入研究ODPS的高级特性,如MapReduce编程、多表JOIN操作、复杂SQL查询优化等,提升数据分析效率和质量。
ODPS(MaxCompute)是阿里巴巴集团推出的一种大数据处理平台,主要设计用于海量数据的离线分析。本权威详尽的帮助手册旨在深入解析ODPS的核心功能、底层优化原理以及实际操作中的各种细节,帮助用户充分利用这一工具...
阿里云ODPS(Open Data Processing Service)是一种大规模数据处理服务,提供了基于SQL的数据处理能力。ODPS SQL是ODPS的一部分,提供了类似于SQL的语法,用于处理大规模数据。 ODPS SQL的特点 ODPS SQL采用的是...
MaxCompute表能够存储海量的监测数据,并基于MapReduce模型进行高效的数据处理。MapReduce是一种编程模型,用于大规模数据集的并行运算,其核心思想是将自动并行化,以及将计算和存储分离。 研究者们基于MaxCompute...
首先,ODPS(开放数据处理服务)是阿里云推出的一种大数据处理平台,它提供了海量数据的存储和计算能力,支持SQL查询以及MapReduce等计算框架。ODPS主要适用于离线数据分析,如日志分析、报表生成等场景,具备高扩展...
开放数据处理服务(Open Data Processing Service,ODPS)是基于飞天分布式系统构建的海 量数据处理和分析的服务平台,具有 PB 级别的数据处理能力, 主要适用于实时性要求不高 的海量数据处理,如数据分析、海量数据...
《ODPS权威指南:阿里大数据平台应用开发实践》是一本深度解析阿里巴巴ODPS技术的专著,旨在为读者提供全面、深入的ODPS理解和应用经验。ODPS,全称为Open Data Processing Service,是阿里巴巴集团自主研发的大数据...
ODPS JDBC 驱动程序有两种方法。 1.第一个是使用独立库: 从 . 结帐。 2.二是依靠maven为你解决依赖: < dependency > < groupId >com.aliyun.odps</ groupId > < artifactId >odps-jdbc</ ...
阿里开放数据处理服务ODPS是一款基于云计算的数据处理和分析平台,旨在帮助企业解决大数据处理和分析的问题。ODPS提供了一站式的数据处理和分析服务,使用户可以快速构建大数据应用程序。 背景与概况: 随着大数据...