全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:简单下载示例

更新时间:Aug 09, 2023

本文为您介绍如何使用MaxCompute Java SDK实现数据下载。

使用TableTunnel的DownloadSession接口实现数据下载

典型的表数据下载流程:

  1. 创建TableTunnel。

  2. 创建DownloadSession。

  3. 创建RecordReader,读取Record。

示例

import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
     // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
		 // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
		 // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
		 private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
		 private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
     private static String odpsUrl = "http://service.odps.aliyun.com/api";
     private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
     //设置tunnelUrl,使用内网时需要设置,否则默认公网。
     private static String project = "<your project>";
     private static String table = "<your table name>";
     private static String partition = "<your partition spec>";
     public static void main(String args[]) {
         Account account = new AliyunAccount(accessId, accessKey);
         Odps odps = new Odps(account);
         odps.setEndpoint(odpsUrl);
         odps.setDefaultProject(project);
         TableTunnel tunnel = new TableTunnel(odps);
         tunnel.setEndpoint(tunnelUrl);//设置tunnelUrl。
         PartitionSpec partitionSpec = new PartitionSpec(partition);
           try {
                  DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
                                      partitionSpec);
                  System.out.println("Session Status is : "
                                      + downloadSession.getStatus().toString());
                  long count = downloadSession.getRecordCount();
                  System.out.println("RecordCount is: " + count);
                  RecordReader recordReader = downloadSession.openRecordReader(0,
                                         count);
                         Record record;
                         while ((record = recordReader.read()) != null) {
                                 consumeRecord(record, downloadSession.getSchema());
                         }
                         recordReader.close();
                 } catch (TunnelException e) {
                         e.printStackTrace();
                 } catch (IOException e1) {
                         e1.printStackTrace();
                 }
         }
         private static void consumeRecord(Record record, TableSchema schema) {
                 for (int i = 0; i < schema.getColumns().size(); i++) {
                         Column column = schema.getColumn(i);
                         String colValue = null;
                         switch (column.getType()) {
                         case BIGINT: {
                                 Long v = record.getBigint(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case BOOLEAN: {
                                 Boolean v = record.getBoolean(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case DATETIME: {
                                 Date v = record.getDatetime(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case DOUBLE: {
                                 Double v = record.getDouble(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case STRING: {
                                 String v = record.getString(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         default:
                                 throw new RuntimeException("Unknown column type: "
                                                 + column.getType());
                         }
                         System.out.print(colValue == null ? "null" : colValue);
                         if (i != schema.getColumns().size())
                                 System.out.print("\t");
                 }
                 System.out.println();
         }
 }
说明

上述示例给出的是华东2经典网络Tunnel Endpoint,其他region的Tunnel Endpoint设置请参见Endpoint

本例中,为方便测试,数据通过System.out.println直接打印出来。在实际使用时,您可改写为直接输出到文本文件。

使用InstanceTunnel的DownloadSession接口实现数据下载

Odps odps = OdpsUtils.newDefaultOdps(); // 初始化Odps对象。
    Instance i = SQLTask.run(odps, "select * from wc_in;");
    i.waitForSuccess();
    //创建InstanceTunnel。
    InstanceTunnel tunnel = new InstanceTunnel(odps);
    //根据instance id,创建DownloadSession。
    InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId());
    long count = session.getRecordCount();
    //输出结果条数。
    System.out.println(count);
    //获取数据的写法与TableTunnel一样。
    TunnelRecordReader reader = session.openRecordReader(0, count);
    Record record;
    while ((record = reader.read()) != null) {
      for (int col = 0; col < session.getSchema().getColumns().size(); ++col) {
        //wc_in表字段均为STRING,这里直接打印输出。
        System.out.println(record.get(col));
      }
    }
    reader.close();

通过使用SQLTask.getResultSet()静态方法实现数据下载

Odps odps = OdpsUtils.newDefaultOdps(); //初始化Odps对象。
    Instance i = SQLTask.run(odps, "select * from wc_in;");
    i.waitForSuccess();
    //根据instance对象,获取结果迭代器。
    ResultSet rs = SQLTask.getResultSet(i);
    for (Record r : rs) {
      //输出结果条数。
      System.out.println(rs.getRecordCount());
      for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) {
        //wc_in表字段均为STRING,这里直接打印输出。
        System.out.println(r.get(col));
      }
    }