`

Hadoop读书笔记(七)MapReduce 0.x版本API使用demo

阅读更多

Hadoop读书笔记(一)Hadoop介绍http://blog.csdn.net/caicongyang/article/details/39898629

Hadoop读书笔记(二)HDFS的shell操作http://blog.csdn.net/caicongyang/article/details/41253927

Hadoop读书笔记(三)Java API操作HDFShttp://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(四)HDFS体系结构http://blog.csdn.net/caicongyang/article/details/41322649

Hadoop读书笔记(五)MapReduce统计单词demohttp://blog.csdn.net/caicongyang/article/details/41453579

Hadoop读书笔记(六)MapReduce自定义数据类型demohttp://blog.csdn.net/caicongyang/article/details/41490379

1.说明

功能和上篇一样实现手机流量的统计(ps:可以与前面文章代码做对比)

数据格式也相同...

2.代码:

KpiApp.java

 

package old;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.HashPartitioner;
/**
 *
 * <p> 
 * Title: KpiApp.java 
 * Package old 
 * </p>
 * <p>
 * Description:  hadoop版本1.x的包一般是mapreduce
 * 		 hadoop版本0.x的包一般是mapred
 * <p>
 * @author Tom.Cai
 * @created 2014-11-25 下午10:23:47 
 * @version V1.0 
 *
 */
public class KpiApp {
	private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/wlan";
	private static final String OUT_PATH = "hdfs://192.168.80.100:9000/wlan_out";
	/**
	 * 改动:
	 * 1.不再使用Job,而是使用JobConf
	 * 2.类的包名不再使用mapreduce,而是使用mapred
	 * 3.不再使用job.waitForCompletion(true)提交作业,而是使用JobClient.runJob(job);
	 * 
	 */
	public static void main(String[] args) throws Exception {
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), new Configuration());
		Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}

		JobConf job = new JobConf(new Configuration(), KpiApp.class);
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormat(TextInputFormat.class);

		job.setMapperClass(KpiMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWite.class);

		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);

		job.setReducerClass(KpiReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWite.class);

		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		job.setOutputFormat(TextOutputFormat.class);

		JobClient.runJob(job);

	}
	/**
	 * 新api:extends Mapper
	 * 老api:extends MapRedcueBase implements Mapper
	 */
	static class KpiMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, KpiWite> {
		@Override
		public void map(LongWritable key, Text value, OutputCollector<Text, KpiWite> out, Reporter arg3) throws IOException {
			String[] splited = value.toString().split("\t");
			String num = splited[1];
			KpiWite kpi = new KpiWite(splited[6], splited[7], splited[8], splited[9]);
			out.collect(new Text(num), kpi);
		}
		
		
	}

	static class KpiReducer extends MapReduceBase implements Reducer<Text, KpiWite, Text, KpiWite> {
		@Override
		public void reduce(Text key, Iterator<KpiWite> value, OutputCollector<Text, KpiWite> out, Reporter arg3) throws IOException {
			long upPackNum = 0L;
			long downPackNum = 0L;
			long upPayLoad = 0L;
			long downPayLoad = 0L;
			while (value.hasNext()) {
				upPackNum += value.next().upPackNum;
				downPackNum += value.next().downPackNum;
				upPayLoad += value.next().upPayLoad;
				downPayLoad += value.next().downPayLoad;
			}
			out.collect(key, new KpiWite(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)));
		}
	}

}

class KpiWite implements Writable {
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;

	public KpiWite() {
	}

	public KpiWite(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) {
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

}

 

 

欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang

 

 

分享到:
评论

相关推荐

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.server.jobtracker org.apache.hadoop.mapreduce.server.tasktracker org.apache.hadoop.mapreduce.tools org.apache.hadoop.mapreduce.v2 org.apache.hadoop.mapreduce.v2.app....

    Hadoop源代码分析(包org.apache.hadoop.mapreduce)

    包org.apache.hadoop.mapreduce的Hadoop源代码分析

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop源码分析-mapreduce部分.doc

    在源码层面,org.apache.hadoop.mapreduce包包含了关键的接口和类。Writeable、Counter和ID相关类处理计数和标识,Context类提供Mapper和Reducer所需的上下文信息,Mapper、Reducer和Job类定义了MapReduce的基本操作...

    Hadoop.MapReduce.v2.Cookbook pdf

    《Hadoop.MapReduce.v2.Cookbook》是一本专注于Hadoop MapReduce v2(也称为YARN)的实用指南,适合那些希望深入了解和利用Hadoop处理大数据的IT专业人士。这本书籍详细介绍了如何在Hadoop 2.x环境中有效地设计、...

    Hadoop HDFS和MapReduce架构浅析.pdf

    Hadoop HDFS和MapReduce架构浅析.pdf 更多资源请点击:https://blog.csdn.net/weixin_44155966

    hadoop-3.3.1-aarch64.tar.gz

    标题 "hadoop-3.3.1-aarch64.tar.gz" 暗示这是一个针对aarch64架构(ARM64)的Hadoop 3.3.1版本的压缩包,适合在运行M1芯片(苹果公司的ARM架构处理器)的Mac系统上使用。Hadoop是一个开源的分布式计算框架,它允许...

    各个版本Hadoop,hadoop.dll以及winutils.exe文件下载大合集

    1. 确保下载的`hadoop.dll`和`winutils.exe`与你的Hadoop版本兼容。 2. 配置环境变量,包括`HADOOP_HOME`和`PATH`,以便系统能找到这些文件。 3. 对于`winutils.exe`,确保设置了正确的HDFS根目录 (`hdfs dfs -...

    hadoop-2.7.x&2.6.x windows可执行文件包

    本文将详细讲解如何在Windows操作系统上搭建和使用Hadoop 2.6.x及2.7.x版本的可执行环境,主要基于提供的压缩包文件:`hadoop2.7.1X64.zip`和`hadoop2.6(x64)V0.2.zip`。 一、Hadoop简介 Hadoop的核心组件包括HDFS...

    Hadoop下载 hadoop-2.9.2.tar.gz

    Hadoop 采用 MapReduce 分布式计算框架,根据 GFS 原理开发了 HDFS(分布式文件系统),并根据 BigTable 原理开发了 HBase 数据存储系统。 Hadoop 和 Google 内部使用的分布式计算系统原理相同,其开源特性使其成为...

    hadoop-2.6.0-hadoop.dll-winutils.exe

     at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:133)  at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:437)  at org.apache....

    hadoop-2.6.0-cdh5.14.2.tar.gz

    标题中的"hadoop-2.6.0-cdh5.14.2.tar.gz"是一个针对Apache Hadoop的软件包,具体来说是CDH(Cloudera Distribution Including Apache Hadoop)5.14.2版本,它基于Hadoop 2.6.0。CDH是由Cloudera公司提供的一个开源...

    Hadoop源代码分析(包mapreduce.lib.map)

    包mapreduce.lib.map的Hadoop源代码分析

    hadoop.dll 和 winutils.exe

    对于Hadoop来说,hadoop.dll是Windows版本Hadoop实现的一部分,它提供了与Hadoop生态系统交互所需的功能。这个库文件可能包含了Hadoop的JNI(Java Native Interface)实现,使得Java代码能够调用本地系统的服务,如...

    Packtpub.Hadoop.MapReduce.Cookbook.Jan.2013

    《Packtpub.Hadoop.MapReduce.Cookbook.Jan.2013》是2013年出版的一本专门探讨Hadoop MapReduce技术的实战指南。这本书深入浅出地介绍了如何利用Hadoop MapReduce框架来处理大数据问题,是IT行业中针对大数据处理的...

    hadoop-2.6.0-cdh5.7.0.tar.gz和jdk-7u80-linux-x64.tar.gz安装包

    在这个场景中,我们有两个安装包:`hadoop-2.6.0-cdh5.7.0.tar.gz` 和 `jdk-7u80-linux-x64.tar.gz`,分别代表了CDH5.7.0版本的Hadoop和Java 7的64位Linux版本。 首先,让我们深入了解一下Hadoop。Hadoop的核心由两...

    Hadoop_MapReduce教程.doc

    如Hadoop Streaming允许使用任意可执行程序(如Shell脚本)作为mapper和reducer,而Hadoop Pipes则是一个C++ API,可用于创建MapReduce应用程序。 7. **输入与输出**: MapReduce作业的输入和输出都是键值对的形式...

    hadoop-mapreduce-examples-2.6.5.jar

    hadoop-mapreduce-examples-2.6.5.jar 官方案例源码

Global site tag (gtag.js) - Google Analytics