全部产品
Search
文档中心

ApsaraDB for SelectDB:Gunakan fitur group commit untuk mengimpor data

更新时间:Jul 30, 2025

Group commit permintaan pada server adalah teknologi yang mengoptimalkan kinerja penulisan data. Fitur ini cocok untuk skenario dengan penulisan data konkurensi tinggi. Dengan melakukan operasi seperti INSERT, beberapa permintaan penulisan data diserahkan sekaligus, mengurangi overhead I/O dan meningkatkan throughput penulisan.

Ikhtisar

Fitur group commit bukan metode impor baru, melainkan teknologi yang mengoptimalkan kinerja penulisan data. Fitur ini menyerahkan beberapa pernyataan INSERT INTO tbl VALUES(...) atau Stream Load sekaligus untuk mengurangi overhead I/O dan meningkatkan throughput penulisan. Aplikasi Anda dapat langsung menggunakan Java Database Connectivity (JDBC) untuk menulis data ke instans ApsaraDB for SelectDB dengan frekuensi tinggi. Selain itu, Anda bisa mendapatkan performa lebih tinggi dengan menggunakan prepared statements. Dalam skenario logging, Anda dapat menulis data ke instans SelectDB menggunakan Stream Load atau HTTP Stream. Fitur group commit mendukung mode berikut:

  • off_mode

    Fitur group commit dinonaktifkan. Dalam hal ini, perilaku default dari INSERT INTO VALUES, Stream Load, dan HTTP Stream dipertahankan.

  • sync_mode

    SelectDB menyerahkan beberapa operasi impor dalam satu transaksi berdasarkan beban dan properti group_commit_interval tabel. Setelah transaksi diserahkan, hasil impor dikembalikan. Mode ini berlaku untuk skenario di mana penulisan data konkurensi tinggi dilakukan dan data harus terlihat segera setelah impor selesai.

  • async_mode

    Mode asinkron berlaku untuk skenario di mana data ditulis dengan frekuensi tinggi dan sistem sensitif terhadap latensi penulisan. SelectDB pertama-tama menulis data ke log pencatatan log terlebih dahulu (WAL). Kemudian, hasil impor segera dikembalikan. SelectDB secara asinkron menyerahkan data berdasarkan beban dan properti group_commit_interval tabel. Data akan terlihat setelah diserahkan. Mode ini secara otomatis beralih ke sync_mode jika sejumlah besar data diimpor sekaligus, mencegah log WAL memakan banyak ruang disk.

Batasan

  • Jika fitur group commit diaktifkan, sistem menentukan apakah pernyataan INSERT INTO VALUES yang Anda inisiasi memenuhi kondisi untuk group commit. Jika ya, pernyataan tersebut dieksekusi dalam mode group commit. Namun, pernyataan yang memenuhi kondisi berikut secara otomatis diturunkan ke mode non-group commit:

    • Data ditulis menggunakan transaksi. Dalam hal ini, data ditulis menggunakan pernyataan BEGIN; INSERT INTO VALUES; COMMIT.

    • Label ditentukan dalam pernyataan dalam format berikut: INSERT INTO dt WITH LABEL {label} VALUES.

    • Ekspresi termasuk dalam VALUES. Contoh: INSERT INTO dt VALUES (1 + 100).

    • Data ditulis menggunakan pembaruan kolom.

    • Tabel tempat data ditulis tidak mendukung perubahan skema ringan.

  • Jika fitur group commit diaktifkan, sistem menentukan apakah pekerjaan Stream Load atau HTTP Stream yang Anda inisiasi memenuhi kondisi untuk group commit. Jika ya, impor dieksekusi dalam mode group commit. Namun, pekerjaan yang memenuhi kondisi berikut secara otomatis diturunkan ke mode non-group commit:

    • Label ditentukan menggunakan -H "label:my_label".

    • Mode komit dua fase (2PC) digunakan.

    • Data ditulis menggunakan pembaruan kolom.

    • Tabel tempat data ditulis tidak mendukung perubahan skema ringan.

  • Fitur group commit tidak dapat menjamin urutan komit jika model kunci unik digunakan. Dalam hal ini, Anda dapat menggunakan fitur group commit bersama dengan kolom urutan untuk memastikan konsistensi data.

  • Dukungan untuk semantik max_filter_ratio:

    • Dalam mode impor default, parameter filter_ratio menentukan apakah akan menyerahkan data berdasarkan jumlah baris gagal dan total jumlah baris setelah impor selesai.

    • Dalam mode group commit, operasi impor yang diinisiasi oleh beberapa klien dieksekusi oleh satu impor internal. Meskipun nilai parameter filter_ratio untuk setiap operasi impor dapat dihitung, transaksi hanya dapat diserahkan sekali data masuk ke proses impor internal.

    • Fitur group commit sebagian mendukung semantik max_filter_ratio. Jika jumlah total baris yang diimpor tidak melebihi nilai item konfigurasi group_commit_memory_rows_for_max_filter_ratio, semantik max_filter_ratio berlaku. group_commit_memory_rows_for_max_filter_ratio adalah item konfigurasi backend (BE) dengan nilai default 10000.

  • Batasan pada WAL:

    • Jika mode group commit diatur ke async_mode, data ditulis ke log WAL. Jika impor internal berhasil, log WAL segera dihapus. Jika impor internal gagal, data dapat dipulihkan menggunakan log WAL.

    • Mode group commit secara otomatis beralih dari async_mode ke sync_mode dalam skenario berikut untuk memastikan ruang disk yang cukup:

      • Jumlah data yang diimpor memakan lebih dari 80% ruang direktori WAL tunggal.

      • Pekerjaan Stream Load berbasis chunk diinisiasi untuk mengimpor jumlah data yang tidak diketahui.

      • Jumlah data yang diimpor kecil, tetapi ruang disk yang tersedia tidak mencukupi.

    • Jika perubahan skema berat terjadi, sistem menolak group commit pada fase modifikasi metadata, yang merupakan fase terakhir untuk perubahan skema, untuk memastikan bahwa log WAL dapat disesuaikan dengan skema tabel. Dalam hal ini, klien menerima pengecualian insert table ${table_name} is blocked on schema change. Jika pengecualian ini diterima, ulangi impor data di klien. Perubahan skema yang menambah atau menghapus kolom, mengubah panjang VARCHAR, atau mengganti nama kolom adalah perubahan skema ringan. Semua perubahan skema lainnya adalah perubahan skema berat.

