ログパイプラインや IoT イベントストリームなど、高頻度のデータ書き込みでは、各書き込みを個別のインポートとして処理すると、以下の 2 つの課題が生じます。まず、各書き込みごとにトランザクションオーバーヘッド(SQL 解析、実行計画生成)が発生します。また、各書き込みで新しいテーブルバージョンが作成され、バックグラウンドでのコンパクション圧力が加速します。グループコミット機能は、複数の INSERT INTO VALUES、Stream Load、および HTTP Stream インポートをサーバー側で単一の内部トランザクションにマージすることで、これらの課題を同時に解決します。これにより I/O オーバーヘッドが低減され、クライアント側のバッチ処理ロジックを必要とせずに書き込みスループットが向上します。
仕組み
グループコミットは独立したインポート方式ではありません。代わりに、条件を満たす INSERT INTO VALUES、Stream Load、および HTTP Stream リクエストをインターセプトし、1 回の内部コミットにバッチ処理します。自動コミットは、以下のいずれかのしきい値に達した時点でトリガーされます。
コミット間隔が経過(デフォルト:10 秒)
累積データサイズが上限に達(デフォルト:64 MB)
この動作は、以下の 3 つのモードで制御されます。
| モード | 動作 | 使用するシナリオ |
|---|---|---|
off_mode | グループコミットが無効化されています。INSERT INTO VALUES、Stream Load、および HTTP Stream は通常通り動作します。 | グループコミットが不要な場合 |
sync_mode | 負荷および group_commit_interval テーブルプロパティに基づき、複数のインポートを 1 つのトランザクションでバッチ処理します。トランザクションのコミット完了後に応答を返します。データは即時に可視化されます。 | データの即時可視化が必要な高同時実行数書き込み |
async_mode | まずデータを先行書き込みログ(WAL)に書き込み、直ちに応答を返します。その後、負荷および group_commit_interval に基づき非同期でコミットします。データはコミット後に可視化されます。大量のデータが検出された場合、自動的に sync_mode に切り替わります。 | 書き込みレイテンシを最優先とする高頻度書き込み |
sync_mode と async_mode の選択方法:
データがインポート直後に即時に可視化される必要がある高同時実行数のシナリオでは、
sync_modeを使用してください。このモードではトランザクションのコミット完了までブロックされるため、応答の返却時点でデータが永続化され、即時にクエリ可能であることが保証されます。書き込みレイテンシを最優先とする場合は、
async_modeを使用してください。サーバーはデータを WAL に書き込んだ時点でインポートを承認し、内部コミットの完了を待たずに応答を返します。内部コミットが失敗した場合、WAL ログを用いてデータを回復します。ただし、インポート応答の返却時点ではデータはまだ可視化されません。
サンプルテーブルの作成
本トピックの例では、以下のテーブルを使用します。
CREATE TABLE `dt` (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;JDBC を使用したデータインポート
ApsaraDB for SelectDB は、Java Database Connectivity(JDBC)経由での MySQL のプリペアドステートメント機能をサポートしています。プリペアドステートメントを使用すると、SQL ステートメントおよびそのインポート計画がセッションレベルのメモリにキャッシュされるため、繰り返しの INSERT 操作における CPU オーバーヘッドが低減されます。
プロジェクトに MySQL コネクタの依存関係を追加します。
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>サーバー側プリペアドステートメントを有効化した JDBC URL を構築します。
jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=truegroup_commitセッション変数を設定します。以下のいずれかの方法を使用できます。JDBC URL に付与する方法:
jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode接続時に SQL ステートメントを実行する方法:
java try (Statement statement = conn.createStatement()) { statement.execute("SET group_commit = async_mode;"); }
プリペアドステートメントを用いて行を挿入します。
private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true"; private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com"; private static final int PORT = 9030; private static final String DB = "db"; private static final String TBL = "dt"; private static final String USER = "admin"; private static final String PASSWD = "***"; private static final int INSERT_BATCH_SIZE = 10; public static void main(String[] args) { groupCommitInsert(); //groupCommitInsertBatch } private static void groupCommitInsert() throws Exception { Class.forName(JDBC_DRIVER); try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) { // セッション変数 'group_commit' を設定 try (Statement statement = conn.createStatement()) { statement.execute("SET group_commit = async_mode;"); } String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { for (int i = 0; i < INSERT_BATCH_SIZE; i++) { stmt.setInt(1, i); stmt.setString(2, "name" + i); stmt.setInt(3, i + 10); int result = stmt.executeUpdate(); System.out.println("rows: " + result); } } } catch (Exception e) { e.printStackTrace(); } } private static void groupCommitInsertBatch() throws Exception { Class.forName(JDBC_DRIVER); // JDBC URL に rewriteBatchedStatements=true および cachePrepStmts=true を追加 // JDBC URL に sessionVariables=group_commit=async_mode を指定してセッション変数を設定 try (Connection conn = DriverManager.getConnection( String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) { String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { for (int j = 0; j < 5; j++) { // 1 回の INSERT で 10 行 for (int i = 0; i < INSERT_BATCH_SIZE; i++) { stmt.setInt(1, i); stmt.setString(2, "name" + i); stmt.setInt(3, i + 10); stmt.addBatch(); } int[] result = stmt.executeBatch(); } } } catch (Exception e) { e.printStackTrace(); } }
INSERT INTO を使用したデータインポート
INSERT INTO ステートメントを実行する前に、group_commit セッション変数を設定することで、グループコミットを有効化します。
非同期モード — データはバックグラウンドでバッチ処理・コミットされます。
-- 非同期モードでグループコミットを有効化します。デフォルト値は off_mode です。
mysql> SET group_commit = async_mode;
-- 返された label が "group_commit" で始まることから、グループコミットが有効であることが確認できます。
mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99);
Query OK, 2 rows affected (0.05 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
-- 同じ label および txnId を共有する連続した INSERT は、1 つのインポートジョブにバッチ処理されます。
mysql> INSERT INTO dt(id, name) VALUES(3, 'John');
Query OK, 1 row affected (0.01 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
-- インポート応答の返却時点ではデータはまだ可視化されません。
mysql> SELECT * FROM dt;
Empty SET (0.01 sec)
-- 約 10 秒後(group_commit_interval で制御)にデータが可視化されます。
mysql> SELECT * FROM dt;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
+------+-------+-------+
3 rows in set (0.02 sec)同期モード — トランザクションのコミット完了後にのみ応答を返します。データは即時に可視化されます。
-- 同期モードでグループコミットを有効化します。
mysql> SET group_commit = sync_mode;
-- コミット間隔は group_commit_interval で制御されます。呼び出しはトランザクションのコミット完了までブロックされます。
mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99);
Query OK, 2 rows affected (10.06 sec)
{'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
-- データは即時に可視化されます。
mysql> SELECT * FROM dt;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
| 4 | Bob | 90 |
| 5 | Alice | 99 |
+------+-------+-------+
5 rows in set (0.03 sec)グループコミットの無効化:
mysql> SET group_commit = off_mode;Stream Load を使用したデータインポート
ログまたは HTTP ベースのパイプラインでは、Stream Load リクエストでグループコミットを有効化するために、group_commit ヘッダーを渡します。Stream Load の詳細については、「Stream Load」をご参照ください。
data.csvというファイルを作成します。6,Amy,60 7,Ross,98適切なモードのヘッダーを指定してインポートを実行します。非同期モード:
# group_commit:async_mode をリクエストヘッダーとして渡します。 curl --location-trusted -u {user}:{passwd} -T data.csv \ -H "group_commit:async_mode" \ -H "column_separator:," \ http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load期待される応答:
{ "TxnId": 7009, "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", "Comment": "", "GroupCommit": true, "Status": "Success", "Message": "OK", "NumberTotalRows": 2, "NumberLoadedRows": 2, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 19, "LoadTimeMs": 35, "StreamLoadPutTimeMs": 5, "ReadDataTimeMs": 0, "WriteDataTimeMs": 26 }同期モード:
# group_commit:sync_mode をリクエストヘッダーとして渡します。 curl --location-trusted -u {user}:{passwd} -T data.csv \ -H "group_commit:sync_mode" \ -H "column_separator:," \ http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load期待される応答:
{ "TxnId": 3009, "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", "Comment": "", "GroupCommit": true, "Status": "Success", "Message": "OK", "NumberTotalRows": 2, "NumberLoadedRows": 2, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 19, "LoadTimeMs": 10044, "StreamLoadPutTimeMs": 4, "ReadDataTimeMs": 0, "WriteDataTimeMs": 10038 }応答内の
"GroupCommit": trueは、グループコミットが有効であることを確認します。ラベルは常にgroup_commitで始まります。
自動コミットしきい値の設定
ALTER TABLE を使用して、テーブルごとにコミット間隔またはデータサイズのしきい値を調整できます。
コミット間隔
デフォルトのコミット間隔は 10 秒です。
-- コミット間隔を 2 秒に変更します。
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");トレードオフ:
| 設定 | メリット | デメリット |
|---|---|---|
| 短い間隔(例:2 秒) | データ可視化までのレイテンシが低減 | コミット頻度が増加し、バージョン成長が速くなり、バックグラウンドのコンパクション圧力が高まる |
| 長い間隔(例:30 秒) | 1 回のコミットでより多くのデータが処理され、システムオーバーヘッドが低減 | データ可視化までのレイテンシが増加 |
書き込みからデータがクエリ可能になるまでのレイテンシをアプリケーションが許容できる範囲に基づいて間隔を設定してください。システムが高コンパクション圧力下にある場合は、間隔を延長してください。
データサイズしきい値
自動コミットのデフォルトデータサイズしきい値は 64 MB です。
-- データサイズしきい値を 128 MB に変更します。
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");制限事項
INSERT INTO VALUES の機能低下
グループコミットが有効になると、以下の INSERT INTO VALUES 文は自動的に非グループコミットモードにスペックダウンされます。
明示的なトランザクション内に記述されたもの:
BEGIN;INSERT INTO VALUES;COMMITラベルが指定されたもの:
INSERT INTO dt WITH LABEL {label} VALUESVALUES 内に式が含まれるもの:例
INSERT INTO dt VALUES (1 + 100)カラム更新を使用して記述されたもの
対象テーブルが軽量スキーマ変更をサポートしない場合
Stream Load および HTTP Stream の機能低下
以下の Stream Load および HTTP Stream ジョブは、自動的に非グループコミットモードにスペックダウンされます。
-H "label:my_label"を使用してラベルが指定されたもの2 フェーズコミット(2PC)モードが使用されているもの
カラム更新を使用して記述されたもの
対象テーブルが軽量スキーマ変更をサポートしない場合
Unique Key モデル
グループコミットは、Unique Key モデルにおいてコミット順序を保証しません。データ整合性を確保するには、シーケンス列とともにグループコミットを使用してください。
max_filter_ratio のサポート
標準インポートモードでは、filter_ratio は、失敗した行数と合計行数の比率に基づきコミットを判断します。グループコミットモードでは、複数のクライアントからのインポートが 1 つの内部インポートにマージされ、単位として 1 回だけコミットされます。
グループコミットは max_filter_ratio のセマンティクスを部分的にサポートします。このセマンティクスは、インポートされた総行数がバックエンド(BE)設定項目 group_commit_memory_rows_for_max_filter_ratio の値を超えない場合にのみ有効になります。デフォルト値は 10000 です。
async_mode における WAL の動作
async_mode では、各インポートはまず WAL ログに書き込まれます。
内部コミットが成功した場合、WAL ログは直ちに削除されます。
内部コミットが失敗した場合、WAL ログを用いてデータを回復します。
ディスク領域を保護するために、以下の状況でシステムは async_mode から sync_mode に自動的に切り替わります。
インポートされたデータが単一の WAL ディレクトリの 80 % を超える場合。
合計データサイズが不明なチャンク化された Stream Load ジョブが送信された場合。
データ量は小さいものの、利用可能なディスク領域が不足している場合。
スキーマ変更
重いスキーマ変更が最終メタデータ変更フェーズに入っている場合、WAL ログがテーブルスキーマと互換性を保つため、システムは新しいグループコミットを拒否します。影響を受けるクライアントには以下の例外が返されます。
insert table ${table_name} is blocked on schema changeこの例外が発生した場合、クライアント側でインポートを再試行してください。
カラムの追加・削除、VARCHAR 長の変更、カラム名の変更などの軽量スキーマ変更は、グループコミットをブロックしません。それ以外のすべてのスキーマ変更は重いスキーマ変更です。