All Products
Search
Document Center

ApsaraDB for SelectDB:Colocation join

Last Updated:May 31, 2024

This topic describes how the colocation join feature is implemented in ApsaraDB for SelectDB and how to use the feature to help you select a join method for query optimization.

Overview

The colocation join feature provides local optimization for some join queries to reduce data transmission time between nodes and accelerate queries. For more information about the initial design, implementation, and performance, see Issue 245. The colocation join feature has undergone a revision, and its design and use are slightly different from the initial design.

Important

The colocation property of a table cannot be synchronized by cross-cluster replication (CCR). If a table contains is_being_synced = true, which indicates that the table is copied from CCR, the colocation property is deleted from the table.

Terms

  • Colocation Group (CG): contains one or more tables. Tables in the same CG have the same Colocation Group Schema (CGS) and the same tablet distribution.

  • CGS: describes the tables in a CG and the general schema information related to colocation. The CGS includes the bucket column type and the number of buckets.

How it works

The colocation join feature forms a CG that consists of tables with the same CGS and ensures that the tablets of these tables fall on the same backend (BE) node. This way, when the tables in the CG are joined based on bucket columns, a local data join can be performed to reduce the data transmission time between nodes.

The data of a table eventually falls into a bucket after hashing is performed on the bucket column values and the modulo operation is performed on the number of buckets. For example, the number of buckets in a table is eight, which are [0, 1, 2, 3, 4, 5, 6, 7]. Such a sequence is called a bucket sequence. Each bucket has one or more tablets. If a table is a single-partition table, a bucket contains only one tablet. If a table is a multi-partition table, a bucket contains multiple tablets.

To ensure the same data distribution for tables, you must make sure that the bucket columns and the number of buckets are the same for tables in a CG. The bucket columns are the columns specified in DISTRIBUTED BY HASH(col1, col2, ...) in the table creation statement. Bucket columns determine the columns whose values are used to hash the data from a table into different tablets. You must make sure that the type and number of bucket columns are exactly the same and that the number of buckets is the same for tables in a CG. This way, the tablets of multiple tables can be distributed in the same way.

For tables in a CG, the number of partitions, scope, and the type of partition columns are not required to be the same.

After the bucket columns and the number of buckets are determined, tables in a CG have the same bucket sequence. For example, the bucket sequence is [0, 1, 2, 3, 4, 5, 6, 7], and the BE nodes are [A, B, C, D]. The following diagram shows a possible data distribution:

+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
| A | | B | | C | | D | | A | | B | | C | | D |
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+

The data of all tables in the CG is distributed based on the preceding rules. This ensures that the data with the same bucket column values falls on the same BE node for local data joins.

Basic usage

Create a table

When you create a table, you can specify "colocate_with" = "group_name" in PROPERTIES, which indicates that the table is a colocation table and belongs to the specified CG.

Example:

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "group1"
);

If the specified CG does not exist, ApsaraDB for SelectDB automatically creates a CG that contains only the current table. If the CG already exists, ApsaraDB for SelectDB checks whether the current table meets the CGS requirements. If so, the table is created and added to the CG. In addition, tablets and replicas are created for the table based on the existing data distribution rules of the CG. A CG belongs to a database, and the name of the CG is unique in the database. Internally, a CG is stored with its full name in the dbId_groupName format. However, users perceive only groupName.

ApsaraDB for SelectDB supports cross-database CGs. When you create a table, you can add the __global__ prefix to the CG name to specify a global CG. Example:

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
    "colocate_with" = "__global__group1"
);

A CG whose name is prefixed with __global__ prefix no longer belongs to a database, and the name is globally unique. You can implement cross-database colocation joins by creating a global CG.

Delete a table

A permanent deletion indicates that a table is deleted from the recycle bin. Generally, after the DROP TABLE statement is executed to delete a table, the table is moved to the recycle bin and stays there for one day before it is permanently deleted. After the last table in a CG is permanently deleted, the CG is automatically deleted.

