This topic describes the basic optimization principles and execution plans that PolarDB-X uses to process SQL statements.

Distributed databases are different from standalone databases in the database architecture. The methods used to optimize SQL queries on standalone databases can be modified to optimize SQL queries on distributed databases. When you use PolarDB-X, you can identify the causes of slow SQL queries based on the statistics, execution plans, rules that define the degree of parallelism, and the reported time that is consumed to run SQL queries. This way, you can optimize SQL queries based on your business scenario.

Slow queries can occur if the execution of physical SQL queries is slow, an excessive number of queries are run in parallel, or an inappropriate query execution plan or index is used. Therefore, the costs of SQL query optimization in distributed databases are higher than the costs of SQL query optimization in standalone databases.

Basic principles

PolarDB-X is a distributed database service in which computing is decoupled from storage. The SELECT queries that you send to PolarDB-X are logical SQL queries. When a SELECT query is sent to a compute node in PolarDB-X, PolarDB-X classifies the subqueries in the SELECT query into two types based on whether the subqueries can be pushed down. The subqueries that can be pushed down are known as physical SQL queries. The SQL queries that cannot be pushed down are run on compute nodes. The SQL queries that can be pushed down are run on data nodes.

PolarDB-X optimizes SQL queries based on the following rules:
  • PolarDB-X pushes down as many logical SQL queries as possible and runs these queries on data nodes. This reduces the network latency between data nodes and compute nodes. The SQL queries that are pushed down can be run on database shards in parallel. This way, the resources of each data node are used in an efficient manner and the SQL queries are accelerated.
  • The optimizer of PolarDB-X selects an optimal method to execute the physical operators that cannot be pushed down. For example, the optimizer selects the physical operators and rules that define the degree of parallelism based on your business scenario, and determines whether to run SQL queries in massively parallel processing (MPP) mode.
    Note Degree of parallelism: the maximum number of queries that can be processed by PolarDB-X in parallel. For compute nodes, the degree of parallelism indicates the parallel processing capability of CPUs that have multiple cores. This capability determines the maximum number of threads that can be run by the compute nodes in parallel. For data nodes, the degree of parallelism indicates the maximum number of physical SQL queries that can be pushed down and run in parallel.
  • Local indexes and global indexes are used in PolarDB-X. A local index is created on a single data node. The local index is also knows as the MySQL index. A global index is a distributed index that is created on multiple data nodes. An index that is suitable for your business scenario can improve the query performance of your PolarDB-X instance.

Execution plans

