すべてのプロダクト
Search
ドキュメントセンター

PolarDB:分散テーブルのクエリを最適化する

最終更新日:Nov 09, 2025

PolarDB for PostgreSQL (分散版) クラスターでは、クエリのパフォーマンスは SQL 文の記述方法に大きく依存します。適切に記述された SQL 文は、計算をデータノード (DN) にプッシュダウンして並列実行することで、分散アーキテクチャの利点を最大限に活用できます。このメソッドは、不要なデータ転送と調整のオーバーヘッドを回避します。このトピックでは、クエリを最適化するためのいくつかの実用的なメソッドについて説明します。

最適化メソッド

クエリシナリオ

最適化メソッド

中心的な原則

単一テーブルでのポイントクエリまたは小範囲スキャン

WHERE 句で分布列を使用して、等価性または範囲フィルターを実行します。

シャードプルーニング: データを格納する単一のシャードまたは少数のシャードを特定します。これにより、すべてのシャードをスキャンすることを回避します。

複数の large テーブルの結合

  • 同じ分布列を使用し、テーブルをコロケーショングループに配置します。

  • small テーブルをレプリケートされたテーブルに変換します。

計算プッシュダウン: JOIN 操作を各データノードにプッシュダウンしてローカルで実行します。これにより、ノード間のネットワークオーバーヘッドを回避します。

グループ化と集約 (GROUP BY)

分布列を GROUP BY 句に含めます。

集約プッシュダウン: ほとんどの集約操作をデータノード上でローカルに完了させます。クライアントノードは最終的なまとめのみを実行します。

複雑なクエリまたはコロケーションされていない JOIN

共通テーブル式 (CTE) を使用してクエリを再書き込みします。

データ再配布の削減: CTE を使用して、まず large テーブルをフィルターし、small な結果セットを生成します。次に、このより small なセットに対して後続の分散操作を実行します。これにより、ネットワーク経由で転送されるデータ量が削減されます。

分布列を使用したシャードプルーニング (単一テーブルのクエリ最適化)

シャードプルーニングは、分散クエリにとって最も基本的で効率的な最適化メソッドです。クエリ条件に分布列の等価性チェックが含まれている場合、オプティマイザーはデータを含む一意のシャードを直接特定できます。これにより、パフォーマンスは単一ノードのクエリに匹敵します。

原則:

クエリを実行すると、分散オプティマイザーは WHERE 句をチェックします。t2.a = 1000 のような分布列の条件を見つけると、オプティマイザーは 1000 のハッシュ値を計算します。これにより、データを格納する正確な物理シャードを特定できます。次に、他のすべての無関係なシャードをプルーニングし、ターゲットシャードを含むデータノードにのみクエリタスクを送信します。このプロセスはシャードプルーニングと呼ばれます。

:

  1. t2 テーブルを作成し、列 a を分布列として使用します。

    CREATE TABLE t2 (a INT, b INT);
    SELECT create_distributed_table('t2', 'a');
  2. a = 1000 のデータをクエリします。ポイントクエリは単一のシャードに直接ルーティングされます。この例では t2_102134 です。

    EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;

    次の結果が返されます。

                            QUERY PLAN                        
    ----------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 1 -- タスク数は 1 です。これは、クエリが単一のシャードにルーティングされることを意味します。
       Tasks Shown: All
       ->  Task
             Node: host=10.188.92.147 port=3003 dbname=testdb
             ->  Seq Scan on t2_102134 t2
                   Filter: (a = 1000)
    (7 rows)

コロケーショングループとレプリケートされたテーブルを使用した JOIN のプッシュダウン (複数テーブルの結合最適化)

分散環境では、ノード間の JOIN を回避することがパフォーマンスを向上させる鍵です。次の 2 つの方法で JOIN 計算をプッシュダウンできます。

  • コロケーショングループを使用する: 2 つ以上の large テーブルが同じ分布列で結合される場合、それらを同じコロケーショングループに配置できます。これにより、関連データが同じノードに存在することが保証され、ローカル JOIN が可能になります。

  • レプリケートされたテーブルを使用する: JOIN に関連するテーブルが次の条件を満たす場合、レプリケートされたテーブルとして作成できます。レプリケートされたテーブルは各データノードにレプリカを持ち、これもローカル JOIN を可能にします。

    1. 結合キーを分布列として使用するのに適していません。たとえば、別の列がすでに分布列として選択されている場合があります。

    2. テーブルはめったに変更されず、他の複数のテーブルと結合する必要があります。

