All Products
Search
Document Center

PolarDB:Optimize queries on distributed tables

Last Updated:Mar 28, 2026

In a PolarDB for PostgreSQL (Distributed Edition) cluster, every optimization technique shares the same goal: keep data movement between nodes to a minimum. The less data your query shuffles across the network, the faster it runs. This topic covers four practical techniques — shard pruning, JOIN pushdown, aggregation and sorting pushdown, and CTE rewriting — each targeting a specific query pattern.

Optimization methods at a glance

Query patternTechniqueMechanism
Point queries or small-range scans on a single tableInclude the distribution column in WHERE with an equality or range conditionShard pruning: the optimizer routes the query directly to the one shard that holds the data
Joins on multiple large tablesPlace tables in the same colocation group, or convert small lookup tables to replicated tablesJOIN pushdown: the JOIN runs locally on each data node (DN), with no cross-node data transfer
GROUP BY aggregationsInclude the distribution column in GROUP BYAggregation pushdown: each DN aggregates its own data; the client node (CN) only performs the final merge
Complex queries or non-colocated JOINsRewrite with a Common Table Expression (CTE)Reduced data redistribution: filter the large table first to produce a small intermediate result, then join on that smaller set

Shard pruning for single-table queries

Shard pruning is the most direct optimization available for single-table queries. When a WHERE clause includes an equality condition on the distribution column, the optimizer calculates the hash value of the filter, identifies the exact shard that holds the matching rows, and skips all other shards entirely.

How it works

Given WHERE t2.a = 1000, the optimizer hashes 1000, maps the result to a physical shard, and sends the query only to the DN that owns that shard. The execution plan shows Task Count: 1, confirming that only one shard was accessed.

Example

Create the t2 table, distributed on column a:

CREATE TABLE t2 (a INT, b INT);
SELECT create_distributed_table('t2', 'a');

Run a point query:

EXPLAIN (COSTS OFF) SELECT * FROM t2 WHERE t2.a = 1000;

Expected output:

QUERY PLAN
----------------------------------------------------------
 Custom Scan (PolarCluster Adaptive)
   Task Count: 1        -- routed to a single shard
   Tasks Shown: All
   ->  Task
         Node: host=10.188.92.147 port=3003 dbname=testdb
         ->  Seq Scan on t2_102134 t2
               Filter: (a = 1000)
(7 rows)

Task Count: 1 confirms that the query reached exactly one shard, making this equivalent in performance to a single-node lookup.

JOIN pushdown for multi-table queries

In a distributed cluster, the most expensive part of a JOIN is often not the computation itself — it is moving data between nodes. The goal is to push the entire JOIN down to the DNs so each node joins its own local data in parallel, with no cross-node transfers.

Two approaches enable this:

  • Colocation group: distribute two or more large tables on the same column so related rows always land on the same DN.

  • Replicated table: copy a small, infrequently updated table to every DN so it can be joined locally regardless of the join key.

Use a colocation group

Colocation works when the JOIN condition is on the distribution column. Rows with the same distribution column value are guaranteed to be on the same DN, so the JOIN never crosses a node boundary.

Create two tables distributed on the same column a:

CREATE TABLE colocation_t1 (a INT, b INT);
SELECT create_distributed_table('colocation_t1', 'a');

CREATE TABLE colocation_t2 (a INT, b INT);
SELECT create_distributed_table('colocation_t2', 'a');

Run a JOIN query:

EXPLAIN (COSTS OFF)
SELECT * FROM colocation_t1 JOIN colocation_t2 ON colocation_t1.a = colocation_t2.a;

Expected output:

QUERY PLAN
------------------------------------------------------------------------
 Custom Scan (PolarCluster Adaptive)
   Task Count: 4        -- distributed to all shards in parallel
   Tasks Shown: One of 4
   ->  Task
         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
         ->  Merge Join   -- JOIN executes entirely within this DN
               Merge Cond: (colocation_t1.a = colocation_t2.a)
               ->  Sort
                     Sort Key: colocation_t1.a
                     ->  Seq Scan on colocation_t1_102137 colocation_t1
               ->  Sort
                     Sort Key: colocation_t2.a
                     ->  Seq Scan on colocation_t2_102141 colocation_t2
(13 rows)

Colocation also applies to subqueries. If a subquery's output is joined on the distribution column of a colocated table, the entire subquery is pushed down as well:

EXPLAIN (COSTS OFF)
SELECT * FROM colocation_t1
JOIN (SELECT a FROM colocation_t2) sub ON colocation_t1.a = sub.a;

