This topic describes common data skew scenarios in MaxCompute and provides corresponding solutions.
MapReduce
To understand data skew, you must first understand MapReduce. MapReduce is a distributed computing framework that uses a divide-and-conquer method. This method splits large or complex problems into smaller, more manageable subproblems, solves them, and then merges the results into a final solution. MapReduce offers higher fault tolerance, ease of use, and better extensibility than traditional parallel programming frameworks. When you use MapReduce for parallel programming, you do not need to manage non-programming issues in distributed clusters, such as data storage, internode communication, and transmission mechanisms. This feature greatly simplifies distributed programming.
The following figure shows the workflow of MapReduce.
Data skew
Data skew often occurs during the reduce stage. Mappers typically split data uniformly based on input files. Data skew occurs when data in a table is unevenly distributed among different workers. This uneven distribution causes some workers to finish their computations quickly while others take much longer. In production environments, most data is skewed, often following the 80/20 rule. For example, 20% of active users on a forum might contribute 80% of the posts, or 20% of users might generate 80% of a website's traffic. In the era of big data, with its explosive growth in data volume, data skew can severely impact the performance of distributed programs. A common symptom is a job's execution progress getting stuck at 99%.
How to identify data skew
In MaxCompute, you can use Logview to identify data skew. The following steps describe the process:
In Fuxi Jobs, sort the jobs by latency in descending order and select the job stage with the longest runtime.
In the Fuxi Instance list for the Fuxi Stage, sort the tasks by latency in descending order. Select a task with a runtime that is significantly longer than the average. Typically, you can focus on the first task in the list. View its StdOut log.
Use the information from StdOut to view the corresponding job execution graph.
Use the key information from the job execution graph to locate the SQL code snippet that is causing the data skew.
The following example shows how to use this method.
Find the Logview log from the task's operational logs. For more information, see Logview entry points.

To quickly identify the problem, go to the Logview interface, sort the Fuxi tasks by Latency in descending order, and select the task with the longest runtime.

