すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:グループコミット機能を使用してデータをインポートする

最終更新日:May 01, 2025

サーバー上のリクエストのグループコミットは、データ書き込みパフォーマンスを最適化する技術です。グループコミット機能は、高並列データ書き込みが実行されるシナリオに適しています。この機能は、INSERT などの操作を実行することにより、複数のデータ書き込みリクエストを一度にコミットします。これにより、I/O オーバーヘッドが削減され、書き込みスループットが向上します。

概要

グループコミット機能は新しいインポート方法ではなく、データ書き込みパフォーマンスを最適化する技術です。複数の INSERT INTO tbl VALUES(...) または Stream Load 文を一度にコミットして、I/O オーバーヘッドを削減し、書き込みスループットを向上させます。アプリケーションは Java Database Connectivity(JDBC)を直接使用して、ApsaraDB for SelectDB インスタンスに高頻度でデータを書き込むことができます。さらに、プリペアドステートメントを使用することで、より高いパフォーマンスを得ることができます。ロギングシナリオでは、Stream Load または HTTP Stream を使用して、SelectDB インスタンスにデータを書き込むことができます。グループコミット機能は、次のモードをサポートしています。

  • off_mode

    グループコミット機能は無効になっています。この場合、INSERT INTO VALUES、Stream Load、および HTTP Stream のデフォルトの動作が保持されます。

  • sync_mode

    SelectDB は、負荷とテーブルの group_commit_interval プロパティに基づいて、1 つのトランザクションで複数のインポート操作をコミットします。トランザクションがコミットされると、インポート結果が返されます。このモードは、高並列データ書き込みが実行され、インポート完了後すぐにデータが表示される必要があるシナリオに適用されます。

  • async_mode

    非同期モードは、データが高頻度で書き込まれ、システムが書き込みレイテンシに敏感なシナリオに適用されます。SelectDB は最初にデータを先行書き込みログ(WAL)ログに書き込みます。その後、インポート結果がすぐに返されます。SelectDB は、負荷とテーブルの group_commit_interval プロパティに基づいて、データを非同期的にコミットします。データはコミット後に表示されます。一度に大量のデータがインポートされる場合、モードは自動的に sync_mode に切り替わります。これにより、WAL ログが大量のディスク容量を占有するのを防ぎます。

制限事項

  • グループコミット機能が有効になっている場合、システムは開始した INSERT INTO VALUES 文がグループコミットの条件を満たしているかどうかを判断します。満たしている場合、文はグループコミットモードで実行されます。ただし、次の条件を満たす文は、自動的に非グループコミットモードにダウングレードされます。

    • トランザクションを使用してデータが書き込まれる。この場合、データは BEGIN; INSERT INTO VALUES; COMMIT 文を使用して書き込まれます。

    • ラベルが次の形式で文に指定されている:INSERT INTO dt WITH LABEL {label} VALUES

    • VALUES に式が含まれている。例:INSERT INTO dt VALUES (1 + 100)

    • 列の更新を使用してデータが書き込まれる。

    • データが書き込まれるテーブルがライトスキーマ変更をサポートしていない。

  • グループコミット機能が有効になっている場合、システムは開始した Stream Load または HTTP Stream ジョブがグループコミットの条件を満たしているかどうかを判断します。満たしている場合、インポートはグループコミットモードで実行されます。ただし、次の条件を満たすジョブは、自動的に非グループコミットモードにダウングレードされます。

    • -H "label:my_label" を使用してラベルが指定されている。

    • 2 フェーズコミット(2PC)モードが使用されている。

    • 列の更新を使用してデータが書き込まれる。

    • データが書き込まれるテーブルがライトスキーマ変更をサポートしていない。

  • 一意キーモデルが使用されている場合、グループコミット機能はコミットシーケンスを保証できません。この場合、グループコミット機能をシーケンス列と共に使用して、データの整合性を確保できます。

  • max_filter_ratio セマンティクスのサポート:

    • デフォルトのインポートモードでは、filter_ratio パラメーターは、インポート完了後の失敗した行数と合計行数に基づいてデータをコミットするかどうかを決定します。

    • グループコミットモードでは、複数のクライアントによって開始されたインポート操作は、1 つの内部インポートによって実行されます。各インポート操作の filter_ratio パラメーターの値を計算できますが、トランザクションはデータが内部インポートプロセスに入った後にのみコミットできます。

    • グループコミット機能は、max_filter_ratio セマンティクスを部分的にサポートしています。インポートされた行の合計数が group_commit_memory_rows_for_max_filter_ratio 設定項目の値を超えない場合、max_filter_ratio セマンティクスが有効になります。 group_commit_memory_rows_for_max_filter_ratio は、デフォルト値が 10000 のバックエンド(BE)設定項目です。

  • WAL の制限:

    • グループコミットモードが async_mode に設定されている場合、データは WAL ログに書き込まれます。内部インポートが成功すると、WAL ログはすぐに削除されます。内部インポートが失敗した場合、WAL ログを使用してデータを復元できます。

    • 十分なディスク容量を確保するために、次のシナリオでは、グループコミットモードが async_mode から sync_mode に自動的に切り替わります。

      • インポートされたデータの量が、単一の WAL ディレクトリの容量の 80% 以上を占めている。

      • チャンク化された Stream Load ジョブが開始され、不明な量のデータがインポートされる。

      • インポートされたデータの量は少ないが、使用可能なディスク容量が不足している。

    • 重大なスキーマ変更が発生した場合、システムはスキーマ変更の最終フェーズであるメタデータ変更フェーズでグループコミットを拒否して、WAL ログをテーブルスキーマに適合できるようにします。この場合、クライアントは例外 insert table ${table_name} is blocked on schema change を受信します。この例外を受信した場合は、クライアントでデータインポートを再試行してください。列を追加または削除するスキーマ変更、VARCHAR の長さを変更するスキーマ変更、または列の名前を変更するスキーマ変更は、ライトスキーマ変更です。その他すべてのスキーマ変更は、重大なスキーマ変更です。