Use a replicated table

A replicated table stores a full copy of the data on every DN. This makes it joinable with any distributed table on any column, because the data is always local.

Use a replicated table when:

  • The join key is not the distribution column of the other table.

  • The table is infrequently modified and needs to be joined with multiple other tables.

Create a distributed table and a replicated table:

CREATE TABLE colocation_t3 (a INT, b INT);
SELECT create_distributed_table('colocation_t3', 'a');

CREATE TABLE colocation_t4 (a INT, b INT);
SELECT create_reference_table('colocation_t4');   -- replicated to every DN

Run a JOIN on a non-distribution column (colocation_t4.b):

EXPLAIN (COSTS OFF)
SELECT * FROM colocation_t3 JOIN colocation_t4 ON colocation_t3.a = colocation_t4.b;

Expected output:

QUERY PLAN
------------------------------------------------------------------------
 Custom Scan (PolarCluster Adaptive)
   Task Count: 4
   Tasks Shown: One of 4
   ->  Task
         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
         ->  Merge Join   -- JOIN executes locally; no cross-node transfer
               Merge Cond: (colocation_t3.a = colocation_t4.b)
               ->  Sort
                     Sort Key: colocation_t3.a
                     ->  Seq Scan on colocation_t3_102145 colocation_t3
               ->  Sort
                     Sort Key: colocation_t4.b
                     ->  Seq Scan on colocation_t4_102149 colocation_t4
(13 rows)

Aggregation and sorting pushdown

Aggregation pushdown

The behavior of aggregation pushdown depends on whether the GROUP BY key includes the distribution column:

  • Distribution column included: aggregate functions such as SUM and COUNT run entirely on the DNs. The CN receives pre-aggregated results and does no additional computation.

  • Distribution column not included: the optimizer uses a two-stage strategy. Each DN first performs a local aggregation on its own shards (stage 1), then the CN collects these intermediate results and runs a final aggregation (stage 2). Because the intermediate results are small, far less data travels over the network than a full scan would require.

Example

Create the group_t table, distributed on column a:

CREATE TABLE group_t (a INT, b INT);
SELECT create_distributed_table('group_t', 'a');

Full pushdown — distribution column a is in GROUP BY:

EXPLAIN (COSTS OFF) SELECT a, SUM(b) FROM group_t GROUP BY a;
QUERY PLAN
---------------------------------------------------------
 Custom Scan (PolarCluster Adaptive)
   Task Count: 4
   Tasks Shown: One of 4
   ->  Task
         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
         ->  HashAggregate   -- aggregation runs entirely on DNs
               Group Key: a
               ->  Seq Scan on group_t_102150 group_t
(8 rows)

Two-stage strategy — distribution column not in GROUP BY:

EXPLAIN (COSTS OFF) SELECT SUM(a) FROM group_t;
QUERY PLAN
---------------------------------------------------------------
 Aggregate                       -- stage 2: final merge on the CN
   ->  Custom Scan (PolarCluster Adaptive)
         Task Count: 4
         Tasks Shown: One of 4
         ->  Task
               Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
               ->  Aggregate    -- stage 1: local aggregation on each DN
                     ->  Seq Scan on group_t_102150 group_t
(8 rows)

Sorting pushdown

ORDER BY ... LIMIT N is pushed down to the DNs when there is no GROUP BY, or when the GROUP BY key includes the distribution column. Each DN returns its own top-N rows; the CN then runs a final ORDER BY ... LIMIT N over the collected results. This keeps the data volume at the CN small regardless of total table size.

Example

CREATE TABLE order_t (a INT, b INT);
SELECT create_distributed_table('order_t', 'a');
EXPLAIN (COSTS OFF) SELECT * FROM order_t ORDER BY a LIMIT 1;
QUERY PLAN
------------------------------------------------------------------------
 Limit                           -- final LIMIT applied on the CN
   ->  Sort
         Sort Key: remote_scan.a
         ->  Custom Scan (PolarCluster Adaptive)
               Task Count: 4
               Tasks Shown: One of 4
               ->  Task
                     Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
                     ->  Limit         -- each DN returns its top-1
                           ->  Sort
                                 Sort Key: a
                                 ->  Seq Scan on order_t_102154 order_t
(12 rows)

Rewrite complex queries with CTEs

When a JOIN cannot use colocation — for example, the join key is not the distribution column — the optimizer may default to an inefficient plan: it scans the full table on the DNs, ships all the rows to the CN, and then pushes them back down for the JOIN. Use a CTE (WITH clause) to apply a selective filter first, shrinking the data set before the JOIN runs.

