This topic describes the data skew issue and provides related solutions. This issue may occur when the JOIN statement in MaxCompute SQL is executed.

Background information

When the JOIN statement in MaxCompute SQL is executed, the data with the same join key is sent to and processed on the same instance. If a key contains a large amount of data, the instance takes a longer time to process the data than other instances. Long tails exist if the execution log shows that a few instances in this JOIN task remain in the executing state and other instances are in the completed state.

Long tails caused by data skew are common and significantly prolong task execution. During promotions such as Double 11, severe long tails may occur. For example, page views of large sellers are much more than page views of small sellers. If page view log data is associated with the seller dimension table, data is distributed by seller ID. This causes some instances to process far more data than others. In this case, the task cannot be completed due to a few long tails.

You can resolve long tails from four perspectives:
  • If you want to join one large table and one small table, you can execute the MAP JOIN statement to cache the small table. For more information about the MAP JOIN statement, see SELECT syntax.
  • To join two large tables, deduplicate data first.
  • Try to find out the cause for the Cartesian product of two large keys and optimize these keys from the business perspective.
  • It takes a long time to directly execute the LEFT JOIN statement for a small table and a large table. In this case, we recommend that you execute the MAP JOIN statement for the small and large tables to generate an intermediate table that contains the intersection of the two tables. This intermediate table is not greater than the large table because the MAP JOIN statement filters out unnecessary data from the large table. Then, execute the LEFT JOIN statement for the small and intermediate tables. The effect of this operation is equivalent to that of executing the LEFT JOIN statement for the small and large tables.

Check data skew

Perform the following steps to check data skew:
  1. Open the log file generated on Logview when SQL statements are executed and check the execution details of each Fuxi task. In the following figure, Long-Tails(115) indicates that 115 long tails exist.Check data skew
  2. Find your Fuxi instance and click the View Long-Tails icon in the StdOut column to view the size of data read by the instance.

    For example, Read from 0 num:52743413 size:1389941257 indicates that 1,389,941,257 rows of data are being read when the JOIN statement is executed. If an instance listed in Long-Tails reads far more data than other instances, a long tail occurs due to the large data size.

