All Products
Search
Document Center

MaxCompute:SQL query optimization

Last Updated:Jan 06, 2026

When your SQL query in MaxCompute runs slowly, the root cause is often not the SQL logic itself, but how the workload is distributed. An improperly configured degree of parallelism (DOP)—the number of parallel instances executing your job—can create severe performance bottlenecks. This can cause all data to be forced onto a single worker, crippling performance, or create excessive overhead from launching too many instances for a small task, leading to long queue times. This guide provides practical techniques to diagnose and tune the DOP for your specific workload. By correctly aligning the DOP with your data and query structure, you can dramatically improve execution speed and optimize resource utilization.

Degree of parallelism (DOP) optimization

The degree of parallelism (DOP) measures how many parallel Instances are used to run a job. In an execution plan, if a task with the ID M1 uses 1,000 Instances, the DOP of M1 is 1000. Properly configuring and tuning the DOP can significantly improve job execution efficiency.

The following sections describe common scenarios for DOP optimization.

Force a single instance for execution

Certain operations force a job to run on a single Instance, which can create a severe performance bottleneck. These operations include:

  • Performing an aggregation without a GROUP BY clause or with a GROUP BY clause that uses a constant.

  • Using a window function where the OVER clause specifies PARTITION BY a constant.

  • Using a DISTRIBUTE BY or CLUSTER BY clause with a constant.

Solution: These operations consolidate all data onto a single worker, eliminating the benefits of parallel processing. Review your business logic to determine if these operations are truly necessary. If a global aggregation is required, consider using a two-stage aggregation pattern. This involves first grouping by a high-cardinality key to perform a partial aggregation in parallel, and then performing the final aggregation on the much smaller intermediate result set.

Impact of incorrect instance count

Important

Higher parallelism does not always lead to better performance. Using too many Instances can slow down execution for the following reasons:

  • Excessive parallelism increases resource wait times and queueing.

  • Each Instance requires an initialization phase. A higher DOP increases the total time spent on initialization, which reduces the time available for actual computation.

The following scenarios can lead to a suboptimal number of Instances:

  • Reading a large number of small partitions: For example, if a query needs to read data from 10,000 partitions, the system may launch 10,000 Instances.

    Solution: Refactor your SQL query to reduce the number of partitions it needs to scan. Achieve this by implementing more effective partition pruning, filtering out unnecessary partitions early in the query, or breaking a large job into smaller, more targeted jobs.

  • A small mapper split size: A small split size (e.g., the default 256 MB) can cause a large input dataset to be divided into too many Instances. This leads to short execution times per Instance, with most of the time spent queueing for resources.

    Solution: Increase the amount of data processed by each mapper by setting a larger split size. This reduces the total number of mapper Instances. You can also manually set the number of reducer Instances.

    SET odps.stage.mapper.split.size=<256>;
    SET odps.stage.reducer.num=<Maximum number of concurrent instances>;

Configure the number of instances

  • For table read tasks (mappers)

    • Method 1: Adjust the DOP by setting a parameter.

      -- Configure the maximum amount of input data of a mapper. Unit: MB.
      -- Default value: 256. Valid values: [1,Integer.MAX_VALUE].
      SET odps.sql.mapper.split.size=<value>;
    • Method 2: Use a hint to adjust the DOP. MaxCompute provides a split_size hint that lets you adjust the DOP for a specific table read operation.

      -- Set the split size to 1 MB. This setting indicates that a task is split into subtasks based on a size of 1 MB when data in the src table is read.
      SELECT a.key FROM src a /*+split_size(1)*/ JOIN src2 b ON a.key=b.key;
    • Method 3: Split data at the table level by size, row count, or a specified DOP.

    The odps.sql.mapper.split.size parameter applies globally to all Mapper stages in the query and has a minimum value of 1 MB. In some cases, you may need more granular control. For example, if a table contains rows that are small in size but computationally expensive to process, you can increase the overall DOP by processing fewer rows per Instance.

    Use the following parameters for table-level DOP tuning:

    • Set the data split size for a single Instance at the table level.

      SET odps.sql.split.size = {"table1": 1024, "table2": 512};
    • Set the number of rows processed by a single Instance at the table level.

      SET odps.sql.split.row.count = {"table1": 100, "table2": 500};
    • Set the DOP at the table level.

      SET odps.sql.split.dop = {"table1": 1, "table2": 5};
    Note

    The odps.sql.split.row.count and odps.sql.split.dop parameters apply only to internal tables, non-transactional tables, and non-clustered tables.

  • For non-read tasks (e.g., reducers)

    Adjust the DOP for these tasks in the following ways:

    • Method 1: Adjust the odps.stage.reducer.num value. Use the following command to force a specific DOP for Reducer tasks. This setting affects all relevant tasks in the query.

      -- Configure the number of instances that are called to execute reducer tasks.
      -- Valid values: [1,99999].
      SET odps.stage.reducer.num=<value>;
    • Method 2: Adjust the odps.stage.joiner.num value. Use the following command to force a specific DOP for Joiner tasks. This setting affects all relevant tasks in the query.

      -- Configure the number of instances that are called to execute joiner tasks.
      -- Valid values: [1,99999].
      SET odps.stage.joiner.num=<value>;
    • Method 3: Adjust the input task's parallelism. The parallelism of a non-read task (like a Reducer) depends on the parallelism of its preceding task (like a Mapper). By adjusting the number of Mappers, you can indirectly influence the number of Reducers.

Window function optimization

If your SQL query contains multiple window functions, each function typically triggers a separate Reduce job, which can consume excessive resources. Optimize performance by merging window functions into a single Reduce job if they meet the following criteria:

  • The OVER clauses are identical (e.g., have the same PARTITION BY and ORDER BY conditions).

  • The window functions are executed in the same SELECT statement.

Window functions that meet these two conditions are automatically merged into a single Reduce job. The following example shows a query with two window functions that can be merged.

SELECT
RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK,
ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num
FROM MyTable;

Subquery optimization

Consider the following query that uses a subquery with an IN clause.

SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);

If the subquery on table_b returns more than 9,999 values for col1, the system reports the error records returned from subquery exceeded limit of 9999. To work around this limitation, rewrite the query using a JOIN statement.

SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);
Note
  • If the DISTINCT keyword is omitted and the subquery on table c returns duplicate values for col1, the final result set may contain more rows from table a than expected.

  • Using DISTINCT can force the subquery to run on a single reducer, which becomes a bottleneck if it processes a large amount of data.

  • If you can guarantee the uniqueness of the col1 values from the subquery based on your business logic, you can remove the DISTINCT keyword to improve performance.

JOIN statement optimization

To ensure efficient partition pruning during a JOIN, always filter partitioned tables as early as possible. Follow these rules:

  • Apply partition-limiting conditions on the primary table within a subquery before the JOIN.

  • Place other WHERE clauses that filter the primary table at the end of the SQL statement.

  • Apply partition-limiting conditions on the secondary table in the ON clause or within a subquery, not in the final WHERE clause.

The following examples illustrate these practices.

SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301;
SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301; -- We recommend that you do not use this statement. The system performs the JOIN operation before it performs partition pruning. This increases the amount of data and causes the query performance to deteriorate. 
SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;

Aggregate function optimization

For string aggregation, the wm_concat function generally offers better performance than collect_list. The following examples show how to use both functions.

-- Implement the collect_list function.
SELECT concat_ws(',', sort_array(collect_list(key))) FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) WITHIN GROUP (ORDER BY key) FROM src;


-- Implement the collect_list function.
SELECT array_join(collect_list(key), ',') FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) FROM src;