All Products
Search
Document Center

PolarDB:Create and manage distributed and replicated tables

Last Updated:Mar 28, 2026

PolarDB for PostgreSQL (Distributed) scales horizontally by sharding data across multiple data nodes (DNs). To support this, it introduces two specialized table types: distributed tables for large datasets, and replicated tables (also called reference tables) for small, frequently joined lookup data. This topic walks you through creating and managing both types.

Choose a table type

Distributed tableReplicated table
Best forLarge business tables (orders, users, events)Small lookup tables (country codes, product categories, tax rates)
Data placementEach row lives on one DN, determined by the distribution columnA full copy of the table lives on every DN
Read performanceQueries scatter across DNs; JOIN efficiency depends on co-locationJOINs with distributed tables are always local — no cross-node queries
Write overheadLow — each write goes to one DNHigh — every write is synchronized across all DNs
Size guidelineNo upper limitKeep small; avoid tables that change frequently

Create a distributed table

Creating a distributed table is a two-step process: create a standard table, then convert it using create_distributed_table.

Step 1: Choose a distribution column

The distribution column determines how rows are routed to DNs. The system hashes the column value and assigns each row to a shard based on that hash.

Choose the column that best meets all three criteria:

  • High cardinality — avoid columns with a small number of distinct values. For example, a status column with only three values (new, paid, shipped) distributes data unevenly and concentrates rows in a few shards, creating hotspots.

  • Even distribution — skewed columns cause data to accumulate on specific DNs.

  • Primary key or unique identifier — this typically satisfies both criteria above and ensures even distribution.

Important

If the table has a primary key or UNIQUE constraint, the distribution column must be one of the columns that form that constraint.

Step 2: Convert the table

Create the table, then call create_distributed_table with the table name and the chosen distribution column.

Example: Distribute table t on the id column.

-- Create a standard table
CREATE TABLE t (id int primary key, data text);

-- Convert to a distributed table
SELECT create_distributed_table('t', 'id');

Expected output:

 create_distributed_table
--------------------------

(1 row)

Step 3: (Optional) Set the shard count

Each distributed table defaults to 32 shards. Set a different count either per table or globally.

Using the shard_count parameter

Option A: Per table — pass shard_count when converting:

CREATE TABLE t1 (id int primary key, data text);
SELECT create_distributed_table('t1', 'id', shard_count := 4);

Using the polar_cluster.shard_count parameter

Option B: Global default — set polar_cluster.shard_count before converting:

SET polar_cluster.shard_count TO 4;
SELECT create_distributed_table('t1', 'id');

Verify the result:

SELECT * FROM pg_dist_shard WHERE logicalrelid = 't1'::regclass;

Expected output (4 shards covering the full hash range):

 logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
 t1           |  102072 | t            | -2147483648   | -1073741825
 t1           |  102073 | t            | -1073741824   | -1
 t1           |  102074 | t            | 0             | 1073741823
 t1           |  102075 | t            | 1073741824    | 2147483647
(4 rows)

Step 4: (Optional) Assign a co-location group

When you JOIN two distributed tables on their distribution columns, PolarDB executes the join on a single DN — but only if the two tables share a co-location group. Without co-location, the query requires cross-node data transfer, which adds significant overhead.

How co-location groups work

PolarDB assigns co-location groups in two ways:

MethodBehaviorRisk
Implicit (default)Tables with the same distribution column type and shard count are grouped together automaticallyUnrelated tables may end up in the same group
Explicit (recommended)Declare relationships using colocate_withFull control; unrelated tables stay in separate groups
Warning

The default implicit grouping does not check whether tables are logically related. Two unrelated tables that happen to share the same column type and shard count will be co-located. If a table is not related to others, set colocate_with := 'none' to create an independent group. Always use explicit co-location for production tables.

How to use explicit co-location

  1. For the first table in a group, set colocate_with := 'none' to create a new, independent group.

  2. For each subsequent table in the same group, set colocate_with := 'first_table_name'.

Example: Create two co-location groups — one for user data and one for animal data.

