This topic describes the data skew when the JOIN statement is executed and the solutions.

Background

When the JOIN statement in MaxCompute SQL is executed, the data with the same JOIN key will be sent to and processed on the same instance. If a key contains a large amount of data, the corresponding instance will take a longer time than other instances to process the data. In the execution log, a few instances in this JOIN task remain in the executing state, while other instances are in the completed state. This is called a long tail.

Long tails caused by data skew are very common and significantly extend the execution periods of tasks. During promotion events such as the Double 11 day, severe long tails may occur. For example, page views of large sellers are much higher than page views of small sellers. When page view log data is associated with the seller dimension table, data is allocated by seller ID. This causes that some instances process far more data than other instances. The task cannot be completed due to a few long tails.

You can try to solve long tails from four perspectives:
  • If there are one large table and one small table, you can execute the MAP JOIN statement to cache the small table. For more information about the MAP JOIN statement, see SELECT syntax.
  • If there are two large tables, perform data deduplication first.
  • Try to find out the cause for the Cartesian product of two large keys and optimize these keys from the business perspective.
  • It takes a long time to execute the LEFT JOIN statement directly for the small and large tables. In this case, we recommend that you execute the MAP JOIN statement for the small and large tables to generate an intermediate table that contains the intersection of the two tables. This intermediate table will not be larger than the large table. Key skewness is proportional to the size of the expanded table. Then execute the LEFT JOIN statement for the small and intermediate tables. This result is smaller than that of executing the LEFT JOIN statement for the small and large tables.

Check whether data skew occurs when the JOIN statement is executed

Perform the following steps to check data skew:
  1. Open the Logview log file generated when SQL statements are executed and check execution details of each Fuxi task. Long-Tails(115) indicates that there are 115 long tails.
  2. Click the View Long-Tails.png icon after a Fuxi instance to check the data read by the instance in stdout.

    For example, Read from 0 num:52743413 size:1389941257 indicates that 1,389,941,257 rows of data is being read when the JOIN statement is executed. If an instance listed in Long-Tails reads far more data than other instances, it indicates that the long tail is caused by a large amount of data.

