All Products
Search
Document Center

MaxCompute:Data skew tuning

Last Updated:Jan 06, 2026

This topic describes common data skew scenarios in MaxCompute and provides corresponding solutions.

MapReduce

To understand data skew, you must first understand MapReduce. MapReduce is a distributed computing framework that uses a divide-and-conquer method. This method splits large or complex problems into smaller, more manageable subproblems, solves them, and then merges the results into a final solution. MapReduce offers higher fault tolerance, ease of use, and better extensibility than traditional parallel programming frameworks. When you use MapReduce for parallel programming, you do not need to manage non-programming issues in distributed clusters, such as data storage, internode communication, and transmission mechanisms. This feature greatly simplifies distributed programming.

The following figure shows the workflow of MapReduce.MapReduce

Data skew

Data skew often occurs during the reduce stage. Mappers typically split data uniformly based on input files. Data skew occurs when data in a table is unevenly distributed among different workers. This uneven distribution causes some workers to finish their computations quickly while others take much longer. In production environments, most data is skewed, often following the 80/20 rule. For example, 20% of active users on a forum might contribute 80% of the posts, or 20% of users might generate 80% of a website's traffic. In the era of big data, with its explosive growth in data volume, data skew can severely impact the performance of distributed programs. A common symptom is a job's execution progress getting stuck at 99%.

How to identify data skew

In MaxCompute, you can use Logview to identify data skew. The following steps describe the process:判断数据倾斜

  1. In Fuxi Jobs, sort the jobs by latency in descending order and select the job stage with the longest runtime.

  2. In the Fuxi Instance list for the Fuxi Stage, sort the tasks by latency in descending order. Select a task with a runtime that is significantly longer than the average. Typically, you can focus on the first task in the list. View its StdOut log.

  3. Use the information from StdOut to view the corresponding job execution graph.

  4. Use the key information from the job execution graph to locate the SQL code snippet that is causing the data skew.

The following example shows how to use this method.

  1. Find the Logview log from the task's operational logs. For more information, see Logview entry points.logview

  2. To quickly identify the problem, go to the Logview interface, sort the Fuxi tasks by Latency in descending order, and select the task with the longest runtime.Fuxi Task

  3. The task R31_26_27 has the longest runtime. Click the R31_26_27 task to open the instance details page, as shown in the following figure. 时间最长业务 The value Latency: {min:00:00:06, avg:00:00:13, max:00:26:40} indicates that for all instances of this task, the minimum runtime is 6 s, the average runtime is 13 s, and the maximum runtime is 26 minutes and 40 seconds. You can sort by Latency (instance runtime) in descending order. You will see four instances with long runtimes. MaxCompute considers a Fuxi instance to be long-tailed if its runtime is more than twice the average. This means that any task instance with a runtime longer than 26 s is considered long-tailed. In this case, 21 instances have a runtime longer than 26 s. The presence of long-tailed instances does not always indicate a task skew. You also need to compare the avg and max values of the instance runtime. If the max value is much larger than the avg value, it indicates a severe data skew. This task requires administration.

  4. Click the Output log icon in the StdOut column to view the output log, as shown in the following figure. Example output

  5. After you identify the problem, go to the Job Details tab. Right-click R31_26_27 and select Expand All to expand the task. For more information, see Use Logview 2.0 to view job information. 展开任务 Examine the step before StreamLineRead22, which is StreamLineWriter21. This step reveals the keys that cause the data skew: new_uri_path_structure, cookie_x5check_userid, and cookie_userid. Use this information to find the SQL snippet that is causing the data skew.KEY

Data skew troubleshooting and solutions

The most common causes of data skew are as follows:

  • Join

  • GroupBy

  • Count (Distinct)

  • ROW_NUMBER (TopN)

  • Dynamic partition

The frequency of occurrence, from most to least common, is: JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > Dynamic partition.

Join

