0 引子:
读取两个文件:
hello:
1,zhangsan
2,lisi
3,wangwu
hello1:
1,45
2,56
3,89
最后实现如下输出:
zhangsan,45
lisi,56
wangwu,89
0.1) 从两个文件中得到数据,在map端根据文件名做记录,后在reduce上实现输出, 因为数据在不同文件中,因此必须也只能在reduce端做join操作,在join之前需要依赖map端做的针对文件来源做标记
1 代入如下,主要看自定义map和reduce的写法
package join; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * */ public class MapJoinApp { static String FILE_ROOT = "hdfs://master:9000/"; static String FILE_INPUT = "hdfs://master:9000/files"; static String FILE_OUTPUT = "hdfs://master:9000/out"; public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf); Path outpath = new Path(FILE_OUTPUT); if(fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } // 0 定义干活的人 Job job = new Job(conf); // 1.1 告诉干活的人 输入流位置 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数 FileInputFormat.setInputPaths(job, FILE_INPUT); // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //1.3 分区 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 目前按照默认方式执行 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outpath); job.setOutputFormatClass(TextOutputFormat.class); // 让干活的人干活s job.waitForCompletion(true); } } /** * */ class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ String line = ""; @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit)context.getInputSplit(); String filename = split.getPath().getName(); // hello or hello1 文件名 //String pathStr = split.getPath().toString();// hdfs://master:9000/files/hello or hdfs://master:9000/files/hello1 System.out.println(filename); line = v1.toString();// 逐行执行 最后一行就是文件的最后一样内容 String[] v1s = v1.toString().split(","); String v2Str = ""; if("hello".equals(filename)){ // hello文件内容格式为: 1 zhangsan v2Str = "#" + v1s[1]; System.out.println("hello : " + v2Str); } if("hello1".equals(filename)){ // hello1文件内容格式为: 1 45 v2Str = "*" + v1s[1]; System.out.println("hello1 : " + v2Str); } //for(String word : v1s){ context.write(new LongWritable(Long.parseLong(v1s[0])), new Text(v2Str)); //} } } /** */ class MyReducer extends Reducer<LongWritable, Text, Text, Text>{ protected void reduce(LongWritable k2, Iterable<Text> v2s, Context ctx) throws IOException, InterruptedException { System.out.println("reduce ..."); String k3Str = ""; String v3Str = ""; for(Text v2 : v2s){ //System.out.println("k2: " + k2.get() + " v2: " + l.toString()); if(v2.toString().startsWith("#")){ k3Str = v2.toString().substring(1, v2.toString().length()); } if(v2.toString().startsWith("*")){ v3Str = v2.toString().substring(1, v2.toString().length()); } } ctx.write(new Text(k3Str), new Text(v3Str)); } }
2 结果:
[root@master local]# hadoop fs -text /out/part-r-00000 Warning: $HADOOP_HOME is deprecated. zhangsan 45 lisi 56 wangwu 89
相关推荐
MapReduceJoinExample 一个reduce join实现的例子运行示例hadoop jar MapReduceJoinExample-1.0-SNAPSHOT-job.jar s3://dags-public/wikistats/s3://dags-public/dbpedia/ /intermediate- date +%Y-%m-%d-%H-%M-%S /...
- 在Reduce阶段,根据标记来判断数据来源并进行相应的Join操作。 例如,假设我们要执行以下SQL查询: ```sql SELECT u.name, o.orderid FROM orders o JOIN users u ON o.uid = u.uid; ``` 在这种情况下,MapReduce...
- 排序经常作为测试 Hadoop 性能的标准。 - 实质上是一种 I/O 速度测试。 - 强调了 Google 文件系统 (GFS) 的实用性。 #### 搜索(Searching) **输入:** - 文件集包含多行文本。 - 要查找的搜索模式。 - 映射器...
例如,在一个内连接中,Map阶段将不同表的数据分别标记,然后在Reduce阶段,依据标记判断并合并来自不同表的记录。这种方法确保了JOIN操作的正确性。 - **Group By的实现原理**:在Map阶段,Hive会将Group By字段...
基于改进粒子群算法的DG储能选址定容优化模型:解决电力系统时序性问题的可靠程序解决方案,基于改进粒子群算法的DG储能选址定容模型优化解决电力系统问题,DG储能选址定容模型matlab 程序采用改进粒子群算法,考虑时序性得到分布式和储能的选址定容模型,程序运行可靠 这段程序是一个改进的粒子群算法,主要用于解决电力系统中的优化问题。下面我将对程序进行详细分析。 首先,程序开始时加载了一些数据文件,包括gfjl、fljl、fhjl1、cjgs和fhbl。这些文件可能包含了电力系统的各种参数和数据。 接下来是一些参数的设置,包括三种蓄电池的参数矩阵、迭代次数、种群大小、速度更新参数、惯性权重、储能动作策略和限制条件等。 然后,程序进行了一些初始化操作,包括初始化种群、速度和适应度等。 接下来是主要的迭代过程。程序使用粒子群算法的思想,通过更新粒子的位置和速度来寻找最优解。在每次迭代中,程序计算了每个粒子的适应度,并更新个体最佳位置和全局最佳位置。 在每次迭代中,程序还进行了一些额外的计算,如潮流计算、储能约束等。这些计算可能涉及到电力系统的潮流计算、功率平衡等知识点。 最后,程序输
数学建模相关主题资源2
内容概要:本文详细介绍了一系列用于科学研究、工程项目和技术开发中至关重要的实验程序编写与文档报告撰写的资源和工具。从代码托管平台(GitHub/GitLab/Kaggle/CodeOcean)到云端计算环境(Colab),以及多种类型的编辑器(LaTeX/Microsoft Word/Overleaf/Typora),还有涵盖整个研究周期的各种辅助工具:如可视化工具(Tableau)、数据分析平台(R/Pandas)、项目管理工具(Trello/Jira)、数据管理和伦理审核支持(Figshare/IRB等),最后提供了典型报告的具体结构指导及其范本实例链接(arXiv/PubMed)。这为实验流程中的各个环节提供了系统的解决方案,极大地提高了工作的效率。 适合人群:高校学生、科研工作者、工程技术人员以及从事学术写作的人员,无论是新手入门还是有一定经验的人士都能从中受益。 使用场景及目标:帮助读者高效地准备并开展实验研究活动;促进团队间协作交流;规范研究报告的形式;提高对所收集资料的安全性和隐私保护意识;确保遵循国际公认的伦理准则进行实验。
四轮毂驱动电动汽车稳定性控制策略:基于滑模与模糊神经网络的转矩分配与仿真研究,四轮毂驱动电动汽车稳定性控制:基于滑模与模糊神经网络的转矩分配策略及联合仿真验证,四轮毂驱动电动汽车稳定性控制,分布式驱动转矩分配。 上层基于滑模,模糊神经网络控制器决策横摆力矩,下层基于动态载荷分配,最优分配,平均分配均可做。 simulink与carsim联合仿真。 ,四轮毂驱动;电动汽车稳定性控制;分布式驱动;转矩分配;滑模控制;模糊神经网络控制器;横摆力矩;动态载荷分配;最优分配;平均分配;Simulink仿真;Carsim仿真,四驱电动稳定性控制:滑模与模糊神经网络决策的转矩分配研究
本资源提供了一份详细的PyCharm安装教程,涵盖下载、安装、配置、激活及使用步骤,适合新手快速搭建Python开发环境。
毕业设计
原版宋体.ttf,原版宋体安装文件,安装方式,直接右键安装。
利用Xilinx FPGA内嵌的软核处理器MicroBlaze,加上自主编写的AXI_IIC控制器,实现对IMX327传感器IIC总线的控制,同时辅以UART调试串口,实现系统状态的实时监控与调试。
在 GEE(Google Earth Engine)中,XEE 包是一个用于处理和分析地理空间数据的工具。以下是对 GEE 中 XEE 包的具体介绍: 主要特性 地理数据处理:提供强大的函数和工具,用于处理遥感影像和其他地理空间数据。 高效计算:利用云计算能力,支持大规模数据集的快速处理。 可视化:内置可视化工具,方便用户查看和分析数据。 集成性:可以与其他 GEE API 和工具无缝集成,支持多种数据源。 适用场景 环境监测:用于监测森林砍伐、城市扩展、水体变化等环境问题。 农业分析:分析作物生长、土地利用变化等农业相关数据。 气候研究:研究气候变化对生态系统和人类活动的影响。
毕业设计
整个文件的代码
名字微控制器_STM32_DFU_引导加载程序_dapboo_1740989527.zip
详细介绍及样例数据:https://blog.csdn.net/T0620514/article/details/145991332
anaconda配置pytorch环境
立体仓库控制组态王6.55与三菱PLC联机仿真程序:视频教程与IO表接线图CAD详解,9仓位立体仓库控制系统优化方案:组态王6.55与三菱PLC联机仿真程序视频教程及IO表接线图CAD详解,9仓位立体仓库控制组态王6.55和三菱PLC联机仿真程序+视频+带io表接线图CAD ,关键词:立体仓库;控制组态王6.55;三菱PLC;联机仿真程序;视频;io表接线图;CAD,立体仓库控制组态王与三菱PLC联机仿真程序资源包