The task
R31_26_27has the longest runtime. Click theR31_26_27task to open the instance details page, as shown in the following figure.
The value Latency: {min:00:00:06, avg:00:00:13, max:00:26:40}indicates that for all instances of this task, the minimum runtime is6 s, the average runtime is13 s, and the maximum runtime is26 minutes and 40 seconds. You can sort byLatency(instance runtime) in descending order. You will see four instances with long runtimes. MaxCompute considers a Fuxi instance to be long-tailed if its runtime is more than twice the average. This means that any task instance with a runtime longer than26 sis considered long-tailed. In this case, 21 instances have a runtime longer than26 s. The presence of long-tailed instances does not always indicate a task skew. You also need to compare theavgandmaxvalues of the instance runtime. If themaxvalue is much larger than theavgvalue, it indicates a severe data skew. This task requires administration.Click the
icon in the StdOut column to view the output log, as shown in the following figure. 
After you identify the problem, go to the Job Details tab. Right-click
R31_26_27and select Expand All to expand the task. For more information, see Use Logview 2.0 to view job information.
Examine the step before StreamLineRead22, which isStreamLineWriter21. This step reveals the keys that cause the data skew:new_uri_path_structure,cookie_x5check_userid, andcookie_userid. Use this information to find the SQL snippet that is causing the data skew.
Data skew troubleshooting and solutions
The most common causes of data skew are as follows:
Join
GroupBy
Count (Distinct)
ROW_NUMBER (TopN)
Dynamic partition
The frequency of occurrence, from most to least common, is: JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > Dynamic partition.
Join
Data skew from a join operation can occur in various situations, such as joining a large table with a small table, joining a large table with a medium table, or when hot key values cause long tails.
Joining a large table with a small table.
Data skew example
In the following example,
t1is a large table, andt2andt3are small tables.SELECT t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN <other_viewtable> t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_idSolution
Use the MAPJOIN HINT syntax, as shown in the following example.
SELECT /*+ mapjoin(t2,t3)*/ t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN (<other_viewtable>) t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_idNotes
When you reference a small table or subquery, you must use its alias.
MapJoin supports using a subquery as a small table.
In a MapJoin, you can use non-equi joins or connect multiple conditions with
OR. You can also compute a Cartesian product by omitting theONclause and usingmapjoin on 1 = 1, for example,select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;. However, this operation can cause data bloat.In a MapJoin, separate multiple small tables with commas (
,), such as/*+ mapjoin(a,b,c)*/.During the map stage, MapJoin loads all data from the specified tables into memory. Therefore, the specified tables must be small. The total memory occupied by the tables after being loaded must not exceed 512 MB. Because MaxCompute uses compressed storage, the data size of a small table expands significantly after it is loaded into memory. The 512 MB limit applies to the size after the data is loaded. You can increase this limit to a maximum of 8192 MB by setting the following parameter.
SET odps.sql.mapjoin.memory.max=2048;
Limits on MapJoin operations
In a
LEFT OUTER JOIN, the left table must be the large table.In a
RIGHT OUTER JOIN, the right table must be the large table.FULL OUTER JOINis not supported.In an
INNER JOIN, either the left or the right table can be the large table.MapJoin supports a maximum of 128 small tables. A syntax error occurs if you specify more.
Joining a large table with a medium table.
Data skew example
In the following example,
t0is a large table andt1is a medium table.SELECT request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'Solution
Use the DISTRIBUTED MAPJOIN hint to resolve data skew, as shown in the following example.
SELECT /*+distmapjoin(t1)*/ request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'
Long tails caused by hot key values in a join.
Data skew example
In the following example, the
eleme_uidfield contains many hot key values, which can cause data skew.SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> ) t2 ON t1.eleme_uid = t2.eleme_uid;Solution
You can use one of the following four methods to resolve this issue.
No.
Solution
Description
Solution 1
Manual Hot Value Splitting
Analyze and identify the hot key values. Filter the records with hot key values from the primary table. First, perform a MapJoin on them. Then, perform a MergeJoin on the remaining records with non-hot key values. Finally, merge the results of the two joins.
Solution 2
Set the SkewJoin parameter
set odps.sql.skewjoin=true;.Solution 3
SkewJoin Hint
Use a hint:
/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. The SkewJoin Hint method adds an extra step to find the skewed keys, which increases the query runtime. If you already know the skewed keys, you can set the SkewJoin parameter to save time.Solution 4
Perform a modulo equal join with a multiplier table
Use a multiplier table.
Manually split hot key values.
This method involves analyzing and identifying the hot key values. First, filter the records that contain hot key values from the primary table and perform a MapJoin on them. Then, perform a MergeJoin on the remaining records that contain non-hot key values. Finally, merge the results of the two joins. The following code provides an example:
SELECT /*+ MAPJOIN (t2) */ eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid = <skewed_value> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid = <skewed_value> ) t2 ON t1.eleme_uid = t2.eleme_uid UNION ALL SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid != <skewed_value> )t3 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid != <skewed_value> ) t4 ON t3.eleme_uid = t4.eleme_uidSet the SkewJoin parameter.
This is a common solution. MaxCompute provides the
set odps.sql.skewjoin=true;parameter to enable the SkewJoin feature. However, simply enabling SkewJoin does not affect the task's execution. You must also set theodps.sql.skewinfoparameter for the feature to take effect. Theodps.sql.skewinfoparameter specifies the details for join optimization. The following is an example of the command syntax.SET odps.sql.skewjoin=true; SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")]; --skewed_src is the traffic table, and skewed_value is the hot key value.The following examples show how to use the command:
--For a single skewed value in a single field SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")]; --For multiple skewed values in a single field SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];SkewJoin Hint.
To execute a MapJoin in a
SELECTstatement, use the following hint:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. In this hint,table_nameis the skewed table,column_nameis the skewed column, andvalueis the skewed key value. The following examples show how to use this hint.--Method 1: Hint the table name. Note that you hint the table's alias. SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1; --Method 2: Hint the table name and the columns that you think might be skewed. For example, columns c0 and c1 in table a have data skew. SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2; --Method 3: Hint the table name and columns, and provide the skewed key values. If the type is STRING, enclose the values in quotation marks. For example, the values for (a.c0=1 and a.c1="2") and (a.c0=3 and a.c1="4") both have data skew. SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;NoteThe SkewJoin hint method that directly specifies values is more efficient than manually splitting hot key values or setting the SkewJoin parameter without specifying values.
Join types supported by SkewJoin Hint:
Inner Join: You can hint either side of the join.
Left Join, Semi Join, and Anti Join: You can only hint the left table.
Right Join: You can only hint the right table.
Full Join: Skew Join Hint is not supported.
Add hints only to joins that are certain to have data skew because the hint runs an aggregate operation, which incurs a cost.
For a hinted join, the data type of the left join key must match the data type of the right join key. Otherwise, the SkewJoin hint does not work. For example, in the previous example, the type of
a.c0must match the type ofb.c0, and the type ofa.c1must match the type ofb.c1. You can use CAST in a subquery to ensure the join key types are consistent. The following examples show how to do this:CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int); CREATE TABLE T1(c0 string, c1 int, c2 int); --Method 1: SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1; --Method 2: SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;After you add a SkewJoin hint, the optimizer runs an aggregate operation to retrieve the top 20 hot key values. The value
20is the default. You can change this value by runningset odps.optimizer.skew.join.topk.num = xx;.The SkewJoin hint supports hinting only one side of a join.
A hinted join must have a
left key = right keycondition and does not support Cartesian product joins.You cannot add a SkewJoin hint to a join that already has a MapJoin hint.
Perform a modulo-equi-join with a multiplier table.
The logic of this solution is different from the previous three. It is not a divide-and-conquer approach. Instead, it uses a multiplier table that has a single column of the INT data type. The values can range from 1 to N, where N is determined by the degree of data skew. This multiplier table is used to expand the user behavior table by N times. Then, when you perform the join, you can use both the user ID and the
numberas association keys. The original data skew, which was caused by distributing data only by user ID, is reduced to1/Nof its original level because of the addednumbercondition. However, this method also causes the data to bloat by N times.SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT /*+mapjoin(<multipletable>)*/ eleme_uid, number ... FROM <customertable> JOIN <multipletable> ) t2 ON t1.eleme_uid = t2.eleme_uid AND mod(t1.<value_col>,10)+1 = t2.number;To address the data bloat issue, you can limit the expansion to only the records that have hot key values in both tables. Records with non-hot key values remain unchanged. First, find the records with hot key values. Then, process the traffic table and the user behavior table separately by adding a new column named
eleme_uid_join. If a user ID is a hot key value, useCONCATto append a randomly assigned positive integer (for example, from 0 to 1,000). If it is not a hot key value, keep the original user ID. When you join the two tables, use theeleme_uid_joincolumn. This method increases the multiplier for hot key values to reduce skew and avoids unnecessary data bloat for non-hot key values. However, this logic completely changes the original business logic SQL and is therefore not recommended.
GroupBy
The following example shows pseudocode with a GroupBy clause.
SELECT shop_id
,sum(is_open) AS business_days
FROM table_xxx_di
WHERE dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;If data skew occurs, you can resolve it using one of the following three solutions:
No. | Solution | Description |
Solution 1 | Set the anti-skew parameter for Group By |
|
Solution 2 | Add a random number | Split the key that is causing the long tail. |
Solution 3 | Create a rolling table | Reduce costs and improve efficiency. |
Solution 1: Set the anti-skew parameter for `GROUP BY`.
SET odps.sql.groupby.skewindata=true;Solution 2: Add a random number.
Unlike Solution 1, this solution requires you to rewrite the SQL statement. Adding a random number to split the key that causes a long tail is an effective way to resolve `GROUP BY` long tails.
For the SQL statement
SELECT Key, COUNT(*) AS Cnt FROM TableName GROUP BY Key;, if a combiner is not used, the map node shuffles data to the reduce node. The reduce node then performs the `COUNT` operation. The corresponding execution plan isM->R.If you have found the key that causes the long tail, you can redistribute the work for that key as follows:
-- Assume that the key that causes a long tail is KEY001. SELECT a.Key ,SUM(a.Cnt) AS Cnt FROM(SELECT Key ,COUNT(*) AS Cnt FROM <TableName> GROUP BY Key ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50 ELSE 0 END ) a GROUP BY a.Key;After the change, the execution plan becomes
M->R->R. Although the number of execution steps increases, the long-tail key is processed in two steps, which can reduce the overall runtime. The resource consumption and time efficiency are similar to Solution 1. However, in real-world scenarios, a long tail is often caused by more than one key. Considering the cost of finding long-tail keys and rewriting SQL statements, Solution 1 is more cost-effective.Create a rolling table.
The core of this solution is to reduce costs and improve efficiency. The main requirement is to retrieve merchant data from the past year. For online tasks, reading all partitions from
T-1toT-365each time wastes significant resources. You can create a rolling table to reduce the number of partitions that are read without affecting the retrieval of data from the past year. The following example shows how to do this.First, initialize 365 days of merchant business data that is aggregated using Group By. Mark the data update date and store the data in a table named
a. For subsequent online tasks, you can join theT-2day tableawith thetable_xxx_ditable, and then use Group By. This way, the number of partitions read each day is reduced from 365 to 2. The duplication of the primary keyshopidis greatly reduced, which also reduces resource consumption.-- Create a rolling table. CREATE TABLE IF NOT EXISTS m_xxx_365_df ( shop_id STRING COMMENT, last_update_ds COMMENT, 365d_open_days COMMENT ) PARTITIONED BY ( ds STRING COMMENT 'Date partition' )LIFECYCLE 7; -- Assume that 365d is from May 1, 2021 to May 1, 2022. Initialize the table first. INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501') SELECT shop_id, max(ds) as last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE dt BETWEEN '20210501' AND '20220501' GROUP BY shop_id; -- Then, the online task to be executed is as follows. INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}') SELECT aa.shop_id, aa.last_update_ds, 365d_open_days - COALESCE(is_open, 0) AS 365d_open_days -- Eliminate the infinite rolling of business days. FROM ( SELECT shop_id, max(last_update_ds) AS last_update_ds, sum(365d_open_days) AS 365d_open_days FROM ( SELECT shop_id, ds AS last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE ds = '${bizdate}' GROUP BY shop_id UNION ALL SELECT shop_id, last_update_ds, 365d_open_days FROM m_xxx_365_df WHERE dt = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}' GROUP BY shop_id ) GROUP BY shop_id ) AS aa LEFT JOIN ( SELECT shop_id, is_open FROM table_xxx_di WHERE ds = '${bizdate_366}' ) AS bb ON aa.shop_id = bb.shop_id;
Count (Distinct)
Suppose a table has the following data distribution.
ds (partition) | cnt (number of records) |
20220416 | 73025514 |
20220415 | 2292806 |
20220417 | 2319160 |
Using the following statement can cause data skew:
SELECT ds
,COUNT(DISTINCT shop_id) AS cnt
FROM demo_data0
GROUP BY ds;The following solutions are available:
No. | Solution | Description |
Solution 1 | Parameter setting optimization |
|
Solution 2 | General two-stage aggregation | Concatenate a random number to the partition field value. |
Solution 3 | Similar to two-stage aggregation | First, group by two fields |
Solution 1: Optimize by setting a parameter.
Set the following parameter.
SET odps.sql.groupby.skewindata=true;Solution 2: Use general two-stage aggregation.
If the data in the
shop_idfield is not uniform, you cannot use Solution 1 for optimization. A more general method is to concatenate a random number to the value of the partition field.-- Method 1: Concatenate a random number. CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds SELECT SPLIT_PART(rand_ds, '_',2) ds ,COUNT(*) id_cnt FROM ( SELECT rand_ds ,shop_id FROM demo_data0 GROUP BY rand_ds,shop_id ) GROUP BY SPLIT_PART(rand_ds, '_',2); -- Method 2: Add a random number field. ROUND(RAND(),1)*10 AS randint10 SELECT ds ,COUNT(*) id_cnt FROM (SELECT ds ,randint10 ,shop_id FROM demo_data0 GROUP BY ds,randint10,shop_id ) GROUP BY ds;Solution 3: Use a method similar to two-stage aggregation.
If the data in both the GroupBy and Distinct fields is uniform, you can optimize the query by first grouping by two fields (`ds` and `shop_id`) and then using the
count(distinct)command.SELECT ds ,COUNT(*) AS cnt FROM(SELECT ds ,shop_id FROM demo_data0 GROUP BY ds ,shop_id ) GROUP BY ds;
ROW_NUMBER (TopN)
The following example shows the top 10.
SELECT main_id
,type
FROM (SELECT main_id
,type
,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
FROM <data_demo2>
) A
WHERE A.rn <= 10;If data skew occurs, you can resolve it using one of the following methods:
No. | Solution | Description |
Solution 1 | Two-stage aggregation using SQL | Add a random column or concatenate a random number and use it as a parameter in the partition. |
Solution 2 | Two-stage aggregation using a user-defined aggregate function (UDAF) | Optimize using a UDAF with a min-heap priority queue. |
Solution 1: Use two-stage aggregation with SQL.
To make the data in each partition group as uniform as possible during the map stage, you can add a random column and use it as a parameter in the partition.
SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(110 * rand()) % 11 AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10; -- 2. Customize the random number. SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM(SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(10 * rand()) AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10;Solution 2: Use two-stage aggregation with a UDAF.
The SQL method can result in a large amount of code that can be difficult to maintain. In this case, you can use a user-defined aggregate function (UDAF) with a min-heap priority queue for optimization. This means that in the
iteratestage, only the top N items are processed. In themergestage, only N elements are merged. The process is as follows.iterate: Pushes the first K elements. For subsequent elements, continuously compares them with the minimum top element and swaps elements in the heap.merge: Merges two heaps and returns the top K elements in place.terminate: Returns the heap as an array.In the SQL statement, split the array into rows.
@annotate('* -> array<string>') class GetTopN(BaseUDAF): def new_buffer(self): return [[], None] def iterate(self, buffer, order_column_val, k): # heapq.heappush(buffer, order_column_val) # buffer = [heapq.nlargest(k, buffer), k] if not buffer[1]: buffer[1] = k if len(buffer[0]) < k: heapq.heappush(buffer[0], order_column_val) else: heapq.heappushpop(buffer[0], order_column_val) def merge(self, buffer, pbuffer): first_buffer, first_k = buffer second_buffer, second_k = pbuffer k = first_k or second_k merged_heap = first_buffer + second_buffer merged_heap.sort(reverse=True) merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap buffer[0] = merged_heap buffer[1] = k def terminate(self, buffer): return buffer[0] SET odps.sql.python.version=cp37; SELECT main_id,type_val FROM ( SELECT main_id ,get_topn(type, 10) AS type_array FROM data_demo2 GROUP BY main_id ) LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;
Dynamic partition
Dynamic partitioning is a feature that lets you specify a partition key column when you insert data into a partitioned table, but without providing a specific value. Instead, the corresponding column in the Select clause provides the partition value. Therefore, you do not know which partitions will be created before the SQL statement runs. You can determine which partitions are created only after the SQL statement is run, based on the values of the partition column. For more information, see Insert or overwrite data into dynamic partitions (DYNAMIC PARTITION). The following is an SQL example.
CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);
INSERT OVERWRITE TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue, region
FROM sale_detail;In many scenarios, creating tables with dynamic partitions can lead to data skew. If data skew occurs, you can use the following solutions to resolve it.
No. | Solution | Description |
Solution 1 | Parameter configuration optimization | Optimize through parameter configuration. |
Solution 2 | Partition pruning optimization | Find partitions with many records, prune them, and insert them separately. |
Solution 1: Optimize by configuring parameters.
Dynamic partitioning lets you put data that meets different conditions into different partitions. This feature eliminates the need for multiple Insert Overwrite operations to write to the table, especially when there are many partitions, and can greatly simplify your code. However, dynamic partitioning can also create too many small files.
Data skew example
Take the following simple SQL as an example.
INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM part_test;Assume there are K Map instances and N target partitions.
ds=1 cfile1 ds=2 ... X ds=3 cfilek ... ds=nIn the most extreme case,
K*Nsmall files can be generated. Too many small files can place an enormous management strain on the file system. Therefore, MaxCompute handles dynamic partitions by introducing an extra reduce stage. It assigns the same target partition to a single reduce instance or a few reduce instances to write to. This method avoids creating too many small files. This reduce operation is always the last reduce task operation. In MaxCompute, this feature is enabled by default, which means the following parameter is set to `true`.SET odps.sql.reshuffle.dynamicpt=true;Enabling this feature by default resolves the issue of too many small files and prevents tasks from failing because a single instance generates too many files. However, it also introduces new problems, such as data skew and the consumption of computing resources by the extra reduce operation. Therefore, you must carefully balance these factors.
Solution
The purpose of enabling
set odps.sql.reshuffle.dynamicpt=true;and introducing an extra reduce stage is to resolve the issue of too many small files. However, if the number of target partitions is small and there is no risk of creating too many small files, enabling this feature by default wastes computing resources and reduces performance. In this situation, you can disable this feature by settingset odps.sql.reshuffle.dynamicpt=false;to significantly improve performance. The following is an example.INSERT OVERWRITE TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp) SELECT /*+ mapjoin(t2) */ '20150503' AS ds, t1.lv AS lv, t1.type AS tp FROM (SELECT ... FROM tbbi.ads_tb_cornucopia_user_d WHERE ds = '20150503' AND lv IN ('flat', '3rd') AND tp = 'T' AND pref_cat2_id > 0 ) t1 JOIN (SELECT ... FROM tbbi.ads_tb_cornucopia_auct_d WHERE ds = '20150503' AND tp = 'T' AND is_all = 'N' AND cat2_id > 0 ) t2 ON t1.pref_cat2_id = t2.cat2_id;If you use the default parameters for the preceding code, the entire task takes about 1 hour and 30 minutes to run. The last reduce task takes about 1 hour and 20 minutes, which is approximately
90%of the total runtime. Introducing an extra reduce task makes the data distribution of each reduce instance very uneven, which leads to a long tail.
For the preceding example, an analysis of the historical number of dynamic partitions created shows that only about two dynamic partitions are created each day. Therefore, you can safely set
set odps.sql.reshuffle.dynamicpt=false;. The task can then be completed in only 9 minutes. In this case, setting this parameter tofalsecan significantly improve performance, save computing time and resources, and provide a high marginal benefit by simply setting one parameter.This optimization is not limited to large tasks that take a long time and consume a large amount of resources. For small, ordinary tasks that run quickly and consume few resources, you can set the
odps.sql.reshuffle.dynamicptparameter tofalseas long as they use dynamic partitions and the number of dynamic partitions is not large. This setting saves resources and improves performance in all cases.Nodes that meet the following three conditions can be optimized, regardless of the task duration.
Uses dynamic partitions
Number of dynamic partitions <= 50
Does not have `set odps.sql.reshuffle.dynamicpt=false;`
The urgency of setting this parameter for a node is determined by the execution time of the last Fuxi instance, which is identified by the
diag_levelfield, based on the following rules:Last_Fuxi_Inst_Timeis greater than 30 minutes:Diag_Level=4 ('Critical').Last_Fuxi_Inst_Timeis between 20 and 30 minutes:Diag_Level=3 ('High').Last_Fuxi_Inst_Timeis between 10 and 20 minutes:Diag_Level=2 ('Medium').Last_Fuxi_Inst_Timeis less than 10 minutes:Diag_Level=1 ('Low').
Solution 2: Optimized cropping.
To resolve the data skew that occurs in the map stage when you insert data into dynamic partitions, you can find the partitions with many records, prune them, and then insert them separately. Based on the actual scenario, you can modify the parameter settings for the map stage as shown in the following example:
SET odps.sql.mapper.split.size=128; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;The results show that a full table scan was performed. To further optimize performance, you can disable the system-generated Reduce job as follows:
SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;To resolve data skew that occurs in the map stage when you insert data into dynamic partitions, you can identify the partitions with many records, isolate them, and then insert them separately. The steps are as follows.
Use the following command to query for partitions that contain many records.
SELECT ds ,hh ,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds ,hh ORDER BY cnt DESC;Some of the partitions are as follows:
ds
hh
cnt
20200928
17
1052800
20191017
17
1041234
20210928
17
1034332
20190328
17
1000321
20210504
1
19
20191003
20
18
20200522
1
18
20220504
1
18
Use the following commands to filter and insert data into the partitions identified above, and perform a separate insert operation for partitions that contain many records.
SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817'); SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817'); SELECT ds ,hh,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds,hh ORDER BY cnt DESC;