All Products
Search
Document Center

:Explain and Explain Analyze

Last Updated:Jan 19, 2024

If SQL query performance is poor or query results do not meet expectations, you can execute the Explain and Explain Analyze statements in Hologres to obtain execution plans of query statements. This way, you can optimize query statements or database structures. This topic describes how to execute the Explain and Explain Analyze statements in Hologres to query execution plans. This topic also describes the meaning of each operator in the outputs of the Explain and Explain Analyze statements.

Introduction to execution plans

In Hologres, the query optimizer (QO) generates an execution plan for each SQL statement. The query engine (QE) generates a final execution plan based on the execution plan generated by the QO. The QE then executes the SQL statement and returns a result. An execution plan contains information such as SQL statistics, operators, and operator execution durations. A good execution plan helps you execute an SQL statement with fewer resources and obtain results faster. Execution plans are vital to routine data development. They help you identify issues in SQL statements and optimize SQL statements.

Hologres is compatible with PostgreSQL. You can query execution plans of SQL statements by executing the Explain and Explain Analyze statements.

  • Explain: This statement returns the SQL execution plan that is estimated by the QO based on characteristics of SQL statements, rather than the actual execution plan. This estimated execution plan provides reference for SQL statement execution.

  • Explain Analyze: This statement returns the actual SQL execution plan. Compared with the estimated execution plan returned by the Explain statement, the actual execution plan returned by the Explain Analyze statement contains actual execution information such as the actually executed operators and the accurate execution duration of each operator. Based on the execution duration of each operator, you can perform SQL statement optimization.

Note

In Hologres V1.3.4x and later, results returned by the Explain and Explain Analyze statements are optimized. If you want to query execution plans with higher readability, we recommend that you upgrade your Hologres instance to V1.3.4x or later.

Explain

  • Syntax

    You can execute the following Explain statement to query the execution plan that is estimated by the QO:

    explain <sql>;
  • Example

    In this example, a TPC-H query statement is used.

    Note

    The return value in this example cannot be used as the result published by TPC-H.

    explain select
            l_returnflag,
            l_linestatus,
            sum(l_quantity) as sum_qty,
            sum(l_extendedprice) as sum_base_price,
            sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
            sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
            avg(l_quantity) as avg_qty,
            avg(l_extendedprice) as avg_price,
            avg(l_discount) as avg_disc,
            count(*) as count_order
    from
            lineitem
    where
            l_shipdate <= date '1998-12-01' - interval '120' day
    group by
            l_returnflag,
            l_linestatus
    order by
            l_returnflag,
            l_linestatus;
  • Returned result

    QUERY PLAN
    Sort  (cost=0.00..7795.30 rows=3 width=80)
      Sort Key: l_returnflag, l_linestatus
      ->  Gather  (cost=0.00..7795.27 rows=3 width=80)
            ->  Project  (cost=0.00..7795.27 rows=3 width=80)
                  ->  Project  (cost=0.00..7794.27 rows=3 width=104)
                        ->  Final HashAggregate  (cost=0.00..7793.27 rows=3 width=76)
                              Group Key: l_returnflag, l_linestatus
                              ->  Redistribution  (cost=0.00..7792.95 rows=1881 width=76)
                                    Hash Key: l_returnflag, l_linestatus
                                    ->  Partial HashAggregate  (cost=0.00..7792.89 rows=1881 width=76)
                                          Group Key: l_returnflag, l_linestatus
                                          ->  Local Gather  (cost=0.00..7791.81 rows=44412 width=76)
                                                ->  Decode  (cost=0.00..7791.80 rows=44412 width=76)
                                                      ->  Partial HashAggregate  (cost=0.00..7791.70 rows=44412 width=76)
                                                            Group Key: l_returnflag, l_linestatus
                                                            ->  Project  (cost=0.00..3550.73 rows=584421302 width=33)
                                                                  ->  Project  (cost=0.00..2585.43 rows=584421302 width=33)
                                                                        ->  Index Scan using Clustering_index on lineitem  (cost=0.00..261.36 rows=584421302 width=25)
                                                                              Segment Filter: (l_shipdate <= '1998-08-03 00:00:00+08'::timestamp with time zone)
                                                                              Cluster Filter: (l_shipdate <= '1998-08-03 00:00:00+08'::timestamp with time zone)
    
  • Result description

    You need to view the execution plan from bottom to top. Each arrow (->) indicates a node. Each node contains information such as the operator and the estimated number of returned rows. The following table describes the parameters of an operator.

    Parameter

    Description

    cost

    The estimated execution duration of an operator. The cost value of a parent node includes the cost values of its child nodes. The cost parameter contains the estimated start cost and estimated total cost that are separated by two periods (..).

    • Estimated start cost: the cost before the output phase starts.

    • Estimated total cost: the estimated total cost of the operator execution.

    In the preceding returned result, the estimated start cost of the Final HashAggregate node is 0.00, and the estimated total cost of the node is 7793.27.

    rows

    The number of rows that are returned by an operator, which is estimated based on table statistics.

    The value of the rows parameter for the Index Scan node is 1000 by default.

    Note

    In most cases, if rows=1000 is returned, the statistics of the table are invalid, and no estimation is performed based on the table statistics. You can execute the analyze <tablename> statement to update the statistics of the table.

    width

    The estimated average length of columns returned by an operator. A large value indicates a long column length. Unit: bytes.