原則:

オプティマイザーが JOIN 内の 2 つのテーブルがコロケーションされているか、またはそのうちの 1 つがレプリケートされたテーブルであり、JOIN 条件が分布列上にあることを見つけると、関連するデータ行がすべて同じデータノード上にあると判断します。したがって、オプティマイザーは JOIN 操作全体をタスクとして各データノードにプッシュダウンします。各ノードは JOIN 操作をローカルで並列に完了します。最後に、クライアントノード (CN) は各 DN からの結果を収集して要約するだけで済みます。このプロセスにより、JOIN のためにノード間で大量の生データを転送することが回避され、高いクエリパフォーマンスが実現します。

:

コロケーショングループを使用する

  1. colocation_t1 テーブルと colocation_t2 テーブルを作成します。両方の分布列として列 a を使用し、それらがコロケーションされていることを確認します。

    CREATE TABLE colocation_t1 (a INT, b INT);
    SELECT create_distributed_table('colocation_t1', 'a');
    
    CREATE TABLE colocation_t2 (a INT, b INT);
    SELECT create_distributed_table('colocation_t2', 'a');
  2. JOIN クエリを実行します。次の実行計画は、JOIN 操作が実行のために DN にプッシュダウンされることを示しています。

     EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;

    次の結果が返されます。

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4 -- タスクはすべてのシャードに分散されます。
       Tasks Shown: One of 4 
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join -- JOIN 操作はデータノード (DN) 内で実行されます。
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)
  3. サブクエリ: コロケーショングループの使用はサブクエリにも有効です。サブクエリの出力が分散テーブルと結合され、結合キーがそのサブクエリ内のテーブルの分布列でもある場合、サブクエリもプッシュダウンされます。

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;

    次の結果が返されます。

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t1.a = colocation_t2.a)
                   ->  Sort
                         Sort Key: colocation_t1.a
                         ->  Seq Scan on colocation_t1_102137 colocation_t1
                   ->  Sort
                         Sort Key: colocation_t2.a
                         ->  Seq Scan on colocation_t2_102141 colocation_t2
    (13 rows)

レプリケートされたテーブルを使用する

  1. colocation_t3 テーブルと colocation_t4 テーブルを作成します。colocation_t3 テーブルの分布列として列 a を使用し、colocation_t4 テーブルをレプリケートされたテーブルにします。

    CREATE TABLE colocation_t3 (a INT, b INT);
    SELECT create_distributed_table('colocation_t3', 'a');
    
    CREATE TABLE colocation_t4 (a INT, b INT);
    SELECT create_reference_table('colocation_t4');
  2. JOIN クエリを実行します。次の実行計画は、JOIN 操作が実行のために DN にプッシュダウンされることを示しています。

    EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;

    次の結果が返されます。

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  Merge Join
                   Merge Cond: (colocation_t3.a = colocation_t4.b)
                   ->  Sort
                         Sort Key: colocation_t3.a
                         ->  Seq Scan on colocation_t3_102145 colocation_t3
                   ->  Sort
                         Sort Key: colocation_t4.b
                         ->  Seq Scan on colocation_t4_102149 colocation_t4
    (13 rows)

集約とソートのプッシュダウン

集約およびソート操作の場合、計算をデータノードにプッシュダウンすると、CN の負荷が大幅に軽減され、ネットワークトラフィックが減少します。

集約プッシュダウン

  • GROUP BY 句の KEY に分布列が含まれている場合、SUMCOUNT などの集計関数は完全にデータノードにプッシュダウンされます。

  • GROUP BY 句の KEY に分布列が含まれていない場合、オプティマイザーは 2 段階の戦略を使用します。最初にデータノードで事前集約を実行し、次にクライアントノードで最終的な集約を実行します。

    1. ステージ 1 (DN でのローカル集約): 各データノードは、まず独自のデータシャードで集約操作を実行して、ローカルの中間結果を生成します。

    2. ステージ 2 (CN での最終集約): クライアントノードは、すべての DN からローカル結果を収集します。この結果セットは通常 small です。次に、CN は 2 回目の集約を実行して、最終的なグローバル結果を計算します。このメソッドは、ネットワーク経由でクライアントノードに転送する必要があるデータ量を削減します。