Query tables

You can query colocation tables in the same way as you query regular tables. The colocation property of a table is imperceivable to users. The system automatically generates query plans that use colocation joins. The following example shows how colocation tables are queried:

  1. Create tables.

    Create the tbl1 table.

    CREATE TABLE `tbl1` (
        `k1` date NOT NULL COMMENT "",
        `k2` int(11) NOT NULL COMMENT "",
        `v1` int(11) SUM NOT NULL COMMENT ""
    ) ENGINE=OLAP
    AGGREGATE KEY(`k1`, `k2`)
    PARTITION BY RANGE(`k1`)
    (
        PARTITION p1 VALUES LESS THAN ('2019-05-31'),
        PARTITION p2 VALUES LESS THAN ('2019-06-30')
    )
    DISTRIBUTED BY HASH(`k2`) BUCKETS 8
    PROPERTIES (
        "colocate_with" = "group1"
    );

    Create the tbl2 table.

    CREATE TABLE `tbl2` (
        `k1` datetime NOT NULL COMMENT "",
        `k2` int(11) NOT NULL COMMENT "",
        `v1` double SUM NOT NULL COMMENT ""
    ) ENGINE=OLAP
    AGGREGATE KEY(`k1`, `k2`)
    DISTRIBUTED BY HASH(`k2`) BUCKETS 8
    PROPERTIES (
        "colocate_with" = "group1"
    );
  2. View the query plan of a query in which the tables are joined.

    DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
    +----------------------------------------------------+
    | Explain String                                     |
    +----------------------------------------------------+
    | PLAN FRAGMENT 0                                    |
    |  OUTPUT EXPRS:`tbl1`.`k1` |                        |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   RESULT SINK                                      |
    |                                                    |
    |   2:HASH JOIN                                      |
    |   |  join op: INNER JOIN                           |
    |   |  hash predicates:                              |
    |   |  colocate: true                                |
    |   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
    |   |  tuple ids: 0 1                                |
    |   |                                                |
    |   |----1:OlapScanNode                              |
    |   |       TABLE: tbl2                              |
    |   |       PREAGGREGATION: OFF. Reason: null        |
    |   |       partitions=0/1                           |
    |   |       rollup: null                             |
    |   |       buckets=0/0                              |
    |   |       cardinality=-1                           |
    |   |       avgRowSize=0.0                           |
    |   |       numNodes=0                               |
    |   |       tuple ids: 1                             |
    |   |                                                |
    |   0:OlapScanNode                                   |
    |      TABLE: tbl1                                   |
    |      PREAGGREGATION: OFF. Reason: No AggregateInfo |
    |      partitions=0/2                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 0                                  |
    +----------------------------------------------------+

    If the colocation join takes effect, the HASH JOIN node contains colocate: true.

    The following example shows a query plan in which the colocation join does not take effect:

    +----------------------------------------------------+
    | Explain String                                     |
    +----------------------------------------------------+
    | PLAN FRAGMENT 0                                    |
    |  OUTPUT EXPRS:`tbl1`.`k1` |                        |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   RESULT SINK                                      |
    |                                                    |
    |   2:HASH JOIN                                      |
    |   |  join op: INNER JOIN (BROADCAST)               |
    |   |  hash predicates:                              |
    |   |  colocate: false, reason: group is not stable  |
    |   |    `tbl1`.`k2` = `tbl2`.`k2`                   |
    |   |  tuple ids: 0 1                                |
    |   |                                                |
    |   |----3:EXCHANGE                                  |
    |   |       tuple ids: 1                             |
    |   |                                                |
    |   0:OlapScanNode                                   |
    |      TABLE: tbl1                                   |
    |      PREAGGREGATION: OFF. Reason: No AggregateInfo |
    |      partitions=0/2                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 0                                  |
    |                                                    |
    | PLAN FRAGMENT 1                                    |
    |  OUTPUT EXPRS:                                     |
    |   PARTITION: RANDOM                                |
    |                                                    |
    |   STREAM DATA SINK                                 |
    |     EXCHANGE ID: 03                                |
    |     UNPARTITIONED                                  |
    |                                                    |
    |   1:OlapScanNode                                   |
    |      TABLE: tbl2                                   |
    |      PREAGGREGATION: OFF. Reason: null             |
    |      partitions=0/1                                |
    |      rollup: null                                  |
    |      buckets=0/0                                   |
    |      cardinality=-1                                |
    |      avgRowSize=0.0                                |
    |      numNodes=0                                    |
    |      tuple ids: 1                                  |
    +----------------------------------------------------+

    The HASH JOIN node contains the reason why the colocation join does not take effect: colocate: false, reason: group is not stable. An EXCHANGE node is also generated.