Explain Analyze

  • Syntax

    You can execute the following Explain Analyze statement to query the actual execution plan of an SQL statement and the execution duration of each operator. This helps you diagnose SQL performance issues.

    explain analyze <sql>;
  • Example

    In this example, a TPC-H query statement is used.

    explain analyze select
            l_returnflag,
            l_linestatus,
            sum(l_quantity) as sum_qty,
            sum(l_extendedprice) as sum_base_price,
            sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
            sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
            avg(l_quantity) as avg_qty,
            avg(l_extendedprice) as avg_price,
            avg(l_discount) as avg_disc,
            count(*) as count_order
    from
            lineitem
    where
            l_shipdate <= date '1998-12-01' - interval '120' day
    group by
            l_returnflag,
            l_linestatus
    order by
            l_returnflag,
            l_linestatus;
  • Returned result

    QUERY PLAN
    Sort  (cost=0.00..7795.30 rows=3 width=80)
      Sort Key: l_returnflag, l_linestatus
    [id=21 dop=1 time=2427/2427/2427ms rows=4(4/4/4) mem=3/3/3KB open=2427/2427/2427ms get_next=0/0/0ms]
      ->  Gather  (cost=0.00..7795.27 rows=3 width=80)
          [20:1 id=100003 dop=1 time=2426/2426/2426ms rows=4(4/4/4) mem=1/1/1KB open=0/0/0ms get_next=2426/2426/2426ms]
            ->  Project  (cost=0.00..7795.27 rows=3 width=80)
                [id=19 dop=20 time=2427/2426/2425ms rows=4(1/0/0) mem=87/87/87KB open=2427/2425/2425ms get_next=1/0/0ms]
                  ->  Project  (cost=0.00..7794.27 rows=0 width=104)
                        ->  Final HashAggregate  (cost=0.00..7793.27 rows=3 width=76)
                              Group Key: l_returnflag, l_linestatus
                            [id=16 dop=20 time=2427/2425/2424ms rows=4(1/0/0) mem=574/570/569KB open=2427/2425/2424ms get_next=1/0/0ms]
                              ->  Redistribution  (cost=0.00..7792.95 rows=1881 width=76)
                                    Hash Key: l_returnflag, l_linestatus
                                  [20:20 id=100002 dop=20 time=2427/2424/2423ms rows=80(20/4/0) mem=3528/1172/584B open=1/0/0ms get_next=2426/2424/2423ms]
                                    ->  Partial HashAggregate  (cost=0.00..7792.89 rows=1881 width=76)
                                          Group Key: l_returnflag, l_linestatus
                                        [id=12 dop=20 time=2428/2357/2256ms rows=80(4/4/4) mem=574/574/574KB open=2428/2357/2256ms get_next=1/0/0ms]
                                          ->  Local Gather  (cost=0.00..7791.81 rows=44412 width=76)
                                              [id=11 dop=20 time=2427/2356/2255ms rows=936(52/46/44) mem=7/6/6KB open=0/0/0ms get_next=2427/2356/2255ms pull_dop=9/9/9]
                                                ->  Decode  (cost=0.00..7791.80 rows=44412 width=76)
                                                    [id=8 dop=234 time=2435/1484/5ms rows=936(4/4/4) mem=0/0/0B open=2435/1484/5ms get_next=4/0/0ms]
                                                      ->  Partial HashAggregate  (cost=0.00..7791.70 rows=44412 width=76)
                                                            Group Key: l_returnflag, l_linestatus
                                                          [id=5 dop=234 time=2435/1484/3ms rows=936(4/4/4) mem=313/312/168KB open=2435/1484/3ms get_next=0/0/0ms]
                                                            ->  Project  (cost=0.00..3550.73 rows=584421302 width=33)
                                                                [id=4 dop=234 time=2145/1281/2ms rows=585075720(4222846/2500323/3500) mem=142/141/69KB open=10/1/0ms get_next=2145/1280/2ms]
                                                                  ->  Project  (cost=0.00..2585.43 rows=584421302 width=33)
                                                                      [id=3 dop=234 time=582/322/2ms rows=585075720(4222846/2500323/3500) mem=142/142/69KB open=10/1/0ms get_next=582/320/2ms]
                                                                        ->  Index Scan using Clustering_index on lineitem  (cost=0.00..261.36 rows=584421302 width=25)
                                                                              Segment Filter: (l_shipdate <= '1998-08-03 00:00:00+08'::timestamp with time zone)
                                                                              Cluster Filter: (l_shipdate <= '1998-08-03 00:00:00+08'::timestamp with time zone)
                                                                            [id=2 dop=234 time=259/125/1ms rows=585075720(4222846/2500323/3500) mem=1418/886/81KB open=10/1/0ms get_next=253/124/0ms]
    
    ADVICE: 
    [node id : 1000xxx] distribution key miss match! table lineitem defined distribution keys : l_orderkey; request distribution columns : l_returnflag, l_linestatus; 
    shuffle data skew in different shards! max rows is 20, min rows is 0
    
    Query id:[300200511xxxx]
    ======================cost======================
    Total cost:[2505] ms
    Optimizer cost:[47] ms
    Init gangs cost:[4] ms
    Build gang desc table cost:[2] ms
    Start query cost:[18] ms
    - Wait schema cost:[0] ms
    - Lock query cost:[0] ms
    - Create dataset reader cost:[0] ms
    - Create split reader cost:[0] ms
    Get the first block cost:[2434] ms
    Get result cost:[2434] ms
    ====================resource====================
    Memory: 921(244/230/217) MB,  straggler worker id: 72969760xxx
    CPU time: 149772(38159/37443/36736) ms, straggler worker id: 72969760xxx
    Physical read bytes: 3345(839/836/834) MB, straggler worker id: 72969760xxx
    Read bytes: 41787(10451/10446/10444) MB, straggler worker id: 72969760xxx
    DAG instance count: 41(11/10/10), straggler worker id: 72969760xxx
    Fragment instance count: 275(70/68/67), straggler worker id: 72969760xxx
  • Result description

    The output of the Explain Analyze statement provides the actual execution path of an SQL statement. The output is displayed in a tree structure and contains detailed execution information of each operator at each phase. The output of the Explain Analyze statement contains the following parts: query plan, advice, cost, and resource.

