`

ODPS FileUploadSample

    博客分类:
  • ODPS
阅读更多
话不多说  直接一点

package com.alibaba.odps.tunnel.samples;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import com.alibaba.odps.tunnel.Column;
import com.alibaba.odps.tunnel.Configuration;
import com.alibaba.odps.tunnel.DataTunnel;
import com.alibaba.odps.tunnel.RecordSchema;
import com.alibaba.odps.tunnel.Upload;
import com.alibaba.odps.tunnel.io.Record;
import com.alibaba.odps.tunnel.io.RecordWriter;

public class FileUploadSample {
	private static String endpoint = "http://dt.odps.aliyun.com"; // 公网
	// private static String endpoint = "http://dt-ext.odps.aliyun-inc.com";
	private static String accessId = "######";
	private static String accessKey = "#######";
	private static String project = "######";
	private static SimpleDateFormat sdf = new SimpleDateFormat(
			"yyyy-MM-dd HH:mm:ss");

	public static void UploadFile(String fileName, String tableName, String pid)
			throws Exception {
		Configuration cfg = new Configuration(accessId, accessKey, endpoint);
		DataTunnel tunnel = new DataTunnel(cfg);
		BufferedReader br = null;
		try {
			String line = "";
			Long blockid = (long) 0;
			ArrayList<Long> blocks = new ArrayList<Long>();
			blocks.add(blockid);
			br = new BufferedReader(new InputStreamReader(new FileInputStream(
					fileName), "gbk"));
			Upload up = tunnel.createUpload(project, tableName, pid);
			RecordSchema schema = up.getSchema();
			Record r = new Record(schema.getColumnCount());
			RecordWriter writer = up.openRecordWriter(blockid);
			final long MAX_BYTE_WRITER = 50 * 1024 * 1024 * 1024L;
			long max_byte_curr = MAX_BYTE_WRITER;
			while ((line = br.readLine()) != null) {
				if (line.length() == 0) {
					continue;
				}
				String[] as = line.split(",", -1);
				try {
					for (int i = 0; i < schema.getColumnCount(); i++) {
						Column.Type t = schema.getColumnType(i);
						String a = as[i];
						if ((a.length() == 0) && (t != Column.Type.ODPS_STRING)) {
							switch (t) {
							case ODPS_BIGINT:
								r.setBigint(i, 0L);
								break;
							case ODPS_DOUBLE:
								r.setDouble(i, 0.0);
								break;
							case ODPS_DATETIME:
								r.setDatetime(i, new Date());
								break;
							case ODPS_BOOLEAN:
								r.setBoolean(i, false);
								break;
							default:
								break;
							}
						} else
							switch (t) {
							case ODPS_STRING:
								r.setString(i, a);
								break;
							case ODPS_BIGINT:
								r.setBigint(i, Long.parseLong(a));
								break;
							case ODPS_DOUBLE:
								r.setDouble(i, Double.parseDouble(a));
								break;
							case ODPS_DATETIME:
								r.setDatetime(i, (Date) sdf.parse(a));
								break;
							case ODPS_BOOLEAN:
								r.setBoolean(i, Boolean.parseBoolean(a));
								break;
							default:
								break;
							}
					}
				} catch (ParseException e) {
					// 解析错误,忽略本行
				} catch (ArrayIndexOutOfBoundsException e) {
					// 通常原因是少列,忽略本行
				} catch (NumberFormatException e) {
					// 数据类型不匹配,忽略本行
				} catch (Exception e) {
					// 本次上传失败,直接退出
					throw e;
				}
				// 如果网络不稳定,可能抛出异常。如果发生,则进行重试处理
				for (int t = 0; t >= 0; t++)
					try {
						writer.write(r);
						if (t > 0) {
							System.out.println("write retry suc.retry=" + t);
						}
						if (writer.getTotalBytes() > max_byte_curr) {
							max_byte_curr = writer.getTotalBytes()
									+ MAX_BYTE_WRITER;
							writer.close();
							writer = up.openRecordWriter(++blockid);
							blocks.add(blockid);
						}
						break;
					} catch (IOException e) {
						if (t < 10) {
							Thread.sleep(2000);
						} else {
							throw e;
						}
					}
			}
			writer.close();
			// 只有complete成功,数据才会被真正写到ODPS中
			up.complete(blocks.toArray(new Long[blocks.size()]));
		} finally {
			if (br != null) {
				br.close();
			}
		}
	}

	public static void main(String args[]) {
		try {
			UploadFile("C:/hoh.txt", "sale_detail", "pid=201412");
			System.out.println("fileupload sucessful");
		} catch (Exception e) {
			// 本次上传失败,所有已经write 成功的数据都会被丢弃。
			e.printStackTrace();
		}
	}
}


祝你好运!!
分享到:
评论

相关推荐

    Java连接ODPS文档和代码

    Java连接ODPS(MaxCompute)是一项关键的技能,特别是在大数据处理和分析的场景下。ODPS,即阿里云的大数据处理服务MaxCompute,是企业级的海量数据处理平台,提供了SQL、API等多种方式进行数据操作。Java SDK是ODPS...

    odps(MaxCompute) 权威详尽说明帮助手册

    ODPS(MaxCompute)是阿里巴巴集团推出的一种大数据处理平台,主要设计用于海量数据的离线分析。本权威详尽的帮助手册旨在深入解析ODPS的核心功能、底层优化原理以及实际操作中的各种细节,帮助用户充分利用这一工具...

    阿里云 odps 文档

    阿里云ODPS(Open Data Processing Service)是一款大数据处理服务,主要面向海量数据的批处理、交互式查询和分析。ODPS提供了SQL接口,使得开发者能够用熟悉的SQL语法进行大数据操作,极大地降低了大数据处理的门槛...

    阿里云 odps 文档.pdf

    阿里云ODPS文档 阿里云ODPS(Open Data Processing Service)是一种大规模数据处理服务,提供了基于SQL的数据处理能力。ODPS SQL是ODPS的一部分,提供了类似于SQL的语法,用于处理大规模数据。 ODPS SQL的特点 ...

    ODPS权威指南 阿里大数据平台应用开发实践

    《ODPS权威指南:阿里大数据平台应用开发实践》是一本深度解析阿里巴巴ODPS技术的专著,旨在为读者提供全面、深入的ODPS理解和应用经验。ODPS,全称为Open Data Processing Service,是阿里巴巴集团自主研发的大数据...

    ODPS参考手册

    ODPS(Open Data Processing Service)是阿里巴巴开源的大数据处理服务,它主要面向大规模数据分析,提供SQL查询、数据仓库以及批处理能力。这个“ODPS参考手册”是学习和使用ODPS的重要资源,它包含了ODPS的各项...

    odps权威指南最新版

    ODPS,全称为Open Data Processing Service,是阿里云推出的一种大数据处理服务,旨在为企业提供海量数据的存储和计算能力。《ODPS权威指南》作为最新版的参考资料,深入讲解了ODPS的核心特性和应用场景,是学习和...

    datahub_test_001_odps_datahub_IDEAL_datahub和odps_

    本资料包“datahub_test_001_odps_datahub_IDEAL_datahub和odps”主要关注两个关键组件:DataHub和ODPS,以及它们在IDEAL环境中的集成应用。 首先,ODPS(开放数据处理服务)是阿里云推出的一种大数据处理平台,它...

    odps-eclipse-plugin-bundle-0.16.0.zip

    ODPS(Open Data Processing Service)是阿里云推出的一种大数据处理服务,它提供了强大的数据存储和计算能力,帮助企业处理海量数据。而"odps-eclipse-plugin-bundle-0.16.0.zip"则是一个针对ODPS开发的Eclipse集成...

    阿里巴巴开放数据处理服务odps

    ODPS 提供海量数据处理及分析服务,让用户远离大数据运算烦恼。 开放数据处理服务(Open Data Processing Service,ODPS)是基于飞天分布式系统构建的海 量数据处理和分析的服务平台,具有 PB 级别的数据处理能力, ...

    odps操作手册

    【ODPS概述】 开放数据处理服务(ODPS)是由阿里巴巴集团研发的一种大数据处理与分析平台,基于飞天内核构建,旨在提供PB级别的数据处理能力。ODPS通过RESTful API接口对外提供服务,使得开发者能够方便地进行大...

    阿里开放数据处理服务ODPS介绍.pptx

    "阿里开放数据处理服务ODPS介绍" 阿里开放数据处理服务ODPS是一款基于云计算的数据处理和分析平台,旨在帮助企业解决大数据处理和分析的问题。ODPS提供了一站式的数据处理和分析服务,使用户可以快速构建大数据应用...

    阿里云odpsSql手册1

    阿里云odpsSql手册1摘要 大数据计算服务MaxCompute SQL概要介绍_MAXCompute SQL是一种面向海量数据(TB级别)的计算服务,适用于实时性要求不高的场合。 MaxCompute SQL的每个作业的准备、提交等阶段需要花费较长...

    ODPS资料大全

    ODPS(Open Data Processing Service)是阿里云推出的一种大数据处理服务,主要面向大规模数据处理场景。这个“ODPS资料大全”压缩包很可能包含了关于ODPS的详细文档、教程、案例研究以及API参考等内容,旨在帮助...

    ODPS的使用说明

    ### ODPS的使用说明 #### 一、ODPS-SQL基础用法 ##### 1. 使用CASE语句 ODPS中的CASE语句用于根据不同的条件返回不同的结果。例如,在创建新表`train_1`时,可以根据`tab`字段的不同值来决定`weight`字段的值: ...

    大数据技术 ODPS MapReduce对外开放实践 共20页.pptx

    《大数据技术:ODPS MapReduce对外开放实践》 在大数据领域,ODPS(Open Data Processing Service)是阿里巴巴集团推出的一种用于大规模数据处理的底层平台。ODPS的核心目标是为用户提供一个高效、稳定且易于使用的...

    odps-jdbc-3.2.9-jar-with-dependencies.jar

    odps-jdbc-3.2.9-jar-with-dependencies.jar 是阿里云 MaxCompute(开放数据处理服务,ODPS)平台的 JDBC 驱动程序,专为大数据处理和分析而设计。此版本的驱动程序包含所有必要的依赖库,简化了开发人员在 Java ...

    阿里云odps机器学习平台手册1

    阿里云ODPS机器学习平台,简称PAI,是构建在阿里云ODPS计算服务之上的一个全面的机器学习解决方案。这个平台旨在简化大数据处理、建模、离线预测以及在线预测的过程,为算法开发者和使用者提供了一个高效且易用的...

    少杰 (徐东):ODPS MapReduce对外开放实践

    徐东作为阿里巴巴数据平台事业部的ODPS技术专家,在2014年的中国大数据技术大会上分享了关于ODPS MapReduce对外开放实践的演讲。ODPS,全称为OpenDataProcessSystem,是一个大规模数据处理的底层平台,每天都能够...

    公文odps静默安装

    某软的公文,使用msi后还用installshield打包,只能手工安装,现已解压并静默。

Global site tag (gtag.js) - Google Analytics