All Products
Search
Document Center

EXCHANGE

Last Updated: Jun 18, 2021

The EXCHANGE operator exchanges data between threads.

The EXCHANGE operator is useful in distributed scenarios. It usually appears in pairs, with an EXCHANGE OUT operator at the source side, and an EXCHANGE IN operator at the destination side.

EXCH-IN/OUT

Short for EXCHANGE IN/ EXCHANGE OUT, the EXCH-IN/OUT operators aggregate data from multiple partitions and send them to the leader node involved in the query.

In the following example, the query accesses 5 partitions: p0, p1, p2, p3, and p4. The No. 1 operator receives the output of the No. 2 operator, and sends the data out. The No. 0 operator receives the output of the No. 1 operator from multiple partitions, aggregates them, and generates the result.

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 5;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM t\G;
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR           |NAME|EST. ROWS|COST  |
----------------------------------------------
|0 |EXCHANGE IN DISTR  |    |500000   |545109|
|1 | EXCHANGE OUT DISTR|    |500000   |320292|
|2 |  TABLE SCAN       |T   |500000   |320292|
==============================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2]), filter(nil)
  1 - output([T.C1], [T.C2]), filter(nil)
  2 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-4])

In the preceding example, the Outputs & filters section in the demo shows in detail the output information of the EXCH-IN/OUT operators.

Field

Description

output

The output expression of the operator.

filter

The filter conditions of the operator.

In this example, the filter is set to nil because no filter condition is configured for the EXCH-IN/OUT operator.

EXCH-IN/OUT (REMOTE)

The EXCH-IN/OUT (REMOTE) operators pull remote data in a single partition back to the local server.

As shown in the following example, a non-partitioned table is created on server A, and a query is executed on server B to read data from that table. In this case, the No. 0 and No. 1 operators are assigned to the execution plan to fetch remote data. The No. 1 operator is executed on server A to read the data from table t and send the data out. The No. 0 operator is executed on server B to receive the output of the No. 1 operator.

obclient>CREATE TABLE t (c1 INT, c2 INT);
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM t\G;
*************************** 1. row ***************************
Query Plan:
===============================================
|ID|OPERATOR            |NAME|EST. ROWS|COST  |
-----------------------------------------------
|0 |EXCHANGE IN REMOTE  |    |100000   |109029|
|1 | EXCHANGE OUT REMOTE|    |100000   |64066 |
|2 |  TABLE SCAN        |T   |100000   |64066 |
===============================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2]), filter(nil)
  1 - output([T.C1], [T.C2]), filter(nil)
  2 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p0)

In the preceding example, the Outputs & filters section in the execution plan display shows in detail the output information of the EXCH-IN/OUT (REMOTE) operator. Fields of the operator have the same meaning as that of the EXCH-IN/OUT operator.

EXCH-IN/OUT (PKEY)

The EXCH-IN/OUT (PKEY) operators repartition data. They are generally used with a binary operator pair to repartition the data of one subnode by following the partitioning method of the other subnode.

In the following example, the query is to join two partitioned tables. The execution plan repartitions the data of table s by following the partitioning method of table t. The input of the No. 4 operator is the result of scanning table s, and for every row of table s, the operator determines its target node based on the partitioning of table t and the join condition of the query.

In addition, it can be seen that the No. 3 operator is an EXCHANGE IN MERGE SORT DISTR, a special EXCHANGE IN operator that performs MERGE and SORT operations when aggregating data from multiple partitions. In this execution plan, the data received by the No. 3 operator from all partitions is sorted by c1, so the operator merges and sorts the received data to ensure that the output is also sorted by c1.

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 5;
Query OK, 0 rows affected (0.12 sec)

obclient>CREATE TABLE s (c1 INT PRIMARY KEY, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT * FROM s, t WHERE s.c1 = t.c1\G;
*************************** 1. row ***************************
Query Plan:
 ===============================================================
|ID|OPERATOR                       |NAME|EST. ROWS |COST      |
---------------------------------------------------------------
|0 |EXCHANGE IN DISTR              |    |1960200000|3090308367|
|1 | EXCHANGE OUT DISTR            |    |1960200000|1327558071|
|2 |  MERGE JOIN                   |    |1960200000|1327558071|
|3 |   EXCHANGE IN MERGE SORT DISTR|    |400000    |436080    |
|4 |    EXCHANGE OUT DISTR (PKEY)  |    |400000    |256226    |
|5 |     TABLE SCAN                |S   |400000    |256226    |
|6 |   TABLE SCAN                  |T   |500000    |320292    |
===============================================================

Outputs & filters:
-------------------------------------
  0 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil)
  1 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil)
  2 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil),
      equal_conds([S.C1 = T.C1]), other_conds(nil)
  3 - output([S.C1], [S.C2]), filter(nil), sort_keys([S.C1, ASC])
  4 - (#keys=1, [S.C1]), output([S.C1], [S.C2]), filter(nil)
  5 - output([S.C1], [S.C2]), filter(nil),
      access([S.C1], [S.C2]), partitions(p[0-3])
  6 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-4])

In the preceding example, the Outputs & filters section in the demo shows in detail the output information of the EXCH-IN/OUT (PKEY) operators.

Field

Description

output

The output expression of the operator.

filter

The filter conditions of the operator.

In this example, the filter is set to nil because no filter condition is configured for the EXCH-IN/OUT(PKEY) operator.

pkey

The column based on which the repartitioning is performed.

For example, #keys=1, [s.c1] indicates that the repartitioning is performed based on column c1.

EXCH-IN/OUT (HASH)

The EXCH-IN/OUT (HASH) operators repartition data by using a set of HASH functions.

