All Products
Search
Document Center

ApsaraDB for SelectDB:Use the group commit feature to import data

Last Updated:Apr 30, 2025

Group commit of requests on servers is a technology that optimizes data write performance. The group commit feature is suitable for scenarios in which high-concurrency data writes are performed. The feature commits multiple data write requests at a time by performing operations such as INSERT. This reduces the I/O overhead and improves the write throughput.

Overview

The group commit feature is not a new import method but a technology that optimizes data write performance. It commits multiple INSERT INTO tbl VALUES(...) or Stream Load statements at a time to reduce the I/O overhead and improve the write throughput. Your application can directly use Java Database Connectivity (JDBC) to write data to an ApsaraDB for SelectDB instance at a high frequency. In addition, you can obtain higher performance by using prepared statements. In logging scenarios, you can write data to a SelectDB instance by using Stream Load or HTTP Stream. The group commit feature supports the following modes:

  • off_mode

    The group commit feature is disabled. In this case, the default behaviors of INSERT INTO VALUES, Stream Load, and HTTP Stream are retained.

  • sync_mode

    SelectDB commits multiple import operations in one transaction based on the load and the group_commit_interval property of tables. After the transaction is committed, the import result is returned. This mode applies to scenarios in which high-concurrency data writes are performed and the data must be visible immediately after the import is complete.

  • async_mode

    The asynchronous mode applies to scenarios in which data is written at a high frequency and the system is sensitive to write latency. SelectDB first writes data to write-ahead logging (WAL) logs. Then, the import result is immediately returned. SelectDB asynchronously commits data based on the load and the group_commit_interval property of tables. The data is visible after it is committed. The mode is automatically switched to sync_mode if a large amount of data is imported at a time. This prevents WAL logs from occupying a large amount of disk space.

Limits

  • If the group commit feature is enabled, the system determines whether the INSERT INTO VALUES statement that you initiate meets the conditions for group commit. If so, the statement is executed in group commit mode. However, statements that meet the following conditions are automatically degraded to non-group commit mode:

    • Data is written by using a transaction. In this case, data is written by using the BEGIN;INSERT INTO VALUES;COMMIT statements.

    • A label is specified in a statement in the following format: INSERT INTO dt WITH LABEL {label} VALUES.

    • An expression is included in VALUES. Example: INSERT INTO dt VALUES (1 + 100).

    • Data is written by using column update.

    • The table to which data is written does not support light schema changes.

  • If the group commit feature is enabled, the system determines whether the Stream Load or HTTP Stream job that you initiate meets the conditions for group commit. If so, the import is executed in group commit mode. However, jobs that meet the following conditions are automatically degraded to non-group commit mode:

    • A label is specified by using -H "label:my_label".

    • The two-phase commit (2PC) mode is used.

    • Data is written by using column update.

    • The table to which data is written does not support light schema changes.

  • The group commit feature cannot ensure the commit sequence if the unique key model is used. In this case, you can use the group commit feature together with a sequence column to ensure data consistency.

  • Support for the max_filter_ratio semantics:

    • In default import mode, the filter_ratio parameter determines whether to commit data based on the number of failed rows and the total number of rows after the import is complete.

    • In group commit mode, the import operations that are initiated by multiple clients are executed by one internal import. Although the value of the filter_ratio parameter for each import operation can be calculated, the transaction can only be committed once data enters the internal import process.

    • The group commit feature partially supports the max_filter_ratio semantics. If the total number of imported rows does not exceed the value of group_commit_memory_rows_for_max_filter_ratio configuration item, the max_filter_ratio semantics takes effect. group_commit_memory_rows_for_max_filter_ratio is a backend (BE) configuration item with a default value of 10000.

  • Limits on WAL:

    • If the group commit mode is set to async_mode, data is written to WAL logs. If the internal import is successful, the WAL logs are immediately deleted. If the internal import fails, data can be restored by using the WAL logs.

    • The group commit mode is automatically switched from async_mode to sync_mode in the following scenarios to ensure sufficient disk space:

      • The amount of the imported data occupies more than 80% of the space of a single WAL directory.

      • A chunked Stream Load job is initiated to import an unknown amount of data.

      • The amount of imported data is small, but the available disk space is insufficient.

    • If a heavy schema change occurs, the system rejects a group commit in the metadata modification phase, which is the last phase for the schema change, to ensure that WAL logs can be adapted to the table schema. In this case, the client receives the exception insert table ${table_name} is blocked on schema change. If this exception is received, retry the data import on the client. Schema changes that add or delete columns, change the VARCHAR length, or rename columns are light schema changes. All the other schema changes are heavy schema changes.

Example

Create a table named dt.

