All Products
Search
Document Center

MaxCompute:Data skew tuning

Last Updated:Mar 26, 2026

Data skew causes some workers to process far more data than others, making distributed jobs slow or stuck. This topic explains how to identify data skew in MaxCompute and fix the five most common causes: JOIN, GROUP BY, COUNT(DISTINCT), ROW_NUMBER(), and dynamic partitions.

How data skew happens

MaxCompute uses MapReduce to run distributed SQL jobs. Mappers split input data roughly evenly, but the reduce stage aggregates data by key — and if certain keys appear far more often than others, the reducers handling those keys end up doing most of the work. The typical symptom is a job stuck at 99% while a small number of instances continue running long after the rest have finished.

Most production data is skewed. A common pattern is that 20% of users generate 80% of the traffic, or 20% of forum users write 80% of the posts.

How it works

The following figure shows the MapReduce workflow.MapReduce

Identify data skew with Logview

Use Logview to find which stage is skewed and which SQL snippet is causing it.

判断数据倾斜

When is an instance long-tailed? MaxCompute considers a Fuxi instance long-tailed if its runtime exceeds twice the average runtime for that stage. Severe skew is indicated when the max runtime is much larger than the avg runtime.

The four-step process:

  1. In Fuxi Jobs, sort jobs by Latency in descending order and select the stage with the longest runtime.

  2. In the Fuxi Instance list for that stage, sort instances by Latency in descending order and select the instance with the longest runtime. Open its StdOut log.

  3. Use the StdOut output to open the corresponding job execution graph.

  4. Examine the execution graph to locate the SQL snippet causing the skew.

Example walkthrough

  1. Find the Logview URL in the task's operational logs. For more information, see Logview entry points.

    logview

  2. Sort the Fuxi tasks by Latency in descending order and select the task with the longest runtime.

    Fuxi Task

  3. Click task R31_26_27 — it has the longest runtime. The instance details show Latency: {min:00:00:06, avg:00:00:13, max:00:26:40}, meaning the minimum runtime is 6 s, the average is 13 s, and the maximum is 26 minutes 40 seconds. Any instance running longer than 26 s (twice the 13 s average) is long-tailed. In this example, 21 instances exceeded that threshold.

    时间最长业务

  4. Click the Output log icon in the StdOut column to view the output log.

    Example output

  5. Go to the Job Details tab. Right-click R31_26_27 and select Expand All. For more information, see Use Logview 2.0 to view job information. The step before StreamLineRead22 is StreamLineWriter21, which reveals the skewed keys: new_uri_path_structure, cookie_x5check_userid, and cookie_userid. Use these keys to locate the SQL snippet causing the skew.

    展开任务

    KEY

Fix data skew

Data skew most often originates from these five operations, in order of frequency:

JOIN > GROUP BY > COUNT(DISTINCT) > ROW_NUMBER() > dynamic partition

JOIN skew

Large table + small table

When one side of a join is small enough to fit in memory, a broadcast join eliminates the shuffle entirely. Use the MAPJOIN hint to broadcast the small tables.

Skewed query:

In the following example, t1 is a large table, and t2 and t3 are small tables.

SELECT  t1.ip
        ,t1.is_anon
        ,t1.user_id
        ,t1.user_agent
        ,t1.referer
        ,t2.ssl_ciphers
        ,t3.shop_province_name
        ,t3.shop_city_name
FROM    <viewtable> t1
LEFT OUTER JOIN <other_viewtable> t2
ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
LEFT OUTER JOIN (  SELECT  shop_id
                            ,city_name AS shop_city_name
                            ,province_name AS shop_province_name
                    FROM    <tenanttable>
                    WHERE   ds = MAX_PT('<tenanttable>')
                    AND     is_valid = 1
                ) t3
ON t1.shopid = t3.shop_id

Fixed query: Add the /*+ mapjoin(t2,t3)*/ hint.

SELECT  /*+ mapjoin(t2,t3)*/
        t1.ip
        ,t1.is_anon
        ,t1.user_id
        ,t1.user_agent
        ,t1.referer
        ,t2.ssl_ciphers
        ,t3.shop_province_name
        ,t3.shop_city_name
