Group Commit(服務端攢批) 是一種最佳化寫入效能的技術,主要用於在高並發寫入情境下,將多個獨立的寫入操作(如INSERT)合并為一個批次提交,從而減少I/O開銷、提升寫入輸送量。
概述
Group Commit不是一種新的匯入方式,而是一種最佳化寫入效能的技術。它可以將多個獨立的INSERT INTO tbl VALUES(...)或Stream Load合并為一個批次提交,從而減少I/O開銷、提升寫入輸送量。您的應用程式可以直接使用JDBC將資料高頻寫入ApsaraDB for SelectDB,同時通過使用PreparedStatement可以獲得更高的效能。在日誌情境下,您也可以利用Stream Load或者Http Stream將資料高頻寫入ApsaraDB for SelectDB。Group Commit寫入有如下三種模式。
關閉模式(off_mode)
不開啟Group Commit,保持以上三種匯入方式的預設行為。
同步模式(sync_mode)
SelectDB會根據負載和表的
group_commit_interval屬性將多個匯入在一個事務提交,事務提交後返回匯入結果。這適用於高並發寫入情境,且在匯入完成後要求資料立即可見。非同步模式(async_mode)
非同步模式適用於寫入延遲敏感以及高頻寫入的情境。SelectDB首先將資料寫入WAL(Write Ahead Log),然後立即返回匯入結果。SelectDB會根據負載和表的
group_commit_interval屬性非同步提交資料,提交之後資料可見。為了防止WAL佔用較大的磁碟空間,單次匯入資料量較大時,會自動切換為sync_mode。
使用限制
當開啟了Group Commit模式,系統會判斷您發起的
INSERT INTO VALUES語句是否符合Group Commit的條件,如果符合,該語句的執行會進入到Group Commit寫入中。但符合如下條件的語句會自動退化為非Group Commit方式。事務寫入:即
Begin;INSERT INTO VALUES;COMMIT方式指定Label:即
INSERT INTO dt WITH LABEL {label} VALUESVALUES中包含運算式:即
INSERT INTO dt VALUES (1 + 100)列更新寫入。
表不支援light schema change。
當開啟了Group Commit模式,系統會判斷您發起的
Stream Load和Http Stream是否符合Group Commit的條件,如果符合,該匯入的執行會進入到Group Commit寫入中。但符合如下條件的會自動退化為非Group Commit方式。指定Label:即通過
-H "label:my_label"設定。兩階段交易認可。
列更新寫入。
表不支援light schema change。
對於Unique模型,由於Group Commit不能保證提交順序,您可以配合Sequence列使用來保證資料一致性。
對
max_filter_ratio語義的支援。在預設的匯入中,
filter_ratio是匯入完成後,通過失敗的行數和總行數計算,決定是否提交本次寫入。在Group Commit模式下,由於多個用戶端發起的匯入都會由一個內部匯入執行,雖然可以計算出每個匯入的
filter_ratio,但是資料一旦進入內部匯入,就只能commit transaction。Group Commit模式支援了一定程度的
max_filter_ratio語義,當匯入的總行數不高於group_commit_memory_rows_for_max_filter_ratio(BE配置參數,預設為10000行),max_filter_ratio正常工作。
WAL限制
對於
async_mode的Group Commit寫入,會把資料寫入WAL。如果內部匯入成功,則WAL被立刻刪除;如果內部匯入失敗,通過匯入WAL的方法來恢複資料。對於
async_mode的Group Commit寫入,為了保護磁碟空間,當遇到如下情況時,會切換成sync_mode。匯入資料量過大,即超過WAL單目錄的80%空間。
資料量規模不確定的chunked stream load。
匯入資料量不大,但磁碟可用空間不足。
當發生重量級Schema Change(目前加減列、修改varchar長度和重新命名列是輕量級Schema Change,其它的是重量級Schema Change)時,為了保證WAL能夠適配表的Schema,在Schema Change最後的修改中繼資料階段,會拒絕Group Commit寫入,用戶端收到
insert table ${table_name} is blocked on schema change異常,請您在用戶端重試即可。
使用樣本
建立表dt,樣本如下。
CREATE TABLE `dt` (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1;使用JDBC方式
當您使用JDBCINSERT INTO VALUES方式寫入時,為了減少SQL解析和產生規劃的開銷,ApsaraDB for SelectDB支援了MySQL協議的PreparedStatement特性。當您使用PreparedStatement時,SQL和其匯入規劃將被緩衝到Session層級的記憶體緩衝中,後續的匯入直接使用緩衝對象,降低了叢集的CPU壓力。下面是在JDBC中使用PreparedStatement的使用樣本。
添加依賴。
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency>設定JDBC URL並在Server端開啟PreparedStatement。
url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true配置
group_commitsession變數,有如下兩種方式。通過JDBC URL設定,增加參數
sessionVariables=group_commit=async_mode。url = jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030/db?useServerPrepStmts=true&sessionVariables=group_commit=async_mode通過執行SQL設定
try (Statement statement = conn.createStatement()) { statement.execute("SET group_commit = async_mode;"); }
使用
PreparedStatement。private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; private static final String URL_PATTERN = "jdbc:mysql://%s:%d/%s?useServerPrepStmts=true"; private static final String HOST = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com"; private static final int PORT = 9030; private static final String DB = "db"; private static final String TBL = "dt"; private static final String USER = "admin"; private static final String PASSWD = "***"; private static final int INSERT_BATCH_SIZE = 10; public static void main(String[] args) { groupCommitInsert(); //groupCommitInsertBatch } private static void groupCommitInsert() throws Exception { Class.forName(JDBC_DRIVER); try (Connection conn = DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) { // set session variable 'group_commit' try (Statement statement = conn.createStatement()) { statement.execute("SET group_commit = async_mode;"); } String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { for (int i = 0; i < INSERT_BATCH_SIZE; i++) { stmt.setInt(1, i); stmt.setString(2, "name" + i); stmt.setInt(3, i + 10); int result = stmt.executeUpdate(); System.out.println("rows: " + result); } } } catch (Exception e) { e.printStackTrace(); } } private static void groupCommitInsertBatch() throws Exception { Class.forName(JDBC_DRIVER); // add rewriteBatchedStatements=true and cachePrepStmts=true in JDBC url // set session variables by sessionVariables=group_commit=async_mode in JDBC url try (Connection conn = DriverManager.getConnection( String.format(URL_PATTERN + "&rewriteBatchedStatements=true&cachePrepStmts=true&sessionVariables=group_commit=async_mode", HOST, PORT, DB), USER, PASSWD)) { String query = "INSERT INTO " + TBL + " VALUES(?, ?, ?)"; try (PreparedStatement stmt = conn.prepareStatement(query)) { for (int j = 0; j < 5; j++) { // 10 rows per insert for (int i = 0; i < INSERT_BATCH_SIZE; i++) { stmt.setInt(1, i); stmt.setString(2, "name" + i); stmt.setInt(3, i + 10); stmt.addBatch(); } int[] result = stmt.executeBatch(); } } } catch (Exception e) { e.printStackTrace(); } }
使用INSERT INTO方式
以下分別介紹使用INSERT INTO插入資料的非同步和同步模式。
非同步模式
-- 配置 session 變數開啟 group commit (預設為 off_mode),開啟非同步模式。 mysql> SET group_commit = async_mode; -- 這裡返回的 label 是 group_commit 開頭的,用於區分是否使用了 group commit模式。 mysql> INSERT INTO dt VALUES(1, 'Bob', 90), (2, 'Alice', 99); Query OK, 2 rows affected (0.05 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -- 如下 label 和 txn_id 和上述插入的相同,說明是攢到了同一個匯入任務中。 mysql> INSERT INTO dt(id, name) VALUES(3, 'John'); Query OK, 1 row affected (0.01 sec) {'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'} -- 不可以立刻查詢到匯入結果。 mysql> SELECT * FROM dt; Empty SET (0.01 sec) -- 過10 秒後執行查詢,可以查詢到,可以通過表屬性 group_commit_interval 控制資料可見的延遲。 mysql> SELECT * FROM dt; +------+-------+-------+ | id | name | score | +------+-------+-------+ | 1 | Bob | 90 | | 2 | Alice | 99 | | 3 | John | NULL | +------+-------+-------+ 3 rows in set (0.02 sec)同步模式
-- 配置 session 變數開啟 group commit (預設為 off_mode),開啟同步模式。 mysql> SET group_commit = sync_mode; -- 匯入耗時為表屬性group_commit_interval。返回的 label 是 group_commit 開頭的,可以區分出是否用了 group commit 模式。 mysql> INSERT INTO dt VALUES(4, 'Bob', 90), (5, 'Alice', 99); Query OK, 2 rows affected (10.06 sec) {'label':'group_commit_d84ab96c09b60587_ec455a33cb0e9e87', 'status':'PREPARE', 'txnId':'3007', 'query_id':'fc6b94085d704a94-a69bfc9a202e66e2'} -- 可以立刻查詢到匯入結果。 mysql> SELECT * FROM dt; +------+-------+-------+ | id | name | score | +------+-------+-------+ | 1 | Bob | 90 | | 2 | Alice | 99 | | 3 | John | NULL | | 4 | Bob | 90 | | 5 | Alice | 99 | +------+-------+-------+ 5 rows in set (0.03 sec)關閉group commit模式
mysql> SET group_commit = off_mode;
Stream Load
Stream Load詳情請參見Stream Load。
建立data.csv檔案,內容如下。
6,Amy,60 7,Ross,98以下分別介紹如何使用Stream Load開啟非同步模式或同步模式匯入資料。
非同步模式
# 匯入時在 header 中增加配置"group_commit:async_mode"。 curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:async_mode" -H "column_separator:," http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load { "TxnId": 7009, "Label": "group_commit_c84d2099208436ab_96e33fda01eddba8", "Comment": "", "GroupCommit": true, "Status": "Success", "Message": "OK", "NumberTotalRows": 2, "NumberLoadedRows": 2, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 19, "LoadTimeMs": 35, "StreamLoadPutTimeMs": 5, "ReadDataTimeMs": 0, "WriteDataTimeMs": 26 } # 返回的參數 GroupCommit 為 true,說明進入了 group commit 的流程。 # 返回的 Label 是 group_commit 開頭,是本次匯入關聯的 label。同步模式
# 匯入時在 header 中增加配置"group_commit:sync_mode"。 curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:sync_mode" -H "column_separator:," http://{selectdbHost}:{selectdbHttpPort}/api/db/dt/_stream_load { "TxnId": 3009, "Label": "group_commit_d941bf17f6efcc80_ccf4afdde9881293", "Comment": "", "GroupCommit": true, "Status": "Success", "Message": "OK", "NumberTotalRows": 2, "NumberLoadedRows": 2, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 19, "LoadTimeMs": 10044, "StreamLoadPutTimeMs": 4, "ReadDataTimeMs": 0, "WriteDataTimeMs": 10038 } # 返回的參數 GroupCommit 為 true,說明進入了 group commit 的流程。 # 返回的 Label 是 group_commit 開頭的,是本次匯入關聯的 label。
自動認可條件
當滿足時間間隔(預設為10秒) 或資料量(預設為64 MB) 其中一個條件時,會自動認可資料。
提交間隔
Group Commit的預設提交時間間隔為10秒,您可以通過如下語句對某個表的提交間隔時間進行修改,樣本如下。
-- 修改提交間隔為 2 秒。
ALTER TABLE dt SET ("group_commit_interval_ms" = "2000");提交資料量
Group Commit的預設提交資料量為64 MB,您可以通過如下語句對某個表的提交資料量進行修改,樣本如下。
-- 修改提交資料量為 128MB。
ALTER TABLE dt SET ("group_commit_data_bytes" = "134217728");