在PolarDB PostgreSQL分布式版叢集中,查詢效能與SQL的寫法密切相關。編寫優良的SQL可以最大化地利用分布式架構的優勢,將計算下推到資料節點(DN)並存執行,從而避免不必要的資料轉送和協調開銷。本文為您提供一套實用的查詢最佳化方法。
最佳化方式
查詢情境 | 最佳化方法 | 核心原則 |
單表點查或小範圍掃描 | 在 | 分區剪枝:精確定位到儲存資料的單個或少數分區,避免全部分區掃描。 |
多張大表關聯(JOIN) |
| 計算下推:將JOIN操作下推到各資料節點本地執行,避免跨節點網路開銷。 |
分組彙總(GROUP BY) | 在 | 彙總下推:將大部分彙總運算在資料節點本地完成,僅在協調節點做最終匯總。 |
複雜查詢或非親和JOIN | 使用**CTE(Common Table Expressions)**改寫查詢。 | 減少資料重分布:通過CTE先過濾出小結果集,再進行後續的分布式操作,降低網路傳輸量。 |
利用分布列進行分區剪枝(單表查詢最佳化)
分區剪枝是分散式查詢中最基礎、最高效的最佳化手段。當查詢條件中包含分布列的等值判斷時,最佳化器能直接定位到資料所在的唯一分區,效能接近單機查詢。
原理:
當您發起查詢時,分布式最佳化器會檢查WHERE子句。如果它發現條件作用於分布列(如t2.a = 1000),最佳化器會根據1000這個值計算出其雜湊值,從而精確推斷出資料存放區在哪一個物理分區上。隨後,它會剪掉所有其他無關的分區,只將查詢任務發送給目標分區所在的資料節點。這個過程就是分區剪枝。
樣本:
建立
t2表,並以a列為分布列。CREATE TABLE t2 (a INT, b INT); SELECT create_distributed_table('t2', 'a');查詢
a = 1000的資料,點查會直接被定位到唯一的分區(在下面的例子中是t2_102134這個分區)。EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;返回結果如下:
QUERY PLAN ---------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 1 -- 任務數僅為1,表示查詢已精確定位到單個分區 Tasks Shown: All -> Task Node: host=10.188.92.147 port=3003 dbname=testdb -> Seq Scan on t2_102134 t2 Filter: (a = 1000) (7 rows)
通過親和組與複製表下推JOIN(多表關聯最佳化)
在分布式環境中,避免跨節點JOIN是提升效能的關鍵。您可以通過以下兩種方式實現JOIN計算下推:
使用親和組:如果兩張或多張大表使用相同的分布列進行JOIN,將它們設定為同一親和組,可以保證關聯資料位元於同一節點,從而實現本地JOIN。
使用複製表:如果有一張參與JOIN的表符合以下條件時,可將表建立為複製表。複製表在每個資料節點上都存有副本,同樣能實現本地JOIN。
由於某種原因不適合使用串連鍵作為分布列來建立分布表(例如,已經選擇了其他的分布列來建立分布表)。
表的修改比較少,且需要被其他多張表JOIN。
原理:
當最佳化器發現JOIN的兩張表是親和的(或其中一張是複製表),並且JOIN條件是分布列時,它就能確定關聯的資料行都位於同一個資料節點上。因此,最佳化器會將整個JOIN操作作為一個任務下推到每個資料節點。各個節點在本地完成JOIN運算,並進行並存執行。最後,協調節點(CN)只需收集並匯總各個DN返回的結果即可。這個過程避免了在節點間傳輸海量的未經處理資料來進行JOIN,從而實現了高效能的查詢。
樣本:
使用親和組
建立
colocation_t1表和colocation_t2表,並均以a列為分布列,且它們是親和的。CREATE TABLE colocation_t1 (a INT, b INT); SELECT create_distributed_table('colocation_t1', 'a'); CREATE TABLE colocation_t2 (a INT, b INT); SELECT create_distributed_table('colocation_t2', 'a');進行JOIN關聯查詢。從下述執行計畫中可以看到JOIN操作被下推至資料節點(DN)中執行。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;返回結果如下:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 -- 任務分發到所有分區 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join -- JOIN操作在資料節點(DN)內部執行 Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)子查詢:使用親和組的方式對子查詢同樣有有效。如果子查詢的輸出和分布表進行串連,且串連鍵是子查詢內部的涉及到的分布表的分布列,那麼也會使得子查詢被下推。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;返回結果如下:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)
使用複製表
建立
colocation_t3表和colocation_t4表。colocation_t3表以a列為分布列,colocation_t4表為複製表。CREATE TABLE colocation_t3 (a INT, b INT); SELECT create_distributed_table('colocation_t3', 'a'); CREATE TABLE colocation_t4 (a INT, b INT); SELECT create_reference_table('colocation_t4');進行JOIN關聯查詢。從下述執行計畫中可以看到JOIN操作被下推至資料節點(DN)中執行。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;返回結果如下:
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t3.a = colocation_t4.b) -> Sort Sort Key: colocation_t3.a -> Seq Scan on colocation_t3_102145 colocation_t3 -> Sort Sort Key: colocation_t4.b -> Seq Scan on colocation_t4_102149 colocation_t4 (13 rows)
下推彙總與排序
對於彙總和排序操作,將計算下推到資料節點能顯著降低協調節點(CN)的壓力和網路流量。
彙總下推
當
GROUP BY的KEY包含分布列時,彙總函式(如SUM,COUNT)會完全下推到資料節點。當
GROUP BY的KEY不包含分布列時,最佳化器會採用兩階段策略,先在資料節點進行預彙總,然後在協調節點做最終彙總。第一階段(DN本地彙總):每個資料節點先對自己持有的資料分區進行一次彙總運算,得出一個局部的中間結果。
第二階段(CN最終彙總):協調節點收集所有DN發來的局部結果(這個結果集通常很小),然後執行第二次彙總,計算出最終的全域結果。這種方式,減少了需要通過網路傳輸到協調節點的資料量。
樣本:
建立
group_t表,並以a列為分布列。CREATE TABLE group_t (a INT, b INT); SELECT create_distributed_table('group_t', 'a');包含分布列時,完全下推。彙總函式將完全在 DN 上執行,它的執行計畫如下所示:
EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;返回結果如下:
QUERY PLAN --------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> HashAggregate Group Key: a -> Seq Scan on group_t_102150 group_t (8 rows)不包含分布列時,採用兩階段策略。會執行兩次彙總函式,分別是在DN和CN上執行:
EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;返回結果如下:
QUERY PLAN --------------------------------------------------------------- Aggregate -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Aggregate -> Seq Scan on group_t_102150 group_t (8 rows)
排序下推
ORDER BY ... LIMIT N子句在很多情境下也能被下推到資料節點,每個節點先返回自己的Top N,協調節點再做最終排序,極大減少了需要排序的資料量。
例如:如果不存在GROUP BY子句,或者GROUP BY子句的KEY包含了分布列,那麼ORDER BY ... LIMIT N會下推到分區上。對於每一個分區返回的結果,CN會再次執行ORDER BY ... LIMIT N的操作。
樣本:
建立
order_t表,並以a列為分布列。CREATE TABLE order_t (a INT, b INT); SELECT create_distributed_table('order_t', 'a');ORDER BY ... LIMIT N被下推,它的執行計畫如下所示:EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;返回結果如下:
QUERY PLAN ------------------------------------------------------------------------ Limit -> Sort Sort Key: remote_scan.a -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Limit -> Sort Sort Key: a -> Seq Scan on order_t_102154 order_t (12 rows)
使用CTE改寫複雜查詢
對於無法通過親和組最佳化的複雜JOIN(例如JOIN條件不是分布列),最佳化器預設可能選擇一種低效的執行計畫(如全量拉取一張表的資料到協調節點再下發)。此時,可以使用CTE(WITH子句)來手動最佳化。
原理:
先通過CTE對大表進行高選擇性的過濾,產生一個小的中間結果集,然後再用這個小結果集去關聯另一張表。這樣可以大幅減少節點間需要傳輸的資料量。
樣本:
資料準備:建立
cte_t1表和cte_t2表,並以a列為分布列。在cte_t1表的b列上建立本地索引。-- 在不親和的表或列之間進行JOIN時,允許查詢最佳化工具產生資料重分區的執行計畫。 SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON; -- 建立普通表 CREATE TABLE cte_t1(a INT, b INT); CREATE TABLE cte_t2(a INT, b INT); -- 轉換分布表 SELECT create_distributed_table('cte_t1', 'a'); SELECT create_distributed_table('cte_t2', 'a'); -- 建立本地索引 CREATE INDEX local_idx_t1_b ON cte_t1(b); -- 插入測試資料 INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; -- 收集並更新這兩張表的統計資訊。 ANALYZE cte_t1, cte_t2;最佳化前:如果直接執行非親和JOIN查詢,會產生下面的執行計畫,它會首先在
cte_t2上進行全表掃描,將結果返回到CN,然後再下發到DN上,進行JSON:EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;返回結果如下:
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1) -> Distributed Subplan 46_1 Subplan Duration: 1488.57 ms Intermediate Data Size: 17 MB Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1) Task Count: 4 Tuple data received from nodes: 7813 kB Tasks Shown: One of 4 -> Task Tuple data received from node: 1950 kB Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Seq Scan on cte_t2_102165 cte_t2 (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l oops=1) Planning Time: 0.013 ms Execution Time: 48.372 ms Planning Time: 0.000 ms Execution Time: 531.747 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 480 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Hash Join (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1) Hash Cond: (intermediate_result.b = cte_t1.a) -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179 .565..278.108 rows=1000000 loops=1) -> Hash (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.021 ..0.021 rows=3 loops=1) Index Cond: (b = 1) Planning Time: 0.365 ms Execution Time: 332.955 ms Planning Time: 0.625 ms Execution Time: 1823.139 ms (34 rows)最佳化後:通過CTE先過濾
t1表(強制對cte_t1進行了索引掃描),並將結果返回到CN,既減少了掃描時間,也減少了網路傳輸:EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;返回結果如下:最佳化後的查詢速度會得到數十倍的提升,因為它避免了大規模的資料重分布。
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1) -> Distributed Subplan 49_1 Subplan Duration: 0.94 ms Intermediate Data Size: 180 bytes Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1) Task Count: 4 Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 32 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.005 ..0.005 rows=4 loops=1) Index Cond: (b = 1) Planning Time: 0.020 ms Execution Time: 0.014 ms Planning Time: 0.000 ms Execution Time: 0.779 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 496 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Gather (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1) Workers Planned: 1 Workers Launched: 1 -> Hash Join (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2) Hash Cond: (cte_t2.b = intermediate_result.a) -> Parallel Seq Scan on cte_t2_102163 cte_t2 (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows= 124936 loops=2) -> Hash (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..0.07 rows=10 width=8) (actual time =1.444..1.448 rows=10 loops=2) Planning Time: 0.068 ms Execution Time: 19.729 ms Planning Time: 0.514 ms Execution Time: 39.672 ms (37 rows)
SELECT ... FOR UPDATE限制
出於全域鎖協調的複雜性,SELECT ... FOR UPDATE語句不支援跨分區執行。如果查詢條件無法將範圍限定在單個分區內,該語句將會報錯。
SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR: FOR UPDATE is not supported for query of more than one share