Partioner是通过启动多个map 与Reduce来将文件中的数据进行分组, 在Mapper向Reducer输出之前
对输出进行分组并根据此次分组指定每组数据在那台机器上执行,将结果输出到不同文件。
以下为实现代码:
package com.itbuilder.mr; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.itbuilder.mr.bean.DataBean; /** * 手机流量计算 * @author mrh * */ public class GRSDataCount { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(GRSDataCount.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataBean.class); job.setMapperClass(DCMapper.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setNumReduceTasks(Integer.parseInt(args[2])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataBean.class); job.setReducerClass(DCRuducer.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setPartitionerClass(DCPartioner.class); job.waitForCompletion(true); } /** * * @author mrh * */ public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, DataBean>.Context context) throws IOException, InterruptedException { String datas[] = value.toString().split("\t"); DataBean dataBean = new DataBean(datas[1], Long.parseLong(datas[8]), Long.parseLong(datas[9])); context.write(new Text(dataBean.getTelNo()), dataBean); } } /** * Partitioner * @author mrh * */ public static class DCPartioner extends Partitioner<Text, DataBean> { private static Map<String, Integer> providerMap = new HashMap<String, Integer>(); static { providerMap.put("135", 1); providerMap.put("136", 1); providerMap.put("137", 1); providerMap.put("138", 1); providerMap.put("139", 1); providerMap.put("150", 2); providerMap.put("159", 2); providerMap.put("180", 3); providerMap.put("182", 3); } @Override public int getPartition(Text key, DataBean value, int numPartitions) { String code = key.toString(); Integer partion = providerMap.get(code.substring(0, 3)); if (partion == null) { return 0; } return partion.intValue(); } } /** * * @author mrh * */ public static class DCRuducer extends Reducer<Text, DataBean, Text, DataBean> { @Override protected void reduce(Text key, Iterable<DataBean> beans, Reducer<Text, DataBean, Text, DataBean>.Context context) throws IOException, InterruptedException { long upPayLoad = 0; long downPayLoad = 0; for (DataBean bean : beans) { upPayLoad += bean.getUpload(); downPayLoad += bean.getDownload(); } DataBean outBean = new DataBean(key.toString(), upPayLoad, downPayLoad); context.write(key, outBean); } } }
相关推荐
在这个特定的兼容包中,我们可以看到两个文件:flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(实际的兼容库)和._flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar(可能是Mac OS的元数据文件,通常...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
标题中的"apache-hadoop-3.1.0-winutils-master.zip"是一个针对Windows用户的Hadoop工具包,它包含了运行Hadoop所需的特定于Windows的工具和配置。`winutils.exe`是这个工具包的关键组件,它是Hadoop在Windows上的一...
Eclipse集成Hadoop2.10.0的插件,使用`ant`对hadoop的jar包进行打包并适应Eclipse加载,所以参数里有hadoop和eclipse的目录. 必须注意对于不同的hadoop版本,` HADDOP_INSTALL_PATH/share/hadoop/common/lib`下的jar包...
Ubuntu虚拟机HADOOP集群搭建eclipse环境 hadoop-eclipse-plugin-3.3.1.jar
hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包
hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03
Hadoop-Eclipse-Plugin 3.1.1是该插件的一个特定版本,可能包含了一些针对Hadoop 3.x版本的优化和修复,以确保与Hadoop集群的兼容性和稳定性。 6. **使用场景**: 这个插件主要适用于大数据开发人员,特别是那些...
Hadoop权威指南----读书笔记
`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发和测试。这个版本的Hadoop包含了对Windows系统的优化,比如提供了`winutils.exe`,这是在Windows...
为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中对Hadoop集群进行操作,如创建、编辑和运行MapReduce任务,极大...
Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....
Hadoop 2.7.3 Windows64位 编译bin(包含winutils.exe, hadoop.dll),自己用的,把压缩包里的winutils.exe, hadoop.dll 放在你的bin 目录 在重启eclipse 就好了
hadoop-eclipse-plugin-2.7.4.jar和hadoop-eclipse-plugin-2.7.3.jar还有hadoop-eclipse-plugin-2.6.0.jar的插件都在这打包了,都可以用。
hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1
在eclipse中搭建hadoop环境,需要安装hadoop-eclipse-pulgin的插件,根据hadoop的版本对应jar包的版本,此为hadoop3.1.2版本的插件。
Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber
hadoop-mapreduce-examples-2.7.1.jar
# 解压命令 tar -zxvf flink-shaded-hadoop-2-uber-3.0.0-cdh6.2.0-7.0.jar.tar.gz # 介绍 用于CDH部署 Flink所依赖的jar包