ここでは、複数の方法で MaxCompute SQL の計算結果をダウンロードする例を紹介します。

ここでは、Java SDK を例として使用します。

次の方法の中からいずれか 1 つを使用して、SQL 文の実行結果をエクスポートします。

  • データ量が少ない場合は、SQL タスクを使用してすべてのクエリ結果をリストします。

  • 特定のテーブルまたはパーティションの結果をエクスポートする場合は、Tunnel を使用します。

  • SQL 文が複雑な場合は、Tunnel と SQL タスクを組み合わせて使用します。

  • DataWorks を使用すると、SQL 文の実行とデータの同期を簡単に行えます。 定期的なスケジューリングと、タスクの依存関係の設定がサポートされています。

  • オープンソースツール DataX を使用すると、maxcompute からターゲットデータソースへ簡単にデータをエクスポートできます。

SQL タスクを使用したデータのエクスポート

SQL タスクは、SDK から maxcompute SQL を直接呼び出すインターフェイスで、簡単に SQL を実行し、結果を取得できます。

SQLTask.getResult(i);は、周期的に反復可能なリストを戻して、完全な SQL 計算結果を取得します。 ただし、この方法には欠点があります。 詳細については、「その他のアクション」に記載されている SetProject READ_TABLE_MAX_ROW maid 機能をご参照ください。

現在、SELECT 文がクライアントへ返すデータレコードの最大値を 10,000 まで調節できます。 クライアント上または SQL タスクを使用して SELECT 文を実行する場合は、クエリの結果には Limit N が追加されます。Limit N は CREATE TABLE XX AS SELECT 文、または結果が INSERT INTO/OVERWRITE TABLE によって特定のテーブルに結合される場合は、適用されません。

Tunnel を使用したデータのエクスポート

テーブル (または特定のパーティション) の内容全体を返すクエリをエクスポートする必要がある場合は、tunnel によってエクスポートできます。詳細については、「コマンドライン」 ツールと、SDK に基づいて書かれた「tunnel SDK」をご参照ください。

Tunnel コマンドラインを使用してデータをエクスポートする方法の例を紹介します。 いくつかのコマンドラインを使用してデータをエクスポートできない場合のみ、Tunnel SDK をコンパイルします。 詳細については、「バッチデータ tunnel 概要」をご参照ください。

tunnel d wc_out c:\wc_out.dat;
2016-12-16 19:32:08 - new session: 201612161932082d3c9b0a012f68e7 total lines: 3
2016-12-16 19:32:08 - file [0]: [0, 3), c:\wc_out.dat
downloading 3 records into 1 file
2016-12-16 19:32:08 - file [0] start
2016-12-16 19:32:08 - file [0] OK. total: 21 bytes
download OK

SQL タスクと Tunnel を使用したデータのエクスポート

SQL タスクでは 10,000 件より多いレコードはエクスポートできませんが、Tunnel ではエクスポートできます。 データをエクスポートするために、SQL タスクと Tunnel を組み合わせて使用できます。

サンプルコードは次のとおりです。

private static final String accessId = "userAccessId";
private static final String accessKey = "userAccessKey";
private static final String endPoint = "http://service.odps.aliyun.com/api";
private static final String project = "userProject";
private static final String sql = "userSQL";
private static final String table = "Tmp_" + UUID.randomUUID().toString().replace("-", "_");//The name of the temporary table is a random string.
private static final Odps odps = getOdps();
public static void main(String[] args) {
System.out.println(table);
runSql();
tunnel();
}
/*
* Download the results returned by SQL Task.
* */
private static void tunnel() {
TableTunnel tunnel = new TableTunnel(odps);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(
project, table);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) ! = null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
/*
* Save the data record.
* If the data volume is small, you can print and copy the data directly.  In reality, you can use Java.io to write the data to a local file or a remote data storage.
* */
private static void consumeRecord(Record record, TableSchema schema) {
System.out.println(record.getString("username")+","+record.getBigint("cnt"));
}
/*
* Run an SQL statement to save the query results to a temporary table. The saved results can be downloaded using Tunnel.
* The lifecycle of the saved data is 1 day. The data does not occupy much storage space even when an error occurs while you delete the data.
* */
private static void runSql() {
Instance i;
StringBuilder sb = new StringBuilder("Create Table ").append(table)
.append(" lifecycle 1 as ").append(sql);
try {
System.out.println(sb.toString());
i = SQLTask.run(getOdps(), sb.toString());
i.waitForSuccess();
} catch (OdpsException e) {
e.printStackTrace();
}
}
/*
* Initialize the connection information of the MaxCompute (formerly ODPS) instance.
* */
private static Odps getOdps() {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
return odps;
}

DataWorks の同期を使用したデータのエクスポート

前述の方法を使用して、ダウンロードしたデータを保存することができます。 データを作成し、データの作成と保存との間のスケジューリング依存関係を実装するには、他の方法が必要です。

DataWorks では、データ同期タスクを設定し、定期的な実行複数のタスク間の依存関係を設定することで、データの作成から保存までのプロセスを完了します。

Data IDE を使用して SQL 文を実行し、データを作成およびエクスポートするためのデータ同期タスクを設定する例を紹介します。

手順
  1. SQL ノードとデータ同期ノードでワークフローを作成します。 2 つのノードを作成し、データ生成ノードとしての SQL ノードと、データエクスポートノードとしてのデータ同期ノードとのノード間依存関係を設定します。
  2. SQL ノードを設定します。
    同期を設定する前に、テーブルを作成するための SQL 文を実行します。 テーブルが存在しない場合は、同期タスクを設定できません。
  3. データ同期タスクを設定するには、次の手順を実行します。
    1. ソースを選択します。
    2. ターゲットを選択します。
    3. フィールドをマッピングします。
    4. tunnel を操作します。
    5. プレビューして保存します。
  4. ワークフロースケジューリングを設定したら、ワークフローを保存して、送信します。 Test Run をクリックします。 ワークフロースケジューリングを設定しない場合は、デフォルトのスケジューリング設定を直接使用できます。次の図のように、データ同期の実行中ログを表示します。
    2016-12-17 23:43:46.394 [job-15598025] INFO JobContainer - 
    Task start time : 2016-12-17 23:43:34
    Task end time : 2016-12-17 23:43:46
    Total task time : 11s
    Average data per task : 31.36 KB/s-
    Write speed : 1,668 rec/s
    Read records : 16,689
    Failed read-write attempts : 0
  5. データ同期の結果を表示するには、SQL 文を実行します。