All Products
Search
Document Center

PolarDB:Create and manage distributed and replicated tables

Last Updated:Aug 28, 2025

As a business grows, the performance and capacity of a single table can become a bottleneck. The traditional solution is vertical scaling (Scale Up), which involves upgrading the hardware of a single server. However, this approach quickly reaches its limits in terms of cost-effectiveness and physical capacity. Horizontal scaling (Scale Out) offers a more scalable solution by distributing data across multiple servers. A PolarDB for PostgreSQL (Distributed) cluster uses horizontal scaling and introduces two special table types to manage data distribution: distributed tables for large datasets, and replicated tables for small datasets that are frequently joined. This topic guides you through creating and managing these two types of tables.

Create a distributed table

Creating a distributed table is a core step in horizontal data splitting. It is suitable for business tables that store large amounts of data, such as user tables and order detail tables. This is a two-step process. First, you create a standard table. Then, you convert it to a distributed table using the create_distributed_table function.

1. Select a distribution column

The distribution column is the key to determining how data is distributed across different data nodes (DNs). The system uses the hash value of the specified column to route data rows.

  • Selection principle: Choose the table's primary key or a unique identifier as the distribution column. This ensures that data is distributed evenly.

  • Important restriction: If the table has a primary key or a UNIQUE constraint, the distribution column must be one of the columns that form the constraint.

Example: Convert the standard table t to a distributed table and use the id column as the distribution column.

  1. Create a standard table named t.

    CREATE TABLE t (id int primary key, data text);
  2. Convert it to a distributed table and use the id column as the distribution column.

    SELECT create_distributed_table('t', 'id');

    The following result is returned:

     create_distributed_table 
    --------------------------
     
    (1 row)

2. (Optional) Specify the number of shards

A shard is the physical storage unit of a distributed table. By default, each distributed table is created with 32 shards. You can explicitly specify the number of shards when you create the table, or you can set it globally using the polar_cluster.shard_count parameter.

Example: Create a distributed table with four shards.

  1. Create a standard table named t1.

    CREATE TABLE t1 (id int primary key, data text);
  2. Explicitly specify the number of shards.

    Using the shard_count parameter

    1. Specify the number of shards during conversion using the shard_count parameter.

      SELECT create_distributed_table('t1', 'id', shard_count := 4);

      The following result is returned:

       create_distributed_table 
      --------------------------
       
      (1 row)
    2. Query the number of shards.

      SELECT * FROM pg_dist_shard WHERE logicalrelid = 't1'::regclass;

      The following result is returned:

      logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue 
      --------------+---------+--------------+---------------+---------------
       t1           |  102072 | t            | -2147483648   | -1073741825
       t1           |  102073 | t            | -1073741824   | -1
       t1           |  102074 | t            | 0             | 1073741823
       t1           |  102075 | t            | 1073741824    | 2147483647
      (4 rows)

    Using the polar_cluster.shard_count parameter

    1. Set the number of shards globally using the polar_cluster.shard_count parameter.

       SET polar_cluster.shard_count TO 4;
    2. Convert the table to a distributed table.

      SELECT create_distributed_table('t1', 'id');

      The following result is returned:

       create_distributed_table 
      --------------------------
       
      (1 row)
    3. Query the number of shards.

      SELECT * FROM pg_dist_shard WHERE logicalrelid = 't1'::regclass;

      The following result is returned:

      logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue 
      --------------+---------+--------------+---------------+---------------
       t1           |  102072 | t            | -2147483648   | -1073741825
       t1           |  102073 | t            | -1073741824   | -1
       t1           |  102074 | t            | 0             | 1073741823
       t1           |  102075 | t            | 1073741824    | 2147483647
      (4 rows)

3. (Optional) Use colocation groups to optimize JOIN performance

In many business applications, information about a single entity is often stored across multiple tables. A JOIN query is required to retrieve all the related information. For example, the user_info table stores all user data, and the user_order table stores all user orders. These two tables need to be joined on user_id.

In a distributed database, if the data in these tables is distributed across different nodes, a join query triggers cross-node data transfer. This creates high overhead. To address this issue, PolarDB for PostgreSQL (Distributed) introduces the concept of a colocation group.

  • Function: Ensures that rows with the same distribution key value from multiple tables are physically stored on the same data node. For example, all related records where user_id is 1001 are kept together. This allows JOIN operations on the distribution key to be performed efficiently on a single node, providing performance comparable to a local query.

  • How to use: PolarDB for PostgreSQL (Distributed) offers two ways to manage colocation groups: implicit (the default behavior) and explicit (the recommended method).

    • Default colocation (implicit behavior): When you create a distributed table without specifying the colocate_with parameter, the system automatically places it into a default colocation group. This grouping is based on two properties: the distribution column type and the number of shards. This means tables with the same distribution column type and shard count are considered collocated by default.

      Note

      Default colocation (implicit behavior): Even if two distributed tables have the same distribution column data type and shard count, their data is not necessarily related.

    • Explicit control (recommended method): The default behavior can cause unrelated business tables to be incorrectly grouped together. To control table colocation precisely, you can declare the relationship explicitly:

      1. When creating the first table in a colocation group, set colocate_with := 'none' in the create_distributed_table function. This creates a new, independent colocation group for the table.

      2. When creating subsequent tables that need to be collocated, set colocate_with := 'first_table_name' to add them to the existing colocation group.

