All Products
Search
Document Center

MaxCompute:Diagnostic cases of Logview

Last Updated:Jun 28, 2023

In most cases, enterprises need job results to be generated earlier than expected. This way, they can make business development decisions based on the results at the earliest opportunity. In this case, job developers must pay attention to the job status to identify and optimize the jobs that run slowly. You can use Logview of MaxCompute to diagnose jobs that run slowly. This topic provides the causes for which jobs run slowly and the related solutions. This topic also describes how to view information about the jobs that run slowly.

Diagnose a job that fails to run

If a job fails to run, you can view error information on the Result tab of Logview. The Result tab is automatically displayed when you open Logview for a failed job.

Possible causes:

  • The SQL syntax is incorrect. In this case, no directed acyclic graph (DAG) or Fuxi job exists because the job is not submitted to the computing cluster for execution.

  • An error occurs in the user-defined function (UDF) that is used. You can click the DAG on the Result tab of Logview to identify the UDF that causes the error. Then, view the error information in StdOut or StdError.

  • Other errors occur. For more information about other errors, see Error code overview.

Diagnose a job that runs slowly

Compilation stage

When a job is at the compilation stage, you can view information about the job in Logview but the execution plan for the job is not run. The compilation stage of a job can be classified into the following substages based on the information that is displayed on the SubStatusHistory tab of Logview: scheduling, optimization, generation of an execution plan, and data replication across clusters. Compilation stageAt the compilation stage, the following issue may occur: A job is stuck at a specific substage and remains suspended for a long period of time. This section describes the possible causes of job suspension at each substage and their solutions.

  • Scheduling

    Problem description: The sub-status of the job is Waiting for cluster resource. The job is waiting to be compiled.

    Cause: The resources of the computing cluster are insufficient.

    Solution: Check the status of the computing cluster and the required resources of the computing cluster. If you use a subscription cluster, you can scale out resources.

  • Optimization

    Problem description: The sub-status of the job is SQLTask is optimizing query. The optimizer is optimizing the execution plan.

    Cause: The execution plan is complex. The optimizer requires a long period of time to optimize the execution plan.

    Solution: Wait for the optimizer to complete the optimization. This process takes less than 10 minutes in most cases.

  • Generation of an execution plan

    Problem description: The sub-status of the job is SQLTask is generating execution plan.

    Cause 1: Data of an excessive number of partitions is read.

    Solution: Optimize SQL statements to reduce the number of partitions. For example, you can perform partition pruning, filter out partitions from which data does not need to be read, and split large jobs into smaller jobs. For more information about how to determine whether partition pruning takes effect in SQL statements and common scenarios in which partition pruning fails, see Check whether partition pruning is effective.

    Cause 2: Excessive small files are generated. Small files are generated in the following scenarios:

    1. An incorrect operation is performed when Tunnel commands are used to upload data. For example, a new upload session is created each time a data record is uploaded. For more information, see FAQ about Tunnel commands.

    2. When you execute an INSERT INTO statement on a partitioned table, a new file is generated in the partition directory.

    Solutions:

    1. Use the TunnelBufferedWriter interface to upload data in a more efficient manner. This prevents excessive small files from being generated.

    2. Manually merge small files. For more information, see Merge small files.

    Note

    If the number of small files is greater than 10,000, you can enable automatic merging of small files. The system automatically merges small files every day. However, if the system fails to merge small files in special scenarios, you must manually merge the small files.

  • Data replication across clusters

    Problem description: Task rerun appears multiple times on the SubStatusHistory tab, and FAILED: ODPS-0110141:Data version exception appears on the Result tab. In this case, the job does not fail. Instead, the job is replicating data across clusters.

    Cause 1: Data is migrated between clusters for the project. In this case, a large number of jobs that replicate data across clusters are running in the first one or two days after the migration is complete.

    Solution: Wait until data replication across clusters is complete as expected.

    Cause 2: Data is migrated between clusters for the project. However, partitions are not filtered as expected. As a result, old data is read from specific partitions.

    Solution: Filter out the partitions that contain old data.

Execution stage

