All Products
Search
Document Center

Generate a distributed plan

Last Updated: Jun 18, 2021

The optimizer of OceanBase Database generates a distributed execution plan through the following two stages:

1. Stage 1: The optimizer generates optimal execution plans for an SQL query based on the optimal local relationships, regardless of the physical distribution of data. Then, it checks whether the query accesses data of multiple partitions, or whether the user uses hints to force force run a parallel query when the query accesses a local single-partition table.

2. Stage 2: The optimizer generates a distributed plan. The optimizer inserts EXCHANGE operators to nodes that require data redistribution to convert the local execution plan tree to a distributed execution plan.

Operators of a distributed execution plan

The process of generating a distributed plan is essentially a process of finding the best position in the original plan tree to insert the EXCHANGE operators. When the optimizer traverses the plan tree from top-down, it needs to analyze the data processing results of the corresponding operators and the data partitions of the input operators before inserting an EXCHANGE operator.

The following example shows the simplest syntax of a single-table scan:

obclient>CREATE TABLE t1 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM t1\G;
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR               |NAME    |EST. ROWS|COST  |
------------------------------------------------------
|0 |PX COORDINATOR         |        |500000   |545109|
|1 | EXCHANGE OUT DISTR    |:EX10000|500000   |320292|
|2 |  PX PARTITION ITERATOR|        |500000   |320292|
|3 |   TABLE SCAN          |T1      |500000   |320292|
======================================================

Outputs & filters:
-------------------------------------
  0 - output([T1.V1], [T1.V2]), filter(nil)
  1 - output([T1.V1], [T1.V2]), filter(nil), dop=1
  2 - output([T1.V1], [T1.V2]), filter(nil)
  3 - output([T1.V1], [T1.V2]), filter(nil),
      access([T1.V1], [T1.V2]), partitions(p[0-4])

When table t1 is a partitioned table, you can insert a matching EXCHANGE operator to TABLE SCAN and encapsulate TABLE SCAN and EXCHANGE OUT into a job for parallel execution.

Single-input pushdown operator

Single-input pushdown operators mainly include the AGGREGATION, SORT, GROUP BY, and LIMIT operators. All listed operators, except for the LIMIT operator, have a matching operation key. If the distribution of the input data is consistent with the operation keys, a stage 1 aggregation, also known as a partition-wise aggregation, is performed. Otherwise, a stage 2 aggregation is required, and the AGGREGATION operator is pushed down.

Example of stage 1 aggregation:

obclient>CREATE TABLE t2 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT SUM(v1) FROM t2 GROUP BY v1\G;
*************************** 1. row ***************************
Query Plan:
| ======================================================
|ID|OPERATOR               |NAME    |EST. ROWS|COST  |
------------------------------------------------------
|0 |PX COORDINATOR         |        |101      |357302|
|1 | EXCHANGE OUT DISTR    |:EX10000|101      |357297|
|2 |  PX PARTITION ITERATOR|        |101      |357297|
|3 |   MERGE GROUP BY      |        |101      |357297|
|4 |    TABLE SCAN         |t2      |400000   |247403|
======================================================

Outputs & filters:
-------------------------------------
  0 - output([T_FUN_SUM(t2.v1)]), filter(nil)
  1 - output([T_FUN_SUM(t2.v1)]), filter(nil), dop=1
  2 - output([T_FUN_SUM(t2.v1)]), filter(nil)
  3 - output([T_FUN_SUM(t2.v1)]), filter(nil),
      group([t2.v1]), agg_func([T_FUN_SUM(t2.v1)])
  4 - output([t2.v1]), filter(nil),
      access([t2.v1]), partitions(p[0-3])

Example of stage 2 aggregation:

| ============================================================
|ID|OPERATOR                     |NAME    |EST. ROWS|COST  |
------------------------------------------------------------
|0 |PX COORDINATOR               |        |101      |561383|
|1 | EXCHANGE OUT DISTR          |:EX10001|101      |561374|
|2 |  HASH GROUP BY              |        |101      |561374|
|3 |   EXCHANGE IN DISTR         |        |101      |408805|
|4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|101      |408795|
|5 |     HASH GROUP BY           |        |101      |408795|
|6 |      PX PARTITION ITERATOR  |        |400000   |256226|
|7 |       TABLE SCAN            |t2      |400000   |256226|
============================================================