How it works

Wrap the large, filterable table in a CTE. The CTE result, a small set of rows, is used as one side of the JOIN. The optimizer ships only this small result across the network instead of the full table.

Example

First, enable data redistribution plans for non-colocated JOINs and prepare the test tables:

-- Allow redistribution plans for non-colocated JOINs
SET polar_cluster.enable_recursively_plan_non_colocated_relations TO ON;

-- Create and distribute the tables
CREATE TABLE cte_t1 (a INT, b INT);
CREATE TABLE cte_t2 (a INT, b INT);
SELECT create_distributed_table('cte_t1', 'a');
SELECT create_distributed_table('cte_t2', 'a');

-- Create a local index on the filter column
CREATE INDEX local_idx_t1_b ON cte_t1(b);

-- Load test data (1,000,000 rows each)
INSERT INTO cte_t1(a, b) SELECT i, i / 10 FROM generate_series(1, 1000000) i;
INSERT INTO cte_t2(a, b) SELECT i, i / 10 FROM generate_series(1, 1000000) i;
ANALYZE cte_t1, cte_t2;

Before optimization — the non-colocated JOIN triggers a full scan of cte_t2:

EXPLAIN ANALYZE
SELECT * FROM cte_t1, cte_t2
WHERE cte_t1.a = cte_t2.b AND cte_t1.b = 1;
QUERY PLAN
...
 Custom Scan (PolarCluster Adaptive)  (actual time=334.511..334.519 rows=100)
   ->  Distributed Subplan 46_1
         Subplan Duration: 1488.57 ms         -- high value: subplan is slow
         Intermediate Data Size: 17 MB        -- full table scan of cte_t2 shipped to CN
         Result destination: Send to 2 nodes
         ->  Custom Scan (PolarCluster Adaptive)  (rows=1000000)
               ...
               ->  Seq Scan on cte_t2_102165 cte_t2  (rows=249651)
   ...
   Execution Time: 1823.139 ms
(34 rows)

The two signals to watch: Intermediate Data Size: 17 MB (the full cte_t2 table is transferred over the network) and Subplan Duration: 1488.57 ms (the subplan that moves this data takes most of the total execution time).

After optimization — a CTE filters cte_t1 first:

EXPLAIN ANALYZE
WITH s AS (SELECT * FROM cte_t1 WHERE cte_t1.b = 1)
SELECT * FROM s, cte_t2 WHERE s.a = cte_t2.b;
QUERY PLAN
...
 Custom Scan (PolarCluster Adaptive)  (actual time=38.689..38.693 rows=100)
   ->  Distributed Subplan 49_1
         Subplan Duration: 0.94 ms            -- subplan now nearly instant
         Intermediate Data Size: 180 bytes    -- only 10 filtered rows transferred
         Result destination: Send to 2 nodes
         ->  Custom Scan (PolarCluster Adaptive)  (rows=10)
               ...
               ->  Index Scan using local_idx_t1_b_102159 on cte_t1_102159
                     Index Cond: (b = 1)      -- uses the local index on cte_t1
   ...
   Execution Time: 39.672 ms
(37 rows)

The intermediate data drops from 17 MB to 180 bytes, and execution time falls from 1,823 ms to 39 ms — tens of times faster because it avoids large-scale data redistribution.

How to read distributed query plans

When diagnosing slow distributed queries, focus on these three fields in EXPLAIN ANALYZE output:

FieldWhat it meansWarning sign
Intermediate Data SizeVolume of data transferred between nodes for a subplanValues in MB or GB indicate excessive data movement
Subplan DurationTime spent executing a distributed subplanHigh values relative to total execution time point to redistribution overhead
Tuple data received from nodesRaw data volume received by the CN from DNsLarge values suggest the CN is doing work that could be pushed to DNs

High values in any of these fields indicate data redistribution that a CTE rewrite or schema change (such as adding a colocation group) could eliminate.

Limitations

SELECT ... FOR UPDATE

SELECT ... FOR UPDATE does not support cross-shard execution. Because coordinating locks globally across shards is complex, any SELECT ... FOR UPDATE query that cannot be confined to a single shard returns an error:

ERROR: FOR UPDATE is not supported for query of more than one shard

To avoid this error, ensure the WHERE clause includes an equality condition on the distribution column so the query can be confined to a single shard.

SELECT * FROM t1 WHERE a > 1 FOR UPDATE;
ERROR:  FOR UPDATE is not supported for query of more than one shard