Contoh

Buat tabel bernama dt.

CREATE TABLE `dt` (
    `id` int(11) NOT NULL,
    `name` varchar(50) NULL,
    `score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;

Gunakan JDBC

Jika Anda mengeksekusi pernyataan INSERT INTO VALUES untuk mengimpor data menggunakan JDBC, ApsaraDB for SelectDB mendukung fitur prepared statement MySQL untuk mengurangi overhead penguraian SQL dan pembuatan rencana. Jika Anda menggunakan prepared statements, pernyataan SQL dan rencana impornya disimpan dalam cache memori tingkat sesi. Objek yang di-cache dapat digunakan untuk impor selanjutnya, mengurangi penggunaan CPU kluster Anda. Dalam contoh berikut, prepared statement dan JDBC digunakan untuk mengimpor data.

  1. Tambahkan dependensi.

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
  2. Tentukan URL JDBC dan aktifkan fitur prepared statement di server.

    url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true
  3. Konfigurasikan variabel sesi group_commit menggunakan salah satu dari metode berikut:

    • Tambahkan sessionVariables=group_commit=async_mode ke URL JDBC.

      url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode
    • Eksekusi pernyataan SQL berikut:

      try (Statement statement = conn.createStatement()) {
          statement.execute("SET group_commit = async_mode;");
      }
  4. Gunakan prepared statement.

        private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
        private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true";
        private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com";
        private static final int PORT = 9030;
        private static final String DB = "db";
        private static final String TBL = "dt";
        private static final String USER = "admin";
        private static final String PASSWD = "***";
        private static final int INSERT_BATCH_SIZE = 10;
        
        public static void main(String[] args) {
            groupCommitInsert();
            //groupCommitInsertBatch
        }
        
        private static void groupCommitInsert() throws Exception {
            Class.forName(JDBC_DRIVER);
            try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) {
                // set session variable 'group_commit'
                try (Statement statement = conn.createStatement()) {
                    statement.execute("SET group_commit = async_mode;");
                }
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                        stmt.setInt(1, i);
                        stmt.setString(2, "name" + i);
                        stmt.setInt(3, i + 10);
                        int result = stmt.executeUpdate();
                        System.out.println("rows: " + result);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private static void groupCommitInsertBatch() throws Exception {
            Class.forName(JDBC_DRIVER);
            // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url
            // set session variables by sessionVariables=group_commit=async_mode in JDBC url
            try (Connection conn = DriverManager.getConnection(
                    String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) {
    
                String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)";
                try (PreparedStatement stmt = conn.prepareStatement(query)) {
                    for (int j = 0; j < 5; j++) {
                        // 10 rows per insert
                        for (int i = 0; i < INSERT_BATCH_SIZE; i++) {
                            stmt.setInt(1, i);
                            stmt.setString(2, "name" + i);
                            stmt.setInt(3, i + 10);
                            stmt.addBatch();
                        }
                        int[] result = stmt.executeBatch();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

Gunakan INSERT INTO

Bagian ini menjelaskan cara menyisipkan data dengan mengeksekusi pernyataan INSERT INTO dalam mode asinkron dan sinkron:

  • Mode asinkron

    -- Konfigurasikan variabel sesi group_commit untuk mengaktifkan fitur group commit dalam mode asinkron. Nilai default variabel ini adalah off_mode. 
    mysql> SET group_commit = async_mode;
    
    -- Label yang dikembalikan dimulai dengan group_commit. Ini menunjukkan bahwa fitur group commit diaktifkan. 
    mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99);
    Query OK, 2 rows affected (0.05 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- Label dan txnId untuk pernyataan berikutnya sama dengan yang untuk pernyataan sebelumnya. Ini menunjukkan bahwa kedua pernyataan dieksekusi dalam pekerjaan impor yang sama. 
    mysql> INSERT INTO dt(id, name) VALUES(3, 'John');
    Query OK, 1 row affected (0.01 sec)
    {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}
    
    -- Data yang diimpor tidak dapat diquery segera setelah impor selesai. 
    mysql> SELECT * FROM dt;
    Empty SET (0.01 sec)
    
    -- Eksekusi query setelah 10 detik. Data yang diimpor dapat diquery. Anda dapat menggunakan properti tabel group_commit_interval untuk mengontrol latensi visibilitas data. 
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    +------+-------+-------+
    3 rows in set (0.02 sec)
  • Mode sinkron

    -- Konfigurasikan variabel sesi group_commit untuk mengaktifkan fitur group commit dalam mode sinkron. Nilai default variabel ini adalah off_mode. 
    mysql> SET group_commit = sync_mode;
    
    -- Interval komit ditentukan oleh properti tabel group_commit_interval. Label yang dikembalikan dimulai dengan group_commit. Ini menunjukkan bahwa fitur group commit diaktifkan. 
    mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99);
    Query OK, 2 rows affected (10.06 sec)
    {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'}
    
    -- Data yang diimpor dapat diquery segera setelah impor selesai. 
    mysql> SELECT * FROM dt;
    +------+-------+-------+
    | id   | name  | score |
    +------+-------+-------+
    |    1 | Bob   |    90 |
    |    2 | Alice |    99 |
    |    3 | John  |  NULL |
    |    4 | Bob   |    90 |
    |    5 | Alice |    99 |
    +------+-------+-------+
    5 rows in set (0.03 sec)
  • Nonaktifkan fitur group commit

    mysql> SET group_commit = off_mode;

Gunakan Stream Load

Untuk informasi lebih lanjut tentang fitur Stream Load, lihat Stream Load.

  1. Buat file bernama data.csv dan tambahkan konten berikut ke file:

    6,Amy,60
    7,Ross,98
  2. Gunakan Stream Load untuk mengimpor data dalam mode asinkron atau sinkron:

    • Mode asinkron

      # Tambahkan header group_commit:async_mode ke URL untuk impor. 
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 7009,
          "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 35,
          "StreamLoadPutTimeMs": 5,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 26
      }
      
      # Nilai parameter GroupCommit yang dikembalikan adalah true. Ini menunjukkan bahwa group commit berlaku. 
      # Label yang terkait dengan impor ini dimulai dengan group_commit.

    • Mode sinkron

      # Tambahkan header group_commit:sync_mode ke URL untuk impor. 
      
      curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode"  -H "column_separator:,"  http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load
      {
          "TxnId": 3009,
          "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293",
          "Comment": "",
          "GroupCommit": true,
          "Status": "Success",
          "Message": "OK",
          "NumberTotalRows": 2,
          "NumberLoadedRows": 2,
          "NumberFilteredRows": 0,
          "NumberUnselectedRows": 0,
          "LoadBytes": 19,
          "LoadTimeMs": 10044,
          "StreamLoadPutTimeMs": 4,
          "ReadDataTimeMs": 0,
          "WriteDataTimeMs": 10038
      }
      
      # Nilai parameter GroupCommit yang dikembalikan adalah true. Ini menunjukkan bahwa group commit berlaku. 
      # Label yang terkait dengan impor ini dimulai dengan group_commit.

Kondisi untuk auto commit

Data secara otomatis diserahkan berdasarkan interval komit atau ketika ukuran data mencapai nilai yang ditentukan. Interval komit default adalah 10 detik, dan ukuran data default untuk auto commit adalah 64 MB.

Interval komit

Interval komit default adalah 10 detik. Anda dapat mengeksekusi pernyataan berikut untuk mengubah interval komit untuk sebuah tabel:

-- Ubah interval komit menjadi 2 detik. 
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");

Ukuran data untuk auto commit

Ukuran data default untuk auto commit adalah 64 MB. Anda dapat mengeksekusi pernyataan berikut untuk mengubah ukuran data untuk auto commit untuk sebuah tabel:

-- Ubah ukuran data untuk auto commit menjadi 128 MB. 
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");