All Products
Search
Document Center

PolarDB:Practice and optimization of HTAP

Last Updated:Jul 24, 2023

Background information

This topic provides the best practices for hybrid transaction/analytical processing (HTAP). HTAP is an architecture that integrates online transaction processing (OLTP) with online analytical processing (OLAP) after databases are migrated to the cloud. HTAP is applicable to the following scenarios:

  1. Scenarios in which HTAP databases are involved, such as transforming Oracle databases and migrating them to the cloud, and selecting cloud database solutions

  2. Scenarios in which OLTP system queries are slow and customers have analytical processing requirements and bottleneck issues

  3. Scenarios in which read/write splitting is required

For more information about the PolarDB-X HTAP architecture, see HTAP.

HTAP clusters

The PolarDB-X primary instance that you purchase is commonly used in online general-purpose business scenarios. If your business scenario requires analysis, export of offline data, and batch processing on the same set of data, you can purchase multiple read-only instances for the PolarDB-X primary instance. For more information, see Read-only instances.

If online HTAP requests exist or read/write splitting is required in your business scenario, we recommend that you use a cluster endpoint. PolarDB-X forwards some requests to read-only instances based on intelligent routing or read and write weights. If your business scenario requires only offline data analysis, we recommend that you use a read-only routing endpoint. The read-only routing endpoint can be used to directly access the read-only instance. The requests that are routed by using the read-only routing endpoint are accelerated by using the multi-node parallel processing (MPP) mode. For more information about endpoints, see Configure read/write splitting.

Routing

Intelligent routing

The PolarDB-X optimizer analyzes the number of scanned rows and the consumption of core resources, such as the CPU, memory, I/O, and network, based on the cost. Then, the optimizer groups requests into transaction processing (TP) and analytical processing (AP) loads. If you enable intelligent routing for the cluster endpoint, the workload types of SQL queries can be automatically identified to route the SQL queries. For example, SQL queries that are identified as AP loads are routed to read-only instances. You can execute the explain cost statement to view the workload type that is identified for the SQL query. For example, in the following query, a small number of rows are scanned and a small number of computing resources (CPU and memory) are consumed. In the returned result, rowcount indicates the number of scanned rows, memory indicates the consumed memory resources, and cpu indicates the consumed CPU resources. This query is identified as a TP load.

mysql> explain cost  select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;                                                                                                                                                                                                                                                           |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| TopN(sort="cnt ASC", offset=?2, fetch=?3): rowcount = 1.0, cumulative cost = value = 2.8765038E7, cpu = 37.0, memory = 64.0, io = 3.0, net = 5.75, id = 163602178                                                                                                                 |
|   Filter(condition="cnt > ?1"): rowcount = 1.0, cumulative cost = value = 2.8765026E7, cpu = 26.0, memory = 47.0, io = 3.0, net = 5.75, id = 163602177                                                                                                                            |
|     HashAgg(group="k", cnt="COUNT()"): rowcount = 1.0, cumulative cost = value = 2.8765025E7, cpu = 25.0, memory = 47.0, io = 3.0, net = 5.75, id = 163602171                                                                                                                     |
|       BKAJoin(condition="k = id", type="inner"): rowcount = 1.0, cumulative cost = value = 2.8765012E7, cpu = 12.0, memory = 18.0, io = 3.0, net = 5.75, id = 163602169                                                                                                           |
|         Gather(concurrent=true): rowcount = 1.0, cumulative cost = value = 2.3755003E7, cpu = 3.0, memory = 0.0, io = 1.0, net = 4.75, id = 163602164                                                                                                                             |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)"): rowcount = 1.0, cumulative cost = value = 2.3755002E7, cpu = 2.0, memory = 0.0, io = 1.0, net = 4.75, id = 163601451         |
|         Gather(concurrent=true): rowcount = 1.0, cumulative cost = value = 5003.0, cpu = 3.0, memory = 0.0, io = 1.0, net = 0.0, id = 163602167                                                                                                                                   |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))"): rowcount = 1.0, cumulative cost = value = 5002.0, cpu = 2.0, memory = 0.0, io = 1.0, net = 0.0, id = 163601377 |                                                                                                                                                                                                                                                                    |
| WorkloadType: TP                                                                                                                                                                                                                                                                  |                                                                                                                                                                                                                                                            |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The identification of the workload type of a query is important for HTAP request routing. You can specify a workload type for a query by using the WORKLOAD_TYPE hint. In the following example, the preceding query is also used, and the workload type of the query is forcibly specified as an AP load.

