當您需要對PolarDB MySQL版中的業務資料進行全文檢索索引或複雜分析時,直接在資料庫上操作可能會影響核心業務的穩定性。PolarDB提供的AutoETL功能,能將資料從讀寫節點自動、持續地同步至叢集內的PolarSearch節點,為您提供一站式的資料服務。您無需額外部署和維護ETL工具,即可實現資料同步,並將搜尋分析負載與線上交易處理負載隔離。
當前功能目前正處於灰階階段。如您有相關需求,請提交工單與我們聯絡,以便為您開啟該功能。
功能簡介
AutoETL是PolarDB MySQL版內建的資料同步能力,它允許資料在叢集內不同類型的節點間自動流轉。目前的版本僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點,以用於高效能的搜尋和分析。
您可以通過資料庫內建的DBMS_ETL工具包,直接使用SQL命令來建立和管理資料同步鏈路。AutoETL提供三種靈活的資料同步方式:
單表同步(
dbms_etl.sync_by_table):將單個源表完整地同步到目標索引。多表匯聚(
dbms_etl.sync_by_map):將多個源表通過JOIN操作匯聚後,同步到目標索引。自訂SQL(
dbms_etl.sync_by_sql):使用相容Flink SQL的文法進行複雜的資料清洗、轉換和彙總。
適用範圍
使用AutoETL功能前,需確保環境滿足以下條件:
叢集版本:
MySQL 8.0.1,且修訂版本需為8.0.1.1.52或以上。
MySQL 8.0.2,且修訂版本需為8.0.2.2.33或以上。
同步方向:僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點。
DDL限制:對已建立同步鏈路的源表進行DDL操作時,需遵循特定的規則與實踐以避免同步中斷。部分不相容的變更需要重建鏈路。詳情請參見DDL變更規則與實踐。
資料類型:暫不支援
BIT類型以及GEOMETRY、POINT、LINESTRING、POLYGON、MULTIPOINT、MULTILINESTRING、MULTIPOLYGON、GEOMETRYCOLLECTION等空間資料類型的同步。
建立同步鏈路
單表同步
資料準備
在PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。
CREATE DATABASE IF NOT EXISTS db1; USE db1; CREATE TABLE IF NOT EXISTS t1 ( id INT PRIMARY KEY, c1 VARCHAR(100), c2 VARCHAR(100) ); INSERT INTO t1(id, c1, c2) VALUES (1, 'apple', 'red'), (2, 'banana', 'yellow'), (3, 'grape', 'purple');建立同步鏈路
使用
dbms_etl.sync_by_table預存程序,建立從db1.t1表到PolarSearch節點的索引dest的同步任務。文法
call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");參數說明
參數
說明
search同步目標,當前固定為
search,表示PolarSearch節點。<source_table>源表名,格式為
資料庫名.表名。<sink_table>PolarSearch節點中的目標索引名。
<column_list>需要同步的列名列表,用英文逗號
,分隔。如果為空白字串"",則同步源表所有列。使用限制
源表需包含主鍵或唯一鍵。
在不同的同步鏈路中,不能使用相同的源表或目標表。
建立鏈路後,源表新增的列預設不會被自動同步。如需同步新增列,請重建鏈路。
如果您希望使用自己定義的目標索引配置,您可以先在PolarSearch節點中手動建立索引並定義其配置,然後再建立同步鏈路。如果鏈路建立時目標索引不存在,系統將自動建立。
樣本
將
db1.t1全表同步到PolarSearch的dest索引:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");將
db1.t1表的c1和c2列同步到dest索引:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
驗證資料
串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。
# 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
多表匯聚
資料準備
在PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。
CREATE DATABASE IF NOT EXISTS db1; CREATE DATABASE IF NOT EXISTS db2; CREATE DATABASE IF NOT EXISTS db3; CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT); CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT); CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10)); INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33); INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444); INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');建立同步鏈路
使用
dbms_etl.sync_by_map預存程序,可將多個表的資料連線(JOIN)後,匯聚到一個PolarSearch節點的索引中。文法
call dbms_etl.sync_by_map( "search", "<columns_map>", -- 目標索引欄位與源表欄位的映射關係 "<join_fields>", -- 表之間的串連鍵 "<join_types>", -- 連線類型 (inner, left) "<filter>" -- 資料過濾條件 );參數說明
參數
格式樣本
說明
columns_mapdest.c1(db1.t1.c1),dest.c2(db2.t2.c2)目標索引欄位與源表欄位的映射關係。
樣本表示:目標索引
dest的c1欄位來自db1.t1.c1,c2欄位來自db2.t2.c2。join_fieldsdest.id=db1.t1.id,db2.t2.id表之間的串連鍵。
樣本表示:目標索引的文檔ID(
dest.id)由db1.t1.id和db2.t2.id構成,同時db1.t1.id和db2.t2.id也是串連條件。join_typesinner,left表之間的連線類型,串連順序與
join_fields中表的出現順序一致。樣本表示:t1 INNER JOIN t2,然後結果再LEFT JOIN t3。filterdb1.t1.c1 > 10 AND db2.t2.c2 < 100一個標準的SQL
WHERE子句,用於在同步前過濾源表資料。使用限制
所有參與同步的源表必須包含主鍵。
該功能使用流式計算,同步過程中僅保證最終一致性。
對於目標索引的更新模式為先刪除後插入。如果您不希望在查詢時訪問到被刪除資料的中間狀態,可以在執行命令前設定會話變數
set sink_options = "'ignore-delete' = 'true'";以忽略PolarSearch節點資料刪除的選項。
樣本
兩張表
INNER JOIN:將db1.t1和db2.t2通過id欄位進行INNER JOIN,並將t1.c1和t2.c2同步到dest索引的c1和c2欄位。call dbms_etl.sync_by_map( "search", "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)", "dest.id=db1.t1.id,db2.t2.id", "inner", "" );多張表混合
JOIN並過濾:db1.t1、db2.t2和db3.t3三張表串連,其中t1與t2為INNER JOIN,t1與t3為LEFT JOIN,並篩選t1.c1 > 10且t2.c2 < 100的資料。call dbms_etl.sync_by_map( "search", "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2),dest.c3(db3.t3.c3)", "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", "inner,left", "db1.t1.c1 > 10 and db2.t2.c2 < 100" );
驗證資料
串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。
# 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
自訂SQL
建立同步鏈路
對於需要複雜轉換、彙總或計算的情境,
dbms_etl.sync_by_sql預存程序支援使用Flink SQL文法定義資料同步邏輯。重要安全警告:嚴禁在SQL語句中寫入程式碼密碼以下樣本僅為示範文法結構,其
WITH子句中包含純文字密碼,存在極大的安全風險。在生產環境中,必須使用更安全的方式管理憑證。文法
call dbms_etl.sync_by_sql("search", "<sync_sql>");樣本
系統將自動替換SQL中的預留位置
{mysql_host}、{mysql_port}、{mysql_user}、{mysql_password}、{search_host}、{search_port}、{search_user}和{search_password}。您僅需按固定預留位置編寫SQL語句即可。CALL dbms_etl.sync_by_sql("search", " -- 步驟1:定義 PolarDB 源表 CREATE TEMPORARY TABLE `db1`.`sbtest1` ( `id` BIGINT, `k` BIGINT, `c` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '{mysql_host}', 'port' = '{mysql_port}', 'username' = '{mysql_user}', -- 生產環境嚴禁使用明文 'password' = '{mysql_password}', -- 生產環境嚴禁使用明文 'database-name' = 'db1', 'table-name' = 'sbtest1' ); -- 步驟2:定義 PolarSearch 目標表 CREATE TEMPORARY TABLE `dest` ( `k` BIGINT, `max_c` STRING, PRIMARY KEY (`k`) NOT ENFORCED ) WITH ( 'connector' = 'opensearch', 'hosts' = '{search_host}:{search_port}', 'index' = 'dest', 'username' = '{search_user}', -- 生產環境嚴禁使用明文 'password' = '{search_password}' -- 生產環境嚴禁使用明文 ); -- 步驟3:定義計算和插入邏輯 INSERT INTO `dest` SELECT `t1`.`k`, MAX(`t1`.`c`) FROM `db1`.`sbtest1` AS `t1` GROUP BY `t1`.`k`; ");驗證資料
串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。
# 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
管理同步鏈路
您可以使用以下命令查看和刪除已建立的同步鏈路。
查看鏈路
查看所有鏈路:
call dbms_etl.show_sync_link();根據ID查看指定鏈路:將
<sync_id>替換為步驟二返回的ID。call dbms_etl.show_sync_link_by_id('<sync_id>')\G返回結果說明:
*************************** 1. row *************************** SYNC_ID: crb5rmv8rttsg NAME: crb5rmv8rttsg SYSTEM: search SYNC_DEFINITION: db1.t1 -> dest SOURCE_TABLES: db1.t1 SINK_TABLES: dest STATUS: active -- 鏈路狀態,active表示正常運行 MESSAGE: -- 如果出錯,此處會顯示錯誤資訊 CREATED_AT: 2024-05-20 11:55:06 UPDATED_AT: 2024-05-20 17:28:04 OPTIONS: ...
刪除鏈路
刪除同步鏈路是高危操作。預設情況下,該操作會同時刪除PolarSearch中的目標索引及其所有資料。執行前請務必確認。
此操作用於停止資料同步並清理相關資源。
call dbms_etl.drop_sync_link('<sync_id>');對不同狀態的鏈路執行drop_sync_link刪除時,系統的處理邏輯存在差異:
active狀態的鏈路:首先會變為dropping,待系統完成鏈路資源和目標索引資料的清理後,狀態才會變為dropped。dropped狀態的鏈路:系統將徹底清除該鏈路的資訊。其他狀態的鏈路:系統不支援刪除操作。
DDL變更規則與實踐
對已建立同步鏈路的源表進行DDL操作時,需要根據建立鏈路的方式和具體操作,遵循不同的處理流程,以確保資料同步的穩定性。不當的DDL操作可能導致同步鏈路中斷。
單表同步(sync_by_table)鏈路
通過sync_by_table建立的鏈路不支援僅同步指定欄位。
增加欄位:為保證同步鏈路不中斷,您需要先在PolarSearch目標索引中增加欄位,然後再對來源資料表執行
ADD COLUMN操作。在PolarSearch中為目標索引增加欄位對應。例如,為
demo索引增加age欄位:PUT demo/_mapping { "properties": { "age": { "type": "integer" } } }在來源資料表中增加對應的列:
ALTER TABLE demo ADD COLUMN age INT;
刪除欄位:在來源資料表刪除欄位後,增量寫入的資料可以正常同步,但PolarSearch存量資料中仍會保留被刪除欄位的值。
修改欄位類型:
類型相容:如果修改後的類型與原類型相容(例如,從
INT修改為TINYINT),可直接在源表修改,增量資料可以正常同步。ALTER TABLE demo MODIFY COLUMN score TINYINT;類型不相容:如果類型不相容,會導致同步鏈路不可用,需重建鏈路。
多表匯聚(sync_by_map)與自訂SQL(sync_by_sql)鏈路
對非同步欄位進行DDL操作:在源表中增加、刪除或修改未被同步的欄位,不會影響同步鏈路,增量資料可以正常同步。
對同步欄位進行DDL操作:
增加欄位:需要重建鏈路。
刪除欄位:刪除同步欄位後,增量資料可以正常同步,但增量資料中該欄位的值將為
null。修改欄位類型:
類型相容:如果修改後的類型與原類型相容,可直接在源表修改,增量資料可以正常同步。
類型不相容:如果類型不相容,會導致同步鏈路不可用,需重建鏈路。
重建鏈路最佳實務
為最小化對業務的影響,建議採用“新索引 + 新鏈路”的方式進行重建。待新鏈路資料同步完成並驗證無誤後,再將業務查詢流量切換至新的索引。
樣本:為shop.user表新增欄位並重建鏈路。假設原鏈路將shop.user表(含id, name, phone, gmt_create欄位)同步至user_v1索引。現需新增membership_level欄位,並確保線上查詢不受影響。
建立新索引:建立一個名為
user_v2的新索引,並在其映射關係中包含新欄位membership_level。PUT user_v2 { "mappings": { "properties": { "id": { "type": "keyword" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "phone": { "type": "keyword" }, "gmt_create": { "type": "date" }, "membership_level": { "type": "integer" } } } }修改源表:在源端MySQL的
user表中增加新列。ALTER TABLE user ADD COLUMN membership_level TINYINT NOT NULL DEFAULT 0 COMMENT '會員等級';建立同步鏈路:建立一條新的同步鏈路,將
shop.user表的資料同步到user_v2索引。驗證與切換:待資料同步完成後,驗證新索引中的資料是否正確。確認無誤後,將業務查詢流量切換到新的
user_v2索引。清理舊資源:觀察新鏈路運行穩定後,可刪除舊的同步鏈路和
user_v1索引。