All Products
Search
Document Center

PolarDB:DML operations on distributed tables

Last Updated:Mar 28, 2026

PolarDB for PostgreSQL (Distributed Edition) accepts standard PostgreSQL Data Manipulation Language (DML) commands — INSERT, UPDATE, DELETE, and COPY — but the execution behavior and performance depend on how data is distributed across nodes. This page covers the constraints, syntax, and execution modes you need to understand before running DML on distributed tables.

Key constraints

Two rules are specific to distributed tables and will cause errors if you ignore them.

Always include the distribution column in INSERT. The cluster uses the distribution column value to calculate a hash and route each row to the correct physical shard. If you omit it, the operation fails:

-- Assume id is the distribution column
INSERT INTO t (data) VALUES ('TEST');
ERROR:  cannot perform an INSERT without a partition column value

Never update the distribution column. A row's physical location is fixed by its distribution column value at insert time. Changing that value would require moving the row between nodes, which is not supported:

-- Insert the original row
INSERT INTO t (id, data) VALUES (1, 'TEST');
-- Attempt to change the distribution column value
UPDATE t SET id = 10 WHERE id = 1;
ERROR:  modifying the partition value of rows is not allowed

The same restriction applies to INSERT ... ON CONFLICT DO UPDATE: you cannot set the distribution column in the DO UPDATE clause.

DML operations

INSERT

Single-row, multi-row, and conflict-handling inserts all follow standard PostgreSQL syntax.

Single-row insert

-- Implicit column order
INSERT INTO t VALUES (1, 'TEST');
-- Explicit column list
INSERT INTO t (id, data) VALUES (1, 'TEST');

Batch insert

INSERT INTO t VALUES (1, 'TEST'), (2, 'TEST'), (3, 'TEST');

INSERT...SELECT: import data between tables

  1. Create the source and target distributed tables:

    -- Source table
    CREATE TABLE source (id int, data text);
    SELECT create_distributed_table('source', 'id');
    -- Target table
    CREATE TABLE target (id int, data text);
    SELECT create_distributed_table('target', 'id');
  2. Import the data:

    -- When schemas match
    INSERT INTO target SELECT * FROM source;
    -- With explicit column mapping
    INSERT INTO target(id, data) SELECT id, data FROM source;

ON CONFLICT: handle duplicate key errors

If the target table has a PRIMARY KEY, UNIQUE, or EXCLUDE constraint, a conflicting insert returns:

ERROR:  duplicate key value violates unique constraint "xxx"

Use ON CONFLICT to handle conflicts without failing:

  • ON CONFLICT DO NOTHING — discard the incoming row:

    INSERT INTO target VALUES (1, 'NEW_DATA') ON CONFLICT(id) DO NOTHING;
  • ON CONFLICT DO UPDATE — update specified columns. Use the EXCLUDED pseudo-table to reference the rejected incoming values:

    -- Update to a static value
    INSERT INTO target VALUES (1, 'NEW_DATA') ON CONFLICT(id) DO UPDATE SET data = 'ERROR';
    -- Update to the incoming value that was rejected
    INSERT INTO target VALUES (1, 'NEW_DATA') ON CONFLICT(id) DO UPDATE SET data = EXCLUDED.data;

UPDATE and DELETE

The syntax is identical to standard PostgreSQL. Use the WHERE clause to efficiently locate the target data:

UPDATE t SET data = 'CHANGED' WHERE id = 1;
DELETE FROM t WHERE id = 1;

COPY: bulk load from a file

COPY is more efficient than repeated INSERT statements for large data loads. There are two variants with different data sources:

CommandData sourceExample
COPY ... FROM 'file_path'File system of a database node (Coordinator Node (CN) or data node (DN))COPY target FROM '/path/to/data.csv' WITH CSV HEADER;
\COPY ... FROM 'file_path'Client machine where psql runs\COPY target FROM '/path/on/client/data.csv' WITH CSV HEADER;

Performance optimization

Complex DML — especially INSERT...SELECT and multi-table UPDATE or DELETE — can run in three different execution modes with significant performance differences. Use EXPLAIN to check which mode applies to your query.

Execution modes

ModePerformanceWhen it applies
Pushed-down parallel execution★★★★★Single distributed table; or multiple distributed tables or replicated tables in the same colocation group, joined on their distribution columns
Repartitioned execution★★INSERT...SELECT where the source and target tables have different distribution keys
Pulled-up and redirected executionDML contains clauses that require a coordinator merge step (see below)

Pushed-down parallel execution

This is the fastest mode. The cluster distributes each sub-task to the relevant shard and runs them in parallel with no data movement between nodes.