Data skew from a join operation can occur in various situations, such as joining a large table with a small table, joining a large table with a medium table, or when hot key values cause long tails.

  • Joining a large table with a small table.

    • Data skew example

      In the following example, t1 is a large table, and t2 and t3 are small tables.

      SELECT  t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN <other_viewtable> t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
    • Solution

      Use the MAPJOIN HINT syntax, as shown in the following example.

      SELECT  /*+ mapjoin(t2,t3)*/
              t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN (<other_viewtable>) t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
      • Notes

        • When you reference a small table or subquery, you must use its alias.

        • MapJoin supports using a subquery as a small table.

        • In a MapJoin, you can use non-equi joins or connect multiple conditions with OR. You can also compute a Cartesian product by omitting the ON clause and using mapjoin on 1 = 1, for example, select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;. However, this operation can cause data bloat.

        • In a MapJoin, separate multiple small tables with commas (,), such as /*+ mapjoin(a,b,c)*/.

        • During the map stage, MapJoin loads all data from the specified tables into memory. Therefore, the specified tables must be small. The total memory occupied by the tables after being loaded must not exceed 512 MB. Because MaxCompute uses compressed storage, the data size of a small table expands significantly after it is loaded into memory. The 512 MB limit applies to the size after the data is loaded. You can increase this limit to a maximum of 8192 MB by setting the following parameter.

          SET odps.sql.mapjoin.memory.max=2048;
      • Limits on MapJoin operations

        • In a LEFT OUTER JOIN, the left table must be the large table.

        • In a RIGHT OUTER JOIN, the right table must be the large table.

        • FULL OUTER JOIN is not supported.

        • In an INNER JOIN, either the left or the right table can be the large table.

        • MapJoin supports a maximum of 128 small tables. A syntax error occurs if you specify more.

  • Joining a large table with a medium table.

    • Data skew example

      In the following example, t0 is a large table and t1 is a medium table.

      SELECT  request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
    • Solution

      Use the DISTRIBUTED MAPJOIN hint to resolve data skew, as shown in the following example.

      SELECT  /*+distmapjoin(t1)*/
              request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
  • Long tails caused by hot key values in a join.

    • Data skew example

      In the following example, the eleme_uid field contains many hot key values, which can cause data skew.

      SELECT
      eleme_uid,
      ...
      FROM (
          SELECT
          eleme_uid,
          ...
          FROM <viewtable>
      )t1
      LEFT JOIN(
          SELECT
          eleme_uid,
          ...
          FROM <customertable>
      )  t2
      ON t1.eleme_uid = t2.eleme_uid;
    • Solution

      You can use one of the following four methods to resolve this issue.

      No.

      Solution

      Description

      Solution 1

      Manual Hot Value Splitting

      Analyze and identify the hot key values. Filter the records with hot key values from the primary table. First, perform a MapJoin on them. Then, perform a MergeJoin on the remaining records with non-hot key values. Finally, merge the results of the two joins.

      Solution 2

      Set the SkewJoin parameter

      set odps.sql.skewjoin=true;.

      Solution 3

      SkewJoin Hint

      Use a hint: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. The SkewJoin Hint method adds an extra step to find the skewed keys, which increases the query runtime. If you already know the skewed keys, you can set the SkewJoin parameter to save time.

      Solution 4

      Perform a modulo equal join with a multiplier table

      Use a multiplier table.

      • Manually split hot key values.

        This method involves analyzing and identifying the hot key values. First, filter the records that contain hot key values from the primary table and perform a MapJoin on them. Then, perform a MergeJoin on the remaining records that contain non-hot key values. Finally, merge the results of the two joins. The following code provides an example:

        SELECT
        /*+ MAPJOIN (t2) */
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid = <skewed_value>
        )t1
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid = <skewed_value>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        UNION ALL
        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid != <skewed_value>
        )t3
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid != <skewed_value>
        )  t4
        ON t3.eleme_uid = t4.eleme_uid
      • Set the SkewJoin parameter.

        This is a common solution. MaxCompute provides the set odps.sql.skewjoin=true; parameter to enable the SkewJoin feature. However, simply enabling SkewJoin does not affect the task's execution. You must also set the odps.sql.skewinfo parameter for the feature to take effect. The odps.sql.skewinfo parameter specifies the details for join optimization. The following is an example of the command syntax.

        SET odps.sql.skewjoin=true;
        SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")];  --skewed_src is the traffic table, and skewed_value is the hot key value.

        The following examples show how to use the command:

        --For a single skewed value in a single field
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")];
        
        --For multiple skewed values in a single field
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
      • SkewJoin Hint.

        To execute a MapJoin in a SELECT statement, use the following hint: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. In this hint, table_name is the skewed table, column_name is the skewed column, and value is the skewed key value. The following examples show how to use this hint.

        --Method 1: Hint the table name. Note that you hint the table's alias.
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1;
        
        --Method 2: Hint the table name and the columns that you think might be skewed. For example, columns c0 and c1 in table a have data skew.
        SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        
        --Method 3: Hint the table name and columns, and provide the skewed key values. If the type is STRING, enclose the values in quotation marks. For example, the values for (a.c0=1 and a.c1="2") and (a.c0=3 and a.c1="4") both have data skew.
        SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        Note

        The SkewJoin hint method that directly specifies values is more efficient than manually splitting hot key values or setting the SkewJoin parameter without specifying values.

        Join types supported by SkewJoin Hint:

        • Inner Join: You can hint either side of the join.

        • Left Join, Semi Join, and Anti Join: You can only hint the left table.

        • Right Join: You can only hint the right table.

        • Full Join: Skew Join Hint is not supported.

        Add hints only to joins that are certain to have data skew because the hint runs an aggregate operation, which incurs a cost.

        For a hinted join, the data type of the left join key must match the data type of the right join key. Otherwise, the SkewJoin hint does not work. For example, in the previous example, the type of a.c0 must match the type of b.c0, and the type of a.c1 must match the type of b.c1. You can use CAST in a subquery to ensure the join key types are consistent. The following examples show how to do this:

        CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int);
        CREATE TABLE T1(c0 string, c1 int, c2 int);
        
        --Method 1:
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1;
        
        --Method 2:
        SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;

        After you add a SkewJoin hint, the optimizer runs an aggregate operation to retrieve the top 20 hot key values. The value 20 is the default. You can change this value by running set odps.optimizer.skew.join.topk.num = xx;.

        • The SkewJoin hint supports hinting only one side of a join.

        • A hinted join must have a left key = right key condition and does not support Cartesian product joins.

        • You cannot add a SkewJoin hint to a join that already has a MapJoin hint.

      • Perform a modulo-equi-join with a multiplier table.

        The logic of this solution is different from the previous three. It is not a divide-and-conquer approach. Instead, it uses a multiplier table that has a single column of the INT data type. The values can range from 1 to N, where N is determined by the degree of data skew. This multiplier table is used to expand the user behavior table by N times. Then, when you perform the join, you can use both the user ID and the number as association keys. The original data skew, which was caused by distributing data only by user ID, is reduced to 1/N of its original level because of the added number condition. However, this method also causes the data to bloat by N times.

        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
        )t1
        LEFT JOIN(
            SELECT
            /*+mapjoin(<multipletable>)*/
            eleme_uid,
            number
            ...
            FROM <customertable>
            JOIN <multipletable>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        AND mod(t1.<value_col>,10)+1 = t2.number;

        To address the data bloat issue, you can limit the expansion to only the records that have hot key values in both tables. Records with non-hot key values remain unchanged. First, find the records with hot key values. Then, process the traffic table and the user behavior table separately by adding a new column named eleme_uid_join. If a user ID is a hot key value, use CONCAT to append a randomly assigned positive integer (for example, from 0 to 1,000). If it is not a hot key value, keep the original user ID. When you join the two tables, use the eleme_uid_join column. This method increases the multiplier for hot key values to reduce skew and avoids unnecessary data bloat for non-hot key values. However, this logic completely changes the original business logic SQL and is therefore not recommended.

GroupBy

The following example shows pseudocode with a GroupBy clause.

SELECT  shop_id
        ,sum(is_open) AS business_days
FROM    table_xxx_di
WHERE   dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;

If data skew occurs, you can resolve it using one of the following three solutions:

No.

Solution

Description

Solution 1

Set the anti-skew parameter for Group By

set odps.sql.groupby.skewindata=true;.

Solution 2

Add a random number

Split the key that is causing the long tail.

Solution 3

Create a rolling table

Reduce costs and improve efficiency.

  • Solution 1: Set the anti-skew parameter for `GROUP BY`.

    SET odps.sql.groupby.skewindata=true;
  • Solution 2: Add a random number.

    Unlike Solution 1, this solution requires you to rewrite the SQL statement. Adding a random number to split the key that causes a long tail is an effective way to resolve `GROUP BY` long tails.

    For the SQL statement SELECT Key, COUNT(*) AS Cnt FROM TableName GROUP BY Key;, if a combiner is not used, the map node shuffles data to the reduce node. The reduce node then performs the `COUNT` operation. The corresponding execution plan is M->R.

    If you have found the key that causes the long tail, you can redistribute the work for that key as follows:

    -- Assume that the key that causes a long tail is KEY001.
    SELECT  a.Key
            ,SUM(a.Cnt) AS Cnt
    FROM(SELECT  Key
                ,COUNT(*) AS Cnt
                FROM    <TableName>
                GROUP BY Key
                ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50
                 ELSE 0
                END
            ) a
    GROUP BY a.Key;

    After the change, the execution plan becomes M->R->R. Although the number of execution steps increases, the long-tail key is processed in two steps, which can reduce the overall runtime. The resource consumption and time efficiency are similar to Solution 1. However, in real-world scenarios, a long tail is often caused by more than one key. Considering the cost of finding long-tail keys and rewriting SQL statements, Solution 1 is more cost-effective.

  • Create a rolling table.

    The core of this solution is to reduce costs and improve efficiency. The main requirement is to retrieve merchant data from the past year. For online tasks, reading all partitions from T-1 to T-365 each time wastes significant resources. You can create a rolling table to reduce the number of partitions that are read without affecting the retrieval of data from the past year. The following example shows how to do this.

    First, initialize 365 days of merchant business data that is aggregated using Group By. Mark the data update date and store the data in a table named a. For subsequent online tasks, you can join the T-2 day table a with the table_xxx_di table, and then use Group By. This way, the number of partitions read each day is reduced from 365 to 2. The duplication of the primary key shopid is greatly reduced, which also reduces resource consumption.

    -- Create a rolling table.
    CREATE TABLE IF NOT EXISTS m_xxx_365_df
    (
      shop_id STRING COMMENT,
      last_update_ds COMMENT,
      365d_open_days COMMENT
    )
    PARTITIONED BY
    (
      ds STRING COMMENT 'Date partition'
    )LIFECYCLE 7;
    -- Assume that 365d is from May 1, 2021 to May 1, 2022. Initialize the table first.
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501')
      SELECT shop_id,
             max(ds) as last_update_ds,
             sum(is_open) AS 365d_open_days
      FROM table_xxx_di
      WHERE dt BETWEEN '20210501' AND '20220501'
      GROUP BY shop_id;
    -- Then, the online task to be executed is as follows.
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}')
      SELECT aa.shop_id, 
             aa.last_update_ds, 
             365d_open_days - COALESCE(is_open, 0) AS 365d_open_days -- Eliminate the infinite rolling of business days.
      FROM (
        SELECT shop_id, 
               max(last_update_ds) AS last_update_ds, 
               sum(365d_open_days) AS 365d_open_days
        FROM (
          SELECT shop_id,
                 ds AS last_update_ds,
                 sum(is_open) AS 365d_open_days
          FROM table_xxx_di
          WHERE ds = '${bizdate}'
          GROUP BY shop_id
          UNION ALL
          SELECT shop_id,
                 last_update_ds,
                 365d_open_days
          FROM m_xxx_365_df
          WHERE dt = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}'
          GROUP BY shop_id
        )
        GROUP BY shop_id
      ) AS aa
      LEFT JOIN (
        SELECT shop_id,
               is_open
        FROM table_xxx_di
        WHERE ds = '${bizdate_366}'
      ) AS bb
      ON aa.shop_id = bb.shop_id;
                                