mysql> explain cost /*+TDDL:WORKLOAD_TYPE=AP*/ select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;                                                                                                                                                                                                                                                           |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| TopN(sort="cnt ASC", offset=?2, fetch=?3): rowcount = 1.0, cumulative cost = value = 2.8765038E7, cpu = 37.0, memory = 64.0, io = 3.0, net = 5.75, id = 163602178                                                                                                                 |
|   Filter(condition="cnt > ?1"): rowcount = 1.0, cumulative cost = value = 2.8765026E7, cpu = 26.0, memory = 47.0, io = 3.0, net = 5.75, id = 163602177                                                                                                                            |
|     HashAgg(group="k", cnt="COUNT()"): rowcount = 1.0, cumulative cost = value = 2.8765025E7, cpu = 25.0, memory = 47.0, io = 3.0, net = 5.75, id = 163602171                                                                                                                     |
|       BKAJoin(condition="k = id", type="inner"): rowcount = 1.0, cumulative cost = value = 2.8765012E7, cpu = 12.0, memory = 18.0, io = 3.0, net = 5.75, id = 163602169                                                                                                           |
|         Gather(concurrent=true): rowcount = 1.0, cumulative cost = value = 2.3755003E7, cpu = 3.0, memory = 0.0, io = 1.0, net = 4.75, id = 163602164                                                                                                                             |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)"): rowcount = 1.0, cumulative cost = value = 2.3755002E7, cpu = 2.0, memory = 0.0, io = 1.0, net = 4.75, id = 163601451         |
|         Gather(concurrent=true): rowcount = 1.0, cumulative cost = value = 5003.0, cpu = 3.0, memory = 0.0, io = 1.0, net = 0.0, id = 163602167                                                                                                                                   |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))"): rowcount = 1.0, cumulative cost = value = 5002.0, cpu = 2.0, memory = 0.0, io = 1.0, net = 0.0, id = 163601377 |                                                                                                                                                                                                                                                                    |
| WorkloadType: AP                                                                                                                                                                                                                                                                  |                                                                                                                                                                                                                                                            |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Rule-based routing

In addition to cost-based intelligent routing, rule-based routing that is performed based on read and write weights is supported. You can set the MASTER_READ_WEIGHT parameter to specify read and write weights for read/write splitting on the Parameter settings page in the console. The default value of this parameter is 100. The value ranges from 0 to 100. If the value is set to 60, 60% of the requests are processed on the primary instance. The remaining 40% requests are routed to one or more read-only instances. If multiple read-only instances are available, the remaining 40% requests are automatically allocated to the read-only instances.

Intelligent routing is decoupled from rule-based routing. The following table describes the relationship between intelligent routing and rule-based routing.

Intelligent routing

Rule-based routing (MASTER_READ_WEIGHT)

Routing result

Enabled

Rule-based routing in which cost-based read/write splitting takes precedence. We recommend that you retain the default value 100.

  • All transactions and write requests are routed to the primary instance.

  • All queries that are identified as AP loads are routed to read-only instances.

  • Queries that are identified as TP loads are routed to read-only instances based on the value of the MASTER_READ_WEIGHT parameter. The value of the MASTER_READ_WEIGHT parameter is set to 100.

Disabled