dt という名前のテーブルを作成します。

CREATE TABLE `dt` (
    `id` int(11) NOT NULL,
    `name` varchar(50) NULL,
    `score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;

JDBC を使用する

JDBC を使用して INSERT INTO VALUES 文を実行してデータをインポートする場合、ApsaraDB for SelectDB は MySQL の prepared statement 機能をサポートしており、SQL 解析とプラン生成のオーバーヘッドを削減します。prepared statements を使用すると、SQL 文とそのインポートプランはセッションレベルのメモリキャッシュにキャッシュされます。キャッシュされたオブジェクトは、後続のインポートに使用できます。これにより、クラスタの CPU 使用率が削減されます。次の例では、prepared statement と JDBC を使用してデータをインポートします。

  1. 依存関係を追加します。

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
  2. JDBC URL を指定し、サーバーで prepared statement 機能を有効にします。

    url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true
  3. 次のいずれかの方法でセッション変数 group_commit を設定します。

    • sessionVariables=group_commit=async_mode を JDBC URL に追加します。

      url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode
    • 次の SQL 文を実行します。

      try (Statement statement = conn.createStatement()) {
          statement.execute("SET group_commit = async_mode;");
      }
  4. prepared statement を使用します。

        private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
        private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true";
        private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com";
        private static final int PORT = 9030;
        private static final String DB = "db";
        private static final String TBL = "dt";
        private static final String USER = "admin";
        private static final String PASSWD = "***";
        private static final int INSERT_BATCH_SIZE = 10;
        
        public static void main(String[] args) {
            groupCommitInsert();
            //groupCommitInsertBatch // グループコミット挿入バッチ
        }
        
        private static void groupCommitInsert() throws Exception {
            Class.forName(JDBC_DRIVER);
            try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) {
                // セッション変数 'group_commit' を設定する
                try (Statement statement = conn.createStatement()) {
                    statement.execute("SET group_commit = async_mode;");
                }
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                        stmt.setInt(1, i);
                        stmt.setString(2, "name" + i);
                        stmt.setInt(3, i + 10);
                        int result = stmt.executeUpdate();
                        System.out.println("rows: " + result); // 行:
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static void groupCommitInsertBatch() throws Exception { // グループコミット挿入バッチ
            Class.forName(JDBC_DRIVER);
            // JDBC url に rewriteBatchedStatements=true と cachePrepStmts=true を追加する
            // JDBC url の sessionVariables=group_commit=async_mode でセッション変数を設定する
            try (Connection conn = DriverManager.getConnection(
                    String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) {
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int j = 0; j < 5; j++) {
                        // 挿入ごとに 10 行
                        for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                            stmt.setInt(1, i);
                            stmt.setString(2, "name" + i);
                            stmt.setInt(3, i + 10);
                            stmt.addBatch();
                        }
                        int[] result = stmt.executeBatch();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

INSERT INTO を使用する

このセクションでは、非同期モードと同期モードで INSERT INTO 文を実行してデータを挿入する方法について説明します。

  • 非同期モード

    -- セッション変数 group_commit を設定して、非同期モードでグループコミット機能を有効にします。変数のデフォルト値は off_mode です。
    mysql> SET group_commit = async_mode;
    
    -- 返されるラベルは group_commit で始まります。これは、グループコミット機能が有効になっていることを示します。
    mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99);
    Query OK, 2 rows affected (0.05 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- 次の文のラベルと txnId は、前の文と同じです。これは、2 つの文が同じインポートジョブで実行されていることを示します。
    mysql> INSERT INTO dt(id, name) VALUES(3, 'John');
    Query OK, 1 row affected (0.01 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- インポートされたデータは、インポート完了直後にクエリできません。
    mysql> SELECT * FROM dt;
    Empty SET (0.01 sec)
    
    -- 10 秒後にクエリを実行します。インポートされたデータをクエリできます。テーブルプロパティ group_commit_interval を使用して、データの可視性のレイテンシを制御できます。
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    +------+-------+-------+
    3 rows in set (0.02 sec)
  • 同期モード

    -- セッション変数 group_commit を設定して、同期モードでグループコミット機能を有効にします。変数のデフォルト値は off_mode です。
    mysql> SET group_commit = sync_mode;
    
    -- コミット間隔は、テーブルプロパティ group_commit_interval によって指定されます。返されるラベルは group_commit で始まります。これは、グループコミット機能が有効になっていることを示します。
    mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99);
    Query OK, 2 rows affected (10.06 sec)
    {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
    
    -- インポートされたデータは、インポート完了直後にクエリできます。
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    |    4 | Bob   |    90 |
    |    5 | Alice |    99 |
    +------+-------+-------+
    5 rows in set (0.03 sec)
  • グループコミット機能を無効にする

    mysql> SET group_commit = off_mode;

Stream Load を使用する

Stream Load 機能の詳細については、「Stream Load」をご参照ください。

  1. data.csv という名前のファイルを作成し、次の内容をファイルに追加します。

    6,Amy,60
    7,Ross,98
  2. Stream Load を使用して、非同期モードまたは同期モードでデータをインポートします。

    • 非同期モード

      # インポートの URL に group_commit:async_mode ヘッダーを追加します。
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 7009,
          "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",  // 成功
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 35,
          "StreamLoadPutTimeMs": 5,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 26
      }
      
      # 返される GroupCommit パラメーターの値は true です。これは、グループコミットが有効になっていることを示します。
      # このインポートに関連付けられているラベルは、group_commit で始まります。

    • 同期モード

      # インポートの URL に group_commit:sync_mode ヘッダーを追加します。
      
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 3009,
          "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success", // 成功
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 10044,
          "StreamLoadPutTimeMs": 4,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 10038
      }
      
      # 返される GroupCommit パラメーターの値は true です。これは、グループコミットが有効になっていることを示します。
      # このインポートに関連付けられているラベルは、group_commit で始まります。

自動コミットの条件

データは、コミット間隔に基づいて、またはデータサイズが指定された値に達したときに自動的にコミットされます。デフォルトのコミット間隔は 10 秒で、自動コミットのデフォルトのデータサイズは 64 MB です。

コミット間隔

デフォルトのコミット間隔は 10 秒です。次の文を実行して、テーブルのコミット間隔を変更できます。

-- コミット間隔を 2 秒に変更します。
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");

自動コミットのデータサイズ

自動コミットのデフォルトのデータサイズは 64 MB です。次の文を実行して、テーブルの自動コミットのデータサイズを変更できます。

-- 自動コミットのデータサイズを 128 MB に変更します。
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");