`
猫耳呀
  • 浏览: 168895 次
社区版块
存档分类
最新评论

MaxCompute Tunnel SDK数据上传利器——BufferedWriter使用指南

阅读更多
摘要: MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 fa.

本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps
MaxCompute 的数据上传接口(Tunnel)定义了数据 block 的概念:一个 block 对应一个 http request,多个 block 的上传可以并发而且是原子的,一次同步请求要么成功要么失败,不会污染其他的 block。这种设计对于服务端来讲十分简洁,但是也把记录状态做 failover 的工作交给了客户端。

用户在使用 Tunnel SDK 编程时,需要对 block 这一层的语义进行认知,并且驱动数据上传的整个过程[1],并且自己进行容错,毕竟『网络错误是正常而不是异常』。由于用户文档中并没有强调这一点的重要性,导致很多用户踩了坑,一种常见的出错场景是,当客户端写数据的速度过慢,两次 write 的间隔超时[2],导致整个 block 上传失败。

High Level API

MaxCompute Java SDK 在 0.21.3-public 之后新增了 BufferredWriter 这个更高层的 API,简化了数据上传的过程,并且提供了容错的功能。 BufferedWriter 对用户隐藏了 block 这个概念,从用户角度看,就是在 session 上打开一个 writer 然后往里面写记录即可:

RecordWriter writer = null;

try {
  int i = 0; 
  writer = uploadSession.openBufferedWriter();
  Record product = uploadSession.newRecord();

  for (String item : items) {
    product.setString("name", item);
    product.setBigint("id", i);
    writer.write(product);
    i += 1;
  }
} finally {
  if (writer != null) {
    writer.close();
  }
}
uploadSession.commit();

具体实现时 BufferedWriter 先将记录缓存在客户端的缓冲区中,并在缓冲区填满之后打开一个 http 连接进行上传。BufferedWriter 会尽最大可能容错,保证数据上传上去。
由于屏蔽了底层细节,这个接口可能并不适合数据预划分、断点续传、分批次上传等需要细粒度控制的场景。

多线程上传示例

多线程上传时,每个线程只需要打开一个 writer 往里面写数据就行了。

class UploadThread extends Thread {
  private UploadSession session;
  private static int RECORD_COUNT = 1200;

  public UploadThread(UploadSession session) {
    this.session = session;
  }

  @Override
  public void run() {
    RecordWriter writer = up.openBufferedWriter();
    Record r = up.newRecord();
    for (int i = 0; i < RECORD_COUNT; i++) {
      r.setBigint(0, i);
      writer.write(r);
    }
    writer.close();
  }
};

public class Example {
  public static void main(String args[]) {

   // 初始化 MaxCompute 和 tunnel 的代码

   TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(projectName, tableName);
   UploadThread t1 = new UploadThread(up);
   UploadThread t2 = new UploadThread(up);

   t1.start();
   t2.start();
   t1.join();
   t2.join();

   uploadSession.commit();
}

更多控制

重试策略

由于底层在上传出错时会回避一段固定的时间并进行重试,但如果你的程序不想花太多时间在重试上,或者你的程序位于一个极其恶劣的网络环境中,为此 TunnelBufferedWriter 允许用户配置重试策略。

用户可以选择三种重试回避策略:指数回避(EXPONENTIAL_BACKOFF)、线性时间回避(LINEAR_BACKOFF)、常数时间回避(CONSTANT_BACKOFF)。

例如下面这段代码可以将,write 的重试次数调整为 6,每一次重试之前先分别回避 4s、8s、16s、32s、64s 和 128s(从 4 开始的指数递增的序列)。

RetryStrategy retry
  = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF)

writer = (TunnelBufferedWriter) uploadSession.openBufferedWriter();
writer.setRetryStrategy(retry);

缓冲区控制

如果你的程序对 JVM 的内存有严格的要求,可以通过下面这个接口修改缓冲区占内存的字节数(bytes):

writer.setBufferSize(1024*1024);

默认配置每一个 Writer 的 BufferSize 是 10 MiB。TunnelBufferedWriter 一次 flush buffer 的操作上传一个 block 的数据[3]。

多个进程共享 Session

由于一个 Session 的上传状态是通过维护一个 block list 实现的,对于多线程程序来讲,通过锁很容易实现资源的分配。但对于两个进程空间里的程序想要复用一个 Session 时,必须通过一种机制对资源进行隔离。

具体地,在 getUploadSession 的时候,必须指定这个共享这个 Session 的进程数目,以及一个用来区分进程的 global id:

//程序1:这个 session 将被两个 writer 共享,我是其中第 0 个
TableTunnel.UploadSession up
  = tunnel.getUploadSession(projectName, tableName, sid, 2, 0);
writer = session.openBufferedWriter();

//程序1:这个 session 将被两个 writer 共享,我是其中第 1 个
TableTunnel.UploadSession up
  = tunnel.getUploadSession(projectName, tableName, sid, 2, 1);
writer = session.openBufferedWriter();

Notes

[1] 一次完整的上传流程通常包括以下步骤:

先对数据进行划分
为每个数据块指定 block id,即调用 openRecordWriter(id)
然后用一个或多个线程分别将这些 block 上传上去
并在某个 block 上传失败以后,需要对整个 block 进行重传
在所有 block 都上传以后,向服务端提供上传成功的 blockid list 进行校验,即调用 session.commit([1,2,3,…])

[2] 因为使用长连接,服务端有计时器判断是否客户端是否 alive

[3] block 在服务端有 20000 个的数量上限,如果 BufferSize 设得太小会导致 20000 个 block 很快被用光

[4] Session的有效期为24小时,超过24小时会导致数据上传失败

原文链接:https://yq.aliyun.com/articles/65030?spm=a2c41.11181499.0.0

分享到:
评论

相关推荐

    阿里大数据计算服务MaxCompute-批量数据通道.pdf

    通过Tunnel SDK,用户可以高效地上传下载数据,MaxCompute Tunnel服务提供了多种优化机制来提高数据上传下载的速度和效率。同时,Tunnel SDK也提供了多种错误处理机制,用户可以根据需要选择合适的错误处理方式。 ...

    阿里大数据计算服务MaxCompute-批量数据通道D.docx

    `TableTunnel`是与MaxCompute Tunnel服务交互的起点,用户可以使用它来创建上传或下载数据的会话。值得注意的是,`TableTunnel`的生命周期从创建实例开始,直到程序结束。它提供了创建和获取`UploadSession`和`...

    MaxCompute数据开发实战—数据进入MaxCompute的N种方式.pdf

    ### MaxCompute数据开发实战——数据进入MaxCompute的多种方式 #### 概述 本文档旨在详细介绍如何通过不同的技术手段和工具实现数据从多种源头至MaxCompute的高效迁移,并结合具体的业务场景,展示整个数据处理...

    MaxCompute技术公开课第四季之MaxComputeTunnel上传典型问题场景实战.pdf

    MaxCompute Tunnel 是一种高效的数据传输工具,主要用于将本地文件上传至MaxCompute表中。该工具提供了灵活的数据上传选项,适用于各种复杂的数据结构和应用场景。 1. **上传支持**: - 支持单个文件或一级目录的...

    藏经阁-MaxCompute多租户数据安全体系介绍及实践.pdf

    MaxCompute是阿里云的一款大数据处理服务,其多租户数据安全体系是确保用户在共享计算资源时数据安全的重要机制。本实践文档详细介绍了MaxCompute如何通过多种方式来保护用户的数据安全。 首先,MaxCompute采用权限...

    阿里云 专有云企业版 V3.9.0 大数据计算服务(MaxCompute) 用户指南 20191017.pdf

    5. **数据导入与导出**:用户可以使用DataHub、ODPS Tunnel等工具进行数据导入导出,与其他阿里云服务如OSS(Object Storage Service)和RDS(Relational Database Service)集成,实现数据的无缝迁移。 6. **监控...

    阿里大数据计算服务MaxCompute-产品简介.pdf

    - **数据导入**:MaxCompute提供了数据通道服务(TUNNEL),支持高并发的离线数据上传和下载,用户可以通过Java API进行操作。 - **SQL支持**:MaxCompute支持SQL查询,允许用户以传统数据库的方式操作数据,尽管...

    阿里云 专有云企业版 V3.6.1 MaxCompute 开发指南 - 20181105.pdf

    - **开发工具**:提供了多种客户端工具,如DataHub、ODPS Studio、Tunnel命令行工具等,方便用户上传数据、运行SQL和管理项目。 - **SDK支持**:提供了Java、Python、.NET等多种语言的SDK,便于集成到企业现有的...

    阿里云 专有云企业版 V3.6.1 MaxCompute 运维指南 - 20190322.pdf

    - **运维工具**:可能包括DataHub、SDK、Tunnel服务等,这些工具方便用户与MaxCompute进行交互,实现数据的上传下载和实时流处理。 3. **法律声明**: - **使用权限**:用户只能通过阿里云官方渠道获取和使用文档...

    阿里云 专有云企业版 V3.7.1 MaxCompute 运维指南 20190318.pdf

    阿里云专有云企业版V3.7.1的MaxCompute运维指南主要涵盖了MaxCompute的运维操作和管理,这是阿里云大数据处理平台的核心组件,用于处理海量数据的计算任务。以下是对指南中涉及的主要知识点的详细说明: 1. **产品...

    基于MaxCompute的大数据BI分析.pptx

    * MaxCompute 提供了完整的批量(Tunnel)/实时(Datahub)数据接入能力,Dataworks 数据开发/应用生产运维/数据管理/数据质量。 * 该企业级管理能力使得 MaxCompute 能够快速构建企业级数据服务平台,提高了数据...

    阿里云 专有云企业版 V3.9.0 大数据计算服务(MaxCompute) 运维指南 20191017.pdf

    阿里云专有云企业版V3.9.0大数据计算服务(MaxCompute)运维指南主要聚焦于MaxCompute的运维操作和管理,旨在帮助用户更好地理解和使用这个大数据处理平台。MaxCompute是一个面向企业的大规模数据仓库服务,它提供...

    阿里巴巴大数据之路——数据技术篇.pdf

    在整体架构中,数据采集层使用DataX等工具进行数据同步,数据计算层依赖MaxCompute这样的离线计算平台,数据服务层通过RDS等数据库服务提供接口或视图数据,数据应用层涵盖流量分析等多个数据应用工具。 在数据采集...

    Max用户指南.pdf

    根据给定的信息,“Max用户指南.pdf”主要涉及的是阿里云MaxCompute的相关内容,这是一款针对大数据处理的服务平台。从文档的法律声明部分可以得知,此文档的使用范围、权限及限制,以及用户如何正确地使用...

    阿里大数据计算服务MaxCompute-产品简介D.docx

    1. **数据导入方案**:MaxCompute提供Tunnel服务,支持高并发的离线数据上传和下载,用户可以通过Java编程接口进行操作。 2. **SQL支持**:虽然MaxCompute的SQL不支持事务、索引和Update/Delete等操作,但它能处理...

    信息安全_数据安全_MaxCompute 2.0 overview.pdf

    2. **多功能性**:它支持离线和准实时计算,提供了SQL、MapReduce、机器学习(ML)等多种计算模式,并通过Tunnel和Datahub实现了数据的快速导入导出。 3. **高性能**:MaxCompute 2.0可以处理100PB级别的数据,其...

Global site tag (gtag.js) - Google Analytics