全部產品
Search
文件中心

PolarDB:最佳化分布表查詢

更新時間:Sep 12, 2025

PolarDB PostgreSQL分布式版叢集中,查詢效能與SQL的寫法密切相關。編寫優良的SQL可以最大化地利用分布式架構的優勢,將計算下推到資料節點(DN)並存執行,從而避免不必要的資料轉送和協調開銷。本文為您提供一套實用的查詢最佳化方法。

最佳化方式

查詢情境

最佳化方法

核心原則

單表點查或小範圍掃描

WHERE子句中使用分布列進行等值或範圍過濾。

分區剪枝:精確定位到儲存資料的單個或少數分區,避免全部分區掃描。

多張大表關聯(JOIN)

  • 使用相同的分布列,並放入親和組。

  • 將小表改造為複製表。

計算下推:將JOIN操作下推到各資料節點本地執行,避免跨節點網路開銷。

分組彙總(GROUP BY)

GROUP BY子句中包含分布列。

彙總下推:將大部分彙總運算在資料節點本地完成,僅在協調節點做最終匯總。

複雜查詢或非親和JOIN

使用**CTE(Common Table Expressions)**改寫查詢。

減少資料重分布:通過CTE先過濾出小結果集,再進行後續的分布式操作,降低網路傳輸量。

利用分布列進行分區剪枝(單表查詢最佳化)

分區剪枝是分散式查詢中最基礎、最高效的最佳化手段。當查詢條件中包含分布列的等值判斷時,最佳化器能直接定位到資料所在的唯一分區,效能接近單機查詢。

原理:

當您發起查詢時,分布式最佳化器會檢查WHERE子句。如果它發現條件作用於分布列(如t2.a = 1000),最佳化器會根據1000這個值計算出其雜湊值,從而精確推斷出資料存放區在哪一個物理分區上。隨後,它會剪掉所有其他無關的分區,只將查詢任務發送給目標分區所在的資料節點。這個過程就是分區剪枝。

樣本

  1. 建立t2表,並以a列為分布列。

    CREATE TABLE t2 (a INT, b INT);
    SELECT create_distributed_table('t2', 'a');
  2. 查詢a = 1000的資料,點查會直接被定位到唯一的分區(在下面的例子中是t2_102134這個分區)。

    EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;

    返回結果如下:

                            QUERY PLAN                        
    ----------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 1 -- 任務數僅為1,表示查詢已精確定位到單個分區
       Tasks Shown: All
       ->  Task
             Node: host=10.188.92.147 port=3003 dbname=testdb
             ->  Seq Scan on t2_102134 t2
                   Filter: (a = 1000)
    (7 rows)

通過親和組與複製表下推JOIN(多表關聯最佳化)

在分布式環境中,避免跨節點JOIN是提升效能的關鍵。您可以通過以下兩種方式實現JOIN計算下推:

  • 使用親和組:如果兩張或多張大表使用相同的分布列進行JOIN,將它們設定為同一親和組,可以保證關聯資料位元於同一節點,從而實現本地JOIN。

  • 使用複製表:如果有一張參與JOIN的表符合以下條件時,可將表建立為複製表。複製表在每個資料節點上都存有副本,同樣能實現本地JOIN。

    1. 由於某種原因不適合使用串連鍵作為分布列來建立分布表(例如,已經選擇了其他的分布列來建立分布表)。

    2. 表的修改比較少,且需要被其他多張表JOIN。

原理

當最佳化器發現JOIN的兩張表是親和的(或其中一張是複製表),並且JOIN條件是分布列時,它就能確定關聯的資料行都位於同一個資料節點上。因此,最佳化器會將整個JOIN操作作為一個任務下推到每個資料節點。各個節點在本地完成JOIN運算,並進行並存執行。最後,協調節點(CN)只需收集並匯總各個DN返回的結果即可。這個過程避免了在節點間傳輸海量的未經處理資料來進行JOIN,從而實現了高效能的查詢。

樣本

