摘要: 随着MaxCompute(ODPS)2.0的上线,新增的非结构化数据处理框架也推出一系列的介绍文章,包括 MaxCompute上如何访问OSS数据, 基本功能用法和整体介绍,侧重介绍读取OSS数据进行计算处理; 本文:MaxCompute(ODPS)上处理非结构化数据的Best Practice。
随着MaxCompute(ODPS)2.0的上线,新增的非结构化数据处理框架也推出一系列的介绍文章,包括
1、MaxCompute上如何访问OSS数据, 基本功能用法和整体介绍,侧重介绍读取OSS数据进行计算处理;
2、MaxCompute上处理非结构化数据的Best Practice。 基于非结构化框架实现原理,提供一些最佳实践总结;
3、MaxCompute访问TableStore(OTS) 数据, 着重介绍通过非结构化框架来访问计算KV(TableStore/OTS)数据;
4、MaxCompute到OSS的非结构化数据输出(及图像处理实例):介绍了非结构化输出功能,并通过图像处理等范例,说明怎样通过MaxCompute的计算能力,打通整个OSS -> MaxCompute -> OSS的数据处理闭环;
5、如何在MaxCompute上处理存储在OSS上的开源格式数据, 介绍对于存储在OSS上的常见开源数据(ORC, PARQUET, AVRO等)格式,如何通过非结构化框架进行处理。
本文是这系列中的第【2】篇。
前言
随着MaxCompute(原ODPS)非结构化数据处理框架的推出,在SQL线上打通了MaxCompute与OSS数据之间的计算数据连接生态,我们看到了视频,图像,音频以及基因,气象等各种各种各样数据在MaxCompute平台上实现了与传统结构化数据的无缝融合。之前我们提供了在MaxCompute非结构化框架处理OSS上数据的整体介绍,在基本功能实现后,我们收到用户许多关于优化和怎样最好的使用非结构化功能的问题。 这里通过分析非结构化框架底层的一些实现原理以及我们看到的一些使用场景,提供一些关于Best Practice的总结,方便大家更有效的在MaxCompute中处理各种数据。
1. 数据在OSS上的存储
1.1 OSS LOCATION 的选择
MaxCompute通过在EXTERNAL TABLE上的LOCATION cluase来指定需要处理的OSS数据地址【注:本文假设用户对于非结构化框架,包括EXTERNABLE TABLE, StorageHanlder等的定义等都有比较好的了解,相关细节这里不再具体说明。 有疑问可以先参考之前的基本功能介绍】。其中LOCATION将指向一个OSS的一个目录(或者更准确的说,是一个以‘/’结尾的地址),其中LOCATION为标准URI格式:
LOCATION 'oss://${endpoint}/${bucket}/${userPath}/'
对于数据安全比较敏感的场景,比如在多用户场景或者公共云上,则推荐采用上述方式,不再LOCATION上使用AK,而是通过STS/RAM体系事先进行鉴权(参见基本功能介绍)。
LOCATION的选择有几点要注意:
- 不允许使用oss的root bucket作为LOCATION, 也就是说
${userPath}
不可以为空,这个要求源自OSS对root bucket下存放内容的一些限制。 - LOCATION不能指向一个单独文件,也就是说,类似
oss://oss-cn-hangzhou.aliyuncs.com/mybucket/directory/data.csv
这种LOCATION是无效的。 如果只有一个文件要处理,则应该提供该文件的父目录。
1.2 数据文件的存储和处理:小文件和大文件
在分布式计算系统中,文件的大小对于整个系统的运行效率,性能等都有比较大的相关性。 这里对MaxCompute对非结构化数据的相关处理机制做一个介绍,并分析几种有代表性的场景(e.g., 小文件和大文件),总结了几个针对MaxCompute计算场景中,比较好的OSS文件存储建议。
-
小文件:通常小文件往往伴随着超大的文件数目,这对于分布式计算系统来说,有两个问题:
- 大的文件数,会导致在进行文件分片时, 获取文件宏信息的overhead较大,导致planning和分片比较耗时,比如一个100万个文件的oss LOCATION, planning的耗时可能在分钟以上的量级。
- 打开每个OSS文件是有ovehead的,碎片化的小文件会带来额外的读取开销。 比如从OSS读取1000个10KB大小的文件,相比读取一个10MB的的文件,耗时可能在10倍以上。 对大量小文件的访问将带来整个分布式系统更多的网络开销,降低实际上有效的IO throughput。
所以总体上不推荐在一个OSS目录中存放过多的文件。 可以从另一个方面,考虑将Externable Table做partition,尽量在partition的子粒度上进行数据处理。 另外,在适用的场景下,可以考虑使用tar文件,比如把多个图像文件打在一个tar文件中再保存到OSS上面。 如果是文本文件,MaxCompute的built-in StorageHandler (比如
com.aliyun.odps.CsvStorageHandler
或者com.aliyun.odps.TsvStorageHandler
) 是能自动从tar文件中读取数据的。 如果用户自己定义的StorageHandler/Extractor,也可以在用户代码中使用Java中的tar处理类,比如直接使用Apache common 的TarArchiveInputStream
来访问。 -
大文件:与小文件相对的,是另外一个极端: 超大文件。 分布式系统的精髓是分而治之的思想:对数据进行分片,通过并发处理多个分片来加快海量数据的处理。 在极限情况下,如果海量数据存在一个无法被切割处理的单个文件中,那并发度就被降成为1,这样子的“分布式系统”就失去了意义。 即使没有那么极端,多个超大文件(比如每个几十GB),对分布式系统也是不友好的:大的文件处理可能需要单独占用大量系统资源,给资源调度带来困难,另外还容易造成长尾,失败重跑代价过高等问题。 所以从MaxCompute处理计算的角度,也不推荐在OSS上使用超大文件保存数据。
总结一下, 作为一个整体上的指导原则,MaxCompute非结构框架推荐如下比较理想的OSS数据存储方案:
-
数据文件根据应用特性,分文件夹存储,不推荐一个文件夹中存储10万以上个文件。 可以考虑使用tar打包多个文件来作为降低物理文件数目的方法。
-
比较适中的文件大小以及均匀分布的数据文件,能更合理的使用各种系统资源, 从而提高分布式处理效率。 对MaxCompute非结构化框架而言,单个文件大小在1MB-2GB是比较理想的情况。
1.3 MaxCompute访问OSS的网络连通以及速度
MaxComput和OSS作为独立的分布式计算和存储服务,在不同的部署集群上的网络连通性有可能影响MaxCompute访问OSS的数据的可达性。 网络的连通性整体服从七网隔离的原则,具体一点来说有几点:
-
MaxCompute的公共云集群上的计算应该访问OSS的外部集群,另外推荐需要访问的OSS集群与MaxCompute计算集群在物理上尽量靠近。关于OSS公共云上的访问域名以及对应数据中心可以参考OSS文档。
在MaxCompute并发访问OSS的情况下,一个需要特别注意的是OSS具有限流机制,默认情况下一个OSS账号的访问流量是限制在5Gb/s,也就是600MB/s左右。 在MaxComput的高并发度下(比如1000个以上的计算节点),OSS数据下载的速度可能将不再受限于单机网络速度,而取决与OSS的总体流量限速。 在这种情况下,完全可能出现单个计算节点的下载速度低于1MB/s。 当然OSS的限流是可以特别配置的,如果有超大量的数据计算需求,可以联系OSS团队调高对应账户的具体的限流上限。
2. 在用户自定义StorageHandler/Extractor中对输入数据的处理
除了提供几个内置的StorageHandler用来处理CSV, TSV以及Apache ORC文件以外,MaxCompute同时开发了非结构化Java SDK来方便用户对数据进行解析和处理。 通过这样的方法,扩展整个非结构化数据处理的生态,对接视频,图像,音频,基因,气象等数据处理的能力。 简单的来说, MaxCompute封装了分布式系统的细节,使用Java InputStream
的一个增强子类来将做输入数据与用户代码的对接。 这样的接口设计区别于Hive的SerDe
, RowFormatter
等多层封装,提供了更自然的完全非结构化数据入口, 用户能获得原始数据流,用类似单机程序相似的逻辑进行处理。 当然,基于分布式系统的处理原则,还是有一些Best Practice推荐用户遵守。
2.1 输入数据流的处理模式
对于输入数据流(InputStream),推荐在获取数据bytes后能直接在内存中直接处理。 最理想的情况是,能针对输入数据做流式的“边读边计算”的处理。 当然,对于某些数据格式,由于数据本身的特性,很难做到完全的流式处理:比如对于某些图片/音频数据格式,一张文件必须完全读入才能获得正确的编码信息以及其他特性,那这种情况下,在文件本身不是很大的情况下,可以把文件完全读入本地内存,再行处理。 效率比较低的一种方式是把数据文件下载到本地,然后再通过FileStream读取本地文件进行处理,这样的处理模式有两个问题:
- 作为分布式系统,为了实现资源隔离和保护计算节点的健康度,一般不推荐往本地磁盘写文件(尤其是大文件)。在MaxCompue计算系统上,用户的Java代码对本地文件近些读写操作需要另外申请权限,或者打开隔离选项(总体计算性能会下降)。
- 数据写入到本地落盘,再读取,性能上有额外的损耗。
- 对于比较大的数据(比如10GB或更大的文件),运算节点的磁盘空间无法做保证,存在磁盘被写爆的可能
2.2 三方库使用
在非结构化数据的处理线上,经常遇到的一个需求是把单机的数据处理机制,通过MaxCompute非结构化数据框架,迁移到分布式系统上执行。 比如希望同过ffmpeg来直接读取视频数据,或者希望通过Netcdf-Java来直接处理气象的netcdf/grib格式数据。 而这些三方库往往有一些共同的特性/局限性,比如
- 可能是基于C/C++,所以需要通过JNI来运行native代码
- 可能是面对单机实现,所以数据的入口经常是一个本地的文件地址
在这些情况下,非结构化框架均有对应的方式来支持。 比如在隔离打开的情况下允许JNI的使用,以及通过权限审批允许数据下载到本机临时文件等等。 从长期来讲,MaxCompute框架本身也认同使用native C/C++代码库,来处理各种特定的数据格式,将是无法避免的,所以会从框架本身安全等方面来解决这个问题,但是对于读取数据到本地再做处理,从本质上是一种比较大的额外消耗,还是推荐通过直接处理输入数据的方式来做,比如改动NETCDF-JAVA的实现,把输入接口通过FilePath->FileStream改成直接使用InputStream等。
3. 结语
MaxCompute非结构化框架是随着MaxCompute2.0推出的新功能,除了处理OSS上面的非结构化数据之外,最近也打通了与TableStore(OTS)的数据链路。 框架本身也还在不断的发展和完善,包括和MaxCompute优化器以及和整个UDF框架更紧密的结合和扩展等等。 在这里先从现有系统的实现和我们收到的一些反馈,总结提炼了一些处理非结构化数据的最佳实践,也希望得到更多的反馈,把框架功能做到更优。 后继我们也会结合具体的使用场景,比如城市大脑上的离线视频图像处理等,来提供一些更具体的使用范例。
相关推荐
ODPS(MaxCompute)是阿里巴巴集团推出的一种大数据处理平台,主要设计用于海量数据的离线分析。本权威详尽的帮助手册旨在深入解析ODPS的核心功能、底层优化原理以及实际操作中的各种细节,帮助用户充分利用这一工具...
MaxCompute SQL的每个作业的准备、提交等阶段需要花费较长时间,因此不适合需要每秒处理几千至数万笔事务的业务。 MaxCompute SQL采用的是类似于SQL的语法,可以看作是标准SQL的子集,但不能因此简单地把...
ODPS 提供海量数据处理及分析服务,让用户远离大数据运算烦恼。 开放数据处理服务(Open Data Processing Service,ODPS)是基于飞天分布式系统构建的海 量数据处理和分析的服务平台,具有 PB 级别的数据处理能力, ...
Java连接ODPS(MaxCompute)是一项关键的技能,特别是在大数据处理和分析的场景下。ODPS,即阿里云的大数据处理服务MaxCompute,是企业级的海量数据处理平台,提供了SQL、API等多种方式进行数据操作。Java SDK是ODPS...
aliyun-odps-jdbc-3.2.29jar包 是阿里云为其大数据平台 MaxCompute(也称为 ODPS:开放数据处理服务)提供的 JDBC 驱动程序。通过该驱动,开发者可以在 Java 应用程序中使用标准的 JDBC API 与 MaxCompute 平台进行...
odps-jdbc-3.2.9-jar-with-dependencies.jar 是阿里云 MaxCompute(开放数据处理服务,ODPS)平台的 JDBC 驱动程序,专为大数据处理和分析而设计。此版本的驱动程序包含所有必要的依赖库,简化了开发人员在 Java ...
"阿里开放数据处理服务ODPS介绍" 阿里开放数据处理服务ODPS是一款基于云计算的数据处理和分析平台,旨在帮助企业解决大数据处理和分析的问题。ODPS提供了一站式的数据处理和分析服务,使用户可以快速构建大数据应用...
6. **ODPS SQL的扩展功能**:除了标准SQL,ODPS还提供了一些特有的扩展,如UDF(用户自定义函数),允许用户编写自己的函数来处理数据;UDTF(用户自定义表函数)则可以生成多行多列的结果;以及SQL的窗口函数,用于...
- **非结构化数据处理**:适应大数据时代80%以上非结构化数据的需求,MaxCompute 2.0增加了对文本、视频、音频等多种非结构化数据的处理能力。 - **异构系统连接能力**:支持与不同系统的连接,便于整合异构数据源...
ODPS,全称为Open Data Processing Service,是阿里云推出的一种大数据处理服务,旨在为企业提供海量数据的存储和计算能力。《ODPS权威指南》作为最新版的参考资料,深入讲解了ODPS的核心特性和应用场景,是学习和...
读者将学习如何使用ODPS SQL进行数据查询、聚合、窗口函数等复杂分析,以及如何定义和管理表结构。此外,ODPS还支持UDF(User Defined Function)和UDAF(User Defined Aggregate Function),这使得用户可以自定义...
随着数据应用场景的多样化,MaxCompute引入湖仓一体架构,实现了对非结构化数据的处理,进一步增强了数据的全面性和灵活性。 5. **赋能客户与未来展望** 阿里巴巴的数据平台不仅服务于集团内部,也致力于对外输出...
aliyun-kettle-odps-plugin-1.0.0包以及安装操作文件,适合在使用maxcompute时用kettle将数据导出,方便实用
1. **数据说明**:介绍数据的基本结构、格式和类型,帮助用户理解和处理ODPS中的数据。 2. **数据导入**:讲解如何将数据导入到ODPS Project中,可能涉及数据源的准备、上传方法、数据格式转换等内容。 3. **数据...
此外,它还支持文件接口、非结构化数据的shuffle等特性,使得开发者可以更灵活地处理各种数据类型。 4. **Pipeline与MRRR**:Pipeline允许MapReduce任务级联,减少中间结果的IO消耗和调度成本。MRRR(Map-Reduce-...
3. **实时计算**:通过MaxCompute Streaming或UDF(User Defined Function),ODPS也支持实时数据处理,满足实时分析的需求。 4. **数据仓库**:ODPS作为企业级数据仓库,可以构建复杂的数据模型,支持星型、雪花型...
MaxCompute是阿里巴巴集团推出的大数据计算平台,原名ODPS(Open Data Processing Service),适用于在线数据处理、大规模数据仓库场景,提供海量数据仓库解决方案。MaxCompute用户指南是针对MaxCompute服务的使用...