话不多说 直接一点
package com.alibaba.odps.tunnel.samples;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import com.alibaba.odps.tunnel.Column;
import com.alibaba.odps.tunnel.Configuration;
import com.alibaba.odps.tunnel.DataTunnel;
import com.alibaba.odps.tunnel.RecordSchema;
import com.alibaba.odps.tunnel.Upload;
import com.alibaba.odps.tunnel.io.Record;
import com.alibaba.odps.tunnel.io.RecordWriter;
public class FileUploadSample {
private static String endpoint = "http://dt.odps.aliyun.com"; // 公网
// private static String endpoint = "http://dt-ext.odps.aliyun-inc.com";
private static String accessId = "######";
private static String accessKey = "#######";
private static String project = "######";
private static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
public static void UploadFile(String fileName, String tableName, String pid)
throws Exception {
Configuration cfg = new Configuration(accessId, accessKey, endpoint);
DataTunnel tunnel = new DataTunnel(cfg);
BufferedReader br = null;
try {
String line = "";
Long blockid = (long) 0;
ArrayList<Long> blocks = new ArrayList<Long>();
blocks.add(blockid);
br = new BufferedReader(new InputStreamReader(new FileInputStream(
fileName), "gbk"));
Upload up = tunnel.createUpload(project, tableName, pid);
RecordSchema schema = up.getSchema();
Record r = new Record(schema.getColumnCount());
RecordWriter writer = up.openRecordWriter(blockid);
final long MAX_BYTE_WRITER = 50 * 1024 * 1024 * 1024L;
long max_byte_curr = MAX_BYTE_WRITER;
while ((line = br.readLine()) != null) {
if (line.length() == 0) {
continue;
}
String[] as = line.split(",", -1);
try {
for (int i = 0; i < schema.getColumnCount(); i++) {
Column.Type t = schema.getColumnType(i);
String a = as[i];
if ((a.length() == 0) && (t != Column.Type.ODPS_STRING)) {
switch (t) {
case ODPS_BIGINT:
r.setBigint(i, 0L);
break;
case ODPS_DOUBLE:
r.setDouble(i, 0.0);
break;
case ODPS_DATETIME:
r.setDatetime(i, new Date());
break;
case ODPS_BOOLEAN:
r.setBoolean(i, false);
break;
default:
break;
}
} else
switch (t) {
case ODPS_STRING:
r.setString(i, a);
break;
case ODPS_BIGINT:
r.setBigint(i, Long.parseLong(a));
break;
case ODPS_DOUBLE:
r.setDouble(i, Double.parseDouble(a));
break;
case ODPS_DATETIME:
r.setDatetime(i, (Date) sdf.parse(a));
break;
case ODPS_BOOLEAN:
r.setBoolean(i, Boolean.parseBoolean(a));
break;
default:
break;
}
}
} catch (ParseException e) {
// 解析错误,忽略本行
} catch (ArrayIndexOutOfBoundsException e) {
// 通常原因是少列,忽略本行
} catch (NumberFormatException e) {
// 数据类型不匹配,忽略本行
} catch (Exception e) {
// 本次上传失败,直接退出
throw e;
}
// 如果网络不稳定,可能抛出异常。如果发生,则进行重试处理
for (int t = 0; t >= 0; t++)
try {
writer.write(r);
if (t > 0) {
System.out.println("write retry suc.retry=" + t);
}
if (writer.getTotalBytes() > max_byte_curr) {
max_byte_curr = writer.getTotalBytes()
+ MAX_BYTE_WRITER;
writer.close();
writer = up.openRecordWriter(++blockid);
blocks.add(blockid);
}
break;
} catch (IOException e) {
if (t < 10) {
Thread.sleep(2000);
} else {
throw e;
}
}
}
writer.close();
// 只有complete成功,数据才会被真正写到ODPS中
up.complete(blocks.toArray(new Long[blocks.size()]));
} finally {
if (br != null) {
br.close();
}
}
}
public static void main(String args[]) {
try {
UploadFile("C:/hoh.txt", "sale_detail", "pid=201412");
System.out.println("fileupload sucessful");
} catch (Exception e) {
// 本次上传失败,所有已经write 成功的数据都会被丢弃。
e.printStackTrace();
}
}
}
祝你好运!!
分享到:
相关推荐
Java连接ODPS(MaxCompute)是一项关键的技能,特别是在大数据处理和分析的场景下。ODPS,即阿里云的大数据处理服务MaxCompute,是企业级的海量数据处理平台,提供了SQL、API等多种方式进行数据操作。Java SDK是ODPS...
ODPS(MaxCompute)是阿里巴巴集团推出的一种大数据处理平台,主要设计用于海量数据的离线分析。本权威详尽的帮助手册旨在深入解析ODPS的核心功能、底层优化原理以及实际操作中的各种细节,帮助用户充分利用这一工具...
阿里云ODPS(Open Data Processing Service)是一款大数据处理服务,主要面向海量数据的批处理、交互式查询和分析。ODPS提供了SQL接口,使得开发者能够用熟悉的SQL语法进行大数据操作,极大地降低了大数据处理的门槛...
阿里云ODPS文档 阿里云ODPS(Open Data Processing Service)是一种大规模数据处理服务,提供了基于SQL的数据处理能力。ODPS SQL是ODPS的一部分,提供了类似于SQL的语法,用于处理大规模数据。 ODPS SQL的特点 ...
《ODPS权威指南:阿里大数据平台应用开发实践》是一本深度解析阿里巴巴ODPS技术的专著,旨在为读者提供全面、深入的ODPS理解和应用经验。ODPS,全称为Open Data Processing Service,是阿里巴巴集团自主研发的大数据...
ODPS(Open Data Processing Service)是阿里巴巴开源的大数据处理服务,它主要面向大规模数据分析,提供SQL查询、数据仓库以及批处理能力。这个“ODPS参考手册”是学习和使用ODPS的重要资源,它包含了ODPS的各项...
ODPS,全称为Open Data Processing Service,是阿里云推出的一种大数据处理服务,旨在为企业提供海量数据的存储和计算能力。《ODPS权威指南》作为最新版的参考资料,深入讲解了ODPS的核心特性和应用场景,是学习和...
本资料包“datahub_test_001_odps_datahub_IDEAL_datahub和odps”主要关注两个关键组件:DataHub和ODPS,以及它们在IDEAL环境中的集成应用。 首先,ODPS(开放数据处理服务)是阿里云推出的一种大数据处理平台,它...
ODPS(Open Data Processing Service)是阿里云推出的一种大数据处理服务,它提供了强大的数据存储和计算能力,帮助企业处理海量数据。而"odps-eclipse-plugin-bundle-0.16.0.zip"则是一个针对ODPS开发的Eclipse集成...
ODPS 提供海量数据处理及分析服务,让用户远离大数据运算烦恼。 开放数据处理服务(Open Data Processing Service,ODPS)是基于飞天分布式系统构建的海 量数据处理和分析的服务平台,具有 PB 级别的数据处理能力, ...
【ODPS概述】 开放数据处理服务(ODPS)是由阿里巴巴集团研发的一种大数据处理与分析平台,基于飞天内核构建,旨在提供PB级别的数据处理能力。ODPS通过RESTful API接口对外提供服务,使得开发者能够方便地进行大...
"阿里开放数据处理服务ODPS介绍" 阿里开放数据处理服务ODPS是一款基于云计算的数据处理和分析平台,旨在帮助企业解决大数据处理和分析的问题。ODPS提供了一站式的数据处理和分析服务,使用户可以快速构建大数据应用...
阿里云odpsSql手册1摘要 大数据计算服务MaxCompute SQL概要介绍_MAXCompute SQL是一种面向海量数据(TB级别)的计算服务,适用于实时性要求不高的场合。 MaxCompute SQL的每个作业的准备、提交等阶段需要花费较长...
ODPS(Open Data Processing Service)是阿里云推出的一种大数据处理服务,主要面向大规模数据处理场景。这个“ODPS资料大全”压缩包很可能包含了关于ODPS的详细文档、教程、案例研究以及API参考等内容,旨在帮助...
### ODPS的使用说明 #### 一、ODPS-SQL基础用法 ##### 1. 使用CASE语句 ODPS中的CASE语句用于根据不同的条件返回不同的结果。例如,在创建新表`train_1`时,可以根据`tab`字段的不同值来决定`weight`字段的值: ...
《大数据技术:ODPS MapReduce对外开放实践》 在大数据领域,ODPS(Open Data Processing Service)是阿里巴巴集团推出的一种用于大规模数据处理的底层平台。ODPS的核心目标是为用户提供一个高效、稳定且易于使用的...
odps-jdbc-3.2.9-jar-with-dependencies.jar 是阿里云 MaxCompute(开放数据处理服务,ODPS)平台的 JDBC 驱动程序,专为大数据处理和分析而设计。此版本的驱动程序包含所有必要的依赖库,简化了开发人员在 Java ...
阿里云ODPS机器学习平台,简称PAI,是构建在阿里云ODPS计算服务之上的一个全面的机器学习解决方案。这个平台旨在简化大数据处理、建模、离线预测以及在线预测的过程,为算法开发者和使用者提供了一个高效且易用的...
徐东作为阿里巴巴数据平台事业部的ODPS技术专家,在2014年的中国大数据技术大会上分享了关于ODPS MapReduce对外开放实践的演讲。ODPS,全称为OpenDataProcessSystem,是一个大规模数据处理的底层平台,每天都能够...
某软的公文,使用msi后还用installshield打包,只能手工安装,现已解压并静默。