使用親和組

  1. 建立colocation_t1表和colocation_t2表,並均以a列為分布列,且它們是親和的。

    CREATE TABLE colocation_t1 (a INT, b INT);
    SELECT create_distributed_table('colocation_t1', 'a');
    
    CREATE TABLE colocation_t2 (a INT, b INT);
    SELECT create_distributed_table('colocation_t2', 'a');
  2. 進行JOIN關聯查詢。從下述執行計畫中可以看到JOIN操作被下推至資料節點(DN)中執行。

     EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;

    返回結果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4 -- 任務分發到所有分區
       Tasks Shown: One of 4 
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join -- JOIN操作在資料節點(DN)內部執行
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)
  3. 子查詢:使用親和組的方式對子查詢同樣有有效。如果子查詢的輸出和分布表進行串連,且串連鍵是子查詢內部的涉及到的分布表的分布列,那麼也會使得子查詢被下推。

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;

    返回結果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)

使用複製表

  1. 建立colocation_t3表和colocation_t4表。colocation_t3表以a列為分布列,colocation_t4表為複製表。

    CREATE TABLE colocation_t3 (a INT, b INT);
    SELECT create_distributed_table('colocation_t3', 'a');
    
    CREATE TABLE colocation_t4 (a INT, b INT);
    SELECT create_reference_table('colocation_t4');
  2. 進行JOIN關聯查詢。從下述執行計畫中可以看到JOIN操作被下推至資料節點(DN)中執行。

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;

    返回結果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t3.a = colocation_t4.b)
                   ->  Sort
                         Sort Key: colocation_t3.a
                         ->  Seq Scan on colocation_t3_102145 colocation_t3
                   ->  Sort
                         Sort Key: colocation_t4.b
                         ->  Seq Scan on colocation_t4_102149 colocation_t4
    (13 rows)

下推彙總與排序

對於彙總和排序操作,將計算下推到資料節點能顯著降低協調節點(CN)的壓力和網路流量。

彙總下推

  • GROUP BYKEY包含分布列時,彙總函式(如SUMCOUNT)會完全下推到資料節點。

  • GROUP BYKEY不包含分布列時,最佳化器會採用兩階段策略,先在資料節點進行預彙總,然後在協調節點做最終彙總。

    1. 第一階段(DN本地彙總):每個資料節點先對自己持有的資料分區進行一次彙總運算,得出一個局部的中間結果。

    2. 第二階段(CN最終彙總):協調節點收集所有DN發來的局部結果(這個結果集通常很小),然後執行第二次彙總,計算出最終的全域結果。這種方式,減少了需要通過網路傳輸到協調節點的資料量。

樣本

  1. 建立group_t表,並以a列為分布列。

    CREATE TABLE group_t (a INT, b INT);
    SELECT create_distributed_table('group_t', 'a');
  2. 包含分布列時,完全下推。彙總函式將完全在 DN 上執行,它的執行計畫如下所示:

    EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;

    返回結果如下:

                           QUERY PLAN                        
    ---------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  HashAggregate
                   Group Key: a
                   ->  Seq Scan on group_t_102150 group_t
    (8 rows)
  3. 不包含分布列時,採用兩階段策略。會執行兩次彙總函式,分別是在DN和CN上執行:

    EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;

    返回結果如下:

                              QUERY PLAN                           
    ---------------------------------------------------------------
     Aggregate
       ->  Custom Scan (PolarCluster Adaptive)
             Task Count: 4
             Tasks Shown: One of 4
             ->  Task
                   Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                   ->  Aggregate
                         ->  Seq Scan on group_t_102150 group_t
    (8 rows)

排序下推

ORDER BY ... LIMIT N子句在很多情境下也能被下推到資料節點,每個節點先返回自己的Top N,協調節點再做最終排序,極大減少了需要排序的資料量。

例如:如果不存在GROUP BY子句,或者GROUP BY子句的KEY包含了分布列,那麼ORDER BY ... LIMIT N會下推到分區上。對於每一個分區返回的結果,CN會再次執行ORDER BY ... LIMIT N的操作。

樣本

  1. 建立order_t表,並以a列為分布列。

    CREATE TABLE order_t (a INT, b INT);
    SELECT create_distributed_table('order_t', 'a');
  2. ORDER BY ... LIMIT N被下推,它的執行計畫如下所示:

    EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;

    返回結果如下:

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Limit
       ->  Sort
             Sort Key: remote_scan.a
             ->  Custom Scan (PolarCluster Adaptive)
                   Task Count: 4
                   Tasks Shown: One of 4
                   ->  Task
                         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                         ->  Limit
                               ->  Sort
                                     Sort Key: a
                                     ->  Seq Scan on order_t_102154 order_t
    (12 rows)

使用CTE改寫複雜查詢

