本文為您介紹如何使用MaxCompute Java SDK實現資料下載。
使用TableTunnel的DownloadSession介面實現資料下載
典型的表資料下載流程:
建立TableTunnel。
建立DownloadSession。
建立RecordReader,讀取Record。
樣本
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
// 阿里雲帳號AccessKey擁有所有API的存取權限,風險很高。強烈建議您建立並使用RAM使用者進行API訪問或日常營運,請登入RAM控制台建立RAM使用者
// 此處以把AccessKey 和 AccessKeySecret 儲存在環境變數為例說明。您也可以根據業務需要,儲存到設定檔裡
// 強烈建議不要把 AccessKey 和 AccessKeySecret 儲存到代碼裡,會存在密鑰泄漏風險
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String tunnelUrl = "https://dt.cn-shanghai-intranet.maxcompute.aliyun-inc.com";
//設定tunnelUrl,使用內網時需要設定,否則預設公網。
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);//設定tunnelUrl。
PartitionSpec partitionSpec = new PartitionSpec(partition);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
partitionSpec);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) != null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private static void consumeRecord(Record record, TableSchema schema) {
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
String colValue = null;
switch (column.getType()) {
case BIGINT: {
Long v = record.getBigint(i);
colValue = v == null ? null : v.toString();
break;
}
case BOOLEAN: {
Boolean v = record.getBoolean(i);
colValue = v == null ? null : v.toString();
break;
}
case DATETIME: {
Date v = record.getDatetime(i);
colValue = v == null ? null : v.toString();
break;
}
case DOUBLE: {
Double v = record.getDouble(i);
colValue = v == null ? null : v.toString();
break;
}
case STRING: {
String v = record.getString(i);
colValue = v == null ? null : v.toString();
break;
}
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
System.out.print(colValue == null ? "null" : colValue);
if (i != schema.getColumns().size())
System.out.print("\t");
}
System.out.println();
}
}說明
上述樣本給出的是華東2(上海)地區的雲產品互聯Tunnel Endpoint,其他Region的Tunnel Endpoint設定請參見Endpoint。
本例中,為方便測試,資料通過System.out.println直接列印出來。在實際使用時,您可改寫為直接輸出到文字檔。
使用InstanceTunnel的DownloadSession介面實現資料下載
Odps odps = OdpsUtils.newDefaultOdps(); // 初始化Odps對象。
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
//建立InstanceTunnel。
InstanceTunnel tunnel = new InstanceTunnel(odps);
//根據instance id,建立DownloadSession。
InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId());
long count = session.getRecordCount();
//輸出結果條數。
System.out.println(count);
//擷取資料的寫法與TableTunnel一樣。
TunnelRecordReader reader = session.openRecordReader(0, count);
Record record;
while ((record = reader.read()) != null) {
for (int col = 0; col < session.getSchema().getColumns().size(); ++col) {
//wc_in表欄位均為STRING,這裡直接列印輸出。
System.out.println(record.get(col));
}
}
reader.close();通過使用SQLTask.getResultSet()靜態方法實現資料下載
Odps odps = OdpsUtils.newDefaultOdps(); //初始化Odps對象。
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
//根據instance對象,擷取結果迭代器。
ResultSet rs = SQLTask.getResultSet(i);
for (Record r : rs) {
//輸出結果條數。
System.out.println(rs.getRecordCount());
for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) {
//wc_in表欄位均為STRING,這裡直接列印輸出。
System.out.println(r.get(col));
}
}