Count (Distinct)

Suppose a table has the following data distribution.

ds (partition)

cnt (number of records)

20220416

73025514

20220415

2292806

20220417

2319160

Using the following statement can cause data skew:

SELECT  ds
        ,COUNT(DISTINCT shop_id) AS cnt
FROM    demo_data0
GROUP BY ds;

The following solutions are available:

No.

Solution

Description

Solution 1

Parameter setting optimization

SET odps.sql.groupby.skewindata=true;

Solution 2

General two-stage aggregation

Concatenate a random number to the partition field value.

Solution 3

Similar to two-stage aggregation

First, group by two fields (ds+shop_id), then use count(distinct).

  • Solution 1: Optimize by setting a parameter.

    Set the following parameter.

    SET odps.sql.groupby.skewindata=true;
  • Solution 2: Use general two-stage aggregation.

    If the data in the shop_id field is not uniform, you cannot use Solution 1 for optimization. A more general method is to concatenate a random number to the value of the partition field.

    -- Method 1: Concatenate a random number. CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds
    SELECT  SPLIT_PART(rand_ds, '_',2) ds
            ,COUNT(*) id_cnt
      FROM (
            SELECT  rand_ds
                    ,shop_id
            FROM    demo_data0
            GROUP BY rand_ds,shop_id
            )
    GROUP BY SPLIT_PART(rand_ds, '_',2);
    
    -- Method 2: Add a random number field. ROUND(RAND(),1)*10 AS randint10
    SELECT  ds
            ,COUNT(*) id_cnt
    FROM    (SELECT  ds
                     ,randint10
                     ,shop_id
               FROM  demo_data0
            GROUP BY ds,randint10,shop_id
            )
    GROUP BY ds;
  • Solution 3: Use a method similar to two-stage aggregation.

    If the data in both the GroupBy and Distinct fields is uniform, you can optimize the query by first grouping by two fields (`ds` and `shop_id`) and then using the count(distinct) command.

    SELECT  ds
            ,COUNT(*) AS cnt
    FROM(SELECT  ds
                ,shop_id
                FROM    demo_data0
                GROUP BY ds ,shop_id
        )
    GROUP BY ds;

