Queries can be optimized in the following steps: locate slow SQL queries, run the EXPLAIN statement to view their execution plans, and then use specific methods to optimize the slow SQL queries. The methods used to optimize slow SQL queries include pushing down more computations to MySQL at the storage layer, adding indexes, and optimizing execution plans.

Push down more computations

PolarDB-X pushes down as many computations as possible to MySQL. This reduces the costs of data transmission, networks, and computations in PolarDB-X. This also enables more efficient SQL query execution. PolarDB-X can push down the following operators:
  • Filter conditions, such as the WHERE and HAVING conditions.
  • Aggregate operators, such as COUNT and GROUP BY. Each aggregation is performed in two phases.
  • Sorting operators, such as ORDER BY.
  • JOIN operations and subqueries. The left and right tables must use the same sharding method, or one of the tables must be a broadcast table.
The following example shows how to push down more computations to MySQL to improve the execution efficiency:
> EXPLAIN select * from customer, nation where c_nationkey = n_nationkey and n_regionkey = 3;

Project(c_custkey="c_custkey", c_name="c_name", c_address="c_address", c_nationkey="c_nationkey", c_phone="c_phone", c_acctbal="c_acctbal", c_mktsegment="c_mktsegment", c_comment="c_comment", n_nationkey="n_nationkey", n_name="n_name", n_regionkey="n_regionkey", n_comment="n_comment")
  BKAJoin(condition="c_nationkey = n_nationkey", type="inner")
    Gather(concurrent=true)
      LogicalView(tables="nation", shardCount=2, sql="SELECT * FROM `nation` AS `nation` WHERE (`n_regionkey` = ?)")
    Gather(concurrent=true)
      LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT * FROM `customer` AS `customer` WHERE (`c_nationkey` IN ('?'))")

A Batched Key Access (BKA) join is used in the execution plan. Each time the BKA join selects a group of rows from the left table, the join performs an IN query to select the correlated rows from the right table. Then, the BKA join combines the rows from both tables. A long time is required to process the query because the left table contains a large amount of data.

The join cannot be pushed down because the table named nation is sharded based on the primary key n_nationkey whereas the join key in this query is c_custkey.

The nation table contains a small amount of data and few changes are made to the table. Therefore, you can create the following broadcast table to replace the nation table:
--- After the modification ---
CREATE TABLE `nation` (
  `n_nationkey` int(11) NOT NULL,
  `n_name` varchar(25) NOT NULL,
  `n_regionkey` int(11) NOT NULL,
  `n_comment` varchar(152) DEFAULT NULL,
  PRIMARY KEY (`n_nationkey`)
) BROADCAST;  --- Claim the table as a broadcast table.
After you modify the nation table, joins are no longer used in the execution plan. Most computations are pushed down to LogicalView in MySQL. PolarDB-X needs only to gather the results and then return them to the user. This greatly improves the execution efficiency.
> EXPLAIN select * from customer, nation where c_nationkey = n_nationkey and n_regionkey = 3;

Gather(concurrent=true)
  LogicalView(tables="customer_[0-7],nation", shardCount=8, sql="SELECT * FROM `customer` AS `customer` INNER JOIN `nation` AS `nation` ON ((`nation`.`n_regionkey` = ?) AND (`customer`.`c_nationkey` = `nation`.`n_nationkey`))")
For more information about how computations are pushed down and optimized, see Push down and rewrite queries.

Add indexes

PolarDB-X supports GSI.

In the following example, the execution plan of a slow SQL query is used to demonstrate how to push down more computations by using global secondary indexes:
> EXPLAIN select o_orderkey, c_custkey, c_name from orders, customer
          where o_custkey = c_custkey and o_orderdate = '2019-11-11' and o_totalprice > 100;

