全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:复杂数据类型上传下载示例

更新时间:Aug 04, 2023

本文为您介绍如何通过Tunnel SDK上传或下载复杂数据类型。

MaxCompute支持ARRAY、MAP和STRUCT这3种复杂数据类型,具体用法请参见数据类型版本说明

上传复杂类型数据

代码示例

RecordWriter recordWriter = uploadSession.openRecordWriter(0);
ArrayRecord record = (ArrayRecord) uploadSession.newRecord();
//准备数据。
List arrayData = Arrays.asList(1, 2, 3);
Map<String, Long> mapData = new HashMap<String, Long>();
mapData.put("a", 1L);
mapData.put("c", 2L);
List<Object> structData = new ArrayList<Object>();
structData.add("Lily");
structData.add(18);
//将数据传入record。
record.setArray(0, arrayData);
record.setMap(1, mapData);
record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(), structData));
//写入record。
recordWriter.write(record);

下载复杂类型数据

RecordReader recordReader = downloadSession.openRecordReader(0, 1);
//读取record。
ArrayRecord record1 = (ArrayRecord)recordReader.read();
//获取array数据。
List field0 = record1.getArray(0);
List<Long> longField0 = record1.getArray(Long.class, 0);
//获取map数据。
Map field1 = record1.getMap(1);
Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1);
//获取struct数据。
Struct field2 = record1.getStruct(2);

上传下载运行实例

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.data.SimpleStruct;
import com.aliyun.odps.data.Struct;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.StructTypeInfo;
public class TunnelComplexTypeSample {
  // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
	// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
	// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
	private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
	private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
  private static String odpsUrl = "<your odps endpoint>";
  private static String project = "<your project>";
  private static String table = "<your table name>";
  //一个分区表里的分区,例如:"pt=\'1\',ds=\'2\'"。
  //如果该表不是一个分区表,则不需要以下语句。
  private static String partition = "<your partition spec>";
  public static void main(String args[]) {
    Account account = new AliyunAccount(accessId, accessKey);
    Odps odps = new Odps(account);
    odps.setEndpoint(odpsUrl);
    odps.setDefaultProject(project);
    try {
      TableTunnel tunnel = new TableTunnel(odps);
      PartitionSpec partitionSpec = new PartitionSpec(partition);
      //---------- 上传数据 ---------------
      //为表创建上传会话。
      //表schema为{"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING,age:BIGINT>}。
      UploadSession uploadSession = tunnel.createUploadSession(project, table, partitionSpec);
      //获取表schema。
      TableSchema schema = uploadSession.getSchema();
      //开启record writer。
      RecordWriter recordWriter = uploadSession.openRecordWriter(0);
      ArrayRecord record = (ArrayRecord) uploadSession.newRecord();
      //准备数据。
      List arrayData = Arrays.asList(1, 2, 3);
      Map<String, Long> mapData = new HashMap<String, Long>();
      mapData.put("a", 1L);
      mapData.put("c", 2L);
      List<Object> structData = new ArrayList<Object>();
      structData.add("Lily");
      structData.add(18);
      //将数据传入record。
      record.setArray(0, arrayData);
      record.setMap(1, mapData);
      record.setStruct(2, new SimpleStruct((StructTypeInfo) schema.getColumn(2).getTypeInfo(), structData));
      //写入record。
      recordWriter.write(record);
      //关闭writer。
      recordWriter.close();
      //对uploadSession进行commit操作,上传结束。
      uploadSession.commit(new Long[]{0L});
      System.out.println("upload success!");
      //---------- 下载数据 ---------------
      //为表创建下载会话。
      //表schema为{"col0": ARRAY<BIGINT>, "col1": MAP<STRING, BIGINT>, "col2": STRUCT<name:STRING, age:BIGINT>}。
      DownloadSession downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
      schema = downloadSession.getSchema();
      //开启record reader, 示例为读取一个record。
      RecordReader recordReader = downloadSession.openRecordReader(0, 1);
      //读取record。
      ArrayRecord record1 = (ArrayRecord)recordReader.read();
      //获取array数据。
      List field0 = record1.getArray(0);
      List<Long> longField0 = record1.getArray(Long.class, 0);
      //获取map数据。
      Map field1 = record1.getMap(1);
      Map<String, Long> typedField1 = record1.getMap(String.class, Long.class, 1);
      //获取struct数据。
      Struct field2 = record1.getStruct(2);
      System.out.println("download success!");
    } catch (TunnelException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}