View CG information

You can view the information about CGs that exist in a cluster. Example:

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId     | GroupName    | TableIds     | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10         | 3              | int(11)  | true     |
+-------------+--------------+--------------+------------+----------------+----------+----------+

The following table describes the parameters.

Parameter

Description

GroupId

The cluster-wide unique identifier of the CG, in which the first half is the database ID and the second half is the CG ID.

GroupName

The full name of the CG.

TabletIds

The IDs of the tables in the CG.

BucketsNum

The number of buckets.

ReplicationNum

The number of replicas.

DistCols

The type of bucket columns.

IsStable

Indicates whether the CG is stable. For more information about the definition of stability, see the "Colocation replica balancing and repair" section of this topic.

View the data distribution of a CG. Example:

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds          |
+-------------+---------------------+
| 0           | 10004               |
| 1           | 10003               |
| 2           | 10002               |
| 3           | 10003               |
| 4           | 10002               |
| 5           | 10003               |
| 6           | 10003               |
| 7           | 10003               |
+-------------+---------------------+

Parameter

Description

BucketIndex

The index of the bucket.

BackendIds

The IDs of the BE nodes on which the tablets in the bucket reside.

Note

The permissions of the admin role are required to run the preceding commands. Regular users cannot run these commands.

Modify the colocation property of a table

You can modify the colocation property of an existing table. Example:

ALTER TABLE tbl SET ("colocate_with" = "group2");
  • If the table has not been added to a CG, the system checks the schema and adds the table to the specified CG. If the CG does not exist, the system automatically creates the CG.

  • If the table has been added to a CG, the system removes the table from the original CG and adds the table to the specified CG. If the CG does not exist, the system automatically creates the CG.

You can also execute the following statement to delete the colocation property of a table:

ALTER TABLE tbl SET ("colocate_with" = "");

Other operations

When you add partitions to a colocation table by executing the ADD PARTITION statement or modify the number of replicas, ApsaraDB for SelectDB checks whether the change violates the CGS. If so, ApsaraDB for SelectDB rejects the change.

Advanced usage

FE configuration items

  • disable_colocate_relocate

    Specifies whether to disable automatic colocation replica repair in ApsaraDB for SelectDB. The default value is false, which indicates that this feature is enabled. This parameter affects the replica repair only for colocation tables, not for regular tables.

  • disable_colocate_balance

    Specifies whether to disable automatic colocation replica balancing in ApsaraDB for SelectDB. The default value is false, which indicates that this feature is enabled. This configuration item affects the replica balancing only for colocation tables, not for regular tables.

You can dynamically modify the preceding configuration items. For more information about how to configure these configuration items, execute the HELP ADMIN SHOW CONFIG; and HELP ADMIN SET CONFIG; statements.

  • disable_colocate_join

    Specifies whether to disable the colocation join feature. The default value is false, which indicates that this feature is enabled.

  • use_new_tablet_scheduler

    Specifies whether to enable the new replica scheduling logic. The default value is true, which indicates that this logic is enabled.