-- Group 1: user tables
CREATE TABLE user_info (user_id int, user_data text);
SELECT create_distributed_table('user_info', 'user_id', colocate_with := 'none');

CREATE TABLE user_order (user_id int, order_id int, order_data text);
SELECT create_distributed_table('user_order', 'user_id', colocate_with := 'user_info');

-- Group 2: animal tables (separate group)
CREATE TABLE animal (animal_id int, animal_data text);
SELECT create_distributed_table('animal', 'animal_id', colocate_with := 'none');

CREATE TABLE animal_class (animal_id int, class_id int, class_data text);
SELECT create_distributed_table('animal_class', 'animal_id', colocate_with := 'animal');

Verify that the two groups have different co-location IDs:

SELECT table_name, colocation_id, polar_cluster_table_type, distribution_column, shard_count
FROM polar_cluster_tables
WHERE table_name IN (
  'user_info'::regclass, 'user_order'::regclass,
  'animal'::regclass, 'animal_class'::regclass)
ORDER BY colocation_id;

Expected output — user_info and user_order share one colocation_id; animal and animal_class share another:

  table_name  | colocation_id | polar_cluster_table_type | distribution_column | shard_count
--------------+---------------+--------------------------+---------------------+-------------
 user_info    |             3 | distributed              | user_id             |           4
 user_order   |             3 | distributed              | user_id             |           4
 animal       |             4 | distributed              | animal_id           |           4
 animal_class |             4 | distributed              | animal_id           |           4
(4 rows)

Create a replicated table

A replicated table stores a full copy of its data on every DN. JOINs between a replicated table and any distributed table are always local, with no cross-node queries.

Common candidates for replicated tables include:

  • Small lookup tables frequently joined with large distributed tables (e.g., country codes, product categories, tax rates)

  • Tables that need unique constraints across multiple columns and are small enough to replicate

  • Tables without a natural distribution column that would work well for sharding

Important

Every write to a replicated table is synchronized across all DNs. Keep replicated tables small and avoid using them for data that changes frequently.

Example: Create a replicated table.

-- Create a standard table
CREATE TABLE t_reference (id int primary key, data text);

-- Convert to a replicated table (no distribution column needed)
SELECT create_reference_table('t_reference');

Expected output:

 create_reference_table
------------------------

(1 row)

Verify the table metadata:

SELECT table_name, polar_cluster_table_type, distribution_column, shard_count
FROM polar_cluster_tables
WHERE table_name = 't_reference'::regclass;
  table_name  | polar_cluster_table_type | distribution_column | shard_count
--------------+--------------------------+---------------------+-------------
 t_reference  | reference                | <none>              |           1
(1 row)

Verify shard placement — the same shard appears on every DN:

SELECT table_name, shardid, nodename, nodeport
FROM polar_cluster_shards
WHERE table_name = 't_reference'::regclass;
  table_name  | shardid |    nodename    | nodeport
--------------+---------+----------------+----------
 t_reference  |  102096 | 10.xxx.xxx.xxx |     3007
 t_reference  |  102096 | 10.xxx.xxx.xxx |     3020
 t_reference  |  102096 | 10.xxx.xxx.xxx |     3006
 t_reference  |  102096 | 10.xxx.xxx.xxx |     3003
(4 rows)

Manage distributed tables

Convert a distributed table back to a standard table

Use undistribute_table to convert a distributed or replicated table back to a standard table. Data is automatically collected from all shards and moved to the primary coordinator node (CN).

SELECT undistribute_table('t');

Expected output:

NOTICE:  creating a new table for public.t
NOTICE:  moving the data of public.t
NOTICE:  dropping the old public.t
NOTICE:  renaming the new table to public.t
 undistribute_table
--------------------

(1 row)

DDL operations on distributed tables

DDL operations on distributed tables use the same syntax as standard PostgreSQL tables. PolarDB automatically propagates each operation to all physical shards, keeping the logical and physical table structures in sync.

Supported operations include:

  • Drop a table: DROP TABLE table_name;

  • Create an index: CREATE INDEX index_name ON table_name (column_name);