After an SQL statement is sent to a PolarDB-X distributed database to be executed, PolarDB-X parses and optimizes the SQL statement, and then generates an execution plan. The execution plan is generated based on the dependencies among operators. You can obtain the information about how an SQL statement is executed on the database by viewing the execution plan tree. The following sample code provides examples on execution plans:

  • Example 1

    Execute the following SQL statement:

    EXPLAIN select count(*) from lineitem group by L_LINESTATUS;

    The following information about the execution plan is returned:

       HashAgg(group="L_LINESTATUS", count(*)="SUM(count(*))")                                                                                                                              
         Exchange(distribution=hash[0], collation=[])                                                                                                                                       
           LogicalView(tables="[000000-000003].lineitem_[00-15]", shardCount=16, sql="SELECT `L_LINESTATUS`, COUNT(*) AS `count(*)` FROM `lineitem` AS `lineitem` GROUP BY `L_LINESTATUS`")

    The GROUP BY operation cannot be completed at the storage layer because the content of the GROUP BY clause and the lineitem partition key of the table are inconsistent. In this case, the GROUP BY operation is split into two phases. The partition aggregate operation is pushed down to the storage layer to perform partial aggregation. After the aggregation is performed, the data is pushed to the compute layer. At the compute layer, the data is redistributed and aggregated. Then, the aggregated data is returned to the client.

    • LogicalView: specifies that 16 shards are scanned. Therefore, multiple physical SQL statements are generated and pushed down to the storage layer. Each physical SQL statement contains a GROUP BY clause that is executed to aggregate data at the storage layer.
    • Exchange: aggregates the data that is returned by the LogicalView operator and redistributes the data to downstream operators based on the L_LINESTATUS field.
    • HashAgg: receives multiple sets of input data and aggregates the data sets.
  • Example 2

    Execute the following SQL statement:

    EXPLAIN select * from lineitem, orders where L_ORDERKEY= O_ORDERKEY;

    The following information about the execution plan is returned:

    +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
     HashJoin(condition="O_ORDERKEY = L_ORDERKEY", type="inner")                                                                                                                                                                                                                                                                                                      
       Exchange(distribution=hash[0], collation=[])                                                                                                                                                                                                                                                                                                                   
         LogicalView(tables="[000000-000003].lineitem_[00-15]", shardCount=16, sql="SELECT `L_ORDERKEY`, `L_PARTKEY`, `L_SUPPKEY`, `L_LINENUMBER`, `L_QUANTITY`, `L_EXTENDEDPRICE`, `L_DISCOUNT`, `L_TAX`, `L_RETURNFLAG`, `L_LINESTATUS`, `L_SHIPDATE`, `L_COMMITDATE`, `L_RECEIPTDATE`, `L_SHIPINSTRUCT`, `L_SHIPMODE`, `L_COMMENT` FROM `lineitem` AS `lineitem`") 
       Exchange(distribution=hash[0], collation=[])                                                                                                                                                                                                                                                                                                                   
         LogicalView(tables="[000000-000003].orders_[00-15]", shardCount=16, sql="SELECT `O_ORDERKEY`, `O_CUSTKEY`, `O_ORDERSTATUS`, `O_TOTALPRICE`, `O_ORDERDATE`, `O_ORDERPRIORITY`, `O_CLERK`, `O_SHIPPRIORITY`, `O_COMMENT` FROM `orders` AS `orders`")
    In this example, the query joins two tables. The partition keys of the two tables are inconsistent. In this case, the join operation is not pushed down to the storage layer to be performed. At the storage layer, both tables are scanned. The join operation is performed at the compute layer.
    • LogicalView: scans data from the tables.
    • Exchange: aggregates data that is returned by the LogicalView operator and redistributes the data to the downstream operators based on the columns on which the join conditions are created.
    • HashJoin: receives data from the two tables and joins the received data by using the HashTable method.
  • Example 3

    Execute the following SQL statement:

    EXPLAIN select * from lineitem, orders where L_LINENUMBER= O_ORDERKEY;

    The following information about the execution plan is returned:

     Gather(concurrent=true)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
       LogicalView(tables="[000000-000003].lineitem_[00-15],orders_[00-15]", shardCount=16, sql="SELECT `lineitem`.`L_ORDERKEY`, `lineitem`.`L_PARTKEY`, `lineitem`.`L_SUPPKEY`, `lineitem`.`L_LINENUMBER`, `lineitem`.`L_QUANTITY`, `lineitem`.`L_EXTENDEDPRICE`, `lineitem`.`L_DISCOUNT`, `lineitem`.`L_TAX`, `lineitem`.`L_RETURNFLAG`, `lineitem`.`L_LINESTATUS`, `lineitem`.`L_SHIPDATE`, `lineitem`.`L_COMMITDATE`, `lineitem`.`L_RECEIPTDATE`, `lineitem`.`L_SHIPINSTRUCT`, `lineitem`.`L_SHIPMODE`, `lineitem`.`L_COMMENT`, `orders`.`O_ORDERKEY`, `orders`.`O_CUSTKEY`, `orders`.`O_ORDERSTATUS`, `orders`.`O_TOTALPRICE`, `orders`.`O_ORDERDATE`, `orders`.`O_ORDERPRIORITY`, `orders`.`O_CLERK`, `orders`.`O_SHIPPRIORITY`, `orders`.`O_COMMENT` FROM `lineitem` AS `lineitem` INNER JOIN `orders` AS `orders` ON (`lineitem`.`L_LINENUMBER` = `orders`.`O_ORDERKEY`)") 
    In this example, the query joins two tables. The specified partition keys of the two tables are consistent. In this case, the join operation is pushed down to the specified shards at the storage layer to be performed. At the compute layer, the Gather operator is used to compute the data that is returned from the storage layer.
  • Example 4

    Execute the following SQL statement:

    EXPLAIN select * from gsi_dml_unique_multi_index_base where integer_test=1;   

    The following information about the execution plan is returned:

    +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
     Project(pk="pk", integer_test="integer_test", varchar_test="varchar_test", char_test="char_test", blob_test="blob_test", tinyint_test="tinyint_test", tinyint_1bit_test="tinyint_1bit_test", smallint_test="smallint_test", mediumint_test="mediumint_test", bit_test="bit_test", bigint_test="bigint_test", float_test="float_test", double_test="double_test", decimal_test="decimal_test", date_test="date_test", time_test="time_test", datetime_test="datetime_test", timestamp_test="timestamp_test", year_test="year_test", mediumtext_test="mediumtext_test") 
       BKAJoin(condition="pk = pk", type="inner")                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
         IndexScan(tables="DRDS_POLARX1_QATEST_APP_000000_GROUP.gsi_dml_unique_multi_index_index1_a0ol_01", sql="SELECT `pk`, `integer_test`, `varchar_test`, `char_test`, `bit_test`, `bigint_test`, `double_test`, `date_test` FROM `gsi_dml_unique_multi_index_index1` AS `gsi_dml_unique_multi_index_index1` WHERE (`integer_test` = ?)")                                                                                                                                                                                                                              
         Gather(concurrent=true)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
           LogicalView(tables="[000000-000003].gsi_dml_unique_multi_index_base_[00-15]", shardCount=16, sql="SELECT `pk`, `blob_test`, `tinyint_test`, `tinyint_1bit_test`, `smallint_test`, `mediumint_test`, `float_test`, `decimal_test`, `time_test`, `datetime_test`, `timestamp_test`, `year_test`, `mediumtext_test` FROM `gsi_dml_unique_multi_index_base` AS `gsi_dml_unique_multi_index_base` WHERE ((`integer_test` = ?) AND (`pk` IN (...)))")                                                                                                                 
     HitCache:true
    In this example, the SQL statement contains only a predicate and defines a simple query. The execution plan for the SQL statement shows that the BKAJoin operator is used to join two tables. This is because the gsi_dml_unique_multi_index_base table has a global secondary index that is created on the integer_test column. If a query hits the index, the resource overhead that is consumed to scan for the queried data can be reduced. The index is not a covering index. In this case, the base table is also scanned.
    • IndexScan: scans the gsi_dml_unique_multi_index_index1_a0ol_01 index table based on the integer_test=1 condition.
    • BKAJoin: collects data that is returned by the IndexScan operator and joins the index table and the gsi_dml_unique_multi_index_base base table to scan for data that is not obtained from the index table.
    Note In most cases, you can use the EXPLAIN statement to query the execution plan for an SQL statement and view the information about how the statement is executed. For example, you can check whether a global secondary index is hit. For SQL clauses that are pushed down to the storage layer to be executed, you can use the EXPLAIN EXECUTE statement to query the information about the physical SQL statements that are executed at the storage layer. For example, you can check whether the local index of the specified table is hit.