Common causes and solutions

  • MAP JOIN statement: If data skew occurs when the JOIN statement is executed on a large table and a small table, you can execute the MAP JOIN statement to prevent a long tail.
    When you use the MAP JOIN statement, the JOIN operation is performed at the Map side. This prevents data skew caused by uneven key distribution. The MAP JOIN statement is subject to the following limits:
    • The MAP JOIN statement is applicable only when the secondary table is small. A secondary table refers to the right table in the LEFT OUTER JOIN statement or the left table in the RIGHT OUTER JOIN statement.
    • The size of the small table is also limited when the MAP JOIN statement is used. By default, the maximum size is 512 MB after the small table is loaded into the memory. You can execute the following statement to increase the maximum size to 2,048 MB:
      set odps.sql.mapjoin.memory.max=2048
    The MAP JOIN statement is easy to use. You can append /* mapjoin(b) */ to the SELECT statement, where b indicates the alias of the small table or the subquery. Example:
    select   /* mapjoin(b) */       
               a.c2       
              ,b.c3
    from        
             (select   c1                 
                      ,c2         
               from     t1         ) a
    left outer join        
             (select   c1                 
                      ,c3         
              from     t2         ) b
    on       a.c1 = b.c1;
  • JOIN long tails caused by empty values
    Empty values can be processed as random values in the following scenario: A long tail occurs when a large number of empty values are distributed to the same instance. The MAP JOIN statement cannot be used for the two large tables that you want to join. This is because empty values cannot be associated and therefore are distributed to the same instance. Random values can be associated and distributed to different instances.
    select   ...
    from
            (select   *
             from     tbcdm.dim_tb_itm
             where    ds='${bizdate}'
             )son1
    left outer join
            (select  *
             from    tbods.s_standard_brand
             where   ds='${bizdate}'
             and     status=3
             )son2
    on       coalesce(son1.org_brand_id,rand()*9999)=son2.value_id;
    If the ON clause contains coalesce(son1.org_brand_id,rand()*9999), org_brand_id is replaced with a random value if it is left empty. This prevents long tails when a large number of empty values are distributed to the same instance.
  • JOIN long tails caused by hot key values
    If hot key values cause a long tail and the MAP JOIN statement cannot be used because no small table is involved, extract hot key values. Hot key values in the primary table are separated from non-hot key values, processed independently, and then joined with non-hot key values. In the following example, the page view log table of the Taobao website is associated with the commodity dimension table.
    1. Extract hot key values: Extract the IDs of the commodities whose page views are greater than 50,000 to a temporary table.
      insert   overwrite table topk_item PARTITION (ds = '${bizdate}')
      select   item_id
      from
              (select   item_id
                       ,count(1) as cnt
               from     dwd_tb_log_pv_di
               where    ds = '${bizdate}'
               and      url_type = 'ipv'
               and      item_id is not null
               group by item_id
               ) a
      where    cnt >= 50000;
    2. Extract non-hot key values.
      Execute the OUTER JOIN statement to associate the sdwd_tb_log_pv_di primary table with the topk_item hot key table. Then, apply condition b1.item_id is null to extract the log data of non-hot commodities that cannot be associated. In this case, execute the MAP JOIN statement to extract non-hot key values. Then, associate the non-hot key table with the commodity dimension table. No long tails occur because hot key values have been removed.
      select   ...
      from
              (select   *
               from     dim_tb_itm
               where    ds = '${bizdate}'
               ) a
      right outer join
              (select   /* mapjoin(b1) */
                        b2.*
               from
                       (select   item_id
                        from     topk_item
                        where    ds = '${bizdate}'
                        ) b1
               right outer join
                       (select   *
                        from     dwd_tb_log_pv_di
                        where    ds = '${bizdate}'
                        and      url_type = 'ipv'
                        ) b2
               on       b1.item_id = coalesce(b2.item_id,concat("tbcdm",rand())
               where    b1.item_id is null
               ) l
      on       a.item_id = coalesce(l.item_id,concat("tbcdm",rand());
    3. Extract hot key values.
      Execute the INNER JOIN statement to associate the sdwd_tb_log_pv_di primary table with the topk_item hot key table. In this case, execute the MAP JOIN statement to extract the log data of hot commodities. Execute the INNER JOIN statement to associate the dim_tb_itm commodity dimension table with the topk_item hot key table to extract the data of the hot commodity dimension table. Execute the OUTER JOIN statement to associate the log data with the data of the dimension table. The dimension table contains a small amount of data and MAP JOIN can be used to prevent long tails.
      select   /* mapjoin(a) */
               ...
      from
              (select   /* mapjoin(b1) */
                        b2.*
               from
                       (select   item_id
                        from     topk_item
                        where    ds = '${bizdate}'
                        )b1
               join
                       (select   *
                        from     dwd_tb_log_pv_di
                        where    ds = '${bizdate}'
                        and      url_type = 'ipv'
                        and      item_id is not null
                        ) b2
               on       (b1.item_id = b2.item_id)
               ) l
      left outer join
              (select   /* mapjoin(a1) */
                        a2.*
               from
                       (select   item_id
                        from     topk_item
                        where    ds = '${bizdate}'
                        ) a1
               join
                       (select   *
                        from     dim_tb_itm
                        where    ds = '${bizdate}'
                        ) a2
               on       (a1.item_id = a2.item_id)
               ) a
      on       a.item_id = l.item_id;
    4. Execute the UNION ALL statement to merge the data obtained in Substeps ii and iii to generate complete log data, with commodity information associated.
  • Set the odps.sql.skewjoin parameter to resolve long tails.

    This is a simple solution. However, you must modify code and execute the statements again if skewed key values change. In addition, value changes cannot be predicted. If many skewed key values exist, it is inconvenient for you to configure them in parameters. In this case, you can split code or specify parameters as required. Perform the following steps to set the odps.sql.skewjoin parameter:

    1. Set the odps.sql.skewjoin parameter to true.
      set odps.sql.skewjoin=true
    2. Set a skewed key and its value.
      set odps.sql.skewinfo=skewed_src:(skewed_key) [("skewed_value")]
      skewed_key indicates the skewed column and skewed_value indicates the skewed value of this column.
  • Use SKEWJOIN HINT to avoid skewed hot key values. For more information about SKEWJOIN HINT, see SKEWJOIN HINT.

    Procedure

    -- Method 1: Include the alias of the table in SKEWJOIN HINT.
    select /*+ skewjoin(a) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1;
    -- Method 2: Include the table name and possibly skewed columns in SKEWJOIN HINT. In the following statement, the c0 and c1 columns of table a are skewed columns.
    select /*+ skewjoin(a(c0, c1)) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
    -- Method 3: Include the table name, columns, and skewed hot key values in SKEWJOIN HINT. If skewed key values are of the STRING type, enclose each value with double quotation marks. In the following statement, (a.c0=1 and a.c1="2") and (a.c0=3 and a.c1="4") contain skewed hot key values.
    select /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
    Note Method 3 is more efficient than Method 1 and Method 2.

    Identify the JOIN statement that causes data skew

    In the following snapshot captured on Logview, J5_3_4 is the Fuxi task that took the longest time to execute. Fuxi Task
    Click the J5_3_4 task and query the instances of this task on the tab that appears. The query results show that the J5_3_4#215_0 instance took the longest time to execute and its I/O records and I/O bytes are much more than those of other instances. Fuxi Instance
    In this case, you can find that data skew occurs on the J5_3_4#215_0 instance. The JOIN statement that causes data skew needs to be further determined. Find the skewed instance, and click the icon in the StdOut column. Find a non-skewed instance, and click the icon in the StdOut column. The content in the StdOut column cannot be completely displayed. You can click Download and view the complete information. StdOut
    In the following figures, you can find that the value of record count in StreamLineRead7 of the skewed instance is much greater than the value of record count of the non-skewed instance. Therefore, data skew occurs when data in StreamLineWrite7 and SreamLineRead7 is shuffled. Skewed instance Non-skewed instance
    On the DAG page, right-click the skewed instance and select expand all to find StreamLineWrite7 and StreamLineRead7. expand all expand all
    You can find that data skew occurs on StreamLineRead7 in MergeJoin2. MergeJoin2 is generated after the dim_hm_item and dim_tb_itm_brand tables are joined and then the joined table and the dim_tb_brand table are joined. expand all
    Use these table names to find the skewed table. The result shows that data skew occurs when the LEFT OUTER JOIN statement is executed and the t1 table is skewed. You can add /*+ skewjoin(t1) */ to the SQL statement to resolve the data skew issue. Skewed table