Project(o_orderkey="o_orderkey", c_custkey="c_custkey", c_name="c_name")
  HashJoin(condition="o_custkey = c_custkey", type="inner")
    Gather(concurrent=true)
      LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT `c_custkey`, `c_name` FROM `customer` AS `customer`")
    Gather(concurrent=true)
      LogicalView(tables="orders_[0-7]", shardCount=8, sql="SELECT `o_orderkey`, `o_custkey` FROM `orders` AS `orders` WHERE ((`o_orderdate` = ?) AND (`o_totalprice` > ?))")

In the execution plan, the orders table is sharded based on o_orderkey but the customer table is sharded based on c_custkey. The join cannot be pushed down because the two tables use different shard keys. A large number of orders whose order amount is greater than USD 100 were placed on November 11, 2019. It is time-consuming to perform a cross-shard join. Therefore, a global secondary index must be created on the orders table to push down the join. The o_orderkey, o_custkey, o_orderdate, and o_totalprice columns of the orders table are included in the query. The o_orderkey column is the shard key of the base table. The o_custkey column is the shard key of the index table. The o_orderdate and o_totalprice columns are specified as covering columns of the index table to prevent the TABLE ACCESS BY INDEX ROWID issue.

> create global index i_o_custkey on orders(`o_custkey`) covering(`o_orderdate`, `o_totalprice`)
        DBPARTITION BY HASH(`o_custkey`) TBPARTITION BY HASH(`o_custkey`) TBPARTITIONS 4;
After the global secondary index is created and then forcibly used by specifying force index(i_o_custkey), the cross-shard join becomes a local join in IndexScan and is performed by MySQL. In addition, the covering columns help prevent the TABLE ACCESS BY INDEX ROWID issue. This improves the query efficiency.
> EXPLAIN select o_orderkey, c_custkey, c_name from orders force index(i_o_custkey), customer
          where o_custkey = c_custkey and o_orderdate = '2019-11-11' and o_totalprice > 100;

Gather(concurrent=true)
  IndexScan(tables="i_o_custkey_[0-7],customer_[0-7]", shardCount=8, sql="SELECT `i_o_custkey`.`o_orderkey`, `customer`.`c_custkey`, `customer`.`c_name` FROM `i_o_custkey` AS `i_o_custkey` INNER JOIN `customer` AS `customer` ON (((`i_o_custkey`.`o_orderdate` = ?) AND (`i_o_custkey`.`o_custkey` = `customer`.`c_custkey`)) AND (`i_o_custkey`.`o_totalprice` > ?))")
For more information about how to use global secondary indexes, see GSI.

Optimize execution plans

In most cases, the query optimizer of PolarDB-X can automatically generate the optimal execution plan. However, an automatically generated execution plan may not be optimal if values are missing or deviations exist. In these cases, hints can be used to optimize the execution plan. The following example shows how to optimize an execution plan:
> EXPLAIN select o_orderkey, c_custkey, c_name from orders, customer
          where o_custkey = c_custkey and o_orderdate = '2019-11-15' and o_totalprice < 10;

Project(o_orderkey="o_orderkey", c_custkey="c_custkey", c_name="c_name")
  HashJoin(condition="o_custkey = c_custkey", type="inner")
    Gather(concurrent=true)
      LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT `c_custkey`, `c_name` FROM `customer` AS `customer`")
    Gather(concurrent=true)
      LogicalView(tables="orders_[0-7]", shardCount=8, sql="SELECT `o_orderkey`, `o_custkey` FROM `orders` AS `orders` WHERE ((`o_orderdate` = ?) AND (`o_totalprice` < ?))")

A small number of orders whose order amount is less than USD 10 were placed on November 15, 2019. In this case, BKA joins are more efficient than hash joins. For more information about BKA joins and hash joins, see Optimize and execute JOIN operations.

Use the /*+TDDL:BKA_JOIN(orders, customer)*/ hint to force the optimizer to use a BKA join. A BKA join is also known as a lookup join.
> EXPLAIN /*+TDDL:BKA_JOIN(orders, customer)*/ select o_orderkey, c_custkey, c_name from orders, customer
          where o_custkey = c_custkey and o_orderdate = '2019-11-15' and o_totalprice < 10;