ROW_NUMBER (TopN)

The following example shows the top 10.

SELECT  main_id
        ,type
FROM    (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM <data_demo2>
        ) A
WHERE   A.rn <= 10;

If data skew occurs, you can resolve it using one of the following methods:

No.

Solution

Description

Solution 1

Two-stage aggregation using SQL

Add a random column or concatenate a random number and use it as a parameter in the partition.

Solution 2

Two-stage aggregation using a user-defined aggregate function (UDAF)

Optimize using a UDAF with a min-heap priority queue.

  • Solution 1: Use two-stage aggregation with SQL.

    To make the data in each partition group as uniform as possible during the map stage, you can add a random column and use it as a parameter in the partition.

    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                            FROM (SELECT  main_id
                                          ,type
                                          ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                                     FROM (SELECT  main_id
                                                   ,type
                                                   ,ceil(110 * rand()) % 11 AS src_pt
                                             FROM  data_demo2
                                          )
                                    ) B
                            WHERE   B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
    -- 2. Customize the random number.
    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                        FROM(SELECT  main_id
                                     ,type
                                     ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                               FROM  (SELECT  main_id
                                              ,type
                                              ,ceil(10 * rand()) AS src_pt
                                              FROM    data_demo2
                                      )
                                    ) B
                            WHERE B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
  • Solution 2: Use two-stage aggregation with a UDAF.

    The SQL method can result in a large amount of code that can be difficult to maintain. In this case, you can use a user-defined aggregate function (UDAF) with a min-heap priority queue for optimization. This means that in the iterate stage, only the top N items are processed. In the merge stage, only N elements are merged. The process is as follows.

    • iterate: Pushes the first K elements. For subsequent elements, continuously compares them with the minimum top element and swaps elements in the heap.

    • merge: Merges two heaps and returns the top K elements in place.

    • terminate: Returns the heap as an array.

    • In the SQL statement, split the array into rows.

    @annotate('* -> array<string>')
    class GetTopN(BaseUDAF):
        def new_buffer(self):
            return [[], None]
        def iterate(self, buffer, order_column_val, k):
            # heapq.heappush(buffer, order_column_val)
            # buffer = [heapq.nlargest(k, buffer), k]
            if not buffer[1]:
                buffer[1] = k
            if len(buffer[0]) < k:
                heapq.heappush(buffer[0], order_column_val)
            else:
                heapq.heappushpop(buffer[0], order_column_val)
        def merge(self, buffer, pbuffer):
            first_buffer, first_k = buffer
            second_buffer, second_k = pbuffer
            k = first_k or second_k
            merged_heap = first_buffer + second_buffer
            merged_heap.sort(reverse=True)
            merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap
            buffer[0] = merged_heap
            buffer[1] = k
        def terminate(self, buffer):
            return buffer[0]
    
    SET odps.sql.python.version=cp37;
    SELECT main_id,type_val FROM (
      SELECT  main_id ,get_topn(type, 10) AS type_array
      FROM data_demo2
      GROUP BY main_id
    )
    LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;