Query Plan

The query plan part provides detailed execution information of each operator. Similar to the Explain statement, you also need to view the query plan from bottom to top. Each arrow (->) indicates a node.

Example

Description

(cost=0.00..2585.43 rows=584421302 width=33)

The parameters indicate values estimated by the QO and have the same meanings as those in the output of the Explain statement.

  • cost: the estimated execution duration of an operator.

  • rows: the estimated number of rows returned by an operator. If the value of the rows parameter is significantly different from the actual number of returned rows, the table statistics may not be updated. In this case, we recommend that you execute the analyze <tablename> statement to update the table statistics.

  • width: the estimated average length of columns returned by an operator. A large value indicates a long column length. Unit: bytes.

[20:20 id=100002 dop=20 time=2427/2424/2423ms rows=80(20/4/0) mem=3528/1172/584B open=1/0/0ms get_next=2426/2424/2423ms]

The parameters indicate actual values for the execution of each operator.

  • 20:20: the ratio of the input DAG degree of parallelism (DOP) to the output DAG DOP, which indicates the parallelism mapping between the input and output of the operator. In most cases, this parameter is used with the output of an operator. For example:

    • 21:1 with the gather operator indicates that 21 inputs are gathered to provide 1 output.

    • 21:21 with the Redistribution operator indicates that the shuffling operation is performed with the parallelism of 21.

    • 1:21 with the broadcast operator indicates that the input parallelism is 1 and the output parallelism is 21.

  • id: the unique ID of an operator.

    In this example, id=100002 is returned.

  • dop: the actual DOP of an operator during the runtime. The value of this parameter is the same as the number of shards for the instance. In this example, dop=20 is returned. However, the dop parameter for the Local Gather node indicates the number of scanned files.

  • time: the actual execution durations in the open and get_next phases. Unit: milliseconds. In Hologres, each operator has an open API and a get_next API. The value of this parameter is in the format of maximum duration/average duration/minimum duration ms. Example: time=2427/2424/2423ms.

    • open: This API operation is called to initialize an operator. For some operators, this API depends on data of downstream operators. For example, for a hash operator, the open API creates a hash table and pulls the data of all downstream operators. The value of this parameter indicates the execution duration in the open phase and is in the format of maximum duration/average duration/minimum duration ms. Example: open=1/0/0ms.

    • get_next: This API operation is called to obtain a record batch from the downstream operators for computing. The implementation of this API operation differs based on types of operators. This API operation is called for multiple times until data of all downstream operators is pulled. The value of this parameter indicates the execution duration in the next phase and is in the format of maximum duration/average duration/minimum duration ms. Example: get_next=2426/2424/2423ms.

  • row: the number of rows returned by an operator. You can compare the maximum value, minimum value, and average value. If the values are not basically the same, data is unevenly distributed.

    The value of this parameter is in the format of total rows(maximum number of rows/average number of rows/minimum number of rows). Example: rows=80(20/4/0).

  • mem: the maximum, average, and minimum memory resources that are consumed by an operator during the runtime.

    The value of this parameter is in the format of maximum memory resource consumption/average memory resource consumption/minimum memory resource consumption. Example: mem=3528/1172/584B.

An SQL statement may involve multiple operators. For more information about operators, see Operators.

Note