:

  1. group_t テーブルを作成し、列 a を分布列として使用します。

    CREATE TABLE group_t (a INT, b INT);
    SELECT create_distributed_table('group_t', 'a');
  2. 分布列が含まれている場合、集約は完全にプッシュダウンされます。集計関数は DN 上で完全に実行されます。実行計画は次のとおりです。

    EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;

    次の結果が返されます。

                           QUERY PLAN                        
    ---------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)
       Task Count: 4
       Tasks Shown: One of 4
       ->  Task
             Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
             ->  HashAggregate
                   Group Key: a
                   ->  Seq Scan on group_t_102150 group_t
    (8 rows)
  3. 分布列が含まれていない場合、2 段階の戦略が使用されます。集計関数は、DN と CN で 1 回ずつ、合計 2 回実行されます。

    EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;

    次の結果が返されます。

                              QUERY PLAN                           
    ---------------------------------------------------------------
     Aggregate
       ->  Custom Scan (PolarCluster Adaptive)
             Task Count: 4
             Tasks Shown: One of 4
             ->  Task
                   Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                   ->  Aggregate
                         ->  Seq Scan on group_t_102150 group_t
    (8 rows)

ソートのプッシュダウン

ORDER BY ... LIMIT N 句も、多くのシナリオでデータノードにプッシュダウンできます。各ノードは、まず独自の 上位 N 件の結果を返します。次に、クライアントノードはこれらの中間結果に対して最終的なソートを実行します。このメソッドにより、ソートする必要があるデータ量が大幅に削減されます。

たとえば、GROUP BY 句がない場合、または GROUP BY 句の KEY に分布列が含まれている場合、ORDER BY ... LIMIT N 操作はシャードにプッシュダウンされます。CN は、各シャードから返された結果に対して ORDER BY ... LIMIT N 操作を再度実行します。

:

  1. order_t テーブルを作成し、列 a を分布列として使用します。CREATE TABLE order_t (a INT, b INT); SELECT create_distributed_table('order_t', 'a');

    CREATE TABLE order_t (a INT, b INT);
    SELECT create_distributed_table('order_t', 'a');
  2. ORDER BY ... LIMIT N 操作がプッシュダウンされます。実行計画は次のとおりです。

    EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;

    次の結果が返されます。

                                   QUERY PLAN                               
    ------------------------------------------------------------------------
     Limit
       ->  Sort
             Sort Key: remote_scan.a
             ->  Custom Scan (PolarCluster Adaptive)
                   Task Count: 4
                   Tasks Shown: One of 4
                   ->  Task
                         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                         ->  Limit
                               ->  Sort
                                     Sort Key: a
                                     ->  Seq Scan on order_t_102154 order_t
    (12 rows)

CTE を使用した複雑なクエリの再書き込み

コロケーショングループで最適化できない複雑な JOIN (JOIN 条件が分布列にない場合など) の場合、オプティマイザーはデフォルトで非効率な実行計画を選択する可能性があります。たとえば、あるテーブルからすべてのデータをクライアントノードにプルし、それをデータノードに送り返すことがあります。この場合、CTE (WITH 句) を使用してクエリを手動で最適化できます。

原則:

まず、CTE を使用して、選択性の高いフィルターを large テーブルに適用できます。このステップでは、small な中間結果セットが生成されます。次に、この small な結果セットを別のテーブルと結合できます。このメソッドにより、ノード間で転送する必要があるデータ量を大幅に削減できます。

