getMapOutput是ReduceTask.MapOutputCopier.copyOutput下面的方法。(MapOutputCopier是Thread)
其中copyOutput只做了数据基本校验,真正的逻辑在getMapOutput方法中。
getMapOutput里,注释部分已经说得很清楚,后面部分做了判断,是shuffle到mem还是disk。
再分别调用shuffleInMemory、shuffleToDisk,这里shuffleInMemory只是保存了数据,应该还没有sort。
用到URLConnection的inputStream来传数据,cp是走的HTTP吗?
/** * Get the map output into a local file (either in the inmemory fs or on the * local fs) from the remote server. * We use the file system so that we generate checksum files on the data. * @param mapOutputLoc map-output to be fetched * @param filename the filename to write the data into * @param connectionTimeout number of milliseconds for connection timeout * @param readTimeout number of milliseconds for read timeout * @return the path of the file that got created * @throws IOException when something goes wrong */ private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, Path filename, int reduce) throws IOException, InterruptedException { // Connect URL url = mapOutputLoc.getOutputLocation(); URLConnection connection = url.openConnection(); InputStream input = setupSecureConnection(mapOutputLoc, connection); // Validate header from map output TaskAttemptID mapId = null; try { mapId = TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK)); } catch (IllegalArgumentException ia) { LOG.warn("Invalid map id ", ia); return null; } TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId(); if (!mapId.equals(expectedMapId)) { LOG.warn("data from wrong map:" + mapId + " arrived to reduce task " + reduce + ", where as expected map output should be from " + expectedMapId); return null; } long decompressedLength = Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH)); long compressedLength = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH)); if (compressedLength < 0 || decompressedLength < 0) { LOG.warn(getName() + " invalid lengths in map output header: id: " + mapId + " compressed len: " + compressedLength + ", decompressed len: " + decompressedLength); return null; } int forReduce = (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK)); if (forReduce != reduce) { LOG.warn("data for the wrong reduce: " + forReduce + " with compressed len: " + compressedLength + ", decompressed len: " + decompressedLength + " arrived to reduce task " + reduce); return null; } if (LOG.isDebugEnabled()) { LOG.debug("header: " + mapId + ", compressed len: " + compressedLength + ", decompressed len: " + decompressedLength); } //We will put a file in memory if it meets certain criteria: //1. The size of the (decompressed) file should be less than 25% of // the total inmem fs //2. There is space available in the inmem fs // Check if this map-output can be saved in-memory boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); // Shuffle MapOutput mapOutput = null; if (shuffleInMemory) { if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into RAM from " + mapOutputLoc.getTaskAttemptId()); } mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength, (int)compressedLength); } else { if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into Local-FS from " + mapOutputLoc.getTaskAttemptId()); } mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength); } return mapOutput; }
MapOutput对象其实就是装了map输出数据的对象,因为肯定能装到内存(不然就直接扔disk了),所以一次拉数据就搞定了。
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc, URLConnection connection, InputStream input, int mapOutputLength, int compressedLength) throws IOException, InterruptedException { // Reserve ram for the map-output boolean createdNow = ramManager.reserve(mapOutputLength, input); // Reconnect if we need to if (!createdNow) { // Reconnect try { connection = mapOutputLoc.getOutputLocation().openConnection(); input = setupSecureConnection(mapOutputLoc, connection); } catch (IOException ioe) { LOG.info("Failed reopen connection to fetch map-output from " + mapOutputLoc.getHost()); // Inform the ram-manager ramManager.closeInMemoryFile(mapOutputLength); ramManager.unreserve(mapOutputLength); throw ioe; } } IFileInputStream checksumIn = new IFileInputStream(input,compressedLength); input = checksumIn; // Are map-outputs compressed? if (codec != null) { decompressor.reset(); input = codec.createInputStream(input, decompressor); } // Copy map-output into an in-memory buffer byte[] shuffleData = new byte[mapOutputLength]; MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength); int bytesRead = 0; try { int n = IOUtils.wrappedReadForCompressedData(input, shuffleData, 0, shuffleData.length); while (n > 0) { bytesRead += n; shuffleClientMetrics.inputBytes(n); // indicate we're making progress reporter.progress(); n = IOUtils.wrappedReadForCompressedData(input, shuffleData, bytesRead, shuffleData.length - bytesRead); } if (LOG.isDebugEnabled()) { LOG.debug("Read " + bytesRead + " bytes from map-output for " + mapOutputLoc.getTaskAttemptId()); } input.close(); } catch (IOException ioe) { LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), ioe); // Inform the ram-manager ramManager.closeInMemoryFile(mapOutputLength); ramManager.unreserve(mapOutputLength); // Discard the map-output try { mapOutput.discard(); } catch (IOException ignored) { LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ignored); } mapOutput = null; // Close the streams IOUtils.cleanup(LOG, input); // Re-throw readError = true; throw ioe; } // Close the in-memory file ramManager.closeInMemoryFile(mapOutputLength); // Sanity check if (bytesRead != mapOutputLength) { // Inform the ram-manager ramManager.unreserve(mapOutputLength); // Discard the map-output try { mapOutput.discard(); } catch (IOException ignored) { // IGNORED because we are cleaning up LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ignored); } mapOutput = null; throw new IOException("Incomplete map output received for " + mapOutputLoc.getTaskAttemptId() + " from " + mapOutputLoc.getOutputLocation() + " (" + bytesRead + " instead of " + mapOutputLength + ")" ); } // TODO: Remove this after a 'fix' for HADOOP-3647 if (LOG.isDebugEnabled()) { if (mapOutputLength > 0) { DataInputBuffer dib = new DataInputBuffer(); dib.reset(shuffleData, 0, shuffleData.length); LOG.debug("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" + WritableUtils.readVInt(dib) + ", " + WritableUtils.readVInt(dib) + ") from " + mapOutputLoc.getHost()); } } return mapOutput; }
private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc, InputStream input, Path filename, long mapOutputLength) throws IOException { // Find out a suitable location for the output on local-filesystem Path localFilename = lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), mapOutputLength, conf); MapOutput mapOutput = new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), conf, localFileSys.makeQualified(localFilename), mapOutputLength); // Copy data to local-disk OutputStream output = null; long bytesRead = 0; try { output = rfs.create(localFilename); byte[] buf = new byte[64 * 1024]; int n = -1; try { n = input.read(buf, 0, buf.length); } catch (IOException ioe) { readError = true; throw ioe; } while (n > 0) { bytesRead += n; shuffleClientMetrics.inputBytes(n); output.write(buf, 0, n); // indicate we're making progress reporter.progress(); try { n = input.read(buf, 0, buf.length); } catch (IOException ioe) { readError = true; throw ioe; } } LOG.info("Read " + bytesRead + " bytes from map-output for " + mapOutputLoc.getTaskAttemptId()); output.close(); input.close(); } catch (IOException ioe) { LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), ioe); // Discard the map-output try { mapOutput.discard(); } catch (IOException ignored) { LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ignored); } mapOutput = null; // Close the streams IOUtils.cleanup(LOG, input, output); // Re-throw throw ioe; } // Sanity check if (bytesRead != mapOutputLength) { try { mapOutput.discard(); } catch (Exception ioe) { // IGNORED because we are cleaning up LOG.info("Failed to discard map-output from " + mapOutputLoc.getTaskAttemptId(), ioe); } catch (Throwable t) { String msg = getTaskID() + " : Failed in shuffle to disk :" + StringUtils.stringifyException(t); reportFatalError(getTaskID(), t, msg); } mapOutput = null; throw new IOException("Incomplete map output received for " + mapOutputLoc.getTaskAttemptId() + " from " + mapOutputLoc.getOutputLocation() + " (" + bytesRead + " instead of " + mapOutputLength + ")" ); } return mapOutput; }
相关推荐
文学翻译的文化传递、语言风格保持与读者接受度提升
,,Phase_Shift_T:基于MATLAB Simulink的移相变压器仿真模型,可实现-25°、-15°……25°的移相。 变压器副边实现36脉波不控整流,变压器网侧电压、阈侧电压以及移相角度可直接设置。 仿真条件:MATLAB Simulink R2015b ,核心关键词: 1. 移相变压器仿真模型 2. MATLAB Simulink 3. 移相 4. 36脉波不控整流 5. 网侧电压 6. 阈侧电压 7. 设置 8. MATLAB Simulink R2015b,MATLAB Simulink中实现宽范围移相与多脉波整流的变压器仿真模型
管理员主要负责填充图书和其类别信息,并对已填充的数据进行维护,包括修改与删除,管理员也需要审核老师注册信息,发布公告信息,管理自助租房信息等。用户信息管理页面,此页面提供给管理员的功能有:用户信息的查询管理,可以删除用户信息、修改用户信息、新增用户信息。商品分类管理页面,此页面提供给管理员的功能有:查看已发布的商品分类数据,修改商品分类,商品分类作废,即可删除。商品信息管理页面,此页面提供给管理员的功能有:根据商品名称进行条件查询,还可以对商品数据进行新增、修改、查询操作等等。商品资讯管理页面,此页面提供给管理员的功能有:查看已发布的商品资讯数据,修改商品资讯,商品资讯作废,即可删除。2 相关技术 3 2.1 SSM框架介绍 3 2.2 B/S结构介绍 3 2.3 Mysql数据库介绍 4 3 系统分析 6 3.1 系统可行性分析 6 3.1.1 技术可行性分析 6 3.1.2 经济可行性分析 6 3.1.3 运行可行性分析 6 3.2 系统性能分析 7 3.2.1 易用性指标 7 3.2.2 可扩展性指标 7 3.2.3 健壮性指标 7 3.2.4 安全性指标 8 3.3 系统流程分
当前资源包含初中高级闯关习题
,,欧姆龙PLC螺丝机程序(含触摸屏程序) 此程序已经实际设备上批量应用,程序成熟可靠,借鉴价值高,程序有注释、非常适合用来欧姆龙plc新手学习,包括欧姆龙plc程序和威纶触摸屏程序。 是新手入门级欧姆龙PLC电气爱好从业人员借鉴和参考经典案列。 ,欧姆龙PLC; 螺丝机程序; 触摸屏程序; 程序成熟可靠; 注释详尽; 新手学习; 经典案例。,欧姆龙PLC螺丝机程序详解:成熟可靠的新手学习经典案例
,,c# mqtt高性能服务器端源代码。 你还在使用第三方服务软件吗?不如试试这个开发框架,助你一臂之力,无限制,无全开源,无版权约束,全是自主开发。 开源框架包括服务器和客户端,支持mqtt3.0及5.0。 可嵌入到自己的服务系统及软件客户端中,不受第三方约束。 你要问我稳定性如何?我能回答的是已经运行了三年有余无任何问题。 如果你要问能接入多少终端,我可以明确回答,不敢往多的说,单节点支持100万并发量无压力。 这是一个关于C# MQTT高性能服务器端源代码的描述。如果我重新表述一下,可以这样说:你是否还在使用第三方服务软件?为什么不尝试一下这个开发框架呢?它可以为你提供强大的支持,没有任何限制,完全开源,没有版权约束,全部都是自主开发的。 这个开源框架包括服务器和客户端,支持MQTT 3.0和5.0协议。你可以将它嵌入到自己的服务系统和软件客户端中,不受第三方的限制。 你可能会问它的稳定性如何。我可以很自信地告诉你,它已经运行了三年多,没有出现任何问题。 如果你想知道它可以接入多少终端,我可以明确地回答,单节点支持100万并发连接,毫不费力。 从这段话中,我们可以提取出以
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
,,matlab程序,复现,基于改进粒子群算法的混合储能系统容量优化,关键词,超级电容,混合储能,粒子群算法。 拿之前问清楚 可以运行看结果,出不 不 ,MATLAB程序; 复现; 改进粒子群算法; 混合储能系统容量优化; 超级电容; 粒子群算法。,MATLAB复现:混合储能系统优化策略研究,基于改进粒子群算法的超级电容容量配置
,,k—medoids 聚类方法的MATLAB源代码,导入数据部分和画图部分已经用中文给出了注释。 这儿选取一个对象叫做mediod来代替上面的中心 的作用,这样的一个medoid就标识了这个类。 ,k-medoids聚类方法; MATLAB源代码; 导入数据部分; 画图部分; 对象; medoid; 类标识。,基于K-Medoids聚类方法的MATLAB源代码及中文注释解析
自驾游中如何保护自然环境
qt 一个基于Qt Creator(qt,C++)实现中国象棋人机对战.
项目资源包含:可运行源码+sql文件+文档; python3.7+django+mysql5.7+vue 适用人群:学习不同技术领域的小白或进阶学习者;可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 项目具有较高的学习借鉴价值,也可拿来修改、二次开发。 有任何使用上的问题,欢迎随时与博主沟通,博主看到后会第一时间及时解答。 系统是一个很好的项目,结合了后端服务(django)和前端用户界面(Vue.js)技术,实现了前后端分离。 当人们打开系统的网址后,首先看到的就是首页界面。在这里,人们能够看到系统的导航条,通过导航条导航进入各功能展示页面进行操作。在个人中心页面可以输入个人信息进行更新信息操作,还可以对我的订单、我的地址和我的收藏进行详细操作。 管理员进入主页面,主要功能包括对首页、个人中心、用户管理、商品类别管理、热卖商品管理、投诉建议、系统管理、订单管理等进行操作。用户进入系统后台,主要功能包括对个人中心和我的收藏管理进行操作。
漫画作品与节日庆典结合
自驾游中的手机APP推荐
,,该套程序是电芯上料机,空料仓和满料仓都是步进电机,搬运模组是松下A6脉冲控制伺服电机,该套程序是已经在量产程序,图四是上料机设备全局图,标准程序框架,其他设备也可套用。 ,电芯上料机; 步进电机; 满料仓; 松下A6脉冲控制伺服电机; 标准程序框架; 设备全局图; 其他设备通用,电芯上料机程序:松下A6伺服电机驱动的步进式料仓管理系统
1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。
,,Matlab代码-基于拉格朗日电力系统参数辨识 对电力系统中存在多个不良参数进行辨识,以IEEE14节点系统进行仿真验证。 ,基于拉格朗日; 电力系统参数辨识; 不良参数辨识; IEEE14节点系统仿真验证。,基于拉格朗日算法的电力系统多参数辨识及IEEE14节点仿真验证
它利用图像预处理、车牌字符分割、字符识别等相关处理技术,对小型车辆的蓝底白字车牌进行车牌号识别。车牌定位是利用HSV色彩定位法,这种方法可大大缩短后续处理时间,字符分割采用阈值限定分割,字符识别则采用模板匹配法。在拍摄角度不过度倾斜情况下识别精度较高,白天照片预处理、分割效果较晚间好。
该文档围绕 DeepSeek 和 AIGC 展开,介绍了 DeepSeek-R1 模型的技术特性、AIGC 的概念、应用及相关工具的选择。 适用人群:对人工智能、AIGC 技术感兴趣的初学者,想要了解行业动态的专业人士,以及寻求提升工作效率、探索创新应用的企业和个人。包括但不限于 AI 从业者、内容创作者、教育工作者、科研人员、电商从业者、传媒工作者、游戏开发者等。 使用场景:在科研领域,DeepSeek-R1 可用于复杂问题求解、实验设计和论文撰写;教育场景中,能辅助知识理解、解答问题,还可作为教学工具锻炼学生逻辑思维。内容创作方面,AIGC 工具能生成文本、图像、音频和视频,满足不同创作需求。电商行业利用 AIGC 制作商品 3D 模型、虚拟主播,提升购物体验和直播带货效率。新闻传媒行业借助 AIGC 提高采编效率、打造 AI 主播。影视和游戏行业,AIGC 可用于剧本创作、角色生成、场景拓展等。
,,MATLAB用yalmip+cplex解决电动汽车有序充放电问题,目标函数为总负荷峰谷差最小,代码可运行且有注释。 ,关键词: 1. MATLAB 2. yalmip 3. cplex 4. 电动汽车有序充放电 5. 目标函数 6. 总负荷峰谷差最小 7. 代码可运行 8. 注释,MATLAB中YALMIP+CPLEX实现电动汽车有序充放电优化,降低负荷峰谷差