このトピックでは、MaxCompute Java SDKを使用してデータをダウンロードする方法について説明します。
TableTunnelのDownloadSession操作を使用してデータをダウンロードする
テーブルデータをダウンロードするには、次の手順を実行します。
TableTunnelインターフェイスを作成します。
DownloadSessionオブジェクトを作成します。
レコードを読み取るRecordReaderオブジェクトを作成します。
例
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
// The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. Using these credentials to perform operations is a high-risk operation. We recommend that you use a RAM user to call API operations or perform routine O&M. To create a RAM user, log on to the RAM console.
// In this example, the AccessKey ID and AccessKey secret are configured as environment variables. You can also save your AccessKey pair in the configuration file based on your business requirements.
// We recommend that you do not directly specify the AccessKey ID and AccessKey secret in code to prevent AccessKey pair leaks.
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String tunnelUrl = "https://dt.cn-shanghai-intranet.maxcompute.aliyun-inc.com";
// Set the tunnelUrl parameter. By default, data is transmitted on the Internet. If you need to transmit data on an internal network, you must set the tunnelUrl parameter based on your business needs.
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);// Set the tunnelUrl parameter.
PartitionSpec partitionSpec = new PartitionSpec(partition);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
partitionSpec);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) != null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private static void consumeRecord(Record record, TableSchema schema) {
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
String colValue = null;
switch (column.getType()) {
case BIGINT: {
Long v = record.getBigint(i);
colValue = v == null ? null : v.toString();
break;
}
case BOOLEAN: {
Boolean v = record.getBoolean(i);
colValue = v == null ? null : v.toString();
break;
}
case DATETIME: {
Date v = record.getDatetime(i);
colValue = v == null ? null : v.toString();
break;
}
case DOUBLE: {
Double v = record.getDouble(i);
colValue = v == null ? null : v.toString();
break;
}
case STRING: {
String v = record.getString(i);
colValue = v == null ? null : v.toString();
break;
}
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
System.out.print(colValue == null ? "null" : colValue);
if (i != schema.getColumns().size())
System.out.print("\t");
}
System.out.println();
}
}
説明
上記の例では、中国 (上海) リージョンのクラウド製品相互接続ネットワークのトンネルエンドポイントが使用されています。 エンドポイントとリージョンの詳細については、「エンドポイント」をご参照ください。
この例では、System.out.println
ステートメントを実行してデータを印刷し、テストを容易にします。 実際の使用では、コードを変更してデータをテキストファイルに書き込むことができます。
InstanceTunnelのDownloadSession操作を使用してデータをダウンロードする
Odps odps = OdpsUtils.newDefaultOdps(); // Initialize a MaxCompute object.
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
// Create an InstanceTunnel object.
InstanceTunnel tunnel = new InstanceTunnel(odps);
// Create a DownloadSession object based on the instance ID of the MaxCompute job.
InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId());
long count = session.getRecordCount();
// The number of records that are returned.
System.out.println(count);
// The method that is used to obtain data is the same as that in TableTunnel.
TunnelRecordReader reader = session.openRecordReader(0, count);
Record record;
while ((record = reader.read()) != null) {
for (int col = 0; col < session.getSchema().getColumns().size(); ++col) {
// Print the data of the wc_in table. All fields of the wc_in table are strings.
System.out.println(record.get(col));
}
}
reader.close();
SQLTask.getResultSet()
静的メソッドを使用してデータをダウンロードする
Odps odps = OdpsUtils.newDefaultOdps(); // Initialize a MaxCompute object.
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
// Obtain the record iterator based on the instance ID.
ResultSet rs = SQLTask.getResultSet(i);
for (Record r : rs) {
// The number of records that are returned.
System.out.println(rs.getRecordCount());
for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) {
// Print the data of the wc_in table. All fields of the wc_in table are strings.
System.out.println(r.get(col));
}
}