When you use the time, row, and mem parameters, take note of the following items:

  • The value of the time parameter is accumulated. Therefore, if you want to obtain the execution duration of an operator, you need to subtract the time value for the downstream operator from the time value for this operator.

  • Values of the row and mem parameters are not accumulated.

Advice

The advice part provides optimization suggestions that are automatically generated by the system based on the output of the Explain Analyze statement. The following suggestions are available:

  • Table xxx misses bitmap index: We recommend that you configure a distribution key, a clustering key, or a bitmap index for the table.

  • Table xxx Miss Stats! please run 'analyze xxx';: No statistics are available for the table.

  • shuffle data xxx in different shards! max rows is 20, min rows is 0: Data screw may exist.

Note

The advice part provides only suggestions for an SQL statement and may not be applicable. You need to perform optimization measures based on business scenarios.

Cost

The cost part provides the total execution duration of an SQL statement and the execution duration of each phase. You can locate performance bottlenecks based on the execution duration of each phase.

Total cost: the total execution duration of an SQL statement in the unit of milliseconds. It consists of the following parts:

  • Optimizer cost: the duration that is taken by the QO to generate an execution plan. Unit: milliseconds.

  • Build gang desc table cost: the duration that is taken to convert the execution plan that is generated by the QO into the data that is in the format required by the QE. Unit: milliseconds.

  • Init gangs cost: the duration that is taken to preprocess the execution plan that is generated by the QO and send the query request to the QE to trigger the start query phase. Unit: milliseconds.

  • Start query cost: the duration of the initialization phase, which starts when the Init gangs step completes and ends when the query operation starts. The initialization phase involves operations such as lock acquiring and schema version alignment and consists of the following parts:

    • Wait schema cost: the duration that is taken to align the storage engine (SE) version and FE node version with the schema version. If the table schema changes, both the FE node version and SE version need to be updated. If the FE node version and SE version are inconsistent with the schema version, schema latency may occur. If a large number of DDL statements are executed on the partitioned parent table, the processing speed of SE is slow and the latency is high. As a result, data write and read are slow. In this case, you can optimize the frequency of executing DDL statements.

    • Lock query cost: the duration that is taken for a query to acquire a lock. If the duration is long, the query is waiting for a lock.

    • Create dataset reader cost: the duration that is taken to create an index data reader. If the duration is long, the cache may not be hit.

    • Create split reader cost: the duration that is taken to open a file. If the duration is long, the metadata of the file does not hit the cache. If this occurs, the I/O overhead is high.

  • Get result cost: the duration from the time when the start query phase ends to the time when all results are returned. The Get result cost contains the Get the first block cost. Unit: milliseconds.

    • Get the first block cost: the duration from the time when the start query phase ends to the time when the first record batch is returned. In some scenarios, the Get the first block cost is very close to or consistent with the Get result cost. For example, if a hash aggregate operator is used in the first step of a query plan, full data of downstream operators is required to create a hash table for an aggregate operation. For common queries with filter conditions, data is calculated and returned in real time. In this scenario, the Get the first block cost greatly differs from the Get result cost. The difference depends on the data amount.

Resource

The resource part provides the amounts of resources that are consumed during the query execution, in the format of total amount of consumed resources(maximum amount of consumed resources/average amount of consumed resources/minimum amount of consumed resources).

Hologres is a distributed engine. A Hologres instance has multiple worker nodes. The computing results of worker nodes are merged and the merging result is returned to the client. Therefore, the resource consumption information is displayed in the format of total(max worker/avg worker/min worker).

  • total: the total amount of resources that are consumed by a SQL query statement.

  • max: the maximum amount of resources that are consumed by a worker node.

  • avg: the average amount of resources consumed per worker node, which is calculated by using the following formula: Total amount of consumed resources/Number of worker nodes.

  • min: the minimum amount of resources that are consumed by a worker node.

The following table describes the metrics in the resource part.

Metric

Description

Memory

The memory consumption information of an SQL query statement, including the total memory resources consumed by all worker nodes, the maximum memory resources consumed by a worker node, the average memory resources consumed by per worker node, and the minimum memory resources consumed by a worker node.

CPU time

The total CPU time that is consumed by an SQL query statement, which is not accurate. Unit: milliseconds.

This metric indicates the total CPU time that is consumed by all computing tasks, which is the sum of the time for multiple CPU cores. The value of this metric basically reflects the complexity of computing tasks.

Physical read bytes

The amount of data that is read from the disk. Unit: bytes. If the query does not hit the cache, the data is read from the disk.

Read bytes

The total number of bytes that are read by the SQL query statement. The total number of bytes includes the bytes of the physical read data and the bytes of the data that is read from the cache. The value of this metric reflects the amount of the data that is obtained from the output of the SQL query statement.

Affected rows

The number of rows that are affected by a data manipulation language (DML) statement. This metric is displayed only when a DML statement is executed.

Dag instance count

The number of DAG instances in the execution plan. A large value indicates a complex query with a high DOP.

Fragment instance count

The number of fragment instances in the execution plan. A large value indicates a large number of plans and a large number of files.

straggler_worker_id