Common causes and solutions

  • MAP JOIN solution: If data skew occurs when the JOIN statement is executed and a small table is involved, you can execute the MAP JOIN statement to prevent a long tail.
    The MAP JOIN solution works in this way: the JOIN statement is executed at the Map side. This prevents data skew caused by unbalanced key distribution. The MAP JOIN statement is subject to the following limits:
    • The MAP JOIN statement is applicable only when the secondary table is rather small. A secondary table refers to the right table when the LEFT OUTER JOIN statement is executed or the left table when the RIGHT OUTER JOIN statement is executed.
    • The size of the small table is also limited when the MAP JOIN statement is executed. By default, the maximum size is 512 MB after the small table is read to the memory. You can execute the following statement to expand the maximum size to 2,048 MB:
      
      set odps.sql.mapjoin.memory.max=2048
    The MAP JOIN statement is easy to use. You can add /*+ mapjoin(b) */ after the SELECT statement, where b indicates the alias of the small table or the subquery. Example:
    select   /*+mapjoin(b)*/       
               a.c2       
              ,b.c3
    from        
             (select   c1                 
                      ,c2         
               from     t1         ) a
    left outer join        
             (select   c1                 
                      ,c3         
              from     t2         ) b
    on       a.c1 = b.c1;
  • JOIN long tails caused by empty values
    If accumulated empty values cause a long tail and the MAP JOIN statement cannot be used because no small table is involved, these empty values will be processed as random values. Empty values cannot be associated and therefore will be sent to one instance. Random values can be associated and therefore prevent value accumulation.
    select   ...
    from
            (select   *
             from     tbcdm.dim_tb_itm
             where    ds='${bizdate}'
             )son1
    left outer join
            (select  *
             from    tbods.s_standard_brand
             where   ds='${bizdate}'
             and     status=3
             )son2
    on       coalesce(son1.org_brand_id,rand()*9999)=son2.value_id;
    If the ON clause contains coalesce(son1.org_brand_id,rand()*9999), it indicates that org_brand_id will be replaced with a random value if it is empty. This prevents long tails caused by accumulated empty values.
  • JOIN long tails caused by hot key values
    If hot key values cause a long tail and the MAP JOIN statement cannot be used because no small table is involved, extract hot keys. Hot key data in the primary table is separated from other data, processed independently, and then combined with other data. In the following example, the page views log table of the Taobao website is associated with the commodity dimension table.
    1. Extract data of hot keys: Extract the IDs of the commodities whose page views are greater than 50,000 to a temporary table.
      insert   overwrite table topk_item
      select   item_id
      from
              (select   item_id
                       ,count(1) as cnt
               from     dwd_tb_log_pv_di
               where    ds = '${bizdate}'
               and      url_type = 'ipv'
               and      item_id is not null
               group by item_id
               ) a
      where    cnt >= 50000;
    2. Extract data of non-hot keys
      Execute the OUTER JOIN statement to associate primary table sdwd_tb_log_pv_di with hot key table topk_item and apply condition b1.item_id is null to extract the log data of non-hot commodities which cannot be associated. Execute the MAP JOIN statement. Then associate the non-hot key table with the commodity dimension table. No long tails occur because hot key data has been removed.
      select   ...
      from
              (select   *
               from     dim_tb_itm
               where    ds = '${bizdate}'
               ) a
      right outer join
              (select   /*+mapjoin(b1)*/
                        b2.*
               from
                       (select   item_id
                        from     topk_item
                        where    ds = '${bizdate}'
                        ) b1
               right outer join
                       (select   *
                        from     dwd_tb_log_pv_di
                        where    ds = '${bizdate}'
                        and      url_type = 'ipv'
                        ) b2
               on       b1.item_id = coalesce(b2.item_id,concat("tbcdm",rand())
               where    b1.item_id is null
               ) l
      on       a.item_id = coalesce(l.item_id,concat("tbcdm",rand());
    3. Extract data of hot keys
      Execute the INNER JOIN statement to associate primary table sdwd_tb_log_pv_di with hot key table topk_item. Execute the MAP JOIN statement to obtain the log data of hot commodities. Execute the INNER JOIN statement to associate commodity dimension table dim_tb_itm with hot key table topk_item to obtain data of the hot commodity dimension table. Execute the OUTER JOIN statement to associate two data parts.
      select   /*+mapjoin(a)*/
               ...
      from
              (select   /*+mapjoin(b1)*/
                        b2.*
               from
                       (select   item_id
                        from     topk_item
                        where    ds = '${bizdate}'
                        )b1
               join
                       (select   *
                        from     dwd_tb_log_pv_di
                        where    ds = '${bizdate}'
                        and      url_type = 'ipv'
                        and      item_id is not null
                        ) b2
               on       (b1.item_id = b2.item_id)
               ) l
      left outer join
              (select   /*+mapjoin(a1)*/
                        a2.*
               from
                       (select   item_id
                        from     topk_item
                        where    ds = '${bizdate}'
                        ) a1
               join
                       (select   *
                        from     dim_tb_itm
                        where    ds = '${bizdate}'
                        ) a2
               on       (a1.item_id = a2.item_id)
               ) a
      on       a.item_id = l.item_id;
    4. Execute the UNION ALL statement to combine the data obtained in Steps b and c to generate entire log data, with commodity information associated.
  • Set odps.sql.skewjoin parameters to solve long tails

    This is a simple solution. However, you must modify code and execute the statements again if skewed key values change. In addition, value changes cannot be predicted. If many skewed key values are involved, it is inconvenient to set them. Then you can split code or parameter settings as needed. Perform the following steps to set odps.sql.skewjoin parameters:

    1. Enable the odps.sql.skewjoin flag.
      set odps.sql.skewjoin=true
    2. Set a skewed key and its value.
      set odps.sql.skewinfo=skewed_src:(skewed_key) [("skewed_value")]
      skewed_key indicates the skewed column and skewed_value indicates its value.
    ,