An execution plan is displayed on the Job Detail tab of Logview. The execution plan is not complete and the job is in the Running state. If a job is stuck at the execution stage or requires an unexpectedly long period of time to complete at this stage, this issue may occur due to the following reasons: waiting for resources, data skew, inefficient UDF execution, and data bloat. This section describes the possible causes and solutions.

  • Waiting for resources

    Description: All instances of the job are in the Ready state. Alternatively, some instances are in the Running state and the other instances are in the Ready state. If an instance is in the Ready state but historical information exists in the Debug column for the instance, a retry is triggered due to an instance failure. The instance is not waiting for resources. Waiting for resources

    Solution

    1. Determine whether the queuing status of the job is normal. You can view the position of the job in the queue on the Basic Info page of Logview. The Queue Length parameter that is shown in the following figure indicates the position of the job in the queue. PositionYou can also view the resource usage of your quota group in the MaxCompute console. If the quota of a specific resource is nearly exhausted or the quota usage exceeds the specified limit, the resources in the quota group may be insufficient. In this case, the job needs to wait in the queue. The scheduling sequence of a job is related to the submission time and priority of the job and the memory or vCPU cores that are required by the job.

    2. View the jobs that use the quota group.

      Large jobs that have low priorities may be submitted, or multiple small jobs are submitted at a time. The jobs occupy a large number of resources. You can contact the owner of the jobs to terminate the jobs and release the resources that are occupied by the jobs.

    3. Change the quota group of the job to a quota group of another project.

    4. Scale out resources. This solution is suitable only for users who use subscription resources.

  • Data skew

    Description: Most instances in the task are complete, but several instances are not complete and have long tails. In the following figure, most instances are complete, but 21 instances are in the Running state. These instances may process a large amount of data. As a result, these instances run slowly. Data skew

    Solution: For more information about common causes of data skew and the related optimization methods, see Data skew tuning.

  • Inefficient UDF execution

    In this topic, UDFs refer to various user-defined extensions, including user-defined scalar functions (UDFs), user-defined aggregate functions (UDAFs), user-defined table-valued functions (UDTFs), user-defined joins (UDJs), and user-defined types (UDTs).

    Description: The execution efficiency of a task is low, and the task includes UDFs. The following error message that indicates a UDF execution timeout may even appear: Fuxi job failed - WorkerRestart errCode:252,errMsg:kInstanceMonitorTimeout, usually caused by bad udf performance.

    Troubleshooting: If an error is reported in a task, you can check whether a UDF is included in the task in the DAG on MaxCompute Studio. The following figure shows that the error is reported in Task R4_3 and this task includes a UDF written in Java. TroubleshootingDouble-click Task R4_3 to expand the operator view. You can view the names of all UDFs in the task. The following figure shows the operator view. ExpandYou can also view the number of input records, the number of output records, and the processing time of the UDF in the StdOut log of the task. The information helps you determine whether a performance issue occurs in the UDF. The following figure shows the StdOut log of the task. In normal cases, the value of Speed(records/s) is approximately one million or one hundred thousand. If the value is approximately ten thousand, a performance issue exists. UDF

    Solution: If a performance issue occurs, you can use the following method to troubleshoot the issue and optimize the performance.

    1. Check whether an error occurs in the UDF.

      In specific cases, the performance issue is caused by a specific data value. For example, an infinite loop occurs when a specific value appears. MaxCompute Studio allows you to download specific sample data of a table and use the data on your on-premises machine for troubleshooting. For more information, see Java UDFs and Python UDF in the MaxCompute Studio development manual.

    2. Check whether the name of the UDF is the same as a built-in function.

      A built-in function may be overwritten by a UDF whose name is the same. If a function appears to be a built-in function, you must determine whether a UDF whose name is the same can overwrite the built-in function.

    3. Use a built-in function to replace the UDF.

      If built-in functions that provide similar features exist, we recommend that you do not use UDFs. Built-in functions are verified and used in a more efficient manner. In addition, optimizers perform white-box testing on built-in functions. Therefore, more optimizations can be made. For more information about the usage of built-in functions, see Built-in functions.

    4. Replace specific UDFs with the built-in functions that provide similar features, and retain only the UDFs that cannot be implemented by using built-in functions.

    5. Optimize the evaluate method of UDFs.

      Use the evaluate method in only necessary operations related to parameters. Perform initialization-related operations or repeated calculations in advance because the evaluate method is repeatedly run.

    6. Estimate the period of time that is required to run a UDF.

      Simulate the amount of data that is processed by one instance on your on-promises machine to test the period of time that is required to run a UDF. Then, optimize the implementation of the UDF. By default, the maximum period of time that is required to run a UDF is 30 minutes. A UDF must return data within 30 minutes or use context.progress() to report heartbeats. If the estimated period of time that is required to run a UDF is longer than 30 minutes, you can configure a parameter to specify the timeout period of a UDF.

      Default value: 1800. Unit: seconds. Valid values: 1 to 3600.
      -- Specify the timeout period of a UDF. Unit: seconds. Default value: 600.
      -- You can manually adjust the timeout period of the UDF in the range of [0,3600].
    7. Modify memory parameters.

      The low efficiency of UDFs is not necessarily due to the computational complexity. The storage complexity may also affect the efficiency of UDFs. Examples:

      • Memory overflow occurs if a UDF performs in-memory computing on or sorts a large amount of data.

      • Insufficient memory causes high garbage collection (GC) frequency.

      You can modify memory parameters to temporarily handle the preceding issues. The specific optimization must be performed based on your business requirements. Example:

      set odps.sql.udf.jvm.memory=
      -- Specify the maximum memory size that can be used for the JVM heap of a UDF. Default value: 1024. Unit: MB.
      -- You can change the value of the odps.sql.udf.jvm.memory parameter in the range of [256,12288].
      Note

      If a UDF is used, partition pruning may become invalid. The UdfProperty annotation is supported from MaxCompute V2.0. When you define a UDF, you can use the annotation for the compiler to specify that the UDF is deterministic. Sample code:

      @com.aliyun.odps.udf.annotation.UdfProperty(isDeterministic = true)
      public class AnnotatedUdf extends com.aliyun.odps.udf.UDF {
          public String evaluate(String x) {
              return x;
          }
      }

      If you rewrite the SQL statement for a UDF, you can use the UDF in partition filtering.

      -- SQL statement before rewriting
      SELECT * FROM t WHERE pt = udf('foo');  -- pt indicates a partition key column of t.
      -- SQL statement after rewriting
      SELECT * FROM t WHERE pt = (SELECT udf('foo')); -- pt indicates a partition key column of t.
  • Data bloat

    Description: The amount of output data of a Fuxi task is significantly greater than the amount of input data. For example, the data size becomes 1 TB after 1 GB of data is processed. When 1 TB of data is processed on an instance, the operation efficiency is significantly reduced. After a job is run, you can view I/O Record and I/O Bytes of tasks to obtain the amounts of input and output data. The following figure shows an example.Output data amountIf a job is not complete for a long period of time at the join stage, you can view the logs in StdOut of specific Fuxi instances that are in the Running state. The following figure shows the related operations.View logs in StdOutStdOut indicates that logs for merge joins are continuously generated. This indicates that the related worker always performs merge joins. In the following figure, the value in the red box indicates that more than 143.3 billion data records are generated for merge joins. In this case, severe data bloat occurs. You must check whether the JOIN condition and join keys are correct.View logs in StdOut

    Solution

    • Check the following issues in the code: whether the JOIN condition is correct, whether the JOIN condition is written as a Cartesian product, whether the UDTF is normal, and whether excessive output data is generated.

    • Check whether data bloat is caused by aggregation.

      Most aggregators perform recursive aggregation. When an aggregator aggregates data, the aggregator first merges the intermediate results. The amount of the intermediate result data is not large and the computing complexity of most aggregators is low. Aggregation can be complete quickly even if the amount of data is large. In most cases, data bloat does not occur during aggregation. However, if you perform aggregation in the following scenarios, data bloat may occur:

      • Enable aggregation in the SELECT statement to perform the DISTINCT operation in different dimensions. Data expands each time the DISTINCT operation is performed.

      • If you use the GROUPING SETS or CUBE | ROLLUP statement, the size of the intermediate result data may be increased multiple times compared with the original data size. However, if you use a specific statement such as COLLECT_LIST and MEDIAN, you must retain all intermediate result data. This may cause specific issues.

    • Prevent data bloat that occurs due to JOIN operations.

      For example, you want to join two tables. The left table contains a large amount of population data. However, the processing efficiency is high because of the high parallelism of MaxCompute instances. The right table is a dimension table that records information about each gender, such as possible bad habits of each gender. The dimension table contains only two genders but hundreds of rows that correspond to each gender exist. If you join the tables by gender, data in the left table may be expanded hundreds of times. To prevent data bloat, you can aggregate the rows of data in the right table into two rows of data before you join the tables.

    • Check whether data bloat is caused by the GROUPING SET statement. When the GROUPING SET statement is executed, data is expanded and the output data is increased multiple times compared with the number of groups. The current execution plan cannot adapt to the GROUPING SET statement or change the degree of parallelism of downstream tasks. You can manually configure the degree of parallelism of downstream tasks. Sample statements:

      set odps.stage.reducer.num = xxx; 
      set odps.stage.joiner.num = xxx;