HTTP RESTful API

ApsaraDB for SelectDB provides several colocation join-related HTTP RESTful API operations that you can use the view and modify CGs.

These API operations are implemented on FE nodes and can be called by sending requests to fe_host:fe_http_port. The permissions of the admin role are required.

  • View all the colocation information of a cluster. Example:

    GET /api/colocate
    
    Returns the internal colocation information in the JSON format. 
    
    {
        "msg": "success",
        "code": 0,
        "data": {
            "infos": [
                ["10003.12002", "10003_group1", "10037, 10043", "1", "1", "int(11)", "true"]
            ],
            "unstableGroupIds": [],
            "allGroupIds": [{
                "dbId": 10003,
                "grpId": 12002
            }]
        },
        "count": 0
    }
  • Mark a CG as Stable or Unstable. Example:

    • Mark a CG as Stable.

      DELETE /api/colocate/group_stable?db_id=10005&group_id=10008
      
      Return value: 200
    • Make a CG as Unstable.

      POST /api/colocate/group_stable?db_id=10005&group_id=10008
      
      Return value: 200
  • Configure the data distribution of a CG.

    You can call this API operation to forcibly configure the data distribution of a CG. In the returned result, the Body field contains a bucket sequence that is represented by a nested array and the IDs of the BE nodes on which tablets in buckets reside.

    POST /api/colocate/bucketseq?db_id=10005&group_id=10008
    
    Body:
    [[10004],[10003],[10002],[10003],[10002],[10003],[10003],[10003],[10003],[10002]]
    
    Return value: 200
    Note

    To use this command, you must set the FE configuration items disable_colocate_relocate and disable_colocate_balance to true. This way, the automatic colocation replica repair and balancing are disabled in the system. Otherwise, colocation replicas are automatically repaired and balanced by the system after manual modification.

Colocation replica balancing and repair

The replica distribution of a colocation table must follow the distribution rules specified in the CGS. Therefore, balancing and repair for colocation replicas are different from those for regular tablets.

A CG has the Stable property. If the value of the Stable property is true, all the tablets of the tables in the CG are not being changed, and the colocation join feature can be used. If the value of the Stable property is false, the tablets of some tables in the CG are being repaired or migrated. In this case, a colocation join of the affected tables is degraded to a regular join.

Replica repair

Replicas can be stored only on specified BE nodes. A BE node may be unavailable if it is down or decommissioned. In this case, the BE node must be replaced with another BE node. ApsaraDB for SelectDB looks for the least loaded BE node to replace an unavailable BE node. After the replacement, all tablets on the original BE node must be repaired. During the migration, the CG is marked as Unstable.

Replica balancing

ApsaraDB for SelectDB evenly distributes the tables of colocation tables across all BE nodes. Replicas of regular tables are balanced at the replica level. In this case, a BE node on which the load is low is individually found for each replica. However, replicas of colocation tables are balanced at the bucket level. In this case, all replicas in a bucket are migrated together.

ApsaraDB for SelectDB uses a simple balancing algorithm. The algorithm distributes the bucket sequence evenly across all BE nodes based on the number of replicas rather than the actual size of the replicas.

Note
  • The current colocation replica balancing and repair algorithms may not work well for ApsaraDB for SelectDB instances that use heterogeneous deployment. Heterogeneous deployment indicates that BE nodes do not have the same disk capacity, disk quantity, or disk type. The disk types include SSD and HDD. In the case of heterogeneous deployment, small-capacity BE nodes and large-capacity BE nodes may store the same number of replicas.

  • If a CG is in the Unstable state, a join of the tables in the CG is degraded to a regular join. In this case, the query performance of the cluster may significantly deteriorate. If you do not want the system to automatically balance replicas, you can set the FE configuration item disable_colocate_balance to true to disable automatic balancing. If automatic balancing is required later, you can manually set the configuration item to false.