全部產品
Search
文件中心

ApsaraDB for SelectDB:Group Commit

更新時間:Apr 26, 2025

Group Commit(服務端攢批) 是一種最佳化寫入效能的技術,主要用於在高並發寫入情境下,將多個獨立的寫入操作(如INSERT)合并為一個批次提交,從而減少I/O開銷、提升寫入輸送量。

概述

Group Commit不是一種新的匯入方式,而是一種最佳化寫入效能的技術。它可以將多個獨立的INSERT INTO tbl VALUES(...)Stream Load合并為一個批次提交,從而減少I/O開銷、提升寫入輸送量。您的應用程式可以直接使用JDBC將資料高頻寫入ApsaraDB for SelectDB,同時通過使用PreparedStatement可以獲得更高的效能。在日誌情境下,您也可以利用Stream Load或者Http Stream將資料高頻寫入ApsaraDB for SelectDB。Group Commit寫入有如下三種模式。

  • 關閉模式(off_mode)

    不開啟Group Commit,保持以上三種匯入方式的預設行為。

  • 同步模式(sync_mode)

    SelectDB會根據負載和表的group_commit_interval屬性將多個匯入在一個事務提交,事務提交後返回匯入結果。這適用於高並發寫入情境,且在匯入完成後要求資料立即可見。

  • 非同步模式(async_mode)

    非同步模式適用於寫入延遲敏感以及高頻寫入的情境。SelectDB首先將資料寫入WAL(Write Ahead Log),然後立即返回匯入結果。SelectDB會根據負載和表的group_commit_interval屬性非同步提交資料,提交之後資料可見。為了防止WAL佔用較大的磁碟空間,單次匯入資料量較大時,會自動切換為sync_mode

使用限制

  • 當開啟了Group Commit模式,系統會判斷您發起的INSERT INTO VALUES語句是否符合Group Commit的條件,如果符合,該語句的執行會進入到Group Commit寫入中。但符合如下條件的語句會自動退化為非Group Commit方式。

    • 事務寫入:即Begin;INSERT INTO VALUES;COMMIT方式

    • 指定Label:即INSERT INTO dt WITH LABEL {label} VALUES

    • VALUES中包含運算式:即INSERT INTO dt VALUES (1 + 100)

    • 列更新寫入。

    • 表不支援light schema change。

  • 當開啟了Group Commit模式,系統會判斷您發起的Stream LoadHttp Stream是否符合Group Commit的條件,如果符合,該匯入的執行會進入到Group Commit寫入中。但符合如下條件的會自動退化為非Group Commit方式。

    • 指定Label:即通過-H "label:my_label"設定。

    • 兩階段交易認可。

    • 列更新寫入。

    • 表不支援light schema change。

  • 對於Unique模型,由於Group Commit不能保證提交順序,您可以配合Sequence列使用來保證資料一致性。

  • max_filter_ratio語義的支援。

    • 在預設的匯入中,filter_ratio是匯入完成後,通過失敗的行數和總行數計算,決定是否提交本次寫入。

    • 在Group Commit模式下,由於多個用戶端發起的匯入都會由一個內部匯入執行,雖然可以計算出每個匯入的filter_ratio,但是資料一旦進入內部匯入,就只能commit transaction。

    • Group Commit模式支援了一定程度的max_filter_ratio語義,當匯入的總行數不高於group_commit_memory_rows_for_max_filter_ratio(BE配置參數,預設為10000行),max_filter_ratio正常工作。

  • WAL限制

    • 對於async_mode的Group Commit寫入,會把資料寫入WAL。如果內部匯入成功,則WAL被立刻刪除;如果內部匯入失敗,通過匯入WAL的方法來恢複資料。

    • 對於async_mode的Group Commit寫入,為了保護磁碟空間,當遇到如下情況時,會切換成sync_mode

      • 匯入資料量過大,即超過WAL單目錄的80%空間。

      • 資料量規模不確定的chunked stream load。

      • 匯入資料量不大,但磁碟可用空間不足。

    • 當發生重量級Schema Change(目前加減列、修改varchar長度和重新命名列是輕量級Schema Change,其它的是重量級Schema Change)時,為了保證WAL能夠適配表的Schema,在Schema Change最後的修改中繼資料階段,會拒絕Group Commit寫入,用戶端收到insert table ${table_name} is blocked on schema change 異常,請您在用戶端重試即可。

使用樣本

建立表dt,樣本如下。

