package com.oncedq.code;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.ChainMapper;
import com.oncedq.code.util.DateUtil;
public class ProcessSample {
public static class ExtractMappper extends MapReduceBase implements
Mapper<LongWritable, Text, LongWritable, Conn1> {
@Override
public void map(LongWritable arg0, Text arg1,
OutputCollector<LongWritable, Conn1> arg2, Reporter arg3)
throws IOException {
String line = arg1.toString();
String[] strs = line.split(";");
Conn1 conn1 = new Conn1();
conn1.orderKey = Long.parseLong(strs[0]);
conn1.customer = Long.parseLong(strs[1]);
conn1.state = strs[2];
conn1.price = Double.parseDouble(strs[3]);
conn1.orderDate = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");
LongWritable lw = new LongWritable(conn1.orderKey);
arg2.collect(lw, conn1);
}
}
private static class Conn1 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate;
@Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
}
@Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
}
}
public static class Filter1Mapper extends MapReduceBase implements
Mapper<LongWritable, Conn1, LongWritable, Conn2> {
@Override
public void map(LongWritable inKey, Conn1 c2,
OutputCollector<LongWritable, Conn2> collector, Reporter report)
throws IOException {
if (c2.state.equals("F")) {
Conn2 inValue = new Conn2();
inValue.customer = c2.customer;
inValue.orderDate = c2.orderDate;
inValue.orderKey = c2.orderKey;
inValue.price = c2.price;
inValue.state = c2.state;
collector.collect(inKey, inValue);
}
}
}
private static class Conn2 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate;
@Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
}
@Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
}
}
public static class RegexMapper extends MapReduceBase implements
Mapper<LongWritable, Conn2, LongWritable, Conn3> {
@Override
public void map(LongWritable inKey, Conn2 c3,
OutputCollector<LongWritable, Conn3> collector, Reporter report)
throws IOException {
c3.state = c3.state.replaceAll("F", "Find");
Conn3 c2 = new Conn3();
c2.customer = c3.customer;
c2.orderDate = c3.orderDate;
c2.orderKey = c3.orderKey;
c2.price = c3.price;
c2.state = c3.state;
collector.collect(inKey, c2);
}
}
private static class Conn3 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate;
@Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
}
@Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
}
}
public static class LoadMapper extends MapReduceBase implements
Mapper<LongWritable, Conn3, LongWritable, Conn3> {
@Override
public void map(LongWritable arg0, Conn3 arg1,
OutputCollector<LongWritable, Conn3> arg2, Reporter arg3)
throws IOException {
arg2.collect(arg0, arg1);
}
}
public static void main(String[] args) {
JobConf job = new JobConf(ProcessSample.class);
job.setJobName("ProcessSample");
job.setNumReduceTasks(0);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
JobConf mapper1 = new JobConf();
JobConf mapper2 = new JobConf();
JobConf mapper3 = new JobConf();
JobConf mapper4 = new JobConf();
ChainMapper cm = new ChainMapper();
cm.addMapper(job, ExtractMappper.class, LongWritable.class, Text.class,
LongWritable.class, Conn1.class, true, mapper1);
cm.addMapper(job, Filter1Mapper.class, LongWritable.class, Conn1.class,
LongWritable.class, Conn2.class, true, mapper2);
cm.addMapper(job, RegexMapper.class, LongWritable.class, Conn2.class,
LongWritable.class, Conn3.class, true, mapper3);
cm.addMapper(job, LoadMapper.class, LongWritable.class, Conn3.class,
LongWritable.class, Conn3.class, true, mapper4);
FileInputFormat.setInputPaths(job, new Path("orderData"));
FileOutputFormat.setOutputPath(job, new Path("orderDataOutput"));
Job job1;
try {
job1 = new Job(job);
JobControl jc = new JobControl("test");
jc.addJob(job1);
jc.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如果想了解程序的具体意思,探讨ChainMapper和ChainReducer在数据处理流程中的应用,请加我QQ:405078363
分享到:
相关推荐
数据流程图示例 软件工程 数据流程图示例 软件工程
在软件工程领域,系统流程图、数据流图和数据字典是进行软件开发过程中不可或缺的工具,它们在软件的需求分析和设计阶段起着至关重要的作用。以下是对这些概念的详细解释: 1. **系统流程图**(System Flowchart)...
在本资料包中,包含了一个使用VISIO绘制的软件流程图示例,该示例已转换为PDF格式,方便查看和打印。 1. **软件流程图的基本概念** 软件流程图(Software Flowchart)是一种图形化表示程序控制流的方式,起源于...
总之,Apache NiFi是一个强大且灵活的大数据处理平台,通过理解并熟练掌握FlowFile的生成和处理,以及模板的使用,用户可以构建出满足各种需求的ETL流程,从而在大数据世界中实现高效的数据同步和转换。
python实现ARIMA时间序列预测模型,附有示例数据以及完整流程的结果 python实现ARIMA时间序列预测模型,附有示例数据以及完整流程的结果 python实现ARIMA时间序列预测模型,附有示例数据以及完整流程的结果 python...
在本示例中,Quartz 被用来定时启动Spring Batch的Job,实现定期从MySQL读取和处理数据的功能。 **MySQL** MySQL 是一种广泛使用的开源关系型数据库管理系统。在本示例中,MySQL 作为数据源和数据目标,Spring ...
"Diagram"这个示例程序提供了一种实现方式,利用Qt的Graphics View Framework来创建和编辑流程图。以下是对该示例程序及其涉及的技术点的详细说明。 首先,Qt的Graphics View Framework是一个强大的图形渲染和交互...
1. 数据业务流程图:这个示例通常涉及到数据的采集、处理、存储和分析等环节。可能包括数据输入、清洗、转换、存储(如数据库管理)、分析和报表生成等步骤。通过这样的流程图,我们可以清晰地理解数据在整个业务...
基于Halcon图像处理的深度示例:二十余种工具集成应用,流程图编写任务执行无忧,完美源码附详细说明文档,基于halcon的图像处理示例,包含二十多个工具,采用流程图方式编写任务,源码可以完美执行,内含说明文档。...
总结来说,这个IFC示例数据集是一个宝贵的资源,它可以帮助用户理解和掌握IFC数据格式在BIM项目中的实际应用,同时通过BIMServer的测试,进一步了解BIM协同工作的流程和技术优势。对于学习和研究BIM技术,以及提高...
InSAR处理开源软件GMTSAR的使用详细说明,可以处理ENVISAT、ALOS、Sentinel等诸多卫星数据。(GMTSAR: An InSAR Processing System Based on Generic Mapping Tools (Second Edition))
9. **数据序列化与反序列化**:OpenDDS 负责数据的序列化和反序列化,将 C++ 对象转换为可以在网络上传输的二进制格式,然后在接收端再还原。 学习和理解这些核心概念是使用 OpenDDS 开发数据收发程序的基础。通过...
这些示例通常包括C语言或.NET环境下的代码片段,展示了如何调用FOCAS API来获取和处理CNC数据,如轴位置、速度、切削参数、报警信息等。通过研究和理解这些示例,开发者可以快速掌握数据采集的基本方法,并根据具体...
5. 处理返回结果:接收来自服务的方法调用结果,通常是XML格式,然后解析和处理这些数据。 三、与后台数据库交互 1. 数据库连接:在WebService中,我们需要建立到后台数据库的连接,这通常通过JDBC(Java Database ...
通过学习和实践这个示例,开发者不仅可以掌握Activiti的基本使用,还能了解到如何将流程引擎与实际业务场景相结合,提升企业的业务流程自动化水平。同时,Activiti的灵活性和扩展性也使其成为企业级应用开发的热门...
介绍一个使用Spark Streaming和Kafka进行实时数据处理的示例。通过该示例,您将了解到如何使用Spark ...这个示例涵盖了从数据源到数据处理和存储的完整流程,可以帮助你理解和应用实时数据处理的基本概念和技术。
泛微流程创建接口,可调用WorkflowServiceImpl.doCreateWorkflowRequest创建明细表及主表数据,供自己参考
总结,jQuery DataTables 结合Struts2、Spring和Ibatis可以创建功能丰富的数据管理界面,提供高效的后台数据获取和处理。通过熟练掌握这些技术,开发者能够构建出用户体验优良且易于维护的数据应用。
android usb转串口数据通信示例。物联网开发中也会经常用到usb转串口,对android手机进行通信。一般都会用otc线进行转换。我在GitHub下来一份代码,亲测可用。并进行了修改封装。我的博客地址:...
数据流程图是什么?做什么?怎么绘制?详细解释示例