In the execution plan shown in the following example, operators 3 - 5 and 7 - 8 are two sets of EXCHANGE operators that use HASH function for repartitioning. These two groups of operators partition the data of tables t and s into multiple parts based on a new set of HASH functions. In the example, the columns t.c2 and s.c2 are taken for the hash operation, to ensure that rows with the same value in column c2 are grouped in the same part. Based on the repartitioned data, HASH JOIN, or the No. 2 operator, merges all parts based on the condition t.c2= s.c2.

Additionally, the plan shows dop = 2 because the query is executed with a degree of parallelism of 2.

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>CREATE TABLE s (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>EXPLAIN SELECT /*+PQ_DISTRIBUTE(@"SEL$1" ("TEST.S"@"SEL$1" ) HASH HASH), 
             PARALLEL(2)*/ * FROM t, s WHERE t.c2 = s.c2\G;
*************************** 1. row ***************************
Query Plan:
=================================================================
|ID|OPERATOR                     |NAME    |EST. ROWS |COST      |
-----------------------------------------------------------------
|0 |PX COORDINATOR               |        |1568160000|2473629500|
|1 | EXCHANGE OUT DISTR          |:EX10002|1568160000|1063429263|
|2 |  HASH JOIN                  |        |1568160000|1063429263|
|3 |   EXCHANGE IN DISTR         |        |400000    |436080    |
|4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|400000    |256226    |
|5 |     PX PARTITION ITERATOR   |        |400000    |256226    |
|6 |      TABLE SCAN             |T       |400000    |256226    |
|7 |   EXCHANGE IN DISTR         |        |400000    |436080    |
|8 |    EXCHANGE OUT DISTR (HASH)|:EX10001|400000    |256226    |
|9 |     PX PARTITION ITERATOR   |        |400000    |256226    |
|10|      TABLE SCAN             |S       |400000    |256226    |
=================================================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil)
  1 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil), dop=2
  2 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil),
      equal_conds([T.C2 = S.C2]), other_conds(nil)
  3 - output([T.C1], [T.C2]), filter(nil)
  4 - (#keys=1, [T.C2]), output([T.C1], [T.C2]), filter(nil), dop=2
  5 - output([T.C1], [T.C2]), filter(nil)
  6 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-3])
  7 - output([S.C1], [S.C2]), filter(nil)
  8 - (#keys=1, [S.C2]), output([S.C1], [S.C2]), filter(nil), dop=2
  9 - output([S.C1], [S.C2]), filter(nil)
  10 - output([S.C1], [S.C2]), filter(nil),
      access([S.C1], [S.C2]), partitions(p[0-3])

The PX PARTITION ITERATO operator iterates data at the partition level. For more information, see GI.

In the preceding example, the Outputs & filters section in the demo shows in detail the output information of the EXCH-IN/OUT (HASH) operators.

Field

Description

output

The output expression of the operator.

filter

The filter conditions of the operator.

In this example, the filter is set to nil because no filter condition is configured for the EXCH-IN/OUT(HASH) operator.

pkey

The column based on which the HASH repartitioning is performed.

For example, #keys=1, [s.c2] indicates that the HASH repartitioning is performed based on column c2.

EXCH-IN/OUT (BROADCAST)

The EXCH-IN/OUT (BROADCAST) operators repartition the input data by using the BROADCAST method and broadcasts the data to other threads.

In the execution plan shown in the following example, operators 3 and 4 are EXCHANGE operators that use BROADCAST repartitioning. They broadcast the data of Table t to every thread, and try to join each partition of table s with the broadcast data of table t .

obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>CREATE TABLE s (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected (0.12 sec)

obclient>INSERT INTO s VALUES (1, 1), (2, 2), (3, 3), (4, 4);
Query OK, 1 rows affected (0.12 sec)

obclient>EXPALIN SELECT  /*+PARALLEL(2) */ * FROM t, s WHERE t.c2 = s.c2\G;
*************************** 1. row ***************************
Query Plan:
======================================================================
|ID|OPERATOR                          |NAME    |EST. ROWS |COST      |
----------------------------------------------------------------------
|0 |PX COORDINATOR                    |        |1568160000|2473449646|
|1 | EXCHANGE OUT DISTR               |:EX10001|1568160000|1063249409|
|2 |  HASH JOIN                       |        |1568160000|1063249409|
|3 |   EXCHANGE IN DISTR              |        |400000    |436080    |
|4 |    EXCHANGE OUT DISTR (BROADCAST)|:EX10000|400000    |256226    |
|5 |     PX PARTITION ITERATOR        |        |400000    |256226    |
|6 |      TABLE SCAN                  |T       |400000    |256226    |
|7 |   PX PARTITION ITERATOR          |        |400000    |256226    |
|8 |    TABLE SCAN                    |S       |400000    |256226    |
======================================================================

Outputs & filters:
-------------------------------------
  0 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil)
  1 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil), dop=2
  2 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil),
      equal_conds([T.C2 = S.C2]), other_conds(nil)
  3 - output([T.C1], [T.C2]), filter(nil)
  4 - output([T.C1], [T.C2]), filter(nil), dop=2
  5 - output([T.C1], [T.C2]), filter(nil)
  6 - output([T.C1], [T.C2]), filter(nil),
      access([T.C1], [T.C2]), partitions(p[0-3])
  7 - output([S.C1], [S.C2]), filter(nil)
  8 - output([S.C1], [S.C2]), filter(nil),
      access([S.C1], [S.C2]), partitions(p[0-3])

In the preceding example, the Outputs & filters section in the execution plan display shows in detail the output information of the EXCH-IN/OUT (BROADCAST) operator. Fields of the operator have the same meaning as that of the EXCH-IN/OUT operator.