FROM    <viewtable> t1
LEFT OUTER JOIN (<other_viewtable>) t2
ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
LEFT OUTER JOIN (  SELECT  shop_id
                            ,city_name AS shop_city_name
                            ,province_name AS shop_province_name
                    FROM    <tenanttable>
                    WHERE   ds = MAX_PT('<tenanttable>')
                    AND     is_valid = 1
                ) t3
ON t1.shopid = t3.shop_id

Usage notes:

  • Always reference a small table or subquery by its alias in the hint.

  • MapJoin supports subqueries as small tables.

  • Separate multiple small tables with commas: /*+ mapjoin(a,b,c)*/.

  • Non-equi joins and multiple conditions joined by OR are supported. Omitting the ON clause computes a Cartesian product: select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1; — use with caution as it causes data bloat.

  • MapJoin loads the specified tables into memory during the map stage. The total size after loading must not exceed 512 MB. Raise the limit up to 8,192 MB if needed:

    SET odps.sql.mapjoin.memory.max=2048;

Limits:

Join typeConstraint
LEFT OUTER JOINThe left table must be the large table
RIGHT OUTER JOINThe right table must be the large table
INNER JOINEither side can be the large table
FULL OUTER JOINNot supported
Maximum small tables128 (a syntax error occurs if you exceed this limit)

Large table + medium table

When the small table is too large for MapJoin but still causes skew, use the DISTRIBUTED MAPJOIN hint instead.

Skewed query:

In the following example, t0 is a large table and t1 is a medium table.

SELECT  request_datetime
        ,host
        ,URI
        ,eagleeye_traceid
FROM <viewtable>
    t0
LEFT JOIN (
    SELECT
    traceid,
    eleme_uid,
    isLogin_is
    FROM <servicetable>
    WHERE ds = '${today}'
    AND     hh = '${hour}'
) t1 ON t0.eagleeye_traceid = t1.traceid
WHERE   ds = '${today}'
AND     hh = '${hour}'

Fixed query: Replace with the /*+distmapjoin(t1)*/ hint.

SELECT  /*+distmapjoin(t1)*/
        request_datetime
        ,host
        ,URI
        ,eagleeye_traceid
FROM <viewtable>
    t0
LEFT JOIN (
    SELECT
    traceid,
    eleme_uid,
    isLogin_is
    FROM <servicetable>
    WHERE ds = '${today}'
    AND     hh = '${hour}'
) t1 ON t0.eagleeye_traceid = t1.traceid
WHERE   ds = '${today}'
AND     hh = '${hour}'

Hot keys causing long tails in a join

When certain key values appear far more often than others (hot keys), those reducers take much longer. In the following example, the eleme_uid field has hot key values.

SELECT
eleme_uid,
...
FROM (
    SELECT
    eleme_uid,
    ...
    FROM <viewtable>
)t1
LEFT JOIN(
    SELECT
    eleme_uid,
    ...
    FROM <customertable>
)  t2
ON t1.eleme_uid = t2.eleme_uid;

Four solutions are available. Choose based on how much you know about the skewed keys:

SolutionWhen to useHow it works
SkewJoin hint with values (recommended)You know the skewed key valuesHint the table, columns, and values — no auto-detection overhead
SkewJoin hint without valuesYou know the skewed columns but not the valuesMaxCompute auto-detects the top 20 hot key values (one extra step)
Set SkewJoin parameterYou want a simpler per-query configurationSet odps.sql.skewjoin=true and odps.sql.skewinfo
Manual hot key splittingOther methods are not applicableFilter hot key records, MapJoin them, MergeJoin the rest, then merge
SkewJoin hint (recommended)

Add a SkewJoin hint to the SELECT statement. The hint syntax supports three levels of specificity:

Level 1 — Hint the table only. MaxCompute detects the skewed columns and values automatically.

-- Hint the table's alias
SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1;