Outputs & filters:
-------------------------------------
  0 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil)
  1 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil), dop=1
  2 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil),
      group([t2.v2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.v1))])
  3 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil)
  4 - (#keys=1, [t2.v2]), output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil), dop=1
  5 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil),
      group([t2.v2]), agg_func([T_FUN_SUM(t2.v1)])
  6 - output([t2.v1], [t2.v2]), filter(nil)
  7 - output([t2.v1], [t2.v2]), filter(nil),
      access([t2.v1], [t2.v2]), partitions(p[0-3])

Binary input operator

The JOIN operator is considered in most cases when binary input operators are involved. The JOIN operator is used to generate distributed execution plans and select the data redistribution methods based on rules. The following three JOIN modes are supported for the JOIN operator:

  • Partition-wise join

    Partition-wise join is used when both of the left and right tables are partitioned in the same way, they have the same physical distribution, and their partition keys are the join conditions. Example:

    obclient>CREATE TABLE t3 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4;
    Query OK, 0 rows affected (0.12 sec)
    
    obclient>EXPLAIN SELECT * FROM t2, t3 WHERE t2.v1 = t3.v1\G;
    *************************** 1. row ***************************
    Query Plan: 
    ===========================================================
    |ID|OPERATOR               |NAME    |EST. ROWS |COST      |
    |0 |PX COORDINATOR         |        |1568160000|1227554264|
    |1 | EXCHANGE OUT DISTR    |:EX10000|1568160000|930670004 |
    |2 |  PX PARTITION ITERATOR|        |1568160000|930670004 |
    |3 |   MERGE JOIN          |        |1568160000|930670004 |
    |4 |    TABLE SCAN         |t2      |400000    |256226    |
    |5 |    TABLE SCAN         |t3      |400000    |256226    |
    ===========================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil)
      1 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), dop=1
      2 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil)
      3 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil),
          equal_conds([t2.v1 = t3.v1]), other_conds(nil)
      4 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])
      5 - output([t3.v1], [t3.v2]), filter(nil),
          access([t3.v1], [t3.v2]), partitions(p[0-3])
  • Partial Partition-Wise Join

    When only one of the left and right tables is partitioned, or they both are partitioned but the join key is the same as the partition key of one of the two tables. Then, the partition distribution of the table where the corresponding partition key belongs is taken as the basis to redistribute the data in the other table. Example:

    obclient>CREATE TABLE t4 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 3;
    Query OK, 0 rows affected (0.12 sec)
    
    obclient>EXPLAIN SELECT * FROM t4, t2 WHERE t2.v1 = t4.v1\G;
    *************************** 1. row ***************************
    Query Plan:
     ===========================================================
    |ID|OPERATOR                     |NAME    |EST. ROWS|COST |
    -----------------------------------------------------------
    |0 |PX COORDINATOR               |        |11880    |17658|
    |1 | EXCHANGE OUT DISTR          |:EX10001|11880    |15409|
    |2 |  NESTED-LOOP JOIN           |        |11880    |15409|
    |3 |   EXCHANGE IN DISTR         |        |3        |37   |
    |4 |    EXCHANGE OUT DISTR (PKEY)|:EX10000|3        |37   |
    |5 |     PX PARTITION ITERATOR   |        |3        |37   |
    |6 |      TABLE SCAN             |t4      |3        |37   |
    |7 |   PX PARTITION ITERATOR     |        |3960     |2561 |
    |8 |    TABLE SCAN               |t2      |3960     |2561 |
    ===========================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil)
      1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=1
      2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil),
          conds(nil), nl_params_([t4.v1])
      3 - output([t4.v1], [t4.v2]), filter(nil)
      4 - (#keys=1, [t4.v1]), output([t4.v1], [t4.v2]), filter(nil), dop=1
      5 - output([t4.v1], [t4.v2]), filter(nil)
      6 - output([t4.v1], [t4.v2]), filter(nil),
          access([t4.v1], [t4.v2]), partitions(p[0-2])
      7 - output([t2.v1], [t2.v2]), filter(nil)
      8 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])
  • Data redistribution

    When the join key is irrelative to the partition keys of both tables, you can choose either BROADCAST or HASH HASH for data redistribution based on the specified rules. Example:

    Notice

    The two data redistribution methods in the following example are available for selection only when the parallelism is greater than 1.

    obclient>EXPLAIN SELECT /*+ PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G;
    *************************** 1. row ***************************
    Query Plan:
    =================================================================
    |ID|OPERATOR                          |NAME    |EST. ROWS|COST  |
    -----------------------------------------------------------------
    |0 |PX COORDINATOR                    |        |11880    |396863|
    |1 | EXCHANGE OUT DISTR               |:EX10001|11880    |394614|
    |2 |  HASH JOIN                       |        |11880    |394614|
    |3 |   EXCHANGE IN DISTR              |        |3        |37    |
    |4 |    EXCHANGE OUT DISTR (BROADCAST)|:EX10000|3        |37    |
    |5 |     PX BLOCK ITERATOR            |        |3        |37    |
    |6 |      TABLE SCAN                  |t4      |3        |37    |
    |7 |   PX PARTITION ITERATOR          |        |400000   |256226|
    |8 |    TABLE SCAN                    |t2      |400000   |256226|
    =================================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil)
      1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2
      2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil),
          equal_conds([t2.v2 = t4.v2]), other_conds(nil)
      3 - output([t4.v1], [t4.v2]), filter(nil)
      4 - output([t4.v1], [t4.v2]), filter(nil), dop=2
      5 - output([t4.v1], [t4.v2]), filter(nil)
      6 - output([t4.v1], [t4.v2]), filter(nil),
          access([t4.v1], [t4.v2]), partitions(p[0-2])
      7 - output([t2.v1], [t2.v2]), filter(nil)
      8 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])
          
    
    obclient>EXPLAIN SELECT /*+ PQ_DISTRIBUTE(t2 HASH HASH) PARALLEL(2)*/* FROM t4, t2 
                  WHERE t2.v2 = t4.v2\G;
    *************************** 1. row ***************************
    Query Plan:
     ============================================================
    |ID|OPERATOR                     |NAME    |EST. ROWS|COST  |
    ------------------------------------------------------------
    |0 |PX COORDINATOR               |        |11880    |434727|
    |1 | EXCHANGE OUT DISTR          |:EX10002|11880    |432478|
    |2 |  HASH JOIN                  |        |11880    |432478|
    |3 |   EXCHANGE IN DISTR         |        |3        |37    |
    |4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|3        |37    |
    |5 |     PX BLOCK ITERATOR       |        |3        |37    |
    |6 |      TABLE SCAN             |t4      |3        |37    |
    |7 |   EXCHANGE IN DISTR         |        |400000   |294090|
    |8 |    EXCHANGE OUT DISTR (HASH)|:EX10001|400000   |256226|
    |9 |     PX PARTITION ITERATOR   |        |400000   |256226|
    |10|      TABLE SCAN             |t2      |400000   |256226|
    ============================================================
    
    Outputs & filters:
    -------------------------------------
      0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil)
      1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2
      2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil),
          equal_conds([t2.v2 = t4.v2]), other_conds(nil)
      3 - output([t4.v1], [t4.v2]), filter(nil)
      4 - (#keys=1, [t4.v2]), output([t4.v1], [t4.v2]), filter(nil), dop=2
      5 - output([t4.v1], [t4.v2]), filter(nil)
      6 - output([t4.v1], [t4.v2]), filter(nil),
          access([t4.v1], [t4.v2]), partitions(p[0-2])
      7 - output([t2.v1], [t2.v2]), filter(nil)
      8 - (#keys=1, [t2.v2]), output([t2.v1], [t2.v2]), filter(nil), dop=2
      9 - output([t2.v1], [t2.v2]), filter(nil)
      10 - output([t2.v1], [t2.v2]), filter(nil),
          access([t2.v1], [t2.v2]), partitions(p[0-3])