全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:简单上传示例

更新时间:Aug 09, 2023

Tunnel SDK是MaxCompute提供的离线批量数据通道服务,主要提供大批量离线数据上传和下载。

典型的表数据上传流程

  1. 创建表,如果是分区表先创建分区。

  2. 创建TableTunnel。

  3. 创建UploadSession。

  4. 创建RecordWriter,写入Record。

  5. 提交上传。

示例

import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
public class UploadSample {
    // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
		// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
		// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
		private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
		private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    private static String odpsUrl = "http://service.odps.aliyun.com/api";
    private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
    //默认情况下,使用公网进行传输。如果需要通过内网进行数据传输,必须设置tunnelUrl变量。
    //此处取值为华东2经典网络Tunnel Endpoint。
    private static String project = "<your project>";
    private static String table = "<your table name>";
    private static String partition = "<your partition spec>";
    public static void main(String args[]) {
        // 该部分为准备工作,仅需执行一次。
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);
        try {
            TableTunnel tunnel = new TableTunnel(odps);
            // 设置tunnelUrl。
        tunnel.setEndpoint(tunnelUrl);
            // 确定写入分区。
        PartitionSpec partitionSpec = new PartitionSpec(partition);
            // 在服务端的本表本分区上创建一个有效期为24小时的Session。
        // 24小时内,该Session共计可以上传20000个分区(Block)数据。
        // 创建Session的耗时在秒级,且需要在服务端使用部分资源、创建临时目录等,需要消耗较多的资源。
        // 建议同一个分区的数据尽可能通过复用Session的方式上传。
            UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec);
            System.out.println("Session Status is : " + uploadSession.getStatus().toString());
            TableSchema schema = uploadSession.getSchema();
        // 数据准备完成后,打开Writer开始写入数据,将数据写入一个Block。
        // 每个Block仅能成功上传一次,不可重复上传。CloseWriter执行成功即代表该Block上传成功,如果失败可重新上传该Block。
        // 同一个Session最多可以包含20000个BlockId(即0-19999)。如果超出20000个需要执行Commit Session并重新创新一个新的Session。
        // 单个Block内如果写入的数据过少将产生大量小文件,严重影响计算性能。强烈建议每次写入64 MB以上的数据(同一Block支持写入100 GB以内的数据)。
        // 如果创建一个Session后仅上传少量数据,不仅会造成小文件、空目录等问题,还会严重影响上传性能(创建Session耗时数秒,而真正数据上传可能仅花费十几毫秒)。
          // Writer创建后任意一段时间内,如果任意连续两分钟内没有写入4 KB以上的数据,将会超时并自动断开连接。
           // 建议在创建Writer前,在内存中准备好可以直接写入的数据。
            RecordWriter recordWriter = uploadSession.openRecordWriter(0);
            Record record = uploadSession.newRecord();
            for (int i = 0; i < schema.getColumns().size(); i++) {
                Column column = schema.getColumn(i);
                switch (column.getType()) {
                    case BIGINT:
                        record.setBigint(i, 1L);
                        break;
                    case BOOLEAN:
                        record.setBoolean(i, true);
                        break;
                    case DATETIME:
                        record.setDatetime(i, new Date());
                        break;
                    case DOUBLE:
                        record.setDouble(i, 0.0);
                        break;
                    case STRING:
                        record.setString(i, "sample");
                        break;
                    default:
                        throw new RuntimeException("Unknown column type: " + column.getType());
                }
            }
            for (int i = 0; i < 10; i++) {
        // 数据写入服务端时,每写入8 KB的数据将进行一次网络传输。
        // 如果120秒内无网络传输,服务端将自动关闭连接,该Writer将不可用,需要重新创建Writer执行数据写入。
                recordWriter.write(record);
            }
            recordWriter.close();
            uploadSession.commit(new Long[]{0L});
            System.out.println("upload success!");
        } catch (TunnelException e) {
            // 建议重试一定次数。
            e.printStackTrace();
        } catch (IOException e) {
            // 建议重试一定次数。
            e.printStackTrace();
        }
    }
}

构造器举例说明:

PartitionSpec(String spec):通过字符串构造此类对象。

参数说明:

spec:分区定义字符串,例如pt=’1’,ds=’2’。

因此程序中应该配置如下:

private static String partition = “pt=’XXX’,ds=’XXX’”;
说明

文中使用的是华东2经典网络Tunnel Endpoint,其他Region的Tunnel Endpoint设置可以参考文档Endpoint