Level 2 — Hint the table and columns. Narrows detection to the specified columns.

-- Hint table alias and the columns that might be skewed (c0 and c1 in table a)
SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;

Level 3 — Hint the table, columns, and values. Most efficient: skips auto-detection entirely.

-- Provide the exact skewed key values; enclose STRING values in quotation marks
SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
Level 3 (with values) is faster than Level 1 or Level 2 because it skips the aggregate step that finds hot key values. Use Level 1 or Level 2 only when the skewed values are unknown.

SkewJoin hint requirements:

  • Join key data types must match on both sides. Use CAST in a subquery if they differ:

    CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int);
    CREATE TABLE T1(c0 string, c1 int, c2 int);
    
    -- Option A: cast in the join condition
    SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1;
    
    -- Option B: cast in a subquery
    SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;
  • The join must have a left key = right key condition. Cartesian product joins are not supported.

  • A join cannot have both a MapJoin hint and a SkewJoin hint.

  • To change the default number of hot key values detected (20), run: set odps.optimizer.skew.join.topk.num = <value>;

Supported join types:

Join typeWhich side to hint
Inner joinEither side
Left join, semi join, anti joinLeft table only
Right joinRight table only
Full joinNot supported
Set the SkewJoin parameter

Set both parameters — odps.sql.skewjoin alone has no effect.

SET odps.sql.skewjoin=true;
SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")];  -- skewed_src is the traffic table; skewed_value is the hot key value

Examples:

-- Single skewed value in a single field
SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")];

-- Multiple skewed values in a single field
SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
Manual hot key splitting

Filter records with hot key values from the primary table, apply MapJoin to them, apply MergeJoin to the remaining records, and merge the results with UNION ALL.

SELECT
/*+ MAPJOIN (t2) */
eleme_uid,
...
FROM (
    SELECT
    eleme_uid,
    ...
    FROM <viewtable>
    WHERE eleme_uid = <skewed_value>
)t1
LEFT JOIN(
    SELECT
    eleme_uid,
    ...
    FROM <customertable>
    WHERE eleme_uid = <skewed_value>
)  t2
ON t1.eleme_uid = t2.eleme_uid
UNION ALL
SELECT
eleme_uid,
...
FROM (
    SELECT
    eleme_uid,
    ...
    FROM <viewtable>
    WHERE eleme_uid != <skewed_value>
)t3
LEFT JOIN(
    SELECT
    eleme_uid,
    ...
    FROM <customertable>
    WHERE eleme_uid != <skewed_value>
)  t4
ON t3.eleme_uid = t4.eleme_uid
Modulo-equi join with a multiplier table

This approach uses a multiplier table — a single-column INT table with values from 1 to N, where N reflects the degree of skew. Join the user behavior table against the multiplier table to expand it N times, then join using both the user ID and the number column. This distributes what was a single-key join into N parallel joins, reducing skew to 1/N of the original level.

This method causes data to expand N times overall. It fundamentally changes the business logic SQL and is generally not recommended.
SELECT
eleme_uid,
...
FROM (
    SELECT
    eleme_uid,
    ...
    FROM <viewtable>
)t1
LEFT JOIN(
    SELECT
    /*+mapjoin(<multipletable>)*/
    eleme_uid,
    number
    ...
    FROM <customertable>
    JOIN <multipletable>
)  t2
ON t1.eleme_uid = t2.eleme_uid
AND mod(t1.<value_col>,10)+1 = t2.number;

GROUP BY skew

GROUP BY skew occurs when key values are unevenly distributed, causing some reducers to handle far more rows than others.

Skewed query:

SELECT  shop_id
        ,sum(is_open) AS business_days
FROM    table_xxx_di
WHERE   dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;

Three solutions are available:

SolutionWhen to use
Set the anti-skew parameterQuickest fix; no SQL rewrite needed
Add a random numberMore control; requires SQL rewrite
Create a rolling tableLong-running daily tasks reading many partitions

Set the anti-skew parameter (recommended)

SET odps.sql.groupby.skewindata=true;