Single-table example

INSERT INTO t VALUES (1, 'TEST'), (2, 'TEST');
UPDATE t SET data = 'CHANGE' WHERE id = 1;
DELETE FROM t WHERE id = 1;

Multi-table example

For INSERT...SELECT, UPDATE ... FROM, and DELETE ... USING across multiple distributed tables, pushed-down execution requires:

  • All distributed tables are in the same colocation group

  • The distributed tables or replicated tables involved in UPDATE or DELETE are joined on their distribution columns

To check colocation group membership:

SELECT logicalrelid, colocationid FROM pg_dist_partition;

Example output:

 logicalrelid | colocationid
--------------+--------------
 target       |            1
 source       |            1

When both conditions are met, these statements are pushed down:

-- Insert from a colocated source table
INSERT INTO target(id, data) SELECT id, data FROM source;
-- Update joined on distribution column
UPDATE target SET data = source.data FROM source WHERE target.id = source.id;
-- Delete joined on distribution column
DELETE FROM target USING source WHERE target.id = source.id;

The EXPLAIN output for a pushed-down INSERT...SELECT shows independent per-shard sub-tasks:

EXPLAIN INSERT INTO target(id, data) SELECT id, data FROM source;
QUERY PLAN
--------------------------------------------------------------------------------------------------
 Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 4 -- distributed across 4 shards
   Tasks Shown: One of 4 -- each shard runs INSERT...SELECT locally
   ->  Task
         Node: host=10.xxx.xxx.xxx port=3006 dbname=testdb
         ->  Insert on target_102105 polar_cluster_table_alias  (cost=0.00..22.70 rows=0 width=0)
               ->  Seq Scan on source_102101 source  (cost=0.00..22.70 rows=1264 width=36)
                     Filter: (id IS NOT NULL)
(8 rows)

Repartitioned execution

When an INSERT...SELECT inserts into a distribution key column that does not match the source column being selected, the cluster cannot route rows locally. Instead, each source node packages its rows and forwards them to the correct destination shard.

Example: target2 is distributed on id, but the SELECT populates id from t_id in source2:

-- source2 is distributed on id; target2 is distributed on id
-- The SELECT maps t_id -> id, so the distribution keys don't align
EXPLAIN INSERT INTO target2(id, data) SELECT t_id, data FROM source2;
QUERY PLAN
-------------------------------------------------------------------------------------
 Custom Scan (PolarCluster INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
   INSERT/SELECT method: repartition
   ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=36)
         Task Count: 4
         Tasks Shown: One of 4
         ->  Task
               Node: host=10.188.91.26 port=3006 dbname=testdb
               ->  Seq Scan on source2_102117 source2  (cost=0.00..22.00 rows=1200 width=36)
(8 rows)

Pulled-up and redirected execution

The slowest mode. The cluster pulls intermediate results to the Coordinator Node (CN) for merging, then sends the final rows back down to the target shards. This creates a bottleneck at the CN and generates significant network traffic.

This mode is triggered when the SELECT part of an INSERT...SELECT contains any of the following clauses:

  • ORDER BY

  • LIMIT

  • OFFSET

  • GROUP BY where the grouping key does not include the distribution column

Example: adding LIMIT 1 forces pull-up:

EXPLAIN INSERT INTO target3(id, data) SELECT t_id, data FROM source3 LIMIT 1;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
 Custom Scan (PolarCluster INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
   INSERT/SELECT method: pull to coordinator
   ->  Limit  (cost=0.00..0.00 rows=1 width=36)
         ->  Custom Scan (PolarCluster Adaptive)  (cost=0.00..0.00 rows=100000 width=36)
               Task Count: 4
               Tasks Shown: One of 4
               ->  Task
                     Node: host=10.188.91.26 port=3006 dbname=testdb
                     ->  Limit  (cost=0.00..0.02 rows=1 width=36)
                           ->  Seq Scan on source3_102125 source3  (cost=0.00..22.00 rows=1200 width=36)
(10 rows)

Remove the offending clause or restructure the query to avoid pull-up when possible.

Cross-node transaction consistency

PolarDB for PostgreSQL (Distributed Edition) uses two-phase commit (2PC) to ensure atomicity of distributed transactions. However, under the default Read Committed isolation level, cross-node Snapshot Isolation (SI) is not supported.

For a transaction spanning multiple data nodes (DNs), commit times on each node may vary slightly. A concurrent SELECT running during this window might read new data from one node and stale data from another — an inconsistent intermediate state. If your application requires strict cross-node consistency, handle this at the application layer, for example by using locks or by avoiding reads of the most recently written data.