すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:マルチスレッドモードでのデータアップロード

最終更新日:Mar 27, 2026

Java の ExecutorServiceTableTunnel インターフェイスを使用して、MaxCompute にデータを並行してアップロードします。各スレッドは排他的なブロックと RecordWriter を所有します。ブロックはスレッド間で共有してはなりません。

前提条件

開始する前に、以下を確認してください。

  • 書き込み先のパーティションテーブルを持つ MaxCompute プロジェクト

  • プロジェクトの依存関係に追加された MaxCompute Java SDK

  • 環境変数として保存された AccessKey ID と AccessKey Secret:

    • ALIBABA_CLOUD_ACCESS_KEY_ID

    • ALIBABA_CLOUD_ACCESS_KEY_SECRET

  • (推奨) Alibaba Cloud ルートアカウントではなく、最小限必要な権限を持つ RAM ユーザー。ルートアカウントの認証情報は、AccessKey ペアがすべての API オペレーションに対する権限を持つため、リスクが高いです。RAM ユーザーは、Resource Access Management (RAM) コンソールで作成します。

仕組み

アップロードフローには、次の4つのステップがあります。

  1. ターゲットテーブルパーティション用に単一の UploadSession を作成します。

  2. 各スレッドに対して、uploadSession.openRecordWriter(blockId) を呼び出して専用の RecordWriter を開きます。ブロック ID はそのスレッドのデータセグメントを一意に識別し、2つのスレッドが同じブロックを共有することはありません。

  3. 各スレッドはレコードを埋め、自身の RecordWriter を通じて書き込み、その後ライターを閉じます。

  4. すべてのスレッドが完了した後、ブロック ID の完全なリストでアップロードセッションをコミットします。

複数スレッドでのデータアップロード

次の例では、10個のアップロードスレッドを作成し、それぞれが自身のブロックに10個のレコードを書き込みます。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;

// 各スレッドは1つのブロック (RecordWriter) を所有し、そこに10個のレコードを書き込みます。
class UploadThread implements Callable<Boolean> {
    private long id;
    private RecordWriter recordWriter;
    private Record record;
    private TableSchema tableSchema;

    public UploadThread(long id, RecordWriter recordWriter, Record record,
                    TableSchema tableSchema) {
        this.id = id;
        this.recordWriter = recordWriter;
        this.record = record;
        this.tableSchema = tableSchema;
    }

    @Override
    public Boolean call() {
        // 各列をそのデータ型に基づいて設定します
        for (int i = 0; i < tableSchema.getColumns().size(); i++) {
            Column column = tableSchema.getColumn(i);
            switch (column.getType()) {
            case BIGINT:
                record.setBigint(i, 1L);
                break;
            case BOOLEAN:
                record.setBoolean(i, true);
                break;
            case DATETIME:
                record.setDatetime(i, new Date());
                break;
            case DOUBLE:
                record.setDouble(i, 0.0);
                break;
            case STRING:
                record.setString(i, "sample");
                break;
            default:
                throw new RuntimeException("Unknown column type: "
                                + column.getType());
            }
        }

        boolean success = true;
        try {
            for (int i = 0; i < 10; i++) {
                recordWriter.write(record);
            }
        } catch (IOException e) {
            success = false;
            e.printStackTrace();
        } finally {
            recordWriter.close();
        }
        return success;
    }
}

public class UploadThreadSample {
    // 環境変数から認証情報をロードします。ソースコードにハードコードしないでください。
    private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

    private static String odpsUrl = "<http://service.odps.aliyun.com/api>";

    // tunnelUrl を特定の Tunnel エンドポイントに設定するか、空白のままにしてパブリックエンドポイントを使用します。
    // 以下の例では、中国 (上海) リージョンのクラシックネットワーク Tunnel エンドポイントを使用しています。
    private static String tunnelUrl = "<http://dt.cn-shanghai.maxcompute.aliyun-inc.com>";

    private static String project = "<your project>";
    private static String table = "<your table name>";
    private static String partition = "<your partition spec>";
    private static int threadNum = 10;

    public static void main(String args[]) {
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);

        try {
            TableTunnel tunnel = new TableTunnel(odps);
            tunnel.setEndpoint(tunnelUrl);

            PartitionSpec partitionSpec = new PartitionSpec(partition);

            // ターゲットパーティション用に単一のアップロードセッションを作成します
            UploadSession uploadSession = tunnel.createUploadSession(project,
                            table, partitionSpec);
            System.out.println("Session Status is : " + uploadSession.getStatus().toString());

            // スレッドごとに1つの RecordWriter (ブロック) を割り当てます
            ExecutorService pool = Executors.newFixedThreadPool(threadNum);
            ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
            for (int i = 0; i < threadNum; i++) {
                RecordWriter recordWriter = uploadSession.openRecordWriter(i);
                Record record = uploadSession.newRecord();
                callers.add(new UploadThread(i, recordWriter, record,
                                uploadSession.getSchema()));
            }

            // すべてのスレッドを実行し、完了するまで待機します
            pool.invokeAll(callers);
            pool.shutdown();

            // すべてのブロックをコミットします
            Long[] blockList = new Long[threadNum];
            for (int i = 0; i < threadNum; i++)
                blockList[i] = Long.valueOf(i);
            uploadSession.commit(blockList);
            System.out.println("upload success!");

        } catch (TunnelException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

コードを実行する前に、次のプレースホルダーを置き換えてください。

プレースホルダー 説明
<your project> MaxCompute プロジェクト名 my_project
<your table name> ターゲットテーブル名 my_table
<your partition spec> パーティション仕様 ds=20240101

Tunnel エンドポイントの設定

ネットワークタイプに基づいて tunnelUrl を設定します。

シナリオ tunnelUrl
空白のままにする デフォルトで使用されるパブリックエンドポイント
内部ネットワーク (クラシックネットワーク、中国 (上海)) http://dt.cn-shanghai.maxcompute.aliyun-inc.com
その他のリージョンまたはネットワークタイプ エンドポイント

アップロードの検証

プログラムが upload success! と出力した後、次のコマンドを実行して MaxCompute のデータを検証します。

SELECT COUNT(*) FROM <your table name> WHERE <your partition spec>;

カウントは、すべてのスレッドで書き込まれたレコードの総数と等しくなるはずです。この例では、10 スレッド × 10 レコード = 100 レコードです。

セキュリティ

AccessKey の認証情報は、ソースコードではなく環境変数に保存してください。本番ワークロードでは、ルートアカウントの認証情報ではなく、最小限必要な権限を持つ RAM ユーザーを使用してください。RAM ユーザーを作成するには、Resource Access Management (RAM) コンソールにアクセスします。

次のステップ

  • エンドポイント — ご利用のリージョンおよびネットワークタイプの Tunnel エンドポイントを確認してください。