This is the simplest option. Resource consumption and runtime are comparable to the random number approach, but it requires no SQL changes. For most cases with multiple long-tail keys, this is more cost-effective than finding and rewriting each one manually.

Add a random number

If you have identified the specific key causing the long tail, redistribute its work using a two-stage aggregation. This changes the execution plan from M->R to M->R->R, processing the long-tail key in two steps. Although the number of execution steps increases, the overall runtime is reduced. The resource consumption and time efficiency are similar to the anti-skew parameter approach.

-- Assume that the key that causes a long tail is KEY001
SELECT  a.Key
        ,SUM(a.Cnt) AS Cnt
FROM(SELECT  Key
            ,COUNT(*) AS Cnt
            FROM    <TableName>
            GROUP BY Key
            ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50
             ELSE 0
            END
        ) a
GROUP BY a.Key;

Create a rolling table

For daily tasks that aggregate data over a 365-day window, reading all 365 partitions every run is expensive. A rolling table pre-aggregates historical data and updates incrementally, so each run reads only 2 partitions instead of 365. This significantly reduces shop_id duplication and cuts resource consumption.

Initialize the rolling table once:

CREATE TABLE IF NOT EXISTS m_xxx_365_df
(
  shop_id STRING COMMENT,
  last_update_ds COMMENT,
  365d_open_days COMMENT
)
PARTITIONED BY
(
  ds STRING COMMENT 'Date partition'
)LIFECYCLE 7;

-- Initialize with 365 days of data (example: May 1, 2021 to May 1, 2022)
INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501')
  SELECT shop_id,
         max(ds) as last_update_ds,
         sum(is_open) AS 365d_open_days
  FROM table_xxx_di
  WHERE dt BETWEEN '20210501' AND '20220501'
  GROUP BY shop_id;

Then run this incremental update daily:

INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}')
  SELECT aa.shop_id,
         aa.last_update_ds,
         365d_open_days - COALESCE(is_open, 0) AS 365d_open_days -- Prevent infinite rolling of business days
  FROM (
    SELECT shop_id,
           max(last_update_ds) AS last_update_ds,
           sum(365d_open_days) AS 365d_open_days
    FROM (
      SELECT shop_id,
             ds AS last_update_ds,
             sum(is_open) AS 365d_open_days
      FROM table_xxx_di
      WHERE ds = '${bizdate}'
      GROUP BY shop_id
      UNION ALL
      SELECT shop_id,
             last_update_ds,
             365d_open_days
      FROM m_xxx_365_df
      WHERE dt = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}'
      GROUP BY shop_id
    )
    GROUP BY shop_id
  ) AS aa
  LEFT JOIN (
    SELECT shop_id,
           is_open
    FROM table_xxx_di
    WHERE ds = '${bizdate_366}'
  ) AS bb
  ON aa.shop_id = bb.shop_id;

COUNT(DISTINCT) skew

COUNT(DISTINCT) is prone to skew when the GROUP BY key has a heavily skewed data distribution. Consider a table with this partition distribution:

Partition (ds)Row count
2022041673,025,514
202204152,292,806
202204172,319,160

The following query skews because the 20220416 partition has roughly 30 times more rows than the others:

SELECT  ds
        ,COUNT(DISTINCT shop_id) AS cnt
FROM    demo_data0
GROUP BY ds;

Three solutions are available:

SolutionWhen to use
Set the anti-skew parameterSimplest option; effective when the GROUP BY data is skewed but the DISTINCT field is not
Two-stage aggregationGROUP BY or DISTINCT field data is non-uniform
Pre-deduplicate then countBoth the GROUP BY and DISTINCT fields are uniform

Set the anti-skew parameter (recommended)

SET odps.sql.groupby.skewindata=true;

Two-stage aggregation

Attach a random number to the partition key to spread the load across multiple reducers, then aggregate again to get the final count.