對於無法通過親和組最佳化的複雜JOIN(例如JOIN條件不是分布列),最佳化器預設可能選擇一種低效的執行計畫(如全量拉取一張表的資料到協調節點再下發)。此時,可以使用CTE(WITH子句)來手動最佳化。

原理

先通過CTE對大表進行高選擇性的過濾,產生一個小的中間結果集,然後再用這個小結果集去關聯另一張表。這樣可以大幅減少節點間需要傳輸的資料量。

樣本

  1. 資料準備:建立cte_t1表和cte_t2表,並以a列為分布列。在cte_t1表的b列上建立本地索引。

    -- 在不親和的表或列之間進行JOIN時,允許查詢最佳化工具產生資料重分區的執行計畫。
    SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON;
    -- 建立普通表
    CREATE TABLE cte_t1(a INT, b INT);
    CREATE TABLE cte_t2(a INT, b INT);
    -- 轉換分布表
    SELECT create_distributed_table('cte_t1', 'a');
    SELECT create_distributed_table('cte_t2', 'a');
    -- 建立本地索引
    CREATE INDEX local_idx_t1_b ON cte_t1(b);
    -- 插入測試資料
    INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    -- 收集並更新這兩張表的統計資訊。
    ANALYZE cte_t1, cte_t2;
  2. 最佳化前:如果直接執行非親和JOIN查詢,會產生下面的執行計畫,它會首先在cte_t2上進行全表掃描,將結果返回到CN,然後再下發到DN上,進行JSON:

    EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;

    返回結果如下:

                                                                                       QUERY PLAN                                               
                                        
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1)
       ->  Distributed Subplan 46_1
             Subplan Duration: 1488.57 ms
             Intermediate Data Size: 17 MB
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 7813 kB
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 1950 kB
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Seq Scan on cte_t2_102165 cte_t2  (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l
    oops=1)
                             Planning Time: 0.013 ms
                             Execution Time: 48.372 ms
             Planning Time: 0.000 ms
             Execution Time: 531.747 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 480 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Hash Join  (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1)
                   Hash Cond: (intermediate_result.b = cte_t1.a)
                   ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179
    .565..278.108 rows=1000000 loops=1)
                   ->  Hash  (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1)
                         Buckets: 1024  Batches: 1  Memory Usage: 9kB
                         ->  Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.021
    ..0.021 rows=3 loops=1)
                               Index Cond: (b = 1)
                 Planning Time: 0.365 ms
                 Execution Time: 332.955 ms
     Planning Time: 0.625 ms
     Execution Time: 1823.139 ms
    (34 rows)
  3. 最佳化後:通過CTE先過濾t1表(強制對cte_t1進行了索引掃描),並將結果返回到CN,既減少了掃描時間,也減少了網路傳輸:

    EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;

    返回結果如下:最佳化後的查詢速度會得到數十倍的提升,因為它避免了大規模的資料重分布。

                                                                                    QUERY PLAN                                                  
                                   
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1)
       ->  Distributed Subplan 49_1
             Subplan Duration: 0.94 ms
             Intermediate Data Size: 180 bytes
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 80 bytes
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 32 bytes
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.005
    ..0.005 rows=4 loops=1)
                               Index Cond: (b = 1)
                             Planning Time: 0.020 ms
                             Execution Time: 0.014 ms
             Planning Time: 0.000 ms
             Execution Time: 0.779 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 496 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Gather  (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1)
                   Workers Planned: 1
                   Workers Launched: 1
                   ->  Hash Join  (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2)
                         Hash Cond: (cte_t2.b = intermediate_result.a)
                         ->  Parallel Seq Scan on cte_t2_102163 cte_t2  (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows=
    124936 loops=2)
                         ->  Hash  (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2)
                               Buckets: 1024  Batches: 1  Memory Usage: 9kB
                               ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..0.07 rows=10 width=8) (actual time
    =1.444..1.448 rows=10 loops=2)
                 Planning Time: 0.068 ms
                 Execution Time: 19.729 ms
     Planning Time: 0.514 ms
     Execution Time: 39.672 ms
    (37 rows)

SELECT ... FOR UPDATE限制

出於全域鎖協調的複雜性,SELECT ... FOR UPDATE語句不支援跨分區執行。如果查詢條件無法將範圍限定在單個分區內,該語句將會報錯。

SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR:  FOR UPDATE is not supported for query of more than one share