當您需要對PolarDB MySQL版中的業務資料進行全文檢索索引或複雜分析時,直接在資料庫上操作可能會影響核心業務的穩定性。PolarDB提供的AutoETL功能,能將資料從讀寫節點自動、持續地同步至叢集內的PolarSearch節點,為您提供一站式的資料服務。您無需額外部署和維護ETL工具,即可實現資料同步,並將搜尋分析負載與線上交易處理負載隔離。
當前功能目前正處於灰階階段。如您有相關需求,請提交工單與我們聯絡,以便為您開啟該功能。
功能簡介
AutoETL是PolarDB MySQL版內建的資料同步能力,它允許資料在叢集內不同類型的節點間自動流轉。目前的版本僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點。,以用於高效能的搜尋和分析。
您可以通過資料庫內建的DBMS_ETL工具包,直接使用SQL命令來建立和管理資料同步鏈路。AutoETL提供三種靈活的資料同步方式:
單表同步(
dbms_etl.sync_by_table):將單個源表完整地同步到目標索引。多表匯聚(
dbms_etl.sync_by_map):將多個源表通過JOIN操作匯聚後,同步到目標索引。自訂SQL(
dbms_etl.sync_by_sql):使用相容Flink SQL的文法進行複雜的資料清洗、轉換和彙總。
適用範圍
使用AutoETL功能前,需確保環境滿足以下條件:
叢集版本:MySQL 8.0.1,且修訂版本需為8.0.1.1.52或以上。
同步方向:僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點。
DDL限制:不允許對已建立同步鏈路的源表進行DDL變更。如需修改,必須重建ETL鏈路。
資料類型:暫不支援
BIT類型以及GEOMETRY、POINT、LINESTRING、POLYGON、MULTIPOINT、MULTILINESTRING、MULTIPOLYGON、GEOMETRYCOLLECTION等空間資料類型的同步。
建立同步鏈路
單表同步
資料準備
在PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。
CREATE DATABASE IF NOT EXISTS db1; USE db1; CREATE TABLE IF NOT EXISTS t1 ( id INT PRIMARY KEY, c1 VARCHAR(100), c2 VARCHAR(100) ); INSERT INTO t1(id, c1, c2) VALUES (1, 'apple', 'red'), (2, 'banana', 'yellow'), (3, 'grape', 'purple');建立同步鏈路
使用
dbms_etl.sync_by_table預存程序,建立從db1.t1表到PolarSearch節點的索引dest的同步任務。文法
call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");參數說明
參數
說明
search同步目標,當前固定為
search,表示PolarSearch節點。<source_table>源表名,格式為
資料庫名.表名。<sink_table>PolarSearch節點中的目標索引名。
<column_list>需要同步的列名列表,用英文逗號
,分隔。如果為空白字串"",則同步源表所有列。使用限制
源表需包含主鍵或唯一鍵。
在不同的同步鏈路中,不能使用相同的源表或目標表。
建立鏈路後,源表新增的列預設不會被自動同步。如需同步新增列,請重建鏈路。
如果您希望使用自己定義的目標索引配置,您可以先在PolarSearch節點中手動建立索引並定義其配置,然後再建立同步鏈路。如果鏈路建立時目標索引不存在,系統將自動建立。
樣本
將
db1.t1全表同步到PolarSearch的dest索引:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");將
db1.t1表的c1和c2列同步到dest索引:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
驗證資料
串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。
# 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
多表匯聚
資料準備
在PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。
CREATE DATABASE IF NOT EXISTS db1; CREATE DATABASE IF NOT EXISTS db2; CREATE DATABASE IF NOT EXISTS db3; CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT); CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT); CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10)); INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33); INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444); INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');建立同步鏈路
使用
dbms_etl.sync_by_map預存程序,可將多個表的資料連線(JOIN)後,匯聚到一個PolarSearch節點的索引中。文法
call dbms_etl.sync_by_map( "search", "<columns_map>", -- 目標索引欄位與源表欄位的映射關係 "<join_fields>", -- 表之間的串連鍵 "<join_types>", -- 連線類型 (inner, left) "<filter>" -- 資料過濾條件 );參數說明
參數
格式樣本
說明
columns_mapdest.c1(db1.t1.c1),dest.c2(db2.t2.c2)目標索引欄位與源表欄位的映射關係。
樣本表示:目標索引
dest的c1欄位來自db1.t1.c1,c2欄位來自db2.t2.c2。join_fieldsdest.id=db1.t1.id,db2.t2.id表之間的串連鍵。
樣本表示:目標索引的文檔ID(
dest.id)由db1.t1.id和db2.t2.id構成,同時db1.t1.id和db2.t2.id也是串連條件。join_typesinner,left表之間的連線類型,串連順序與
join_fields中表的出現順序一致。樣本表示:t1 INNER JOIN t2,然後結果再LEFT JOIN t3。filterdb1.t1.c1 > 10 AND db2.t2.c2 < 100一個標準的SQL
WHERE子句,用於在同步前過濾源表資料。使用限制
所有參與同步的源表必須包含主鍵。
該功能使用流式計算,同步過程中僅保證最終一致性。
對於目標索引的更新模式為先刪除後插入。如果您不希望在查詢時訪問到被刪除資料的中間狀態,可以在執行命令前設定會話變數
set sink_options = "'ignore-delete' = 'true'";以忽略PolarSearch節點資料刪除的選項。
樣本
兩張表
INNER JOIN:將db1.t1和db2.t2通過id欄位進行INNER JOIN,並將t1.c1和t2.c2同步到dest索引的c1和c2欄位。call dbms_etl.sync_by_map( "search", "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2)", "dest.id=db1.t1.id,db2.t2.id", "inner", "" );多張表混合
JOIN並過濾:db1.t1、db2.t2和db3.t3三張表串連,其中t1與t2為INNER JOIN,t1與t3為LEFT JOIN,並篩選t1.c1 > 10且t2.c2 < 100的資料。call dbms_etl.sync_by_map( "search", "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2), dest.c3(db3.t3.c3)", "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", "inner,left", "db1.t1.c1 > 10 and db2.t2.c2 < 100" );
驗證資料
串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。
# 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
自訂SQL
建立同步鏈路
對於需要複雜轉換、彙總或計算的情境,
dbms_etl.sync_by_sql預存程序支援使用Flink SQL文法定義資料同步邏輯。重要安全警告:嚴禁在SQL語句中寫入程式碼密碼以下樣本僅為示範文法結構,其
WITH子句中包含純文字密碼,存在極大的安全風險。在生產環境中,必須使用更安全的方式管理憑證。文法
call dbms_etl.sync_by_sql("search", "<sync_sql>");樣本
call dbms_etl.sync_by_sql("search", " -- 步驟1:定義 PolarDB 源表 CREATE TEMPORARY TABLE `db1`.`sbtest1` ( `id` BIGINT, `k` BIGINT, `c` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'xxxxxxx', -- 填寫 PolarDB 叢集地址 'port' = '3306', 'username' = 'xxx', -- 生產環境嚴禁使用明文 'password' = 'xxx', -- 生產環境嚴禁使用明文 'database-name' = 'db1', 'table-name' = 'sbtest1' ); -- 步驟2:定義 PolarSearch 目標表 CREATE TEMPORARY TABLE `dest` ( `k` BIGINT, `max_c` STRING, PRIMARY KEY (`k`) NOT ENFORCED ) WITH ( 'connector' = 'opensearch', 'hosts' = 'xxxxxx:xxxx', -- 填寫 PolarSearch 串連地址 'index' = 'dest', 'username' = 'xxx', -- 生產環境嚴禁使用明文 'password' = 'xxx' -- 生產環境嚴禁使用明文 ); -- 步驟3:定義計算和插入邏輯 INSERT INTO `dest` SELECT `t1`.`k`, MAX(`t1`.`c`) FROM `db1`.`sbtest1` AS `t1` GROUP BY `t1`.`k`; ");驗證資料
串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。
# 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
管理同步鏈路
您可以使用以下命令查看和刪除已建立的同步鏈路。
查看鏈路
查看所有鏈路:
call dbms_etl.show_sync_link();根據ID查看指定鏈路:將
<sync_id>替換為步驟二返回的ID。call dbms_etl.show_sync_link_by_id('<sync_id>')\G返回結果說明:
*************************** 1. row *************************** SYNC_ID: crb5rmv8rttsg NAME: crb5rmv8rttsg SYSTEM: search SYNC_DEFINITION: db1.t1 -> dest SOURCE_TABLES: db1.t1 SINK_TABLES: dest STATUS: active -- 鏈路狀態,active表示正常運行 MESSAGE: -- 如果出錯,此處會顯示錯誤資訊 CREATED_AT: 2024-05-20 11:55:06 UPDATED_AT: 2024-05-20 17:28:04 OPTIONS: ...
刪除鏈路
刪除同步鏈路是高危操作。預設情況下,該操作會同時刪除PolarSearch中的目標索引及其所有資料。執行前請務必確認。
此操作用於停止資料同步並清理相關資源。
call dbms_etl.drop_sync_link('<sync_id>');對不同狀態的鏈路執行drop_sync_link刪除時,系統的處理邏輯存在差異:
active狀態的鏈路:首先會變為dropping,待系統完成鏈路資源和目標索引資料的清理後,狀態才會變為dropped。dropped狀態的鏈路:系統將徹底清除該鏈路的資訊。其他狀態的鏈路:系統不支援刪除操作。