As a business grows, the performance and capacity of a single table can become a bottleneck. The traditional solution is vertical scaling (Scale Up), which involves upgrading the hardware of a single server. However, this approach quickly reaches its limits in terms of cost-effectiveness and physical capacity. Horizontal scaling (Scale Out) offers a more scalable solution by distributing data across multiple servers. A PolarDB for PostgreSQL (Distributed) cluster uses horizontal scaling and introduces two special table types to manage data distribution: distributed tables for large datasets, and replicated tables for small datasets that are frequently joined. This topic guides you through creating and managing these two types of tables.
Create a distributed table
Creating a distributed table is a core step in horizontal data splitting. It is suitable for business tables that store large amounts of data, such as user tables and order detail tables. This is a two-step process. First, you create a standard table. Then, you convert it to a distributed table using the create_distributed_table function.
1. Select a distribution column
The distribution column is the key to determining how data is distributed across different data nodes (DNs). The system uses the hash value of the specified column to route data rows.
Selection principle: Choose the table's primary key or a unique identifier as the distribution column. This ensures that data is distributed evenly.
Important restriction: If the table has a primary key or a UNIQUE constraint, the distribution column must be one of the columns that form the constraint.
Example: Convert the standard table t to a distributed table and use the id column as the distribution column.
Create a standard table named
t.CREATE TABLE t (id int primary key, data text);Convert it to a distributed table and use the
idcolumn as the distribution column.SELECT create_distributed_table('t', 'id');The following result is returned:
create_distributed_table -------------------------- (1 row)
2. (Optional) Specify the number of shards
A shard is the physical storage unit of a distributed table. By default, each distributed table is created with 32 shards. You can explicitly specify the number of shards when you create the table, or you can set it globally using the polar_cluster.shard_count parameter.
Example: Create a distributed table with four shards.
Create a standard table named
t1.CREATE TABLE t1 (id int primary key, data text);Explicitly specify the number of shards.
Using the
shard_countparameterSpecify the number of shards during conversion using the
shard_countparameter.SELECT create_distributed_table('t1', 'id', shard_count := 4);The following result is returned:
create_distributed_table -------------------------- (1 row)Query the number of shards.
SELECT * FROM pg_dist_shard WHERE logicalrelid = 't1'::regclass;The following result is returned:
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------+---------+--------------+---------------+--------------- t1 | 102072 | t | -2147483648 | -1073741825 t1 | 102073 | t | -1073741824 | -1 t1 | 102074 | t | 0 | 1073741823 t1 | 102075 | t | 1073741824 | 2147483647 (4 rows)
Using the
polar_cluster.shard_countparameterSet the number of shards globally using the
polar_cluster.shard_countparameter.SET polar_cluster.shard_count TO 4;Convert the table to a distributed table.
SELECT create_distributed_table('t1', 'id');The following result is returned:
create_distributed_table -------------------------- (1 row)Query the number of shards.
SELECT * FROM pg_dist_shard WHERE logicalrelid = 't1'::regclass;The following result is returned:
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------+---------+--------------+---------------+--------------- t1 | 102072 | t | -2147483648 | -1073741825 t1 | 102073 | t | -1073741824 | -1 t1 | 102074 | t | 0 | 1073741823 t1 | 102075 | t | 1073741824 | 2147483647 (4 rows)
3. (Optional) Use colocation groups to optimize JOIN performance
In many business applications, information about a single entity is often stored across multiple tables. A JOIN query is required to retrieve all the related information. For example, the user_info table stores all user data, and the user_order table stores all user orders. These two tables need to be joined on user_id.
In a distributed database, if the data in these tables is distributed across different nodes, a join query triggers cross-node data transfer. This creates high overhead. To address this issue, PolarDB for PostgreSQL (Distributed) introduces the concept of a colocation group.
Function: Ensures that rows with the same distribution key value from multiple tables are physically stored on the same data node. For example, all related records where
user_idis 1001 are kept together. This allowsJOINoperations on the distribution key to be performed efficiently on a single node, providing performance comparable to a local query.How to use: PolarDB for PostgreSQL (Distributed) offers two ways to manage colocation groups: implicit (the default behavior) and explicit (the recommended method).
Default colocation (implicit behavior): When you create a distributed table without specifying the
colocate_withparameter, the system automatically places it into a default colocation group. This grouping is based on two properties: thedistribution column typeand thenumber of shards. This means tables with the same distribution column type and shard count are considered collocated by default.NoteDefault colocation (implicit behavior): Even if two distributed tables have the same distribution column data type and shard count, their data is not necessarily related.
Explicit control (recommended method): The default behavior can cause unrelated business tables to be incorrectly grouped together. To control table colocation precisely, you can declare the relationship explicitly:
When creating the first table in a colocation group, set
colocate_with := 'none'in thecreate_distributed_tablefunction. This creates a new, independent colocation group for the table.When creating subsequent tables that need to be collocated, set
colocate_with := 'first_table_name'to add them to the existing colocation group.
Example: Place the user and order tables in one colocation group, and the animal-related tables in another.
Create user and order related tables and place them in the same colocation group.
-- Create a new colocation group CREATE TABLE user_info (user_id int, user_data text); SELECT create_distributed_table('user_info', 'user_id', colocate_with := 'none'); -- Add to the existing colocation group CREATE TABLE user_order (user_id int, order_id int, order_data text); SELECT create_distributed_table('user_order', 'user_id', colocate_with := 'user_info');Create animal-related tables and place them in another colocation group.
-- Create a new colocation group CREATE TABLE animal (animal_id int, animal_data text); SELECT create_distributed_table('animal', 'animal_id', colocate_with := 'none'); -- Add to the existing colocation group CREATE TABLE animal_class (animal_id int, class_id int, class_data text); SELECT create_distributed_table('animal_class', 'animal_id', colocate_with := 'animal');Verify that the two groups of distributed tables have different colocation IDs. This confirms that they have been added to separate colocation groups.
SELECT table_name, colocation_id, polar_cluster_table_type, distribution_column, shard_count FROM polar_cluster_tables WHERE table_name IN ( 'user_info'::regclass, 'user_order'::regclass, 'animal'::regclass, 'animal_class'::regclass) ORDER BY colocation_id;The following result is returned:
table_name | colocation_id | polar_cluster_table_type | distribution_column | shard_count --------------+---------------+--------------------------+---------------------+------------- user_info | 3 | distributed | user_id | 4 user_order | 3 | distributed | user_id | 4 animal | 4 | distributed | animal_id | 4 animal_class | 4 | distributed | animal_id | 4 (4 rows)
Create a replicated table
A replicated table, also known as a reference table, stores a full copy of its data on every data node. It is suitable for storing small amounts of public data or dimension tables that are frequently joined with distributed tables, such as country codes and product categories.
Advantage: Avoids cross-node queries and speeds up join operations.
Cost: Write operations are synchronized across all nodes, which results in high overhead. For this reason, replicated tables are not suitable for data that changes frequently.
Example: Create a replicated table.
Create a standard table named
t_reference.CREATE TABLE t_reference (id int primary key, data text);Convert it to a replicated table. You only need to specify the table name for this operation.
SELECT create_reference_table('t_reference');The following result is returned:
create_reference_table ------------------------ (1 row)Query the information for the replicated table. The output shows that the replicated table has a shard with the same name on every node:
SELECT table_name, polar_cluster_table_type, distribution_column, shard_count FROM polar_cluster_tables WHERE table_name = 't_reference'::regclass;The following result is returned:
table_name | polar_cluster_table_type | distribution_column | shard_count -------------+--------------------------+---------------------+------------- t_reference | reference | <none> | 1 (1 row)SELECT table_name, shardid, nodename, nodeport FROM polar_cluster_shards WHERE table_name = 't_reference'::regclass;The following result is returned:
table_name | shardid | nodename | nodeport -------------+---------+----------------+---------- t_reference | 102096 | 10.xxx.xxx.xxx | 3007 t_reference | 102096 | 10.xxx.xxx.xxx | 3020 t_reference | 102096 | 10.xxx.xxx.xxx | 3006 t_reference | 102096 | 10.xxx.xxx.xxx | 3003 (4 rows)
Manage distributed tables
Convert a distributed table back to a standard table
If you no longer need the distributed features, you can use the undistribute_table function to convert a distributed or replicated table back to a standard table. Data is automatically collected from all shards and moved to the primary coordinator node (CN).
Example: Convert the distributed table t back to a standard table.
SELECT undistribute_table('t');The following result is returned:
NOTICE: creating a new table for public.t
NOTICE: moving the data of public.t
NOTICE: dropping the old public.t
NOTICE: renaming the new table to public.t
undistribute_table
--------------------
(1 row)Other DDL operations
For an existing distributed table, you can perform other Data Definition Language (DDL) operations just as you would on a standard PostgreSQL table. These operations are automatically propagated to all physical shards to keep the logical and physical table structures consistent.
Delete a table:
DROP TABLE table_name;Create an index:
CREATE INDEX index_name ON table_name (column_name);