When your SQL query in MaxCompute runs slowly, the root cause is often not the SQL logic itself, but how the workload is distributed. An improperly configured degree of parallelism (DOP)—the number of parallel instances executing your job—can create severe performance bottlenecks. This can cause all data to be forced onto a single worker, crippling performance, or create excessive overhead from launching too many instances for a small task, leading to long queue times. This guide provides practical techniques to diagnose and tune the DOP for your specific workload. By correctly aligning the DOP with your data and query structure, you can dramatically improve execution speed and optimize resource utilization.
Degree of parallelism (DOP) optimization
The degree of parallelism (DOP) measures how many parallel Instances are used to run a job. In an execution plan, if a task with the ID M1 uses 1,000 Instances, the DOP of M1 is 1000. Properly configuring and tuning the DOP can significantly improve job execution efficiency.
The following sections describe common scenarios for DOP optimization.
Force a single instance for execution
Certain operations force a job to run on a single Instance, which can create a severe performance bottleneck. These operations include:
Performing an aggregation without a
GROUP BYclause or with aGROUP BYclause that uses a constant.Using a window function where the
OVERclause specifiesPARTITION BYa constant.Using a
DISTRIBUTE BYorCLUSTER BYclause with a constant.
Solution: These operations consolidate all data onto a single worker, eliminating the benefits of parallel processing. Review your business logic to determine if these operations are truly necessary. If a global aggregation is required, consider using a two-stage aggregation pattern. This involves first grouping by a high-cardinality key to perform a partial aggregation in parallel, and then performing the final aggregation on the much smaller intermediate result set.
Impact of incorrect instance count
Higher parallelism does not always lead to better performance. Using too many Instances can slow down execution for the following reasons:
Excessive parallelism increases resource wait times and queueing.
Each Instance requires an initialization phase. A higher DOP increases the total time spent on initialization, which reduces the time available for actual computation.
The following scenarios can lead to a suboptimal number of Instances:
Reading a large number of small partitions: For example, if a query needs to read data from 10,000 partitions, the system may launch 10,000 Instances.
Solution: Refactor your SQL query to reduce the number of partitions it needs to scan. Achieve this by implementing more effective partition pruning, filtering out unnecessary partitions early in the query, or breaking a large job into smaller, more targeted jobs.
A small mapper split size: A small split size (e.g., the default
256 MB) can cause a large input dataset to be divided into too many Instances. This leads to short execution times per Instance, with most of the time spent queueing for resources.Solution: Increase the amount of data processed by each mapper by setting a larger split size. This reduces the total number of mapper Instances. You can also manually set the number of reducer Instances.
SET odps.stage.mapper.split.size=<256>; SET odps.stage.reducer.num=<Maximum number of concurrent instances>;
Configure the number of instances
For table read tasks (mappers)
Method 1: Adjust the DOP by setting a parameter.
-- Configure the maximum amount of input data of a mapper. Unit: MB. -- Default value: 256. Valid values: [1,Integer.MAX_VALUE]. SET odps.sql.mapper.split.size=<value>;Method 2: Use a hint to adjust the DOP. MaxCompute provides a
split_sizehint that lets you adjust the DOP for a specific table read operation.-- Set the split size to 1 MB. This setting indicates that a task is split into subtasks based on a size of 1 MB when data in the src table is read. SELECT a.key FROM src a /*+split_size(1)*/ JOIN src2 b ON a.key=b.key;Method 3: Split data at the table level by size, row count, or a specified DOP.
The
odps.sql.mapper.split.sizeparameter applies globally to all Mapper stages in the query and has a minimum value of 1 MB. In some cases, you may need more granular control. For example, if a table contains rows that are small in size but computationally expensive to process, you can increase the overall DOP by processing fewer rows per Instance.Use the following parameters for table-level DOP tuning:
Set the data split size for a single Instance at the table level.
SET odps.sql.split.size = {"table1": 1024, "table2": 512};Set the number of rows processed by a single Instance at the table level.
SET odps.sql.split.row.count = {"table1": 100, "table2": 500};Set the DOP at the table level.
SET odps.sql.split.dop = {"table1": 1, "table2": 5};
NoteThe
odps.sql.split.row.countandodps.sql.split.dopparameters apply only to internal tables, non-transactional tables, and non-clustered tables.For non-read tasks (e.g., reducers)
Adjust the DOP for these tasks in the following ways:
Method 1: Adjust the
odps.stage.reducer.numvalue. Use the following command to force a specific DOP for Reducer tasks. This setting affects all relevant tasks in the query.-- Configure the number of instances that are called to execute reducer tasks. -- Valid values: [1,99999]. SET odps.stage.reducer.num=<value>;Method 2: Adjust the
odps.stage.joiner.numvalue. Use the following command to force a specific DOP for Joiner tasks. This setting affects all relevant tasks in the query.-- Configure the number of instances that are called to execute joiner tasks. -- Valid values: [1,99999]. SET odps.stage.joiner.num=<value>;Method 3: Adjust the input task's parallelism. The parallelism of a non-read task (like a Reducer) depends on the parallelism of its preceding task (like a Mapper). By adjusting the number of Mappers, you can indirectly influence the number of Reducers.
Window function optimization
If your SQL query contains multiple window functions, each function typically triggers a separate Reduce job, which can consume excessive resources. Optimize performance by merging window functions into a single Reduce job if they meet the following criteria:
The
OVERclauses are identical (e.g., have the samePARTITION BYandORDER BYconditions).The window functions are executed in the same
SELECTstatement.
Window functions that meet these two conditions are automatically merged into a single Reduce job. The following example shows a query with two window functions that can be merged.
SELECT
RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK,
ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num
FROM MyTable;Subquery optimization
Consider the following query that uses a subquery with an IN clause.
SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);If the subquery on table_b returns more than 9,999 values for col1, the system reports the error records returned from subquery exceeded limit of 9999. To work around this limitation, rewrite the query using a JOIN statement.
SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);If the
DISTINCTkeyword is omitted and the subquery on tablecreturns duplicate values forcol1, the final result set may contain more rows from tableathan expected.Using
DISTINCTcan force the subquery to run on a single reducer, which becomes a bottleneck if it processes a large amount of data.If you can guarantee the uniqueness of the
col1values from the subquery based on your business logic, you can remove theDISTINCTkeyword to improve performance.
JOIN statement optimization
To ensure efficient partition pruning during a JOIN, always filter partitioned tables as early as possible. Follow these rules:
Apply partition-limiting conditions on the primary table within a subquery before the JOIN.
Place other
WHEREclauses that filter the primary table at the end of the SQL statement.Apply partition-limiting conditions on the secondary table in the
ONclause or within a subquery, not in the finalWHEREclause.
The following examples illustrate these practices.
SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301;
SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301; -- We recommend that you do not use this statement. The system performs the JOIN operation before it performs partition pruning. This increases the amount of data and causes the query performance to deteriorate.
SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;Aggregate function optimization
For string aggregation, the wm_concat function generally offers better performance than collect_list. The following examples show how to use both functions.
-- Implement the collect_list function.
SELECT concat_ws(',', sort_array(collect_list(key))) FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) WITHIN GROUP (ORDER BY key) FROM src;
-- Implement the collect_list function.
SELECT array_join(collect_list(key), ',') FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) FROM src;