CREATE TABLE `dt` (
    `id` int(11) NOT NULL,
    `name` varchar(50) NULL,
    `score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;

Use JDBC

If you execute the INSERT INTO VALUES statement to import data by using JDBC, ApsaraDB for SelectDB supports the prepared statement feature of MySQL to reduce the overhead of SQL parsing and plan generation. If you use prepared statements, the SQL statements and their import plans are cached in the session-level memory cache. The cached objects can be used for subsequent import. This reduces the CPU utilization of your cluster. In the following example, a prepared statement and JDBC are used to import data.

  1. Add the dependency.

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
  2. Specify the JDBC URL and enable the prepared statement feature on the server.

    url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true
  3. Configure the session variable group_commit by using either of the following methods:

    • Add sessionVariables=group_commit=async_mode to the JDBC URL.

      url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode
    • Execute the following SQL statement:

      try (Statement statement = conn.createStatement()) {
          statement.execute("SET group_commit = async_mode;");
      }
  4. Use a prepared statement.

        private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
        private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true";
        private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com";
        private static final int PORT = 9030;
        private static final String DB = "db";
        private static final String TBL = "dt";
        private static final String USER = "admin";
        private static final String PASSWD = "***";
        private static final int INSERT_BATCH_SIZE = 10;
        
        public static void main(String[] args) {
            groupCommitInsert();
            //groupCommitInsertBatch
        }
        
        private static void groupCommitInsert() throws Exception {
            Class.forName(JDBC_DRIVER);
            try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) {
                // set session variable 'group_commit'
                try (Statement statement = conn.createStatement()) {
                    statement.execute("SET group_commit = async_mode;");
                }
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                        stmt.setInt(1, i);
                        stmt.setString(2, "name" + i);
                        stmt.setInt(3, i + 10);
                        int result = stmt.executeUpdate();
                        System.out.println("rows: " + result);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static void groupCommitInsertBatch() throws Exception {
            Class.forName(JDBC_DRIVER);
            // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url
            // set session variables by sessionVariables=group_commit=async_mode in JDBC url
            try (Connection conn = DriverManager.getConnection(
                    String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) {
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int j = 0; j < 5; j++) {
                        // 10 rows per insert
                        for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                            stmt.setInt(1, i);
                            stmt.setString(2, "name" + i);
                            stmt.setInt(3, i + 10);
                            stmt.addBatch();
                        }
                        int[] result = stmt.executeBatch();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

Use INSERT INTO

This section describes how to insert data by executing the INSERT INTO statement in asynchronous and synchronous mode:

  • Asynchronous mode

    -- Configure the session variable group_commit to enable the group commit feature in asynchronous mode. The default value of the variable is off_mode. 
    mysql> SET group_commit = async_mode;
    
    -- The label that is returned starts with group_commit. This indicates the group commit feature is enabled. 
    mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99);
    Query OK, 2 rows affected (0.05 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- The label and txnId for the following statement are the same as those for the preceding statement. This indicates that the two statements are executed in the same import job. 
    mysql> INSERT INTO dt(id, name) VALUES(3, 'John');
    Query OK, 1 row affected (0.01 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- The imported data cannot be queried immediately after the import is complete. 
    mysql> SELECT * FROM dt;
    Empty SET (0.01 sec)
    
    -- Execute a query after 10 seconds. The imported data can be queried. You can use the table property group_commit_interval to control the latency of data visibility. 
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    +------+-------+-------+
    3 rows in set (0.02 sec)
  • Synchronous mode

    -- Configure the session variable group_commit to enable the group commit feature in synchronous mode. The default value of the variable is off_mode. 
    mysql> SET group_commit = sync_mode;
    
    -- The commit interval is specified by the table property group_commit_interval. The label that is returned starts with group_commit. This indicates that the group commit feature is enabled. 
    mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99);
    Query OK, 2 rows affected (10.06 sec)
    {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
    
    -- The imported data can be queried immediately after the import is complete. 
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    |    4 | Bob   |    90 |
    |    5 | Alice |    99 |
    +------+-------+-------+
    5 rows in set (0.03 sec)
  • Disable the group commit feature

    mysql> SET group_commit = off_mode;

Use Stream Load

For more information about the Stream Load feature, see Stream Load.

  1. Create a file named data.csv and add the following content to the file:

    6,Amy,60
    7,Ross,98
  2. Use Stream Load to import data in asynchronous or synchronous mode:

    • Asynchronous mode

      # Add the group_commit:async_mode header to the URL for the import. 
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 7009,
          "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 35,
          "StreamLoadPutTimeMs": 5,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 26
      }
      
      # The value of the GroupCommit parameter that is returned is true. This indicates that the group commit takes effect. 
      # The label that is associated with this import starts with group_commit.

    • Synchronous mode

      # Add the group_commit:sync_mode header to the URL for the import. 
      
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 3009,
          "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 10044,
          "StreamLoadPutTimeMs": 4,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 10038
      }
      
      # The value of the GroupCommit parameter that is returned is true. This indicates that the group commit takes effect. 
      # The label that is associated with this import starts with group_commit.

Conditions for auto commit

Data is automatically committed based on the commit interval or when the data size reaches the specified value. The default commit interval is 10 seconds, and the default data size for auto commit is 64 MB.

Commit interval

The default commit interval is 10 seconds. You can execute the following statement to change the commit interval for a table:

-- Change the commit interval to 2 seconds. 
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");

Data size for auto commit

The default data size for auto commit is 64 MB. You can execute the following statement to change the data size for auto commit for a table:

-- Change the data size for auto commit to 128 MB. 
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");