Queries can be optimized in the following steps: locate slow SQL queries, run the EXPLAIN statement to view their execution plans, and then use specific methods to optimize the slow SQL queries. The methods used to optimize slow SQL queries include pushing down more computations to MySQL at the storage layer, adding indexes, and optimizing execution plans.
Push down more computations
- Filter conditions, such as the WHERE and HAVING conditions.
- Aggregate operators, such as COUNT and GROUP BY. Each aggregation is performed in two phases.
- Sorting operators, such as ORDER BY.
- JOIN operations and subqueries. The left and right tables must use the same sharding method, or one of the tables must be a broadcast table.
> EXPLAIN select * from customer, nation where c_nationkey = n_nationkey and n_regionkey = 3;
Project(c_custkey="c_custkey", c_name="c_name", c_address="c_address", c_nationkey="c_nationkey", c_phone="c_phone", c_acctbal="c_acctbal", c_mktsegment="c_mktsegment", c_comment="c_comment", n_nationkey="n_nationkey", n_name="n_name", n_regionkey="n_regionkey", n_comment="n_comment")
BKAJoin(condition="c_nationkey = n_nationkey", type="inner")
Gather(concurrent=true)
LogicalView(tables="nation", shardCount=2, sql="SELECT * FROM `nation` AS `nation` WHERE (`n_regionkey` = ?)")
Gather(concurrent=true)
LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT * FROM `customer` AS `customer` WHERE (`c_nationkey` IN ('?'))")
A Batched Key Access (BKA) join is used in the execution plan. Each time the BKA join selects a group of rows from the left table, the join performs an IN query to select the correlated rows from the right table. Then, the BKA join combines the rows from both tables. A long time is required to process the query because the left table contains a large amount of data.
The join cannot be pushed down because the table named nation is sharded based on the primary key n_nationkey whereas the join key in this query is c_custkey.
--- After the modification ---
CREATE TABLE `nation` (
`n_nationkey` int(11) NOT NULL,
`n_name` varchar(25) NOT NULL,
`n_regionkey` int(11) NOT NULL,
`n_comment` varchar(152) DEFAULT NULL,
PRIMARY KEY (`n_nationkey`)
) BROADCAST; --- Claim the table as a broadcast table.
> EXPLAIN select * from customer, nation where c_nationkey = n_nationkey and n_regionkey = 3;
Gather(concurrent=true)
LogicalView(tables="customer_[0-7],nation", shardCount=8, sql="SELECT * FROM `customer` AS `customer` INNER JOIN `nation` AS `nation` ON ((`nation`.`n_regionkey` = ?) AND (`customer`.`c_nationkey` = `nation`.`n_nationkey`))")
For more information about how computations are pushed down and optimized, see Push down and rewrite queries.
Add indexes
PolarDB-X supports GSI.
> EXPLAIN select o_orderkey, c_custkey, c_name from orders, customer
where o_custkey = c_custkey and o_orderdate = '2019-11-11' and o_totalprice > 100;
Project(o_orderkey="o_orderkey", c_custkey="c_custkey", c_name="c_name")
HashJoin(condition="o_custkey = c_custkey", type="inner")
Gather(concurrent=true)
LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT `c_custkey`, `c_name` FROM `customer` AS `customer`")
Gather(concurrent=true)
LogicalView(tables="orders_[0-7]", shardCount=8, sql="SELECT `o_orderkey`, `o_custkey` FROM `orders` AS `orders` WHERE ((`o_orderdate` = ?) AND (`o_totalprice` > ?))")
In the execution plan, the orders table is sharded based on o_orderkey but the customer table is sharded based on c_custkey. The join cannot be pushed down because the two tables use different shard keys. A large number of orders whose order amount is greater than USD 100 were placed on November 11, 2019. It is time-consuming to perform a cross-shard join. Therefore, a global secondary index must be created on the orders table to push down the join. The o_orderkey, o_custkey, o_orderdate, and o_totalprice columns of the orders table are included in the query. The o_orderkey column is the shard key of the base table. The o_custkey column is the shard key of the index table. The o_orderdate and o_totalprice columns are specified as covering columns of the index table to prevent the TABLE ACCESS BY INDEX ROWID issue.
> create global index i_o_custkey on orders(`o_custkey`) covering(`o_orderdate`, `o_totalprice`)
DBPARTITION BY HASH(`o_custkey`) TBPARTITION BY HASH(`o_custkey`) TBPARTITIONS 4;
> EXPLAIN select o_orderkey, c_custkey, c_name from orders force index(i_o_custkey), customer
where o_custkey = c_custkey and o_orderdate = '2019-11-11' and o_totalprice > 100;
Gather(concurrent=true)
IndexScan(tables="i_o_custkey_[0-7],customer_[0-7]", shardCount=8, sql="SELECT `i_o_custkey`.`o_orderkey`, `customer`.`c_custkey`, `customer`.`c_name` FROM `i_o_custkey` AS `i_o_custkey` INNER JOIN `customer` AS `customer` ON (((`i_o_custkey`.`o_orderdate` = ?) AND (`i_o_custkey`.`o_custkey` = `customer`.`c_custkey`)) AND (`i_o_custkey`.`o_totalprice` > ?))")
For more information about how to use global secondary indexes, see GSI.
Optimize execution plans
> EXPLAIN select o_orderkey, c_custkey, c_name from orders, customer
where o_custkey = c_custkey and o_orderdate = '2019-11-15' and o_totalprice < 10;
Project(o_orderkey="o_orderkey", c_custkey="c_custkey", c_name="c_name")
HashJoin(condition="o_custkey = c_custkey", type="inner")
Gather(concurrent=true)
LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT `c_custkey`, `c_name` FROM `customer` AS `customer`")
Gather(concurrent=true)
LogicalView(tables="orders_[0-7]", shardCount=8, sql="SELECT `o_orderkey`, `o_custkey` FROM `orders` AS `orders` WHERE ((`o_orderdate` = ?) AND (`o_totalprice` < ?))")
A small number of orders whose order amount is less than USD 10 were placed on November 15, 2019. In this case, BKA joins are more efficient than hash joins. For more information about BKA joins and hash joins, see Optimize and execute JOIN operations.
> EXPLAIN /*+TDDL:BKA_JOIN(orders, customer)*/ select o_orderkey, c_custkey, c_name from orders, customer
where o_custkey = c_custkey and o_orderdate = '2019-11-15' and o_totalprice < 10;
Project(o_orderkey="o_orderkey", c_custkey="c_custkey", c_name="c_name")
BKAJoin(condition="o_custkey = c_custkey", type="inner")
Gather(concurrent=true)
LogicalView(tables="orders_[0-7]", shardCount=8, sql="SELECT `o_orderkey`, `o_custkey` FROM `orders` AS `orders` WHERE ((`o_orderdate` = ?) AND (`o_totalprice` < ?))")
Gather(concurrent=true)
LogicalView(tables="customer_[0-7]", shardCount=8, sql="SELECT `c_custkey`, `c_name` FROM `customer` AS `customer` WHERE (`c_custkey` IN ('?'))")
You can perform a query that contains the following hint:/*+TDDL:BKA_JOIN(orders, customer)*/ select o_orderkey, c_custkey, c_name from orders, customer where o_custkey = c_custkey and o_orderdate = '2019-11-15' and o_totalprice < 10;
The preceding operation accelerates the SQL query. To make the hint effective, include
the hint in the SQL statement in your application. You can also use the plan management
feature to bind the execution plan to the SQL statement. We recommend that you use
the plan management feature instead of modifying the SQL statement. The following
code block provides an example:BASELINE FIX SQL /*+TDDL:BKA_JOIN(orders, customer)*/ select o_orderkey, c_custkey, c_name from orders, customer where o_custkey = c_custkey and o_orderdate = '2019-11-15';
This way, PolarDB-X executes only the preceding execution plan for the specified SQL statement, even if the parameters of the SQL statement are modified. For more information about the plan management feature, see Manage execution plans.
Process a query in parallel
mysql> explain physical select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 or
der by cnt limit 5, 10;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorType: AP_LOCAL |
| The Query's MaxConcurrentParallelism: 2 |
| Fragment 1 |
| Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT] |
| Output partitioning: SINGLE [] Parallelism: 1 |
| TopN(sort="cnt ASC", offset=?2, fetch=?3) |
| Filter(condition="cnt > ?1") |
| HashAgg(group="k", cnt="COUNT()") |
| BKAJoin(condition="k = id", type="inner") |
| RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k)) |
| Gather(concurrent=true) |
| LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 0 |
| Shuffle Output layout: [BIGINT, BIGINT] Output layout: [BIGINT, BIGINT] |
| Output partitioning: SINGLE [] Parallelism: 1 Splits: 16 |
| LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)") |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
mysql> explain physical /*+TDDL:PARALLELISM=8*/select a.k, count(*) cnt from sbtest1 a, sbtest1 b where a.id = b.k and a.id > 1000 group by k having cnt > 1300 order by cnt limit 5, 10; |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ExecutorMode: AP_LOCAL |
| Fragment 0 dependency: [] parallelism: 8 |
| BKAJoin(condition="k = id", type="inner") |
| Gather(concurrent=true) |
| LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `id`, `k` FROM `sbtest1` AS `sbtest1` WHERE (`id` > ?)") |
| Gather(concurrent=true) |
| LogicalView(tables="[000000-000003].sbtest1_[00-15]", shardCount=16, sql="SELECT `k` FROM `sbtest1` AS `sbtest1` WHERE ((`k` > ?) AND (`k` IN (...)))") |
| Fragment 1 dependency: [] parallelism: 8 |
| LocalBuffer |
| RemoteSource(sourceFragmentIds=[0], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0)) |
| Fragment 2 dependency: [0, 1] parallelism: 8 |
| Filter(condition="cnt > ?1") |
| HashAgg(group="k", cnt="COUNT()") |
| RemoteSource(sourceFragmentIds=[1], type=RecordType(INTEGER_UNSIGNED id, INTEGER_UNSIGNED k, INTEGER_UNSIGNED k0)) |
| Fragment 3 dependency: [0, 1] parallelism: 1 |
| LocalBuffer |
| RemoteSource(sourceFragmentIds=[2], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt)) |
| Fragment 4 dependency: [2, 3] parallelism: 1 |
| TopN(sort="cnt ASC", offset=?2, fetch=?3) |
| RemoteSource(sourceFragmentIds=[3], type=RecordType(INTEGER_UNSIGNED k, BIGINT cnt)) |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+