Example: Place the user and order tables in one colocation group, and the animal-related tables in another.

  1. Create user and order related tables and place them in the same colocation group.

    -- Create a new colocation group
    CREATE TABLE user_info (user_id int, user_data text);
    SELECT create_distributed_table('user_info', 'user_id', colocate_with := 'none');
    -- Add to the existing colocation group
    CREATE TABLE user_order (user_id int, order_id int, order_data text);
    SELECT create_distributed_table('user_order', 'user_id', colocate_with := 'user_info');
  2. Create animal-related tables and place them in another colocation group.

    -- Create a new colocation group
    CREATE TABLE animal (animal_id int, animal_data text);
    SELECT create_distributed_table('animal', 'animal_id', colocate_with := 'none');
    -- Add to the existing colocation group
    CREATE TABLE animal_class (animal_id int, class_id int, class_data text);
    SELECT create_distributed_table('animal_class', 'animal_id', colocate_with := 'animal');
  3. Verify that the two groups of distributed tables have different colocation IDs. This confirms that they have been added to separate colocation groups.

    SELECT table_name, colocation_id, polar_cluster_table_type, distribution_column, shard_count
    FROM polar_cluster_tables
    WHERE table_name IN (
      'user_info'::regclass, 'user_order'::regclass,
      'animal'::regclass, 'animal_class'::regclass)
    ORDER BY colocation_id;

    The following result is returned:

      table_name  | colocation_id | polar_cluster_table_type | distribution_column | shard_count 
    --------------+---------------+--------------------------+---------------------+-------------
     user_info    |             3 | distributed              | user_id             |           4
     user_order   |             3 | distributed              | user_id             |           4
     animal       |             4 | distributed              | animal_id           |           4
     animal_class |             4 | distributed              | animal_id           |           4
    (4 rows)

Create a replicated table

A replicated table, also known as a reference table, stores a full copy of its data on every data node. It is suitable for storing small amounts of public data or dimension tables that are frequently joined with distributed tables, such as country codes and product categories.

  • Advantage: Avoids cross-node queries and speeds up join operations.

  • Cost: Write operations are synchronized across all nodes, which results in high overhead. For this reason, replicated tables are not suitable for data that changes frequently.

Example: Create a replicated table.

  1. Create a standard table named t_reference.

    CREATE TABLE t_reference (id int primary key, data text);
  2. Convert it to a replicated table. You only need to specify the table name for this operation.

    SELECT create_reference_table('t_reference');

    The following result is returned:

     create_reference_table 
    ------------------------
     
    (1 row)
  3. Query the information for the replicated table. The output shows that the replicated table has a shard with the same name on every node:

    SELECT table_name, polar_cluster_table_type, distribution_column, shard_count
    FROM polar_cluster_tables
    WHERE table_name = 't_reference'::regclass;

    The following result is returned:

     table_name  | polar_cluster_table_type | distribution_column | shard_count 
    -------------+--------------------------+---------------------+-------------
     t_reference | reference                | <none>              |           1
    (1 row)
    SELECT table_name, shardid, nodename, nodeport
    FROM polar_cluster_shards
    WHERE table_name = 't_reference'::regclass;

    The following result is returned:

     table_name  | shardid |   nodename     | nodeport 
    -------------+---------+----------------+----------
     t_reference |  102096 | 10.xxx.xxx.xxx |     3007
     t_reference |  102096 | 10.xxx.xxx.xxx |     3020
     t_reference |  102096 | 10.xxx.xxx.xxx |     3006
     t_reference |  102096 | 10.xxx.xxx.xxx |     3003
    (4 rows)

Manage distributed tables

Convert a distributed table back to a standard table

If you no longer need the distributed features, you can use the undistribute_table function to convert a distributed or replicated table back to a standard table. Data is automatically collected from all shards and moved to the primary coordinator node (CN).

Example: Convert the distributed table t back to a standard table.

SELECT undistribute_table('t');

The following result is returned:

NOTICE:  creating a new table for public.t
NOTICE:  moving the data of public.t
NOTICE:  dropping the old public.t
NOTICE:  renaming the new table to public.t
 undistribute_table 
--------------------
 
(1 row)

Other DDL operations

For an existing distributed table, you can perform other Data Definition Language (DDL) operations just as you would on a standard PostgreSQL table. These operations are automatically propagated to all physical shards to keep the logical and physical table structures consistent.

  • Delete a table: DROP TABLE table_name;

  • Create an index: CREATE INDEX index_name ON table_name (column_name);