:

  1. データの準備: cte_t1 テーブルと cte_t2 テーブルを作成し、両方の分布列として列 a を使用します。cte_t1 テーブルの列 b にローカルインデックスを作成します。

    -- クエリオプティマイザーが、コロケーションされていないテーブルまたは列間の JOIN に対してデータ再配布実行計画を生成できるようにします。
    SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON;
    -- 標準テーブルの作成
    CREATE TABLE cte_t1(a INT, b INT);
    CREATE TABLE cte_t2(a INT, b INT);
    -- 分散テーブルへの変換
    SELECT create_distributed_table('cte_t1', 'a');
    SELECT create_distributed_table('cte_t2', 'a');
    -- ローカルインデックスの作成
    CREATE INDEX local_idx_t1_b ON cte_t1(b);
    -- テストデータの挿入
    INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i;
    -- これら 2 つのテーブルの統計を収集して更新します。
    ANALYZE cte_t1, cte_t2;
  2. 最適化前: コロケーションされていない JOIN クエリを直接実行すると、クエリは次の実行計画を生成します。まず cte_t2 で全表スキャンを実行し、結果を CN に返し、それを JOIN のために DN に送り返します。

    EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;

    次の結果が返されます。

                                                                                       QUERY PLAN                                               
                                        
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1)
       ->  Distributed Subplan 46_1
             Subplan Duration: 1488.57 ms
             Intermediate Data Size: 17 MB
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 7813 kB
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 1950 kB
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Seq Scan on cte_t2_102165 cte_t2  (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l
    oops=1)
                             Planning Time: 0.013 ms
                             Execution Time: 48.372 ms
             Planning Time: 0.000 ms
             Execution Time: 531.747 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 480 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Hash Join  (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1)
                   Hash Cond: (intermediate_result.b = cte_t1.a)
                   ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179
    .565..278.108 rows=1000000 loops=1)
                   ->  Hash  (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1)
                         Buckets: 1024  Batches: 1  Memory Usage: 9kB
                         ->  Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.021
    ..0.021 rows=3 loops=1)
                               Index Cond: (b = 1)
                 Planning Time: 0.365 ms
                 Execution Time: 332.955 ms
     Planning Time: 0.625 ms
     Execution Time: 1823.139 ms
    (34 rows)
  3. 最適化後: CTE を使用して、まず cte_t1 テーブルをフィルターできます。これにより、cte_t1 でインデックススキャンが強制され、small な結果セットが CN に返されます。このメソッドは、スキャン時間とネットワーク経由で転送されるデータ量の両方を削減します。

    EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;

    最適化されたクエリは、大規模なデータ再配布を回避するため、数十倍高速になります。

                                                                                    QUERY PLAN                                                  
                                   
    --------------------------------------------------------------------------------------------------------------------------------------------
     Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1)
       ->  Distributed Subplan 49_1
             Subplan Duration: 0.94 ms
             Intermediate Data Size: 180 bytes
             Result destination: Send to 2 nodes
             ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1)
                   Task Count: 4
                   Tuple data received from nodes: 80 bytes
                   Tasks Shown: One of 4
                   ->  Task
                         Tuple data received from node: 32 bytes
                         Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
                         ->  Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1  (cost=0.42..2.68 rows=3 width=8) (actual time=0.005
    ..0.005 rows=4 loops=1)
                               Index Cond: (b = 1)
                             Planning Time: 0.020 ms
                             Execution Time: 0.014 ms
             Planning Time: 0.000 ms
             Execution Time: 0.779 ms
       Task Count: 4
       Tuple data received from nodes: 1600 bytes
       Tasks Shown: One of 4
       ->  Task
             Tuple data received from node: 496 bytes
             Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb
             ->  Gather  (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1)
                   Workers Planned: 1
                   Workers Launched: 1
                   ->  Hash Join  (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2)
                         Hash Cond: (cte_t2.b = intermediate_result.a)
                         ->  Parallel Seq Scan on cte_t2_102163 cte_t2  (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows=
    124936 loops=2)
                         ->  Hash  (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2)
                               Buckets: 1024  Batches: 1  Memory Usage: 9kB
                               ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..0.07 rows=10 width=8) (actual time
    =1.444..1.448 rows=10 loops=2)
                 Planning Time: 0.068 ms
                 Execution Time: 19.729 ms
     Planning Time: 0.514 ms
     Execution Time: 39.672 ms
    (37 rows)

SELECT ... FOR UPDATE の制限事項

グローバルロック調整の複雑さのため、SELECT ... FOR UPDATE 文はシャード間の実行をサポートしていません。クエリ条件が範囲を単一のシャードに限定できない場合、文はエラーを返します。

SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR:  FOR UPDATE is not supported for query of more than one shard