`

ChainMapper和ChainReducer处理数据流程示例

阅读更多

package com.oncedq.code;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.ChainMapper;

import com.oncedq.code.util.DateUtil;

public class ProcessSample {
	public static class ExtractMappper extends MapReduceBase implements
			Mapper<LongWritable, Text, LongWritable, Conn1> {

		@Override
		public void map(LongWritable arg0, Text arg1,
				OutputCollector<LongWritable, Conn1> arg2, Reporter arg3)
				throws IOException {
			String line = arg1.toString();
			String[] strs = line.split(";");
			Conn1 conn1 = new Conn1();
			conn1.orderKey = Long.parseLong(strs[0]);
			conn1.customer = Long.parseLong(strs[1]);
			conn1.state = strs[2];
			conn1.price = Double.parseDouble(strs[3]);
			conn1.orderDate = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");
			LongWritable lw = new LongWritable(conn1.orderKey);
			arg2.collect(lw, conn1);
		}

	}

	private static class Conn1 implements WritableComparable<Conn1> {
		public long orderKey;
		public long customer;
		public String state;
		public double price;
		public java.util.Date orderDate;

		@Override
		public void readFields(DataInput in) throws IOException {
			orderKey = in.readLong();
			customer = in.readLong();
			state = Text.readString(in);
			price = in.readDouble();
			orderDate = DateUtil.getDateFromString(Text.readString(in),
					"yyyy-MM-dd");
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(orderKey);
			out.writeLong(customer);
			Text.writeString(out, state);
			out.writeDouble(price);
			Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
		}

		@Override
		public int compareTo(Conn1 arg0) {
			// TODO Auto-generated method stub
			return 0;
		}

	}

	public static class Filter1Mapper extends MapReduceBase implements
			Mapper<LongWritable, Conn1, LongWritable, Conn2> {

		@Override
		public void map(LongWritable inKey, Conn1 c2,
				OutputCollector<LongWritable, Conn2> collector, Reporter report)
				throws IOException {
			if (c2.state.equals("F")) {
				Conn2 inValue = new Conn2();
				inValue.customer = c2.customer;
				inValue.orderDate = c2.orderDate;
				inValue.orderKey = c2.orderKey;
				inValue.price = c2.price;
				inValue.state = c2.state;
				collector.collect(inKey, inValue);
			}
		}

	}

	private static class Conn2 implements WritableComparable<Conn1> {
		public long orderKey;
		public long customer;
		public String state;
		public double price;
		public java.util.Date orderDate;

		@Override
		public void readFields(DataInput in) throws IOException {
			orderKey = in.readLong();
			customer = in.readLong();
			state = Text.readString(in);
			price = in.readDouble();
			orderDate = DateUtil.getDateFromString(Text.readString(in),
					"yyyy-MM-dd");
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(orderKey);
			out.writeLong(customer);
			Text.writeString(out, state);
			out.writeDouble(price);
			Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
		}

		@Override
		public int compareTo(Conn1 arg0) {
			// TODO Auto-generated method stub
			return 0;
		}

	}

	public static class RegexMapper extends MapReduceBase implements
			Mapper<LongWritable, Conn2, LongWritable, Conn3> {

		@Override
		public void map(LongWritable inKey, Conn2 c3,
				OutputCollector<LongWritable, Conn3> collector, Reporter report)
				throws IOException {
			c3.state = c3.state.replaceAll("F", "Find");
			Conn3 c2 = new Conn3();
			c2.customer = c3.customer;
			c2.orderDate = c3.orderDate;
			c2.orderKey = c3.orderKey;
			c2.price = c3.price;
			c2.state = c3.state;
			collector.collect(inKey, c2);
		}
	}

	private static class Conn3 implements WritableComparable<Conn1> {
		public long orderKey;
		public long customer;
		public String state;
		public double price;
		public java.util.Date orderDate;

		@Override
		public void readFields(DataInput in) throws IOException {
			orderKey = in.readLong();
			customer = in.readLong();
			state = Text.readString(in);
			price = in.readDouble();
			orderDate = DateUtil.getDateFromString(Text.readString(in),
					"yyyy-MM-dd");
		}

		@Override
		public void write(DataOutput out) throws IOException {
			out.writeLong(orderKey);
			out.writeLong(customer);
			Text.writeString(out, state);
			out.writeDouble(price);
			Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
		}

		@Override
		public int compareTo(Conn1 arg0) {
			// TODO Auto-generated method stub
			return 0;
		}

	}

	public static class LoadMapper extends MapReduceBase implements
			Mapper<LongWritable, Conn3, LongWritable, Conn3> {

		@Override
		public void map(LongWritable arg0, Conn3 arg1,
				OutputCollector<LongWritable, Conn3> arg2, Reporter arg3)
				throws IOException {
			arg2.collect(arg0, arg1);
		}

	}

	public static void main(String[] args) {
		JobConf job = new JobConf(ProcessSample.class);
		job.setJobName("ProcessSample");
		job.setNumReduceTasks(0);
		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		JobConf mapper1 = new JobConf();
		JobConf mapper2 = new JobConf();
		JobConf mapper3 = new JobConf();
		JobConf mapper4 = new JobConf();
		ChainMapper cm = new ChainMapper();
		cm.addMapper(job, ExtractMappper.class, LongWritable.class, Text.class,
				LongWritable.class, Conn1.class, true, mapper1);
		cm.addMapper(job, Filter1Mapper.class, LongWritable.class, Conn1.class,
				LongWritable.class, Conn2.class, true, mapper2);
		cm.addMapper(job, RegexMapper.class, LongWritable.class, Conn2.class,
				LongWritable.class, Conn3.class, true, mapper3);
		cm.addMapper(job, LoadMapper.class, LongWritable.class, Conn3.class,
				LongWritable.class, Conn3.class, true, mapper4);
		FileInputFormat.setInputPaths(job, new Path("orderData"));
		FileOutputFormat.setOutputPath(job, new Path("orderDataOutput"));
		Job job1;
		try {
			job1 = new Job(job);
			JobControl jc = new JobControl("test");
			jc.addJob(job1);
			jc.run();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}
}
 如果想了解程序的具体意思,探讨ChainMapper和ChainReducer在数据处理流程中的应用,请加我QQ:405078363
分享到:
评论
1 楼 Genie13 2012-05-17  
很不错的例子啊
Conn3 c2 = new Conn3(); 
collector.collect(inKey, c2);
这个会产生很多的Conn3对象,一般用set方法

相关推荐

    数据流程图示例

    5. **错误示例**:在绘制数据流程图时,需要避免一些常见错误,如数据源没有输出、处理没有输入,或者数据流直接从外部实体流向数据存储而不经过处理等。每个处理应至少有一个输入数据流和一个输出数据流。 6. **...

    qt 绘制流程图示例程序——Diagram

    "Diagram"这个示例程序提供了一种实现方式,利用Qt的Graphics View Framework来创建和编辑流程图。以下是对该示例程序及其涉及的技术点的详细说明。 首先,Qt的Graphics View Framework是一个强大的图形渲染和交互...

    AdventureWorksDW2012 数据仓库示例库

    在数据库领域,AdventureWorksDW2012是一个广泛使用的示例数据仓库,为学习和测试数据仓库技术提供了丰富的资源。这个实例库由Microsoft开发,旨在帮助用户了解如何构建、管理和优化数据仓库解决方案。本文将深入...

    it各类流程图设计示例

    1. 数据业务流程图:这个示例通常涉及到数据的采集、处理、存储和分析等环节。可能包括数据输入、清洗、转换、存储(如数据库管理)、分析和报表生成等步骤。通过这样的流程图,我们可以清晰地理解数据在整个业务...

    BIM的ifc示例数据

    总结来说,这个IFC示例数据集是一个宝贵的资源,它可以帮助用户理解和掌握IFC数据格式在BIM项目中的实际应用,同时通过BIMServer的测试,进一步了解BIM协同工作的流程和技术优势。对于学习和研究BIM技术,以及提高...

    县霍乱疫情应急处理流程图示例.pdf

    县霍乱疫情应急处理流程图示例.pdf

    GMTSAR软件InSAR数据处理流程及使用详细说明(GMTSAR: An InSAR Processing System Based on Generic Mapping Tools (Second Edition))

    InSAR处理开源软件GMTSAR的使用详细说明,可以处理ENVISAT、ALOS、Sentinel等诸多卫星数据。(GMTSAR: An InSAR Processing System Based on Generic Mapping Tools (Second Edition))

    管理信息系统:8 数据流程图绘制示例.ppt

    管理信息系统:8 数据流程图绘制示例.ppt

    大数据处理利器:Spark+ZooKeeper+Kafka Scala源码示例

    - 示例丰富:提供了丰富的代码示例,便于学习和理解大数据处理流程。 - 环境自搭:项目不包含预配置环境,需要根据指南自行搭建,以适应不同的部署需求。 该代码集是大数据处理领域的实践宝库,适合对Spark、...

    osworkflow eclipse+mysql 请假审批流程 完整示例

    《基于osworkflow与Eclipse的请假审批流程完整示例解析》 在IT行业中,工作流管理系统(Workflow Management System,简称WMS)是企业信息化建设的重要组成部分,它能够规范业务流程,提高工作效率。osworkflow是一...

    调用WebService与后台数据交互示例

    5. 处理返回结果:接收来自服务的方法调用结果,通常是XML格式,然后解析和处理这些数据。 三、与后台数据库交互 1. 数据库连接:在WebService中,我们需要建立到后台数据库的连接,这通常通过JDBC(Java Database ...

    数据集代码示例.rar

    下面我将给出几个常见的数据集处理代码示例,分别使用Python语言和其流行的数据处理库Pandas。 示例 1: 读取CSV文件 假设你有一个名为data.csv的CSV文件,你想使用Pandas库来读取它。 python import pandas as pd ...

    《检查处理kettle数据流中的空行》示例附件代码

    本教程《检查处理kettle数据流中的空行》主要关注如何在Kettle的数据流中识别和处理包含空行的情况,这对于数据清洗和预处理来说至关重要。下面将详细解释这个过程以及相关的Kettle知识点。 首先,我们需要理解...

    一个UDP接收/发送数据的示例程序(VC++ 源码)

    一个UDP接收/发送数据的示例程序 说明:本示例程序仅为新手演示UDP程序的一般过程... 发送数据过程: 1.创建socket(socket) 2.向目的IP的指定端口发送数据(sendto) ... 监听本地指定端口UDP数据过程: 1.创建socket...

    哨兵1号数据处理手册大全

    总之,《哨兵1号数据处理手册大全》是一份详尽的手册,不仅包含了哨兵1号数据处理的基本步骤和技术细节,而且还提供了一系列实用的示例和指南,旨在帮助用户充分利用GAMMA软件处理哨兵1号数据,以满足各种科研和应用...

    Jquery dataTable后台获取数据示例

    总结,jQuery DataTables 结合Struts2、Spring和Ibatis可以创建功能丰富的数据管理界面,提供高效的后台数据获取和处理。通过熟练掌握这些技术,开发者能够构建出用户体验优良且易于维护的数据应用。

    android usb转串口数据通信示例

    android usb转串口数据通信示例。物联网开发中也会经常用到usb转串口,对android手机进行通信。一般都会用otc线进行转换。我在GitHub下来一份代码,亲测可用。并进行了修改封装。我的博客地址:...

    java.流程控制(处理方案示例).md

    java.流程控制(处理方案示例).md

    java.流程控制(处理方案示例).txt

    java.流程控制(处理方案示例)

    linux下中断处理程序示例,含驱动和应用程序

    在这个示例中,DAL可能包含了中断管理的接口,使得应用程序能更方便地触发和处理中断,而无需关心具体的硬件细节。 应用程序可以利用驱动提供的接口直接调用中断处理函数,这在某些实时性要求高的场景中非常有用。...

Global site tag (gtag.js) - Google Analytics