The ID of the worker node that consumes the most resources.

Operators

SCAN

  • seq scan

    Seq Scan is used to read data from a table in sequence. A full table scan is performed. The on keyword after Seq Scan is followed by the name of the scanned table.

    Example: Execute the following statement to check the execution plan of a query on a common internal table. Seq Scan is returned in the execution plan of the query statement.

    EXPLAIN SELECT * FROM public.holo_lineitem_100g;

    The following figure shows the returned result.

    image

    • Query data from a partitioned table

      If you query data from a partitioned table, the Seq Scan on Partitioned Table keywords are displayed in the execution plan of the query. You can obtain the number of partitions that are scanned in the query statement from the Partitions selected keywords.

      Example: Execute the following statement to check the execution plan of a query on a partitioned parent table with one partition scanned.

      EXPLAIN SELECT * FROM public.hologres_parent;

      The following figure shows the returned result.

      image

    • Query data by using a foreign table

      If you query data by using a foreign table, the Foreign Table Type keywords are displayed in the execution plan of the query to specify the source of the foreign table. The value of Foreign Table Type can be MaxCompute, OSS, or Hologres.

      Example: Execute the following statement to check the execution plan of a query by using a MaxCompute foreign table.

      EXPLAIN SELECT * FROM public.odps_lineitem_100;

      The following figure shows the returned result.

      image

  • Index Scan and Index Seek

    You can use indexes to accelerate queries on a table. Hologres uses different indexes at the underlying layer based on the storage mode of the table. Based on the column-oriented storage mode and row-oriented storage mode, indexes are classified into clustering_index and Index Seek (also named pk_index).

    • Clustering_index: the type of indexes that are applicable to column-oriented tables, such as the segment key or clustering key. If a query on a column-oriented table hits indexes, this type of indexes are used. Seq Scan Using Clustering_index is usually used with Filter. Filter is a child node and lists the hit indexes. Filter can be a clustering filter, segment filter, or bitmap filter. For more information, see Column-oriented storage.

      • Example 1: A query hits indexes.

        begin;
        create table column_test (
         "id" bigint not null ,
         "name" text not null ,
         "age" bigint not null 
        );
        call set_table_property('column_test', 'orientation', 'column');
        call set_table_property('column_test', 'distribution_key', 'id');
        call set_table_property('column_test', 'clustering_key', 'id');
        commit;
        
        insert into column_test values(1,'tom',10),(2,'tony',11),(3,'tony',12);
        
        explain select * from column_test where id>2;

        The following figure shows the returned result.

        image

      • Example 2: A query does not hit any index, and indexes of the clustering_index type are not used.

        explain select * from column_test where age>10;

        The following figure shows the returned result.

        image

    • Index Seek (also named pk_index): the type of indexes that are applicable to row-oriented tables, such as the primary key. In most cases, fixed plans are used for point queries on row-oriented tables that are configured with primary keys. If queries on row-oriented tables that are configured with primary keys do not use fixes plans, indexes of the Index Seek type are used. For more information, see Row-oriented storage.

      Example: Query data from a row-oriented table.

      begin;
      create table row_test_1 (
          id bigint not null,
          name text not null,
          class text ,
      PRIMARY KEY (id)
      );
      call set_table_property('row_test_1', 'orientation', 'row');
      call set_table_property('row_test_1', 'clustering_key', 'name');
      commit;
      insert into row_test_1 values ('1','qqq','3'),('2','aaa','4'),('3','zzz','5');
      
      begin;
      create table row_test_2 (
          id bigint not null,
          name text not null,
          class text ,
      PRIMARY KEY (id)
      );
      call set_table_property('row_test_2', 'orientation', 'row');
      call set_table_property('row_test_2', 'clustering_key', 'name');
      commit;
      insert into row_test_2 values ('1','qqq','3'),('2','aaa','4'),('3','zzz','5');
      
      --pk_index
      explain select * from (select id from row_test_1 where id = 1) t1 join row_test_2 t2 on t1.id = t2.id;
      

      The following figure shows the returned result.

      image

Filter

Filter is used to filter data based on SQL conditions. In most cases, Filter is a child node of Seq Scan and is executed with the Seq Scan node. Filter specifies whether data is filtered and whether a filter condition hits indexes. This section describes various types of Filters.

  • Filter

    If an execution plan contains only the Filter keyword, the filter condition does not hit any index. In this case, you need to check the table indexes and configure indexes again to accelerate the SQL query.

    Note

    If an execution plan contains One-Time Filter: false, the output is empty.

    Example:

    begin;
    create table clustering_index_test (
     "id" bigint not null ,
     "name" text not null ,
     "age" bigint not null 
    );
    call set_table_property('clustering_index_test', 'orientation', 'column');
    call set_table_property('clustering_index_test', 'distribution_key', 'id');
    call set_table_property('clustering_index_test', 'clustering_key', 'age');
    commit;
    
    insert into clustering_index_test values(1,'tom',10),(2,'tony',11),(3,'tony',12);
    
    explain select * from clustering_index_test where id>2;

    The following figure shows the returned result.

    image

  • Segment Filter

    Segment Filter indicates that a query hits the segment key. Segment Filter is used with index_scan. For more information, see Event time column (segment key).

  • Cluster Filter

    Cluster Filter indicates that a query hits the clustering key. For more information, see Clustering key.

  • Bitmap Filter

    Bitmap Filter indicates that a query hits the bitmap index. For more information, see Bitmap index.

  • Join Filter

    Join Filter indicates that data needs to be filtered after the join operation.

