全部產品
Search
文件中心

MaxCompute:簡單下載樣本

更新時間:Oct 18, 2024

本文為您介紹如何使用MaxCompute Java SDK實現資料下載。

使用TableTunnel的DownloadSession介面實現資料下載

典型的表資料下載流程:

  1. 建立TableTunnel。

  2. 建立DownloadSession。

  3. 建立RecordReader,讀取Record。

樣本

import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
     // 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者
		 // 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡
		 // 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險
		 private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
		 private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
     private static String odpsUrl = "http://service.odps.aliyun.com/api";
     private static String tunnelUrl = "https://dt.cn-shanghai-intranet.maxcompute.aliyun-inc.com";
     //設定tunnelUrl,使用內網時需要設定,否則預設公網。
     private static String project = "<your project>";
     private static String table = "<your table name>";
     private static String partition = "<your partition spec>";
     public static void main(String args[]) {
         Account account = new AliyunAccount(accessId, accessKey);
         Odps odps = new Odps(account);
         odps.setEndpoint(odpsUrl);
         odps.setDefaultProject(project);
         TableTunnel tunnel = new TableTunnel(odps);
         tunnel.setEndpoint(tunnelUrl);//設定tunnelUrl。
         PartitionSpec partitionSpec = new PartitionSpec(partition);
           try {
                  DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
                                      partitionSpec);
                  System.out.println("Session Status is : "
                                      + downloadSession.getStatus().toString());
                  long count = downloadSession.getRecordCount();
                  System.out.println("RecordCount is: " + count);
                  RecordReader recordReader = downloadSession.openRecordReader(0,
                                         count);
                         Record record;
                         while ((record = recordReader.read()) != null) {
                                 consumeRecord(record, downloadSession.getSchema());
                         }
                         recordReader.close();
                 } catch (TunnelException e) {
                         e.printStackTrace();
                 } catch (IOException e1) {
                         e1.printStackTrace();
                 }
         }
         private static void consumeRecord(Record record, TableSchema schema) {
                 for (int i = 0; i < schema.getColumns().size(); i++) {
                         Column column = schema.getColumn(i);
                         String colValue = null;
                         switch (column.getType()) {
                         case BIGINT: {
                                 Long v = record.getBigint(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case BOOLEAN: {
                                 Boolean v = record.getBoolean(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case DATETIME: {
                                 Date v = record.getDatetime(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case DOUBLE: {
                                 Double v = record.getDouble(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         case STRING: {
                                 String v = record.getString(i);
                                 colValue = v == null ? null : v.toString();
                                 break;
                         }
                         default:
                                 throw new RuntimeException("Unknown column type: "
                                                 + column.getType());
                         }
                         System.out.print(colValue == null ? "null" : colValue);
                         if (i != schema.getColumns().size())
                                 System.out.print("\t");
                 }
                 System.out.println();
         }
 }
說明

上述樣本給出的是華東2(上海)地區的雲產品互聯Tunnel Endpoint,其他Region的Tunnel Endpoint設定請參見Endpoint

本例中,為方便測試,資料通過System.out.println直接列印出來。在實際使用時,您可改寫為直接輸出到文字檔。

使用InstanceTunnel的DownloadSession介面實現資料下載

Odps odps = OdpsUtils.newDefaultOdps(); // 初始化Odps對象。
    Instance i = SQLTask.run(odps, "select * from wc_in;");
    i.waitForSuccess();
    //建立InstanceTunnel。
    InstanceTunnel tunnel = new InstanceTunnel(odps);
    //根據instance id,建立DownloadSession。
    InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId());
    long count = session.getRecordCount();
    //輸出結果條數。
    System.out.println(count);
    //擷取資料的寫法與TableTunnel一樣。
    TunnelRecordReader reader = session.openRecordReader(0, count);
    Record record;
    while ((record = reader.read()) != null) {
      for (int col = 0; col < session.getSchema().getColumns().size(); ++col) {
        //wc_in表欄位均為STRING,這裡直接列印輸出。
        System.out.println(record.get(col));
      }
    }
    reader.close();

通過使用SQLTask.getResultSet()靜態方法實現資料下載

Odps odps = OdpsUtils.newDefaultOdps(); //初始化Odps對象。
    Instance i = SQLTask.run(odps, "select * from wc_in;");
    i.waitForSuccess();
    //根據instance對象,擷取結果迭代器。
    ResultSet rs = SQLTask.getResultSet(i);
    for (Record r : rs) {
      //輸出結果條數。
      System.out.println(rs.getRecordCount());
      for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) {
        //wc_in表欄位均為STRING,這裡直接列印輸出。
        System.out.println(r.get(col));
      }
    }