CREATE TABLE `dt` (
    `id` int(11) NOT NULL,
    `name` varchar(50) NULL,
    `score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;

使用JDBC方式

當您使用JDBCINSERT INTO VALUES方式寫入時,為了減少SQL解析和產生規劃的開銷,ApsaraDB for SelectDB支援了MySQL協議的PreparedStatement特性。當您使用PreparedStatement時,SQL和其匯入規劃將被緩衝到Session層級的記憶體緩衝中,後續的匯入直接使用緩衝對象,降低了叢集的CPU壓力。下面是在JDBC中使用PreparedStatement的使用樣本。

  1. 添加依賴。

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
  2. 設定JDBC URL並在Server端開啟PreparedStatement。

    url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true
  3. 配置group_commitsession變數,有如下兩種方式。

    • 通過JDBC URL設定,增加參數sessionVariables=group_commit=async_mode

      url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode
    • 通過執行SQL設定

      try (Statement statement = conn.createStatement()) {
          statement.execute("SET group_commit = async_mode;");
      }
  4. 使用PreparedStatement

        private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
        private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true";
        private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com";
        private static final int PORT = 9030;
        private static final String DB = "db";
        private static final String TBL = "dt";
        private static final String USER = "admin";
        private static final String PASSWD = "***";
        private static final int INSERT_BATCH_SIZE = 10;
        
        public static void main(String[] args) {
            groupCommitInsert();
            //groupCommitInsertBatch
        }
        
        private static void groupCommitInsert() throws Exception {
            Class.forName(JDBC_DRIVER);
            try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) {
                // set session variable 'group_commit'
                try (Statement statement = conn.createStatement()) {
                    statement.execute("SET group_commit = async_mode;");
                }
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                        stmt.setInt(1, i);
                        stmt.setString(2, "name" + i);
                        stmt.setInt(3, i + 10);
                        int result = stmt.executeUpdate();
                        System.out.println("rows: " + result);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static void groupCommitInsertBatch() throws Exception {
            Class.forName(JDBC_DRIVER);
            // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url
            // set session variables by sessionVariables=group_commit=async_mode in JDBC url
            try (Connection conn = DriverManager.getConnection(
                    String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) {
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int j = 0; j < 5; j++) {
                        // 10 rows per insert
                        for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                            stmt.setInt(1, i);
                            stmt.setString(2, "name" + i);
                            stmt.setInt(3, i + 10);
                            stmt.addBatch();
                        }
                        int[] result = stmt.executeBatch();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

使用INSERT INTO方式

以下分別介紹使用INSERT INTO插入資料的非同步和同步模式。

  • 非同步模式

    -- 配置 session 變數開啟 group commit (預設為 off_mode),開啟非同步模式。
    mysql> SET group_commit = async_mode;
    
    -- 這裡返回的 label 是 group_commit 開頭的,用於區分是否使用了 group commit模式。
    mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99);
    Query OK, 2 rows affected (0.05 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- 如下 label 和 txn_id 和上述插入的相同,說明是攢到了同一個匯入任務中。
    mysql> INSERT INTO dt(id, name) VALUES(3, 'John');
    Query OK, 1 row affected (0.01 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- 不可以立刻查詢到匯入結果。
    mysql> SELECT * FROM dt;
    Empty SET (0.01 sec)
    
    -- 過10 秒後執行查詢,可以查詢到,可以通過表屬性 group_commit_interval 控制資料可見的延遲。
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    +------+-------+-------+
    3 rows in set (0.02 sec)
  • 同步模式

    -- 配置 session 變數開啟 group commit (預設為 off_mode),開啟同步模式。
    mysql> SET group_commit = sync_mode;
    
    -- 匯入耗時為表屬性group_commit_interval。返回的 label 是 group_commit 開頭的,可以區分出是否用了 group commit 模式。
    mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99);
    Query OK, 2 rows affected (10.06 sec)
    {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
    
    -- 可以立刻查詢到匯入結果。
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    |    4 | Bob   |    90 |
    |    5 | Alice |    99 |
    +------+-------+-------+
    5 rows in set (0.03 sec)
  • 關閉group commit模式

    mysql> SET group_commit = off_mode;

Stream Load

Stream Load詳情請參見Stream Load

  1. 建立data.csv檔案,內容如下。

    6,Amy,60
    7,Ross,98
  2. 以下分別介紹如何使用Stream Load開啟非同步模式或同步模式匯入資料。

    • 非同步模式

      # 匯入時在 header 中增加配置"group_commit:async_mode"。
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 7009,
          "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 35,
          "StreamLoadPutTimeMs": 5,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 26
      }
      
      # 返回的參數 GroupCommit 為 true,說明進入了 group commit 的流程。
      # 返回的 Label 是 group_commit 開頭,是本次匯入關聯的 label。
    • 同步模式

      # 匯入時在 header 中增加配置"group_commit:sync_mode"。
      
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 3009,
          "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 10044,
          "StreamLoadPutTimeMs": 4,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 10038
      }
      
      # 返回的參數 GroupCommit 為 true,說明進入了 group commit 的流程。
      # 返回的 Label 是 group_commit 開頭的,是本次匯入關聯的 label。

自動認可條件

當滿足時間間隔(預設為10秒) 或資料量(預設為64 MB) 其中一個條件時,會自動認可資料。

提交間隔

Group Commit的預設提交時間間隔為10秒,您可以通過如下語句對某個表的提交間隔時間進行修改,樣本如下。

-- 修改提交間隔為 2 秒。
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");

提交資料量

Group Commit的預設提交資料量為64 MB,您可以通過如下語句對某個表的提交資料量進行修改,樣本如下。

-- 修改提交資料量為 128MB。
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");