Decode

Decode is used to encode or decode data to accelerate the computing of text data.

Local Gather and Gather

In Hologres, data is stored as files in shards. Local Gather is used to merge data in multiple files into one shard. Gather is used to summarize data in multiple shards and return the result.

Example:

explain select * from public.lineitem;

The following figure shows the returned execution plan. Data is scanned, merged into one shard by using the Local Gather operator, and then summarized by using the Gather operator.

image

Redistribution

Redistribution is used to hash data or randomly distribute data to one or more shards. The redistribution operator is commonly used in the JOIN, COUNT DISTINCT, and GROUP BY clauses.

If an execution plan contains the redistribution operator, no distribution key is configured for the table, or the distribution key setting is invalid. As a result, data is shuffled among multiple shards during the query. If redistribution occurs when you join multiple tables, the local join feature is not used. As a result, the query performance is poor.

Example: Join two tables.

begin;
create table tbl1(
a int not null,
b text not null
);
call set_table_property('tbl1', 'distribution_key', 'a');
create table tbl2(
c int not null,
d text not null
);
call set_table_property('tbl2', 'distribution_key', 'd');
commit;

explain select * from tbl1  join tbl2 on tbl1.a=tbl2.c;

The following figure shows the returned execution plan. The redistribution operator is contained, indicating that the distribution key is invalid. In the SQL statement, tbl1.a=tbl2.c is configured to join tables. However, the distribution key of the tbl1 table is a and that of the tbl2 table is d. As a result, data is shuffled during the join operation.

image

Suggestion: If an SQL statement contains the redistribution operator, we recommend that you check the setting of the distribution key. For more information about redistribution scenarios and setting of the distribution key, see Distribution key.

Join