Termination stage

Most SQL jobs immediately stop after Fuxi jobs are complete. In specific scenarios, the overall progress of an SQL job is still running when Fuxi jobs are complete. In the following figure of Logview, the Job Details tab on the right side of the Basic Info page shows that the stages of all Fuxi jobs are Terminated, but the Status parameter on the left side indicates that the overall progress of the SQL job is Running.runningThis issue may occur in the following scenarios:

  1. The SQL job includes multiple Fuxi jobs. For example, subqueries are run at multiple stages, or jobs that automatically merge small files are run because excessive small files are generated.

  2. At the termination stage, the SQL job takes a long period of time in the control cluster. For example, the job updates the metadata of dynamic partitions. The following section provides examples of common scenarios.

  • Subquery execution at multiple stages

    In most cases, subqueries of MaxCompute SQL are compiled into the same Fuxi DAG. Therefore, all subqueries and main queries are complete by one Fuxi job. However, specific special subqueries need to be separately run before main queries. Sample code:

    SELECT
         product,
        sum(price)
     FROM
         sales
     WHERE
         ds in (SELECT DISTINCT ds FROM t_ds_set)
     GROUP BY product;

    The subquery SELECT DISTINCT ds FROM t_ds_set is run first. The result of the subquery is used for partition pruning and is used to optimize the number of partitions that the main query needs to read. Two Fuxi jobs are run for the subquery and partition pruning. In Logview, two tabs are displayed to indicate the two jobs. The first tab shows that all Fuxi tasks of job_0 are successful, but the second tab shows that job_1 is running. The following figure shows the details.SubqueryYou need to only click the second tab to view the result of job_1. The following figure shows the details.Second tab

  • Excessive small files

    Excessive small files mainly affect the storage performance and computing performance.

    • Storage: Excessive small files increase the workload on the Apsara Distributed File System. This affects the storage usage.

    • Computing: The overall processing performance is affected because the processing efficiency of MaxCompute on a single large file is higher than the processing efficiency on multiple small files. Therefore, when an SQL job is complete, the operation that is used to merge small files is automatically triggered if specific conditions are met. This helps prevent the system from generating excessive small files.

  • Solution: Check whether the job triggers automatic merging of small files in Logview. The process of a merge job is similar to the process that is described in Subquery execution at multiple stages. In Logview, a merge job is displayed on a tab. The merge tasks that are generated to automatically merge small files increase the overall execution time of the current job. However, after the merge tasks are complete, the number and size of files that are generated in the result table are close to the expected values. This prevents heavy workloads on the file system. The read performance of the table is improved when the table is used by subsequent jobs. The following figure shows an example of a merge job.Tab

    If excessive small files exist, SELECT statements may be executed for a long period of time when a job is at the termination stage. When the system generates and displays the execution results of SELECT statements, the system needs to open a large number of small files to read data from the files. This process is time-consuming. To prevent the system from generating a large number of execution results, we recommend that you do not use SELECT statements. You can use Tunnel commands to download data. If the number of execution results is not large but the number of files is excessively large, we recommend that you check whether the odps.merge.smallfile.filesize.threshold parameter is properly configured. For more information about how to merge small files, see Merge small files.

  • Metadata update in dynamic partitions

    Problem description: After a Fuxi job is complete, you may need to perform specific operations related to the metadata. For example, if you want to move the result data to a specific directory and update the metadata of the table, a large number of partitions may be generated in the table during dynamic partitioning. As a result, the process is time-consuming. For example, the insert into ... values statement is executed for the partitioned table sales to add 2,000 partitions. Sample statement:

    INSERT INTO TABLE sales partition (ds)(ds, product, price)
    VALUES ('20170101','a',1),('20170102','b',2),('20170103','c',3), ...;

    After the Fuxi job is complete, the metadata of the table still needs to be updated in a specific period of time. The following figure shows that SQLTask is updating meta information is displayed on the SubStatusHistory tab of Logview. This information indicates that the job is stuck. Job is stuck

  • Increase in the size of the output file

    Problem description: When the number of input data records is similar to the number of output data records, the size of the result data may be increased multiple times compared with the size of the input data. Result

    Solution: This issue may be caused by the change in data distribution. When data is written to a table, the data is compressed. The compression algorithm applies the highest compression ratio on duplicate data. If duplicate data is arranged together during data writing, the compression ratio is high. The distribution of data that is written to a table varies based on how data is shuffled and sorted in the table during data writing. This process corresponds to the R12_7_8 task in the preceding figure. In the preceding figure, the last operation in the SQL statement that corresponds to the R12_7_8 task is JOIN. The following sample code shows the join key.

    on  t1.query = t2.query and t1.item_id=t2.item_id

    The characteristics of the data indicate that most columns are the attributes of items. In this case, all columns for the items that have the same item_id are the same. Therefore, all items in the query are sorted in random order. This causes a low compression ratio. The following sample code provides an example on how to modify the join sequence. After the modification, the data size is decreased to 1/3 of the original data size.

    on t1.item_id=t2.item_id and t1.query = t2.query

    After the modification, the data size is decreased to 1/3 of the original data size.

    In another case, the shuffle operations that are generated by JOIN or GROUP BY do not contain the most suitable sorting column for compression. In this case, you can use the ZORDER BY clause to sort the items on your on-premises machine. This way, you can obtain an expected compression ratio at a lower cost. You can also execute a DISTRIBUTED BY SORT BY SQL statement to manually rearrange the data distribution. This method requires a long period of time for data calculation and a high CPU utilization.