Rule-based routing in which read/write splitting based on read and write weights takes precedence. The value ranges from 0 to 100.

  • All transactions and write requests are routed to the primary instance.

  • Queries that are identified as TP or AP loads are routed to read-only instances based on the value of the MASTER_READ_WEIGHT parameter. The value of the MASTER_READ_WEIGHT parameter is set to 100.

Execution modes

PolarDB-X supports three execution modes:

  • Single-node single-thread mode (TP_LOCAL): A query is processed by using a single thread on a single node. This execution mode is used for queries that are identified as TP loads because these queries scan only a small number of rows. For example, this mode is used for point queries that are performed based on the primary key.

  • Single-node parallel mode (AP_LOCAL): A query is processed by using multiple CPU cores of a single node in parallel. If you do not purchase a read-only instance, this execution mode is used for queries that are identified as AP loads. This execution mode is also known as the parallel query mode.

  • MPP mode: A query is processed by using multiple CPU cores across different nodes of a read-only instance in parallel. If you purchase a read-only instance, this execution mode is used for queries that are identified as AP loads. This execution mode accelerates the queries in a distributed manner.

The EXPLAIN PHYSICAL statement is provided as an extension to the original EXPLAIN statement so that you can retrieve the exact execution mode. For example, in the following query, the EXPLAIN PHYSICAL statement is executed. The result shows that the query uses the MPP execution mode. The degree of parallelism in each fragment of the execution plan is also returned.

mysql> explain physical select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 or
der by cnt limit 5, 10;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN                                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorType: MPP                                                                                                                                                 |
| The Query's MaxConcurrentParallelism: 2                                                                                                                           |
| Fragment 1                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1                                                                                                                 |
|     TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                     |
|   Filter(condition="cnt > ?1")                                                                                                                                    |
|     HashAgg(group="k", cnt="COUNT()")                                                                                                                             |
|       BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|         RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k))                                                             |
|         Gather(concurrent=true)                                                                                                                                   |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 0                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1 Splits: 16                                                                                                      |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Workloads and execution modes are coupled. The MPP execution mode is used for AP loads. You can specify an execution mode by using the EXECUTOR_MODE hint. If the primary instance has a large number of idle resources, you can forcibly specify the execution mode as the single-node or multi-node parallel processing mode to accelerate queries.

mysql> explain physical /*+TDDL:EXECUTOR_MODE=AP_LOCAL*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;                                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorMode: AP_LOCAL                                                                                                                                      |
| Fragment 0 dependency: [] parallelism: 4                                                                                                                    |
| BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")               |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 1 dependency: [] parallelism: 8                                                                                                                    |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                        |
| Fragment 2 dependency: [0, 1] parallelism: 8                                                                                                                |
| Filter(condition="cnt > ?1")                                                                                                                                |
|   HashAgg(group="k", cnt="COUNT()")                                                                                                                         |
|     RemoteSource(sourceFragmentIds=[1], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                      |
| Fragment 3 dependency: [0, 1] parallelism: 1                                                                                                                |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[2], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
| Fragment 4 dependency: [2, 3] parallelism: 1                                                                                                                |
| TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                   |
|   RemoteSource(sourceFragmentIds=[3], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+

For the MPP execution mode, the degree of parallelism is calculated based on the number of scanned rows, instance specifications, and the number of table shards that are involved in the computing process. The overall degree of parallelism is conservatively calculated because high concurrency scenarios need to be considered. You can execute the EXPLAIN PHYSICAL statement to view the degree of parallelism. You can forcibly specify a degree of parallelism by using the MPP_PARALLELISM hint.

/*+TDDL:EXECUTOR_MODE=MPP MPP_PARALLELISM=8*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;

Scheduling policy

If you purchase multiple read-only instances and associate them with the cluster endpoint, the SQL queries that are routed to read-only instances by using the cluster endpoint are evenly scheduled to multiple nodes on the read-only instances. The queries are scheduled based on the resource load of each node to ensure that loads are evenly distributed among the nodes. For example, PolarDB-X uses the latency of a read-only instance as a reference metric for scheduling. This prevents queries from being scheduled to read-only instances that have high latency.

Feedback mechanism

An error may occur when the workloads are intelligently identified based on statistics. In the case of a workload identification error, PolarDB-X can change workload types based on the number of scanned rows and execution time of the query. This process is called adaptive feedback of workloads. In PolarDB-X, execution plans and their workloads are managed. For more information, see Execution plan management. You can execute the following statement to view the workload of each plan in Plan Management.

baseline [Select Statemtnt]

If the execution time of the query and the number of scanned rows are greater than the threshold values in a simple query that is identified as a TP load, the query is changed to an AP load, and is managed as the AP load in Plan Management. The similar rule applies to a query that is identified as an AP load. In addition to the adaptive feedback capability, you can execute the following statement to manually change workload types in Plan Management.

baseline fix sql /*+TDDL:WORKLOAD_TYPE=AP*/ [Select Statemtnt]