Project(o_orderkey="o_orderkey", c_custkey="c_custkey", c_name="c_name")
  BKAJoin(condition="o_custkey = c_custkey", type="inner")
    Gather(concurrent=true)
      LogicalView(tables="orders_[0-7]", shardCount=8, sql="SELECT `o_orderkey`, `o_custkey` FROM `orders` AS `orders` WHERE ((`o_orderdate` = ?) AND (`o_totalprice` < ?))")
    Gather(concurrent=true)
      LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT `c_custkey`, `c_name` FROM `customer` AS `customer` WHERE (`c_custkey` IN ('?'))")
You can perform a query that contains the following hint:
/*+TDDL:BKA_JOIN(orders, customer)*/ select o_orderkey, c_custkey, c_name from orders, customer where o_custkey = c_custkey and o_orderdate = '2019-11-15' and o_totalprice < 10;
The preceding operation accelerates the SQL query. To make the hint effective, include the hint in the SQL statement in your application. You can also use the plan management feature to bind the execution plan to the SQL statement. We recommend that you use the plan management feature instead of modifying the SQL statement. The following code block provides an example:
BASELINE FIX SQL /*+TDDL:BKA_JOIN(orders, customer)*/ select o_orderkey, c_custkey, c_name from orders, customer where o_custkey = c_custkey and o_orderdate = '2019-11-15';

This way, PolarDB-X executes only the preceding execution plan for the specified SQL statement, even if the parameters of the SQL statement are modified. For more information about the plan management feature, see Manage execution plans.

Process a query in parallel

You can use HINT /*+TDDL:PARALLELISM=4*/ to specify the degree of parallelism. This enables the system to use multiple CPU cores to accelerate computations. The following code block provides an example:
mysql> explain physical select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 or
der by cnt limit 5, 10;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN                                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorType: AP_LOCAL                                                                                                                                                 |
| The Query's MaxConcurrentParallelism: 2                                                                                                                           |
| Fragment 1                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1                                                                                                                 |
|     TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                     |
|   Filter(condition="cnt > ?1")                                                                                                                                    |
|     HashAgg(group="k", cnt="COUNT()")                                                                                                                             |
|       BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|         RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k))                                                             |
|         Gather(concurrent=true)                                                                                                                                   |
|           LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 0                                                                                                                                                        |
|     Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT]                                                                                       |
|     Output partitioning: SINGLE [] Parallelism: 1 Splits: 16                                                                                                      |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The default degree of parallelism is not high. You can specify the degree of parallelism to improve the query efficiency in standalone or parallel computing mode.
mysql> explain physical /*+TDDL:PARALLELISM=8*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10;                                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorMode: AP_LOCAL                                                                                                                                      |
| Fragment 0 dependency: [] parallelism: 8                                                                                                                    |
| BKAJoin(condition="k = id", type="inner")                                                                                                                   |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)")               |
|   Gather(concurrent=true)                                                                                                                                   |
|     LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 1 dependency: [] parallelism: 8                                                                                                                    |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                        |
| Fragment 2 dependency: [0, 1] parallelism: 8                                                                                                                |
| Filter(condition="cnt > ?1")                                                                                                                                |
|   HashAgg(group="k", cnt="COUNT()")                                                                                                                         |
|     RemoteSource(sourceFragmentIds=[1], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0))                                      |
| Fragment 3 dependency: [0, 1] parallelism: 1                                                                                                                |
| LocalBuffer                                                                                                                                                 |
|   RemoteSource(sourceFragmentIds=[2], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
| Fragment 4 dependency: [2, 3] parallelism: 1                                                                                                                |
| TopN(sort="cnt ASC", offset=?2, fetch=?3)                                                                                                                   |
|   RemoteSource(sourceFragmentIds=[3], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt))                                                                      |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+