Java の ExecutorService と TableTunnel インターフェイスを使用して、MaxCompute にデータを並行してアップロードします。各スレッドは排他的なブロックと RecordWriter を所有します。ブロックはスレッド間で共有してはなりません。
前提条件
開始する前に、以下を確認してください。
-
書き込み先のパーティションテーブルを持つ MaxCompute プロジェクト
-
プロジェクトの依存関係に追加された MaxCompute Java SDK
-
環境変数として保存された AccessKey ID と AccessKey Secret:
-
ALIBABA_CLOUD_ACCESS_KEY_ID -
ALIBABA_CLOUD_ACCESS_KEY_SECRET
-
-
(推奨) Alibaba Cloud ルートアカウントではなく、最小限必要な権限を持つ RAM ユーザー。ルートアカウントの認証情報は、AccessKey ペアがすべての API オペレーションに対する権限を持つため、リスクが高いです。RAM ユーザーは、Resource Access Management (RAM) コンソールで作成します。
仕組み
アップロードフローには、次の4つのステップがあります。
-
ターゲットテーブルパーティション用に単一の
UploadSessionを作成します。 -
各スレッドに対して、
uploadSession.openRecordWriter(blockId)を呼び出して専用のRecordWriterを開きます。ブロック ID はそのスレッドのデータセグメントを一意に識別し、2つのスレッドが同じブロックを共有することはありません。 -
各スレッドはレコードを埋め、自身の
RecordWriterを通じて書き込み、その後ライターを閉じます。 -
すべてのスレッドが完了した後、ブロック ID の完全なリストでアップロードセッションをコミットします。
複数スレッドでのデータアップロード
次の例では、10個のアップロードスレッドを作成し、それぞれが自身のブロックに10個のレコードを書き込みます。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
// 各スレッドは1つのブロック (RecordWriter) を所有し、そこに10個のレコードを書き込みます。
class UploadThread implements Callable<Boolean> {
private long id;
private RecordWriter recordWriter;
private Record record;
private TableSchema tableSchema;
public UploadThread(long id, RecordWriter recordWriter, Record record,
TableSchema tableSchema) {
this.id = id;
this.recordWriter = recordWriter;
this.record = record;
this.tableSchema = tableSchema;
}
@Override
public Boolean call() {
// 各列をそのデータ型に基づいて設定します
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
Column column = tableSchema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
boolean success = true;
try {
for (int i = 0; i < 10; i++) {
recordWriter.write(record);
}
} catch (IOException e) {
success = false;
e.printStackTrace();
} finally {
recordWriter.close();
}
return success;
}
}
public class UploadThreadSample {
// 環境変数から認証情報をロードします。ソースコードにハードコードしないでください。
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsUrl = "<http://service.odps.aliyun.com/api>";
// tunnelUrl を特定の Tunnel エンドポイントに設定するか、空白のままにしてパブリックエンドポイントを使用します。
// 以下の例では、中国 (上海) リージョンのクラシックネットワーク Tunnel エンドポイントを使用しています。
private static String tunnelUrl = "<http://dt.cn-shanghai.maxcompute.aliyun-inc.com>";
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
private static int threadNum = 10;
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);
PartitionSpec partitionSpec = new PartitionSpec(partition);
// ターゲットパーティション用に単一のアップロードセッションを作成します
UploadSession uploadSession = tunnel.createUploadSession(project,
table, partitionSpec);
System.out.println("Session Status is : " + uploadSession.getStatus().toString());
// スレッドごとに1つの RecordWriter (ブロック) を割り当てます
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < threadNum; i++) {
RecordWriter recordWriter = uploadSession.openRecordWriter(i);
Record record = uploadSession.newRecord();
callers.add(new UploadThread(i, recordWriter, record,
uploadSession.getSchema()));
}
// すべてのスレッドを実行し、完了するまで待機します
pool.invokeAll(callers);
pool.shutdown();
// すべてのブロックをコミットします
Long[] blockList = new Long[threadNum];
for (int i = 0; i < threadNum; i++)
blockList[i] = Long.valueOf(i);
uploadSession.commit(blockList);
System.out.println("upload success!");
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
コードを実行する前に、次のプレースホルダーを置き換えてください。
| プレースホルダー | 説明 | 例 |
|---|---|---|
<your project> |
MaxCompute プロジェクト名 | my_project |
<your table name> |
ターゲットテーブル名 | my_table |
<your partition spec> |
パーティション仕様 | ds=20240101 |
Tunnel エンドポイントの設定
ネットワークタイプに基づいて tunnelUrl を設定します。
| シナリオ | tunnelUrl |
|---|---|
| 空白のままにする | デフォルトで使用されるパブリックエンドポイント |
| 内部ネットワーク (クラシックネットワーク、中国 (上海)) | http://dt.cn-shanghai.maxcompute.aliyun-inc.com |
| その他のリージョンまたはネットワークタイプ | エンドポイント |
アップロードの検証
プログラムが upload success! と出力した後、次のコマンドを実行して MaxCompute のデータを検証します。
SELECT COUNT(*) FROM <your table name> WHERE <your partition spec>;
カウントは、すべてのスレッドで書き込まれたレコードの総数と等しくなるはずです。この例では、10 スレッド × 10 レコード = 100 レコードです。
セキュリティ
AccessKey の認証情報は、ソースコードではなく環境変数に保存してください。本番ワークロードでは、ルートアカウントの認証情報ではなく、最小限必要な権限を持つ RAM ユーザーを使用してください。RAM ユーザーを作成するには、Resource Access Management (RAM) コンソールにアクセスします。
次のステップ
-
エンドポイント — ご利用のリージョンおよびネットワークタイプの Tunnel エンドポイントを確認してください。