PolarDB for PostgreSQL (Distributed) scales horizontally by sharding data across multiple data nodes (DNs). To support this, it introduces two specialized table types: distributed tables for large datasets, and replicated tables (also called reference tables) for small, frequently joined lookup data. This topic walks you through creating and managing both types.
Choose a table type
| Distributed table | Replicated table | |
|---|---|---|
| Best for | Large business tables (orders, users, events) | Small lookup tables (country codes, product categories, tax rates) |
| Data placement | Each row lives on one DN, determined by the distribution column | A full copy of the table lives on every DN |
| Read performance | Queries scatter across DNs; JOIN efficiency depends on co-location | JOINs with distributed tables are always local — no cross-node queries |
| Write overhead | Low — each write goes to one DN | High — every write is synchronized across all DNs |
| Size guideline | No upper limit | Keep small; avoid tables that change frequently |
Create a distributed table
Creating a distributed table is a two-step process: create a standard table, then convert it using create_distributed_table.
Step 1: Choose a distribution column
The distribution column determines how rows are routed to DNs. The system hashes the column value and assigns each row to a shard based on that hash.
Choose the column that best meets all three criteria:
High cardinality — avoid columns with a small number of distinct values. For example, a
statuscolumn with only three values (new,paid,shipped) distributes data unevenly and concentrates rows in a few shards, creating hotspots.Even distribution — skewed columns cause data to accumulate on specific DNs.
Primary key or unique identifier — this typically satisfies both criteria above and ensures even distribution.
If the table has a primary key or UNIQUE constraint, the distribution column must be one of the columns that form that constraint.
Step 2: Convert the table
Create the table, then call create_distributed_table with the table name and the chosen distribution column.
Example: Distribute table t on the id column.
-- Create a standard table
CREATE TABLE t (id int primary key, data text);
-- Convert to a distributed table
SELECT create_distributed_table('t', 'id');Expected output:
create_distributed_table
--------------------------
(1 row)Step 3: (Optional) Set the shard count
Each distributed table defaults to 32 shards. Set a different count either per table or globally.
Using the shard_count parameter
Option A: Per table — pass shard_count when converting:
CREATE TABLE t1 (id int primary key, data text);
SELECT create_distributed_table('t1', 'id', shard_count := 4);Using the polar_cluster.shard_count parameter
Option B: Global default — set polar_cluster.shard_count before converting:
SET polar_cluster.shard_count TO 4;
SELECT create_distributed_table('t1', 'id');Verify the result:
SELECT * FROM pg_dist_shard WHERE logicalrelid = 't1'::regclass;Expected output (4 shards covering the full hash range):
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
t1 | 102072 | t | -2147483648 | -1073741825
t1 | 102073 | t | -1073741824 | -1
t1 | 102074 | t | 0 | 1073741823
t1 | 102075 | t | 1073741824 | 2147483647
(4 rows)Step 4: (Optional) Assign a co-location group
When you JOIN two distributed tables on their distribution columns, PolarDB executes the join on a single DN — but only if the two tables share a co-location group. Without co-location, the query requires cross-node data transfer, which adds significant overhead.
How co-location groups work
PolarDB assigns co-location groups in two ways:
| Method | Behavior | Risk |
|---|---|---|
| Implicit (default) | Tables with the same distribution column type and shard count are grouped together automatically | Unrelated tables may end up in the same group |
| Explicit (recommended) | Declare relationships using colocate_with | Full control; unrelated tables stay in separate groups |
The default implicit grouping does not check whether tables are logically related. Two unrelated tables that happen to share the same column type and shard count will be co-located. If a table is not related to others, set colocate_with := 'none' to create an independent group. Always use explicit co-location for production tables.
How to use explicit co-location
For the first table in a group, set
colocate_with := 'none'to create a new, independent group.For each subsequent table in the same group, set
colocate_with := 'first_table_name'.
Example: Create two co-location groups — one for user data and one for animal data.
-- Group 1: user tables
CREATE TABLE user_info (user_id int, user_data text);
SELECT create_distributed_table('user_info', 'user_id', colocate_with := 'none');
CREATE TABLE user_order (user_id int, order_id int, order_data text);
SELECT create_distributed_table('user_order', 'user_id', colocate_with := 'user_info');
-- Group 2: animal tables (separate group)
CREATE TABLE animal (animal_id int, animal_data text);
SELECT create_distributed_table('animal', 'animal_id', colocate_with := 'none');
CREATE TABLE animal_class (animal_id int, class_id int, class_data text);
SELECT create_distributed_table('animal_class', 'animal_id', colocate_with := 'animal');Verify that the two groups have different co-location IDs:
SELECT table_name, colocation_id, polar_cluster_table_type, distribution_column, shard_count
FROM polar_cluster_tables
WHERE table_name IN (
'user_info'::regclass, 'user_order'::regclass,
'animal'::regclass, 'animal_class'::regclass)
ORDER BY colocation_id;Expected output — user_info and user_order share one colocation_id; animal and animal_class share another:
table_name | colocation_id | polar_cluster_table_type | distribution_column | shard_count
--------------+---------------+--------------------------+---------------------+-------------
user_info | 3 | distributed | user_id | 4
user_order | 3 | distributed | user_id | 4
animal | 4 | distributed | animal_id | 4
animal_class | 4 | distributed | animal_id | 4
(4 rows)Create a replicated table
A replicated table stores a full copy of its data on every DN. JOINs between a replicated table and any distributed table are always local, with no cross-node queries.
Common candidates for replicated tables include:
Small lookup tables frequently joined with large distributed tables (e.g., country codes, product categories, tax rates)
Tables that need unique constraints across multiple columns and are small enough to replicate
Tables without a natural distribution column that would work well for sharding
Every write to a replicated table is synchronized across all DNs. Keep replicated tables small and avoid using them for data that changes frequently.
Example: Create a replicated table.
-- Create a standard table
CREATE TABLE t_reference (id int primary key, data text);
-- Convert to a replicated table (no distribution column needed)
SELECT create_reference_table('t_reference');Expected output:
create_reference_table
------------------------
(1 row)Verify the table metadata:
SELECT table_name, polar_cluster_table_type, distribution_column, shard_count
FROM polar_cluster_tables
WHERE table_name = 't_reference'::regclass; table_name | polar_cluster_table_type | distribution_column | shard_count
--------------+--------------------------+---------------------+-------------
t_reference | reference | <none> | 1
(1 row)Verify shard placement — the same shard appears on every DN:
SELECT table_name, shardid, nodename, nodeport
FROM polar_cluster_shards
WHERE table_name = 't_reference'::regclass; table_name | shardid | nodename | nodeport
--------------+---------+----------------+----------
t_reference | 102096 | 10.xxx.xxx.xxx | 3007
t_reference | 102096 | 10.xxx.xxx.xxx | 3020
t_reference | 102096 | 10.xxx.xxx.xxx | 3006
t_reference | 102096 | 10.xxx.xxx.xxx | 3003
(4 rows)Manage distributed tables
Convert a distributed table back to a standard table
Use undistribute_table to convert a distributed or replicated table back to a standard table. Data is automatically collected from all shards and moved to the primary coordinator node (CN).
SELECT undistribute_table('t');Expected output:
NOTICE: creating a new table for public.t
NOTICE: moving the data of public.t
NOTICE: dropping the old public.t
NOTICE: renaming the new table to public.t
undistribute_table
--------------------
(1 row)DDL operations on distributed tables
DDL operations on distributed tables use the same syntax as standard PostgreSQL tables. PolarDB automatically propagates each operation to all physical shards, keeping the logical and physical table structures in sync.
Supported operations include:
Drop a table:
DROP TABLE table_name;Create an index:
CREATE INDEX index_name ON table_name (column_name);