In a PolarDB for PostgreSQL (Distributed Edition) cluster, every optimization technique shares the same goal: keep data movement between nodes to a minimum. The less data your query shuffles across the network, the faster it runs. This topic covers four practical techniques — shard pruning, JOIN pushdown, aggregation and sorting pushdown, and CTE rewriting — each targeting a specific query pattern.
Optimization methods at a glance
| Query pattern | Technique | Mechanism |
|---|---|---|
| Point queries or small-range scans on a single table | Include the distribution column in WHERE with an equality or range condition | Shard pruning: the optimizer routes the query directly to the one shard that holds the data |
| Joins on multiple large tables | Place tables in the same colocation group, or convert small lookup tables to replicated tables | JOIN pushdown: the JOIN runs locally on each data node (DN), with no cross-node data transfer |
GROUP BY aggregations | Include the distribution column in GROUP BY | Aggregation pushdown: each DN aggregates its own data; the client node (CN) only performs the final merge |
| Complex queries or non-colocated JOINs | Rewrite with a Common Table Expression (CTE) | Reduced data redistribution: filter the large table first to produce a small intermediate result, then join on that smaller set |
Shard pruning for single-table queries
Shard pruning is the most direct optimization available for single-table queries. When a WHERE clause includes an equality condition on the distribution column, the optimizer calculates the hash value of the filter, identifies the exact shard that holds the matching rows, and skips all other shards entirely.
How it works
Given WHERE t2.a = 1000, the optimizer hashes 1000, maps the result to a physical shard, and sends the query only to the DN that owns that shard. The execution plan shows Task Count: 1, confirming that only one shard was accessed.
Example
Create the t2 table, distributed on column a:
CREATE TABLE t2 (a INT, b INT);
SELECT create_distributed_table('t2', 'a');Run a point query:
EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;Expected output:
QUERY PLAN
----------------------------------------------------------
Custom Scan (PolarCluster Adaptive)
Task Count: 1 -- routed to a single shard
Tasks Shown: All
-> Task
Node: host=10.188.92.147 port=3003 dbname=testdb
-> Seq Scan on t2_102134 t2
Filter: (a = 1000)
(7 rows)Task Count: 1 confirms that the query reached exactly one shard, making this equivalent in performance to a single-node lookup.
JOIN pushdown for multi-table queries
In a distributed cluster, the most expensive part of a JOIN is often not the computation itself — it is moving data between nodes. The goal is to push the entire JOIN down to the DNs so each node joins its own local data in parallel, with no cross-node transfers.
Two approaches enable this:
Colocation group: distribute two or more large tables on the same column so related rows always land on the same DN.
Replicated table: copy a small, infrequently updated table to every DN so it can be joined locally regardless of the join key.
Use a colocation group
Colocation works when the JOIN condition is on the distribution column. Rows with the same distribution column value are guaranteed to be on the same DN, so the JOIN never crosses a node boundary.
Create two tables distributed on the same column a:
CREATE TABLE colocation_t1 (a INT, b INT);
SELECT create_distributed_table('colocation_t1', 'a');
CREATE TABLE colocation_t2 (a INT, b INT);
SELECT create_distributed_table('colocation_t2', 'a');Run a JOIN query:
EXPLAIN (COSTS OFF)
SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;Expected output:
QUERY PLAN
------------------------------------------------------------------------
Custom Scan (PolarCluster Adaptive)
Task Count: 4 -- distributed to all shards in parallel
Tasks Shown: One of 4
-> Task
Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
-> Merge Join -- JOIN executes entirely within this DN
Merge Cond: (colocation_t1.a = colocation_t2.a)
-> Sort
Sort Key: colocation_t1.a
-> Seq Scan on colocation_t1_102137 colocation_t1
-> Sort
Sort Key: colocation_t2.a
-> Seq Scan on colocation_t2_102141 colocation_t2
(13 rows)Colocation also applies to subqueries. If a subquery's output is joined on the distribution column of a colocated table, the entire subquery is pushed down as well:
EXPLAIN (COSTS OFF)
SELECT * FROM colocation_t1
JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;Use a replicated table
A replicated table stores a full copy of the data on every DN. This makes it joinable with any distributed table on any column, because the data is always local.
Use a replicated table when:
The join key is not the distribution column of the other table.
The table is infrequently modified and needs to be joined with multiple other tables.
Create a distributed table and a replicated table:
CREATE TABLE colocation_t3 (a INT, b INT);
SELECT create_distributed_table('colocation_t3', 'a');
CREATE TABLE colocation_t4 (a INT, b INT);
SELECT create_reference_table('colocation_t4'); -- replicated to every DNRun a JOIN on a non-distribution column (colocation_t4.b):
EXPLAIN (COSTS OFF)
SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;Expected output:
QUERY PLAN
------------------------------------------------------------------------
Custom Scan (PolarCluster Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
-> Merge Join -- JOIN executes locally; no cross-node transfer
Merge Cond: (colocation_t3.a = colocation_t4.b)
-> Sort
Sort Key: colocation_t3.a
-> Seq Scan on colocation_t3_102145 colocation_t3
-> Sort
Sort Key: colocation_t4.b
-> Seq Scan on colocation_t4_102149 colocation_t4
(13 rows)Aggregation and sorting pushdown
Aggregation pushdown
The behavior of aggregation pushdown depends on whether the GROUP BY key includes the distribution column:
Distribution column included: aggregate functions such as
SUMandCOUNTrun entirely on the DNs. The CN receives pre-aggregated results and does no additional computation.Distribution column not included: the optimizer uses a two-stage strategy. Each DN first performs a local aggregation on its own shards (stage 1), then the CN collects these intermediate results and runs a final aggregation (stage 2). Because the intermediate results are small, far less data travels over the network than a full scan would require.
Example
Create the group_t table, distributed on column a:
CREATE TABLE group_t (a INT, b INT);
SELECT create_distributed_table('group_t', 'a');Full pushdown — distribution column a is in GROUP BY:
EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;QUERY PLAN
---------------------------------------------------------
Custom Scan (PolarCluster Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
-> HashAggregate -- aggregation runs entirely on DNs
Group Key: a
-> Seq Scan on group_t_102150 group_t
(8 rows)Two-stage strategy — distribution column not in GROUP BY:
EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;QUERY PLAN
---------------------------------------------------------------
Aggregate -- stage 2: final merge on the CN
-> Custom Scan (PolarCluster Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
-> Aggregate -- stage 1: local aggregation on each DN
-> Seq Scan on group_t_102150 group_t
(8 rows)Sorting pushdown
ORDER BY ... LIMIT N is pushed down to the DNs when there is no GROUP BY, or when the GROUP BY key includes the distribution column. Each DN returns its own top-N rows; the CN then runs a final ORDER BY ... LIMIT N over the collected results. This keeps the data volume at the CN small regardless of total table size.
Example
CREATE TABLE order_t (a INT, b INT);
SELECT create_distributed_table('order_t', 'a');EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;QUERY PLAN
------------------------------------------------------------------------
Limit -- final LIMIT applied on the CN
-> Sort
Sort Key: remote_scan.a
-> Custom Scan (PolarCluster Adaptive)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
-> Limit -- each DN returns its top-1
-> Sort
Sort Key: a
-> Seq Scan on order_t_102154 order_t
(12 rows)Rewrite complex queries with CTEs
When a JOIN cannot use colocation — for example, the join key is not the distribution column — the optimizer may default to an inefficient plan: it scans the full table on the DNs, ships all the rows to the CN, and then pushes them back down for the JOIN. Use a CTE (WITH clause) to apply a selective filter first, shrinking the data set before the JOIN runs.
How it works
Wrap the large, filterable table in a CTE. The CTE result, a small set of rows, is used as one side of the JOIN. The optimizer ships only this small result across the network instead of the full table.
Example
First, enable data redistribution plans for non-colocated JOINs and prepare the test tables:
-- Allow redistribution plans for non-colocated JOINs
SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON;
-- Create and distribute the tables
CREATE TABLE cte_t1 (a INT, b INT);
CREATE TABLE cte_t2 (a INT, b INT);
SELECT create_distributed_table('cte_t1', 'a');
SELECT create_distributed_table('cte_t2', 'a');
-- Create a local index on the filter column
CREATE INDEX local_idx_t1_b ON cte_t1(b);
-- Load test data (1,000,000 rows each)
INSERT INTO cte_t1(a, b) SELECT i, i / 10 FROM generate_series(1, 1000000) i;
INSERT INTO cte_t2(a, b) SELECT i, i / 10 FROM generate_series(1, 1000000) i;
ANALYZE cte_t1, cte_t2;Before optimization — the non-colocated JOIN triggers a full scan of cte_t2:
EXPLAIN ANALYZE
SELECT * FROM cte_t1, cte_t2
WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;QUERY PLAN
...
Custom Scan (PolarCluster Adaptive) (actual time=334.511..334.519 rows=100)
-> Distributed Subplan 46_1
Subplan Duration: 1488.57 ms -- high value: subplan is slow
Intermediate Data Size: 17 MB -- full table scan of cte_t2 shipped to CN
Result destination: Send to 2 nodes
-> Custom Scan (PolarCluster Adaptive) (rows=1000000)
...
-> Seq Scan on cte_t2_102165 cte_t2 (rows=249651)
...
Execution Time: 1823.139 ms
(34 rows)The two signals to watch: Intermediate Data Size: 17 MB (the full cte_t2 table is transferred over the network) and Subplan Duration: 1488.57 ms (the subplan that moves this data takes most of the total execution time).
After optimization — a CTE filters cte_t1 first:
EXPLAIN ANALYZE
WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1)
SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;QUERY PLAN
...
Custom Scan (PolarCluster Adaptive) (actual time=38.689..38.693 rows=100)
-> Distributed Subplan 49_1
Subplan Duration: 0.94 ms -- subplan now nearly instant
Intermediate Data Size: 180 bytes -- only 10 filtered rows transferred
Result destination: Send to 2 nodes
-> Custom Scan (PolarCluster Adaptive) (rows=10)
...
-> Index Scan using local_idx_t1_b_102159 on cte_t1_102159
Index Cond: (b = 1) -- uses the local index on cte_t1
...
Execution Time: 39.672 ms
(37 rows)The intermediate data drops from 17 MB to 180 bytes, and execution time falls from 1,823 ms to 39 ms — tens of times faster because it avoids large-scale data redistribution.
How to read distributed query plans
When diagnosing slow distributed queries, focus on these three fields in EXPLAIN ANALYZE output:
| Field | What it means | Warning sign |
|---|---|---|
Intermediate Data Size | Volume of data transferred between nodes for a subplan | Values in MB or GB indicate excessive data movement |
Subplan Duration | Time spent executing a distributed subplan | High values relative to total execution time point to redistribution overhead |
Tuple data received from nodes | Raw data volume received by the CN from DNs | Large values suggest the CN is doing work that could be pushed to DNs |
High values in any of these fields indicate data redistribution that a CTE rewrite or schema change (such as adding a colocation group) could eliminate.
Limitations
SELECT ... FOR UPDATE
SELECT ... FOR UPDATE does not support cross-shard execution. Because coordinating locks globally across shards is complex, any SELECT ... FOR UPDATE query that cannot be confined to a single shard returns an error:
ERROR: FOR UPDATE is not supported for query of more than one shardTo avoid this error, ensure the WHERE clause includes an equality condition on the distribution column so the query can be confined to a single shard.
SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR: FOR UPDATE is not supported for query of more than one shard