The definition of the Join operator in the execution plan is the same as that in a standard database. The join operation can be classified into hash join, nested loop join, and merge join.

  • Hash Join

    Hash join is a way of joining two or more tables. During hash join, a hash table is built in the memory based on a to-be-joined table, which is usually a small table. The data of the to-be-joined columns is hash calculated and then inserted into the hash table. Data in other to-be-joined tables is read by row, hash calculated, and matched with data in the hash table. Matched data is returned. The following table describes the categories of hash join.

    Category

    Description

    Hash Left Join

    When multiple tables are joined, all rows in the left table that meet the join conditions are returned and then matched against the right table. If no data is matched, null is returned.

    Hash Right Join

    When multiple tables are joined, all rows in the right table and the rows in the left table that meet the join conditions are returned. If the rows in the right table do not match data in the left table, null is returned for the left table.

    Hash Inner Join

    When multiple tables are joined, only the rows that meet the join conditions are returned.

    Hash Full Join

    When multiple tables are joined, all rows in the left table and the right table are returned. If the data in one table does not match data in the other table, null is returned for the table whose data cannot be matched.

    Hash Anti Join

    Only unmatched data is returned. This type of join is mostly used for queries with the NOT EXISTS clause.

    Hash Semi Join

    Rows are returned if a data record is matched. The returned rows do not contain duplicate data. This type of join is usually used for queries with the EXISTS clause.

    When you view execution plans of hash join operations, you also need to focus on the child nodes.

    • hash cond: the join condition. Example: hash cond(tmp.a=tmp1.b).

    • hash key: the key that is used for hash calculation in multiple shards. In most cases, the key indicates the key of GROUP BY.

    During a hash join, you need to check whether the table with a small amount of data is used to create a hash table. You can use one of the following methods to check whether the small table is used to create a hash table:

    • In the execution plan, the table with the hash keyword is the table that is used to create a hash table.

    • In the execution plan, the bottom table is the table that is used to create a hash table.

    Optimization suggestion:

    • Update statistics

      The core idea is to use the small table to create a hash table. If a large table is used to create a hash table in the memory, more resources are consumed. In most cases, this issue occurs because statistics of the table are not updated and the QO uses a large table to create a hash table.

      In this example, the hash_join_test_2 (also named tbl2) table contains 1,000,000 rows of data, and the hash_join_test_1 (also named tbl1) table contains 10,000 rows of data. However, the table statistics are not updated and indicate that the tbl2 table contains 1,000 rows of data. As a result, the tbl2 table is considered as the small table and is used to create a hash table. The query efficiency is low.

      BEGIN ;
      CREATE TABLE public.hash_join_test_1 (
          a integer not null,
          b text not null
      );
      CALL set_table_property('public.hash_join_test_1', 'distribution_key', 'a');
      CREATE TABLE public.hash_join_test_2 (
          c integer not null,
          d text not null
      );
      CALL set_table_property('public.hash_join_test_2', 'distribution_key', 'c');
      COMMIT ;
      
      insert into hash_join_test_1 select i, i+1 from generate_series(1, 10000) as s(i);
      insert into hash_join_test_2 select i, i::text from generate_series(10, 1000000) as s(i);
      
      explain select * from hash_join_test_1 tbl1  join hash_join_test_2 tbl2 on tbl1.a=tbl2.c;

      The following figure shows the execution plan. The large table hash_join_test_2 is used to create a hash table.image

      If the table statistics are not updated, you can manually execute the analyze <tablename> statement to update the statistics. Sample statements:

      analyze hash_join_test_1;
      analyze hash_join_test_2;

      The following figure shows the execution plan after the table statistics are updated. The small table hash_join_test_1 is used to create a hash table, and the number of rows estimated by the QO is correct.image

    • Adjust the join order

      In most cases, you can resolve join issues by updating table statistics. However, based on the default mechanism, if the SQL statements are complex and five or more tables are joined, the Hologres QO selects an optimal execution plan based on the SQL statements.The selection process is time-consuming. You can execute the following statement to configure the Grand Unified Configuration (GUC) parameter and control the join order to accelerate the selection process of the QO:

      set optimizer_join_order = '<value>'; 

      The following table describes valid values of the GUC parameter.

      Valid value

      Description

      exhaustive (default value)

      The join order is determined by using an algorithm, and an optimal execution plan is generated. This may increase the QO overhead during multi-table join.

      query

      The execution plan is generated based on SQL statements. The QO does not make any changes. This value is applicable and helps decrease the QO overhead only if the multiple tables to be joined contain no more than hundreds of millions of rows of data. We recommend that you do not configure this GUC parameter at the database level. Otherwise, the performance of other join operations deteriorates.

      greedy

      The join order is generated by using the greedy algorithm. In this mode, the QO overhead is moderate.

  • Nested loop join and Materialize

    In a nested loop join of multiple tables, data is read from one table into an outer table. Each data record of the outer table is traversed into an inner table. Then, the inner and outer tables are joined in nested loops. This process is equivalent to the process of calculating the Cartesian product. In the execution plan, the first inner table usually has a Materialize operator.

    Optimization suggestion:

    • The principle of nested loop is that the inner table is driven by the outer table. Each row returned by the outer table must match a row in the inner table. Therefore, the returned result set cannot be too large. Otherwise, large amounts of resources are consumed. We recommend that you use the table that returns a small result as the outer table.

    • Non-equivalent joins usually generate nested loop joins. We recommend that you prevent non-equivalent joins in SQL statements.

    • The following code provides an example of a nested loop join.

      begin;
      create table public.nestedloop_test_1 (
          a integer not null,
          b integer not null
      );
      call set_table_property('public.nestedloop_test_1', 'distribution_key', 'a');
      create table public.nestedloop_test_2 (
          c integer not null,
          d text not null
      );
      call set_table_property('public.nestedloop_test_2', 'distribution_key', 'c');
      commit;
      
      insert into nestedloop_test_1 select i, i+1 from generate_series(1, 10000) as s(i);
      insert into nestedloop_test_2 select i, i::text from generate_series(10, 1000000) as s(i);
      
      explain select * from nestedloop_test_1 tbl1,nestedloop_test_2 tbl2 where tbl1.a>tbl2.c;

      The following figure shows the execution plan. The Materialize and Nested Loop operators are displayed. This indicates that the SQL statement uses the nested loop join.image

Broadcast

Broadcast is used to distribute data to each shard. In most cases, broadcast join is used to join small tables with large tables. When an SQL statement is delivered, the QO compares the costs of redistribution and broadcast, and then generates an execution plan based on the algorithm.

Optimization suggestion:

  • If the table that you want to query is small and the instance contains a small number of shards, such as 5 shards, broadcast is recommended.

    In this example, two tables are joined. The data amount of the broadcast_test_ 1 table and that of the broadcast_test_ 2 table are significantly different.

    begin;
    create table broadcast_test_1 (
        f1 int, 
        f2 int);
    call set_table_property('broadcast_test_1','distribution_key','f2');
    create table broadcast_test_2 (
        f1 int,
        f2 int);
    commit;
    
    insert into broadcast_test_1 select i as f1, i as f2 from generate_series(1, 30)i;
    insert into broadcast_test_2 select i as f1, i as f2 from generate_series(1, 30000)i;
    
    analyze broadcast_test_1;
    analyze broadcast_test_2;
    
    explain select * from broadcast_test_1 t1, broadcast_test_2 t2 where t1.f1=t2.f1;

    The following figure shows the returned result.

    image

  • If the tables to be joined are not small tables but the broadcast operator is used, the table statistics may not be updated as expected. For example, the statistics indicate that the table contains 1,000 rows of data, but actually the table contains 1,000,000 rows of data. In this case, execute the analyze <tablename> statement to update the table statistics.