Dynamic partition

Dynamic partitioning is a feature that lets you specify a partition key column when you insert data into a partitioned table, but without providing a specific value. Instead, the corresponding column in the Select clause provides the partition value. Therefore, you do not know which partitions will be created before the SQL statement runs. You can determine which partitions are created only after the SQL statement is run, based on the values of the partition column. For more information, see Insert or overwrite data into dynamic partitions (DYNAMIC PARTITION). The following is an SQL example.

CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);

INSERT OVERWRITE TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue, region
FROM sale_detail;

In many scenarios, creating tables with dynamic partitions can lead to data skew. If data skew occurs, you can use the following solutions to resolve it.

No.

Solution

Description

Solution 1

Parameter configuration optimization

Optimize through parameter configuration.

Solution 2

Partition pruning optimization

Find partitions with many records, prune them, and insert them separately.

  • Solution 1: Optimize by configuring parameters.

    Dynamic partitioning lets you put data that meets different conditions into different partitions. This feature eliminates the need for multiple Insert Overwrite operations to write to the table, especially when there are many partitions, and can greatly simplify your code. However, dynamic partitioning can also create too many small files.

    • Data skew example

      Take the following simple SQL as an example.

      INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM  part_test;

      Assume there are K Map instances and N target partitions.

                                  ds=1
      cfile1                      ds=2
      ...             X           ds=3
      cfilek                      ...
                                  ds=n

      In the most extreme case, K*N small files can be generated. Too many small files can place an enormous management strain on the file system. Therefore, MaxCompute handles dynamic partitions by introducing an extra reduce stage. It assigns the same target partition to a single reduce instance or a few reduce instances to write to. This method avoids creating too many small files. This reduce operation is always the last reduce task operation. In MaxCompute, this feature is enabled by default, which means the following parameter is set to `true`.

      SET odps.sql.reshuffle.dynamicpt=true;

      Enabling this feature by default resolves the issue of too many small files and prevents tasks from failing because a single instance generates too many files. However, it also introduces new problems, such as data skew and the consumption of computing resources by the extra reduce operation. Therefore, you must carefully balance these factors.

    • Solution

      The purpose of enabling set odps.sql.reshuffle.dynamicpt=true; and introducing an extra reduce stage is to resolve the issue of too many small files. However, if the number of target partitions is small and there is no risk of creating too many small files, enabling this feature by default wastes computing resources and reduces performance. In this situation, you can disable this feature by setting set odps.sql.reshuffle.dynamicpt=false; to significantly improve performance. The following is an example.

      INSERT OVERWRITE TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp)
      SELECT /*+ mapjoin(t2) */
          '20150503' AS ds,
          t1.lv AS lv,
          t1.type AS tp
      FROM
          (SELECT  ...
          FROM tbbi.ads_tb_cornucopia_user_d
          WHERE ds = '20150503'
          AND lv IN ('flat', '3rd')
          AND tp = 'T'
          AND pref_cat2_id > 0
          ) t1
      JOIN
          (SELECT ...
          FROM tbbi.ads_tb_cornucopia_auct_d
          WHERE ds = '20150503'
          AND tp = 'T'
          AND is_all = 'N'
          AND cat2_id > 0
          ) t2
      ON t1.pref_cat2_id = t2.cat2_id;

      If you use the default parameters for the preceding code, the entire task takes about 1 hour and 30 minutes to run. The last reduce task takes about 1 hour and 20 minutes, which is approximately 90% of the total runtime. Introducing an extra reduce task makes the data distribution of each reduce instance very uneven, which leads to a long tail.

    For the preceding example, an analysis of the historical number of dynamic partitions created shows that only about two dynamic partitions are created each day. Therefore, you can safely set set odps.sql.reshuffle.dynamicpt=false;. The task can then be completed in only 9 minutes. In this case, setting this parameter to false can significantly improve performance, save computing time and resources, and provide a high marginal benefit by simply setting one parameter.

    This optimization is not limited to large tasks that take a long time and consume a large amount of resources. For small, ordinary tasks that run quickly and consume few resources, you can set the odps.sql.reshuffle.dynamicpt parameter to false as long as they use dynamic partitions and the number of dynamic partitions is not large. This setting saves resources and improves performance in all cases.

    Nodes that meet the following three conditions can be optimized, regardless of the task duration.

    • Uses dynamic partitions

    • Number of dynamic partitions <= 50

    • Does not have `set odps.sql.reshuffle.dynamicpt=false;`

    The urgency of setting this parameter for a node is determined by the execution time of the last Fuxi instance, which is identified by the diag_level field, based on the following rules:

    • Last_Fuxi_Inst_Time is greater than 30 minutes: Diag_Level=4 ('Critical').

    • Last_Fuxi_Inst_Time is between 20 and 30 minutes: Diag_Level=3 ('High').

    • Last_Fuxi_Inst_Time is between 10 and 20 minutes: Diag_Level=2 ('Medium').

    • Last_Fuxi_Inst_Time is less than 10 minutes: Diag_Level=1 ('Low').

  • Solution 2: Optimized cropping.

    To resolve the data skew that occurs in the map stage when you insert data into dynamic partitions, you can find the partitions with many records, prune them, and then insert them separately. Based on the actual scenario, you can modify the parameter settings for the map stage as shown in the following example:

    SET odps.sql.mapper.split.size=128;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT  *
    FROM    dwd_alsc_ent_shop_info_hi;

    The results show that a full table scan was performed. To further optimize performance, you can disable the system-generated Reduce job as follows:

    SET odps.sql.reshuffle.dynamicpt=false ;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT *
    FROM dwd_alsc_ent_shop_info_hi;

    To resolve data skew that occurs in the map stage when you insert data into dynamic partitions, you can identify the partitions with many records, isolate them, and then insert them separately. The steps are as follows.

    1. Use the following command to query for partitions that contain many records.

      SELECT  ds
              ,hh
              ,COUNT(*) AS cnt
      FROM    dwd_alsc_ent_shop_info_hi
      GROUP BY ds
               ,hh
      ORDER BY cnt DESC;

      Some of the partitions are as follows:

      ds

      hh

      cnt

      20200928

      17

      1052800

      20191017

      17

      1041234

      20210928

      17

      1034332

      20190328

      17

      1000321

      20210504

      1

      19

      20191003

      20

      18

      20200522

      1

      18

      20220504

      1

      18

    2. Use the following commands to filter and insert data into the partitions identified above, and perform a separate insert operation for partitions that contain many records.

      SET odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817');
      
      SET odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817');
      
      SELECT  ds
        ,hh,COUNT(*) AS cnt
       FROM dwd_alsc_ent_shop_info_hi
       GROUP BY ds,hh ORDER BY cnt DESC;