After the workload type for an execution plan is appropriately changed in Plan Management, the correct workload mark is assigned to queries that have the same workload type in Plan Management.

Read consistency

Business traffic is routed by using read-only routing endpoints. This allows you to configure global read consistency. For more information, see Configure read/write splitting. The business traffic is routed to read-only instances by using the cluster endpoint. By default, global read consistency is enabled. The global read consistency mechanism ensures that data can be read from read-only databases after the data is written to the primary instance. This prevents inconsistency between the read data and the written data due to a replication delay in a conventional read/write splitting architecture.

If your service uses the cluster endpoint and does not require consistency between the read data and the written data, you can specify ENABLE_CONSISTENT_REPLICA_READ on the Parameter settings page or use a hint to disable read consistency for a query. For example, if data inconsistency caused by the replication delay on the current read-only instance can be ignored, you can use the preceding methods to disable read consistency.

/*+TDDL:ENABLE_CONSISTENT_REPLICA_READ=false*/ [Select Statemtnt]

FAQ

  • Do I need to specify MASTER_READ_WEIGHT to enable rule-based routing after intelligent routing is enabled in the cluster endpoint?

    PolarDB-X intelligent routing is used to route complex SQL queries that are identified as AP loads to read-only instances to decrease the workloads on the primary instance. If simple queries that are identified as TP loads are highly concurrent and consume a large number of resources of the primary instance, you can also enable rule-based routing to allocate some TP loads to read-only instances.

  • Is the read/write splitting based on the cluster endpoint compatible with the traditional read/write splitting by proportion? What are the differences and advantages?

    PolarDB-X supports two modes: intelligent routing and rule-based routing. Rule-based routing is compatible with the traditional read/write splitting mode. The advantage of the read/write splitting of PolarDB-X is that the read concurrency mechanism is supported. This prevents inconsistency between the read data and the written data due to a replication delay on the read-only instance.

  • Can I query the workload type that is identified based on intelligent routing for an SQL query? What are the impacts if the identification is incorrect, and how do I change the workload type?

    PolarDB-X can use the Plan Management feature to query the SQL templates and execution plans that were used for execution. For intelligent routing, the workload type of the SQL query can be automatically changed based on the feedback about the execution. You can also use a hint to forcibly specify a workload type to change the workload type in Plan Management.

  • What are the characteristics and advantages of the HTAP mode based on the cluster endpoint over a conventional solution that combines OLTP, Data Transmission Service (DTS), and OLAP?

    The HTAP mode of PolarDB-X uses the native multi-replica capability of databases to simplify O&M and reduces the costs for synchronizing data by exporting data and importing data to DTS. In addition, the HTAP mode provides the read consistency mechanism and supports MPP. The HTAP mode can meet the requirements of your service for real-time and scalable online computing.