ExecuteExternalSQL

As described in Hologres architecture, the query engines of Hologres are classified into Hologres Query Engine (HQE), PostgreSQL Query Engine (PQE), and Seahawks Query Engine (SQE). PQE is the native PostgreSQL engine. Some operators and functions that are not supported by HQE are executed by PQE. PQE is less efficient than HQE. If an execution plan contains the ExecuteExternalSQL operator, PQE is used.

  • Example 1: An SQL statement uses PQE.

    create table pqe_test(a text);
    insert into pqe_test values ('2023-01-28 16:25:19.082698+08');
    explain select a::timestamp from pqe_test;

    In the following execution plan, ExecuteExternalSQL is displayed, and PQE is used to process the ::timestamp operator.

    image

  • Example 2: The ::timestamp operator is changed to to_timestamp. HQE is used.

    explain select to_timestamp(a,'YYYY-MM-DD HH24:MI:SS') from pqe_test;

    In the following execution plan, ExecuteExternalSQL is not displayed, and PQE is not used.image

Optimization suggestion: Use the execution plan to find the function or operator that is processed by PQE, and rewrite the SQL statement to enable HQE to process the function or operator. This helps improve the query efficiency. For more information about how to rewrite operators, see Optimize query performance on Hologres internal tables.

Note

The support for PQE is continuously optimized in each Hologres version to push down more PQE functions to HQE. Some functions can be automatically supported by HQE after Hologres instances are upgraded. For more information, see Function release notes.

Aggregate

Aggregate is used to aggregate data. It can be an aggregate function or a combination of multiple aggregate functions. In SQL statements, aggregate operators are classified into the following types:

  • GroupAggregate: Data has been pre-sorted based on the GROUP BY clause.

  • HashAggregate: Data is hash-calculated, distributed to different shards based on hash values, and then aggregated by using the Gather operator. This type of aggregate is most commonly used.

    EXPLAIN SELECT l_orderkey,count(l_linenumber) FROM public.holo_lineitem_100g group by l_orderkey;
  • Multi-stage HashAggregate: Data is stored as files in shards. Files have different levels. If the data volume is large, the Aggregate phase is also divided into multiple phases. This type of aggregate includes the following operators:

    • Partial HashAggregate: Data is aggregated at the file level or shard level.

    • Final HashAggregate: Data in multiple shards is aggregated.

    In this example, the TPC-H Query 6 statement uses the multi-phase HashAggregate.

    explain select
            sum(l_extendedprice * l_discount) as revenue
    from
            lineitem
    where
            l_shipdate >= date '1996-01-01'
            and l_shipdate < date '1996-01-01' + interval '1' year
            and l_discount between 0.02 - 0.01 and 0.02 + 0.01
            and l_quantity < 24;

    The following figure shows the returned result.image

    Optimization suggestion: In most cases, the QO determines whether to use a single-phase HashAggregate or a multi-phase HashAggregate based on the data volume. If the execution plan returned by the Explain Analyze statement indicates that the aggregate operator is time-consuming, the data volume is large. The QO enables only the shard-level aggregation instead of file-level aggregation. In this case, you can set the following GUC parameter to on to perform multi-phase HashAggregate. If the SQL statement already uses a multi-phase aggregate, no additional adjustments are required.

    set optimizer_force_multistage_agg = on;

Sort

Sort is used to sort data in ascending order (ASC) or descending order (DESC), which is usually used with the ORDER BY clause.

In this example, data in the l_shipdate column of the TPC-H lineitem table is sorted.

EXPLAIN SELECT l_shipdate FROM public.lineitem order by l_shipdate;

The following figure shows the returned result.image

Optimization suggestion: If the ORDER BY clause involves a large amount of data, many resources are consumed. We recommend that you prevent sorting a large amount of data.

Limit

Limit is used to control the number of rows that an SQL statement can return. The limit operator controls only the number of rows that can be returned in the final result, and does not control the number of rows that can be scanned in the calculation. The limit operator can control the number of rows that are scanned only if the limit operator is pushed down to the Seq Scan node.

In this example, limit 1 is pushed down to the Seq Scan node, and only one row of data needs to be scanned.

explain select * from public.lineitem limit 1;

The following figure shows the returned result.image

Optimization suggestion:

  • Not all the limit operators can be pushed down. We recommend that you configure filter conditions in SQL queries to prevent full table scan.

  • We recommend that you do not set the limit to a super large value, such as 100,000 or even 1,000,000. If you set the limit to a large value, a large amount of data is scanned even if the limit is pushed down. As a result, the scanning process is time-consuming.

Append

Append is used to merge the results of subqueries, which is usually used in the Union All operation.

Exchange

Exchange is used to exchange data among shards. You do not need to pay much attention to this operator.

Forward

Forward is used to transmit data of operators between HQE and PQE or between HQE and SQE.

Project

Project indicates the mapping between a subquery and an outer query. You do not need to pay much attention to this operator.

References

You can view execution plans in a visualized manner in HoloWeb. For more information, see View execution plans.