PolarDB for PostgreSQL (分散版) クラスターでは、クエリのパフォーマンスは SQL 文の記述方法に大きく依存します。適切に記述された SQL 文は、計算をデータノード (DN) にプッシュダウンして並列実行することで、分散アーキテクチャの利点を最大限に活用できます。このメソッドは、不要なデータ転送と調整のオーバーヘッドを回避します。このトピックでは、クエリを最適化するためのいくつかの実用的なメソッドについて説明します。
最適化メソッド
クエリシナリオ | 最適化メソッド | 中心的な原則 |
単一テーブルでのポイントクエリまたは小範囲スキャン |
| シャードプルーニング: データを格納する単一のシャードまたは少数のシャードを特定します。これにより、すべてのシャードをスキャンすることを回避します。 |
複数の large テーブルの結合 |
| 計算プッシュダウン: JOIN 操作を各データノードにプッシュダウンしてローカルで実行します。これにより、ノード間のネットワークオーバーヘッドを回避します。 |
グループ化と集約 (GROUP BY) | 分布列を | 集約プッシュダウン: ほとんどの集約操作をデータノード上でローカルに完了させます。クライアントノードは最終的なまとめのみを実行します。 |
複雑なクエリまたはコロケーションされていない JOIN | 共通テーブル式 (CTE) を使用してクエリを再書き込みします。 | データ再配布の削減: CTE を使用して、まず large テーブルをフィルターし、small な結果セットを生成します。次に、このより small なセットに対して後続の分散操作を実行します。これにより、ネットワーク経由で転送されるデータ量が削減されます。 |
分布列を使用したシャードプルーニング (単一テーブルのクエリ最適化)
シャードプルーニングは、分散クエリにとって最も基本的で効率的な最適化メソッドです。クエリ条件に分布列の等価性チェックが含まれている場合、オプティマイザーはデータを含む一意のシャードを直接特定できます。これにより、パフォーマンスは単一ノードのクエリに匹敵します。
原則:
クエリを実行すると、分散オプティマイザーは WHERE 句をチェックします。t2.a = 1000 のような分布列の条件を見つけると、オプティマイザーは 1000 のハッシュ値を計算します。これにより、データを格納する正確な物理シャードを特定できます。次に、他のすべての無関係なシャードをプルーニングし、ターゲットシャードを含むデータノードにのみクエリタスクを送信します。このプロセスはシャードプルーニングと呼ばれます。
例:
t2テーブルを作成し、列aを分布列として使用します。CREATE TABLE t2 (a INT, b INT); SELECT create_distributed_table('t2', 'a');a = 1000のデータをクエリします。ポイントクエリは単一のシャードに直接ルーティングされます。この例ではt2_102134です。EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;次の結果が返されます。
QUERY PLAN ---------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 1 -- タスク数は 1 です。これは、クエリが単一のシャードにルーティングされることを意味します。 Tasks Shown: All -> Task Node: host=10.188.92.147 port=3003 dbname=testdb -> Seq Scan on t2_102134 t2 Filter: (a = 1000) (7 rows)
コロケーショングループとレプリケートされたテーブルを使用した JOIN のプッシュダウン (複数テーブルの結合最適化)
分散環境では、ノード間の JOIN を回避することがパフォーマンスを向上させる鍵です。次の 2 つの方法で JOIN 計算をプッシュダウンできます。
コロケーショングループを使用する: 2 つ以上の large テーブルが同じ分布列で結合される場合、それらを同じコロケーショングループに配置できます。これにより、関連データが同じノードに存在することが保証され、ローカル JOIN が可能になります。
レプリケートされたテーブルを使用する: JOIN に関連するテーブルが次の条件を満たす場合、レプリケートされたテーブルとして作成できます。レプリケートされたテーブルは各データノードにレプリカを持ち、これもローカル JOIN を可能にします。
結合キーを分布列として使用するのに適していません。たとえば、別の列がすでに分布列として選択されている場合があります。
テーブルはめったに変更されず、他の複数のテーブルと結合する必要があります。
原則:
オプティマイザーが JOIN 内の 2 つのテーブルがコロケーションされているか、またはそのうちの 1 つがレプリケートされたテーブルであり、JOIN 条件が分布列上にあることを見つけると、関連するデータ行がすべて同じデータノード上にあると判断します。したがって、オプティマイザーは JOIN 操作全体をタスクとして各データノードにプッシュダウンします。各ノードは JOIN 操作をローカルで並列に完了します。最後に、クライアントノード (CN) は各 DN からの結果を収集して要約するだけで済みます。このプロセスにより、JOIN のためにノード間で大量の生データを転送することが回避され、高いクエリパフォーマンスが実現します。
例:
コロケーショングループを使用する
colocation_t1テーブルとcolocation_t2テーブルを作成します。両方の分布列として列aを使用し、それらがコロケーションされていることを確認します。CREATE TABLE colocation_t1 (a INT, b INT); SELECT create_distributed_table('colocation_t1', 'a'); CREATE TABLE colocation_t2 (a INT, b INT); SELECT create_distributed_table('colocation_t2', 'a');JOIN クエリを実行します。次の実行計画は、JOIN 操作が実行のために DN にプッシュダウンされることを示しています。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;次の結果が返されます。
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 -- タスクはすべてのシャードに分散されます。 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join -- JOIN 操作はデータノード (DN) 内で実行されます。 Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)サブクエリ: コロケーショングループの使用はサブクエリにも有効です。サブクエリの出力が分散テーブルと結合され、結合キーがそのサブクエリ内のテーブルの分布列でもある場合、サブクエリもプッシュダウンされます。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t1 JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;次の結果が返されます。
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t1.a = colocation_t2.a) -> Sort Sort Key: colocation_t1.a -> Seq Scan on colocation_t1_102137 colocation_t1 -> Sort Sort Key: colocation_t2.a -> Seq Scan on colocation_t2_102141 colocation_t2 (13 rows)
レプリケートされたテーブルを使用する
colocation_t3テーブルとcolocation_t4テーブルを作成します。colocation_t3テーブルの分布列として列aを使用し、colocation_t4テーブルをレプリケートされたテーブルにします。CREATE TABLE colocation_t3 (a INT, b INT); SELECT create_distributed_table('colocation_t3', 'a'); CREATE TABLE colocation_t4 (a INT, b INT); SELECT create_reference_table('colocation_t4');JOIN クエリを実行します。次の実行計画は、JOIN 操作が実行のために DN にプッシュダウンされることを示しています。
EXPLAIN (COSTS OFF) SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;次の結果が返されます。
QUERY PLAN ------------------------------------------------------------------------ Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Merge Join Merge Cond: (colocation_t3.a = colocation_t4.b) -> Sort Sort Key: colocation_t3.a -> Seq Scan on colocation_t3_102145 colocation_t3 -> Sort Sort Key: colocation_t4.b -> Seq Scan on colocation_t4_102149 colocation_t4 (13 rows)
集約とソートのプッシュダウン
集約およびソート操作の場合、計算をデータノードにプッシュダウンすると、CN の負荷が大幅に軽減され、ネットワークトラフィックが減少します。
集約プッシュダウン
GROUP BY句のKEYに分布列が含まれている場合、SUMやCOUNTなどの集計関数は完全にデータノードにプッシュダウンされます。GROUP BY句のKEYに分布列が含まれていない場合、オプティマイザーは 2 段階の戦略を使用します。最初にデータノードで事前集約を実行し、次にクライアントノードで最終的な集約を実行します。ステージ 1 (DN でのローカル集約): 各データノードは、まず独自のデータシャードで集約操作を実行して、ローカルの中間結果を生成します。
ステージ 2 (CN での最終集約): クライアントノードは、すべての DN からローカル結果を収集します。この結果セットは通常 small です。次に、CN は 2 回目の集約を実行して、最終的なグローバル結果を計算します。このメソッドは、ネットワーク経由でクライアントノードに転送する必要があるデータ量を削減します。
例:
group_tテーブルを作成し、列aを分布列として使用します。CREATE TABLE group_t (a INT, b INT); SELECT create_distributed_table('group_t', 'a');分布列が含まれている場合、集約は完全にプッシュダウンされます。集計関数は DN 上で完全に実行されます。実行計画は次のとおりです。
EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;次の結果が返されます。
QUERY PLAN --------------------------------------------------------- Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> HashAggregate Group Key: a -> Seq Scan on group_t_102150 group_t (8 rows)分布列が含まれていない場合、2 段階の戦略が使用されます。集計関数は、DN と CN で 1 回ずつ、合計 2 回実行されます。
EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;次の結果が返されます。
QUERY PLAN --------------------------------------------------------------- Aggregate -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Aggregate -> Seq Scan on group_t_102150 group_t (8 rows)
ソートのプッシュダウン
ORDER BY ... LIMIT N 句も、多くのシナリオでデータノードにプッシュダウンできます。各ノードは、まず独自の 上位 N 件の結果を返します。次に、クライアントノードはこれらの中間結果に対して最終的なソートを実行します。このメソッドにより、ソートする必要があるデータ量が大幅に削減されます。
たとえば、GROUP BY 句がない場合、または GROUP BY 句の KEY に分布列が含まれている場合、ORDER BY ... LIMIT N 操作はシャードにプッシュダウンされます。CN は、各シャードから返された結果に対して ORDER BY ... LIMIT N 操作を再度実行します。
例:
order_tテーブルを作成し、列aを分布列として使用します。CREATE TABLE order_t (a INT, b INT); SELECT create_distributed_table('order_t', 'a');CREATE TABLE order_t (a INT, b INT); SELECT create_distributed_table('order_t', 'a');ORDER BY ... LIMIT N操作がプッシュダウンされます。実行計画は次のとおりです。EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;次の結果が返されます。
QUERY PLAN ------------------------------------------------------------------------ Limit -> Sort Sort Key: remote_scan.a -> Custom Scan (PolarCluster Adaptive) Task Count: 4 Tasks Shown: One of 4 -> Task Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb -> Limit -> Sort Sort Key: a -> Seq Scan on order_t_102154 order_t (12 rows)
CTE を使用した複雑なクエリの再書き込み
コロケーショングループで最適化できない複雑な JOIN (JOIN 条件が分布列にない場合など) の場合、オプティマイザーはデフォルトで非効率な実行計画を選択する可能性があります。たとえば、あるテーブルからすべてのデータをクライアントノードにプルし、それをデータノードに送り返すことがあります。この場合、CTE (WITH 句) を使用してクエリを手動で最適化できます。
原則:
まず、CTE を使用して、選択性の高いフィルターを large テーブルに適用できます。このステップでは、small な中間結果セットが生成されます。次に、この small な結果セットを別のテーブルと結合できます。このメソッドにより、ノード間で転送する必要があるデータ量を大幅に削減できます。
例:
データの準備:
cte_t1テーブルとcte_t2テーブルを作成し、両方の分布列として列aを使用します。cte_t1テーブルの列bにローカルインデックスを作成します。-- クエリオプティマイザーが、コロケーションされていないテーブルまたは列間の JOIN に対してデータ再配布実行計画を生成できるようにします。 SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON; -- 標準テーブルの作成 CREATE TABLE cte_t1(a INT, b INT); CREATE TABLE cte_t2(a INT, b INT); -- 分散テーブルへの変換 SELECT create_distributed_table('cte_t1', 'a'); SELECT create_distributed_table('cte_t2', 'a'); -- ローカルインデックスの作成 CREATE INDEX local_idx_t1_b ON cte_t1(b); -- テストデータの挿入 INSERT INTO cte_t1(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; INSERT INTO cte_t2(a, b) SELECT i , i / 10 FROM generate_series(1, 1000000)i; -- これら 2 つのテーブルの統計を収集して更新します。 ANALYZE cte_t1, cte_t2;最適化前: コロケーションされていない JOIN クエリを直接実行すると、クエリは次の実行計画を生成します。まず
cte_t2で全表スキャンを実行し、結果を CN に返し、それを JOIN のために DN に送り返します。EXPLAIN ANALYZE SELECT * FROM cte_t1, cte_t2 WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;次の結果が返されます。
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=334.511..334.519 rows=100 loops=1) -> Distributed Subplan 46_1 Subplan Duration: 1488.57 ms Intermediate Data Size: 17 MB Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=419.049..505.639 rows=1000000 loops=1) Task Count: 4 Tuple data received from nodes: 7813 kB Tasks Shown: One of 4 -> Task Tuple data received from node: 1950 kB Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Seq Scan on cte_t2_102165 cte_t2 (cost=0.00..4032.51 rows=249651 width=8) (actual time=0.004..12.029 rows=249651 l oops=1) Planning Time: 0.013 ms Execution Time: 48.372 ms Planning Time: 0.000 ms Execution Time: 531.747 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 480 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Hash Join (cost=2.71..10950.10 rows=1 width=16) (actual time=179.605..329.055 rows=30 loops=1) Hash Cond: (intermediate_result.b = cte_t1.a) -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..7197.27 rows=1000000 width=8) (actual time=179 .565..278.108 rows=1000000 loops=1) -> Hash (cost=2.68..2.68 rows=3 width=8) (actual time=0.024..0.025 rows=3 loops=1) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Index Scan using local_idx_t1_b_102161 on cte_t1_102161 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.021 ..0.021 rows=3 loops=1) Index Cond: (b = 1) Planning Time: 0.365 ms Execution Time: 332.955 ms Planning Time: 0.625 ms Execution Time: 1823.139 ms (34 rows)最適化後: CTE を使用して、まず
cte_t1テーブルをフィルターできます。これにより、cte_t1でインデックススキャンが強制され、small な結果セットが CN に返されます。このメソッドは、スキャン時間とネットワーク経由で転送されるデータ量の両方を削減します。EXPLAIN ANALYZE WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1) SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;最適化されたクエリは、大規模なデータ再配布を回避するため、数十倍高速になります。
QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------- Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=16) (actual time=38.689..38.693 rows=100 loops=1) -> Distributed Subplan 49_1 Subplan Duration: 0.94 ms Intermediate Data Size: 180 bytes Result destination: Send to 2 nodes -> Custom Scan (PolarCluster Adaptive) (cost=0.00..0.00 rows=100000 width=8) (actual time=0.760..0.761 rows=10 loops=1) Task Count: 4 Tuple data received from nodes: 80 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 32 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Index Scan using local_idx_t1_b_102159 on cte_t1_102159 cte_t1 (cost=0.42..2.68 rows=3 width=8) (actual time=0.005 ..0.005 rows=4 loops=1) Index Cond: (b = 1) Planning Time: 0.020 ms Execution Time: 0.014 ms Planning Time: 0.000 ms Execution Time: 0.779 ms Task Count: 4 Tuple data received from nodes: 1600 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 496 bytes Node: host=10.xxx.xxx.xxx port=3003 dbname=testdb -> Gather (cost=1000.20..4872.21 rows=3148 width=16) (actual time=0.450..19.685 rows=31 loops=1) Workers Planned: 1 Workers Launched: 1 -> Hash Join (cost=0.20..3557.41 rows=1852 width=16) (actual time=5.368..13.807 rows=16 loops=2) Hash Cond: (cte_t2.b = intermediate_result.a) -> Parallel Seq Scan on cte_t2_102163 cte_t2 (cost=0.00..3005.84 rows=146984 width=8) (actual time=0.005..6.544 rows= 124936 loops=2) -> Hash (cost=0.07..0.07 rows=10 width=8) (actual time=1.451..1.452 rows=10 loops=2) Buckets: 1024 Batches: 1 Memory Usage: 9kB -> Function Scan on read_intermediate_result intermediate_result (cost=0.00..0.07 rows=10 width=8) (actual time =1.444..1.448 rows=10 loops=2) Planning Time: 0.068 ms Execution Time: 19.729 ms Planning Time: 0.514 ms Execution Time: 39.672 ms (37 rows)
SELECT ... FOR UPDATE の制限事項
グローバルロック調整の複雑さのため、SELECT ... FOR UPDATE 文はシャード間の実行をサポートしていません。クエリ条件が範囲を単一のシャードに限定できない場合、文はエラーを返します。
SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR: FOR UPDATE is not supported for query of more than one shard