-- Method 1: Concatenate a random number to create rand_ds, then split it back
-- Note: Add CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds to the SELECT clause of the subquery
SELECT  SPLIT_PART(rand_ds, '_',2) ds
        ,COUNT(*) id_cnt
  FROM (
        SELECT  rand_ds
                ,shop_id
        FROM    demo_data0
        GROUP BY rand_ds,shop_id
        )
GROUP BY SPLIT_PART(rand_ds, '_',2);

-- Method 2: Add a separate random integer column
-- Note: Add ROUND(RAND(),1)*10 AS randint10 to the SELECT clause of the subquery
SELECT  ds
        ,COUNT(*) id_cnt
FROM    (SELECT  ds
                 ,randint10
                 ,shop_id
           FROM  demo_data0
        GROUP BY ds,randint10,shop_id
        )
GROUP BY ds;

Pre-deduplicate then count

When both the GROUP BY field (ds) and the DISTINCT field (shop_id) are uniformly distributed, first deduplicate by grouping on both fields, then count the distinct rows.

SELECT  ds
        ,COUNT(*) AS cnt
FROM(SELECT  ds
            ,shop_id
            FROM    demo_data0
            GROUP BY ds ,shop_id
    )
GROUP BY ds;

ROW_NUMBER() skew (TopN)

ROW_NUMBER() skew occurs when the PARTITION BY key is heavily skewed, causing the window function to accumulate too many rows in a single reducer.

Skewed query (top 10 per `main_id`):

SELECT  main_id
        ,type
FROM    (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM <data_demo2>
        ) A
WHERE   A.rn <= 10;

Two solutions are available:

SolutionTrade-off
Two-stage SQL aggregationMore code; no external dependency
UDAF with min-heapLess code; requires deploying a Python UDAF

Two-stage SQL aggregation

Add a random column to the innermost query and include it in the PARTITION BY clause to spread the data across reducers in the first stage. The second stage collects the final top N.

-- Version 1: Use modulo on a random number to create src_pt (0–10)
SELECT  main_id
        ,type
  FROM  (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM (SELECT  main_id
                          ,type
                        FROM (SELECT  main_id
                                      ,type
                                      ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                                 FROM (SELECT  main_id
                                               ,type
                                               ,ceil(110 * rand()) % 11 AS src_pt
                                         FROM  data_demo2
                                            )
                                    ) B
                        WHERE   B.rn <= 10
                    )
        ) A
WHERE   A.rn <= 10;

-- Version 2: Use a simple random number as src_pt (1–10)
SELECT  main_id
        ,type
  FROM  (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM (SELECT  main_id
                          ,type
                    FROM(SELECT  main_id
                                 ,type
                                 ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                           FROM  (SELECT  main_id
                                          ,type
                                          ,ceil(10 * rand()) AS src_pt
                                          FROM    data_demo2
                                  )
                                ) B
                        WHERE B.rn <= 10
                    )
        ) A
WHERE   A.rn <= 10;

UDAF with min-heap

For complex TopN queries, a Python user-defined aggregate function (UDAF) using a min-heap priority queue reduces the code significantly. The UDAF keeps only the top K elements at each stage:

  • iterate: Pushes the first K elements, then compares each subsequent element with the current minimum and swaps if larger.

  • merge: Merges two heaps and keeps the top K elements.

  • terminate: Returns the heap as an array.

@annotate('* -> array<string>')
class GetTopN(BaseUDAF):
    def new_buffer(self):
        return [[], None]
    def iterate(self, buffer, order_column_val, k):
        if not buffer[1]:
            buffer[1] = k
        if len(buffer[0]) < k:
            heapq.heappush(buffer[0], order_column_val)
        else:
            heapq.heappushpop(buffer[0], order_column_val)
    def merge(self, buffer, pbuffer):
        first_buffer, first_k = buffer
        second_buffer, second_k = pbuffer
        k = first_k or second_k
        merged_heap = first_buffer + second_buffer
        merged_heap.sort(reverse=True)
        merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap
        buffer[0] = merged_heap
        buffer[1] = k
    def terminate(self, buffer):
        return buffer[0]

After deploying the UDAF, use it in SQL:

SET odps.sql.python.version=cp37;
SELECT main_id,type_val FROM (
  SELECT  main_id ,get_topn(type, 10) AS type_array
  FROM data_demo2
  GROUP BY main_id
)
LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;

Dynamic partition skew

Dynamic partitioning inserts data into partitions determined at runtime, based on the partition column value in the SELECT clause. This is useful when you have many partitions and want to avoid writing one INSERT OVERWRITE per partition.

Example:

CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);

INSERT OVERWRITE TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue, region
FROM sale_detail;

Why skew occurs: By default, MaxCompute introduces an extra reduce stage (odps.sql.reshuffle.dynamicpt=true) to group rows destined for the same partition into a small number of reduce instances. This prevents too many small files, but it also creates an uneven data distribution across reducers — causing long tails.

When the extra reduce stage causes more harm than good: If you have only a small number of dynamic partitions (50 or fewer), the extra reduce stage provides no benefit and wastes resources. Disabling it can dramatically reduce runtime.

Two solutions are available:

SolutionWhen to use
Disable reshuffleDynamic partitions 50 or fewer; no risk of small file explosion
Isolate large partitionsA small number of partitions hold most of the data

Disable the dynamic partition reshuffle (recommended for 50 or fewer partitions)

Set odps.sql.reshuffle.dynamicpt=false for any query that meets all three conditions:

  1. Uses dynamic partitions

  2. Creates 50 or fewer partitions

  3. Does not already have set odps.sql.reshuffle.dynamicpt=false

Example impact: A task that took 1 hour 30 minutes with default settings — with the last reduce taking 1 hour 20 minutes (90% of total runtime) — completed in 9 minutes after setting this parameter to false. Historical data confirmed only about 2 dynamic partitions were created per day.

SET odps.sql.reshuffle.dynamicpt=false;
INSERT OVERWRITE TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp)
SELECT /*+ mapjoin(t2) */
    '20150503' AS ds,
    t1.lv AS lv,
    t1.type AS tp
FROM
    (SELECT  ...
    FROM tbbi.ads_tb_cornucopia_user_d
    WHERE ds = '20150503'
    AND lv IN ('flat', '3rd')
    AND tp = 'T'
    AND pref_cat2_id > 0
    ) t1
JOIN
    (SELECT ...
    FROM tbbi.ads_tb_cornucopia_auct_d
    WHERE ds = '20150503'
    AND tp = 'T'
    AND is_all = 'N'
    AND cat2_id > 0
    ) t2
ON t1.pref_cat2_id = t2.cat2_id;

This optimization applies to both slow and fast tasks. The urgency of applying it scales with the last Fuxi instance's runtime:

Last Fuxi instance runtimePriority
> 30 minutesCritical
20–30 minutesHigh
10–20 minutesMedium
< 10 minutesLow

Isolate large partitions

When a small number of partitions contain the vast majority of rows, isolating them and inserting separately prevents them from overloading a single reducer.

Step 1: Identify the large partitions.

SELECT  ds
        ,hh
        ,COUNT(*) AS cnt
FROM    dwd_alsc_ent_shop_info_hi
GROUP BY ds
         ,hh
ORDER BY cnt DESC;

Example output:

dshhcnt
20200928171,052,800
20191017171,041,234
20210928171,034,332
20190328171,000,321
20210504119
201910032018
20200522118
20220504118

Step 2: Insert the remaining partitions and the large partitions separately, each with reshuffle disabled.

-- Insert all rows except the large partitions
SET odps.sql.reshuffle.dynamicpt=false;
INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
SELECT  *
FROM    dwd_alsc_ent_shop_info_hi
WHERE   CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817');

-- Insert the large partitions separately
SET odps.sql.reshuffle.dynamicpt=false;
INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
SELECT  *
FROM    dwd_alsc_ent_shop_info_hi
WHERE   CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817');

Also consider increasing the mapper split size to control the number of map instances when doing a full table scan:

SET odps.sql.mapper.split.size=128;
INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
SELECT  *
FROM    dwd_alsc_ent_shop_info_hi;