All Products
Search
Document Center

MaxCompute:Control computing costs

Last Updated:Jan 26, 2024

If your MaxCompute bill constantly increases and costs become difficult to manage, you can optimize SQL jobs and MapReduce jobs to reduce computing costs. This topic describes how to control the computing costs of SQL jobs and MapReduce jobs.

Estimate computing costs

You can estimate computing costs before you run computing jobs. For more information, see TCO tools in the "Select a billing method" topic. You can also configure alerts for resource consumption to prevent extra costs from being generated. If computing costs are high, you can use the methods described in this topic to reduce the costs.

Control the computing costs of SQL jobs

Some SQL jobs that trigger full table scans incur high computing costs. The frequent scheduling of SQL jobs may cause an accumulation of jobs, which also increases computing costs. If an accumulation occurs and the pay-as-you-go billing method is used, jobs are queued and require more resources. As a result, the bill generated the next day is abnormally high. You can use the following methods to control the computing costs of SQL jobs:

  • Avoid frequent scheduling. MaxCompute provides a computing service to process large amounts of data at a time. It is different from real-time computing services. If SQL jobs are executed at short intervals, computing frequency is increased. The increased computing frequency and improper execution of SQL jobs cause increases in computing costs. If you require frequent scheduling, use CostSQL to estimate the costs of SQL jobs to avoid extra costs.

  • Reduce full table scans. You can use the following methods:

    • Specify the required parameters to disable the full table scan feature. You can disable the feature for a session or project.

      -- Disable the feature for a session. 
      set odps.sql.allow.fullscan=false;
      -- Disable the feature for a project. 
      SetProject odps.sql.allow.fullscan=false;
    • Prune columns. Column pruning allows the system to read data only from the required columns. We recommend that you do not use the SELECT * statement. This is because a full table scan is triggered when you execute the statement.

      SELECT a,b FROM T WHERE e < 10;

      In this statement, the T table contains the a, b, c, d, and e columns. However, only the a, b, and e columns are read.

    • Prune partitions. Partition pruning allows you to specify filter conditions for partition key columns. This way, the system reads data only from the required partitions. This avoids the errors and waste of resources caused by full table scans.

      SELECT a,b FROM T WHERE partitiondate='2017-10-01';
    • Optimize SQL keywords that incur costs. The keywords include JOIN, GROUP BY, ORDER BY, DISTINCT, and INSERT INTO. You can optimize the keywords based on the following rules:

      • Before a JOIN operation, you must prune partitions. Otherwise, a full table scan may be performed. For more information about scenarios in which partition pruning is invalid, see Scenarios where partition pruning does not take effect in the "Check whether partition pruning is effective" topic.

      • Use UNION ALL instead of FULL OUTER JOIN.

        SELECT COALESCE(t1.id, t2.id) AS id, SUM(t1.col1) AS col1
         , SUM(t2.col2) AS col2
        FROM (
         SELECT id, col1
         FROM table1
        ) t1
        FULL OUTER JOIN (
         SELECT id, col2
         FROM table2
        ) t2
        ON t1.id = t2.id
        GROUP BY COALESCE(t1.id, t2.id);
        -- Optimized: 
        SELECT t.id, SUM(t.col1) AS col1, SUM(t.col2) AS col2
        FROM (
         SELECT id, col1, 0 AS col2
         FROM table1
         UNION ALL
         SELECT id, 0 AS col1, col2
         FROM table2
        ) t
        GROUP BY t.id;
      • Do not include GROUP BY in UNION ALL. Use GROUP BY outside UNION ALL.

        SELECT t.id, SUM(t.val) AS val
        FROM (
         SELECT id, SUM(col3) AS val
         FROM table3
         GROUP BY id
         UNION ALL
         SELECT id, SUM(col4) AS val
         FROM table4
         GROUP BY id
        ) t
        GROUP BY t.id;
        Optimized:---------------------------
        SELECT t.id, SUM(t.val) AS val
        FROM (
         SELECT id, col3 AS val
         FROM table3
         UNION ALL
         SELECT id, col4 AS val
         FROM table4
        ) t
        GROUP BY t.id;
      • To sort temporarily exported data, sort the data by using tools such as EXCEL instead of ORDER BY.

      • Do not use DISTINCT. Use GROUP BY instead.

        SELECT COUNT(DISTINCT id) AS cnt
        FROM table1;
        Optimized:---------------------------
        SELECT COUNT(1) AS cnt
        FROM (
         SELECT id
         FROM table1
         GROUP BY id
        ) t;
      • Do not use INSERT INTO to write data. Add a partition field instead. This reduces SQL complexity and saves computing costs.

  • Do not execute SQL statements to preview table data. You can use the table preview feature to view table data. This method does not consume computing resources. If you use DataWorks, you can preview a table and query details about the table on the Data Map page. For more information, see View the details of a table. If you use MaxCompute Studio, double-click a table to preview its data.

  • Select an appropriate tool for data computing. MaxCompute responds to a query within minutes. It is not suitable for frontend queries. Computing results are synchronized to an external storage system. Most users use relational databases to store results. We recommend that you use MaxCompute for lightweight computing jobs and relational databases, such as ApsaraDB for RDS, for frontend queries. Frontend queries require the real-time generation of query results. If the query results are displayed in the frontend, no conditional clauses are executed on the data. The data is not aggregated or associated with dictionaries. The queries do not even include the WHERE clause.

Control the computing costs of MapReduce jobs

You can use the following methods to control the computing costs of MapReduce jobs:

  • Configure the required settings

    • Split size

      The default split size for a mapper is 256 MB. The split size determines the number of mappers. If your code logic for a mapper is time-consuming, you can use JobConf#setSplitSize to reduce the split size. You must configure an appropriate split size. Otherwise, excessive computing resources may be consumed.

    • MapReduce Reduce Instance

      By default, the number of reducers that are used to complete a job is one fourth of the number of mappers. You can set the number of the reducers to a value that ranges from 0 to 2,000. More reducers require more computing resources, which increases costs. You must appropriately configure the number of reducers.

  • Reduce the number of MapReduce jobs

    If multiple MapReduce jobs are correlated and the output of a job is the input of the next job, we recommend that you use the pipeline mode. The pipeline mode allows you to merge multiple serial MapReduce jobs into a single job. This reduces redundant disk I/O operations caused by intermediate tables and improves performance. This also simplifies job scheduling and enhances process maintenance efficiency. For more information, see Pipeline examples.

  • Prune the columns of input tables

    For input tables that contain a large number of columns, only a few columns are processed by a mapper. When you add an input table, you can specify the columns to reduce the amount of data that needs to be read. For example, to process data in the c1 and c2 columns, use the following configuration:

    InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);

    After the configuration, the mapper reads data only from the c1 and c2 columns. This does not affect the data that is obtained based on column names. However, this may affect the data that is obtained based on subscripts.

  • Avoid the duplicate reads of resources

    We recommend that you read resources in the setup stage. This prevents performance loss caused by duplicate resource reads. You can read resources for up to 64 times. For more information, see Resource usage example.

  • Reduce the overheads of object construction

    For Java objects that are used each time in the map and reduce stages, prevent constructing them in the map or reduce function. You can put them in the setup stage to prevent overheads caused by multiple constructions.

    {
        ...
        Record word;
        Record one;
    
        public void setup(TaskContext context) throws IOException {
    
    
          // Create a Java object in the setup stage. This prevents the repeated creation of Java objects in each map stage. 
          word = context.createMapOutputKeyRecord();
    
          one = context.createMapOutputValueRecord();
    
          one.set(new Object[]{1L});
    
        }
        ...
    }
  • Use a combiner in the proper manner

    If the output of a map task contains multiple duplicate keys, you can use a combiner to merge these keys. This reduces transmission bandwidth and shuffling overheads. If the output of a map task does not contain multiple duplicate keys, using a combiner may incur extra overheads. A combiner implements a reducer interface. The following code defines the combiner in a WordCount program:

    /**
       * A combiner class that combines map output by sum them.
       */
      public static class SumCombiner extends ReducerBase {
    
        private Record count;
    
        @Override
        public void setup(TaskContext context) throws IOException {
          count = context.createMapOutputValueRecord();
        }
    
        @Override
        public void reduce(Record key, Iterator<Record> values, TaskContext context)
            throws IOException {
          long c = 0;
          while (values.hasNext()) {
            Record val = values.next();
            c += (Long) val.get(0);
          }
          count.set(0, c);
          context.write(key, count);
        }
      }
  • Appropriately select partition key columns or customize a partitioner

    You can use JobConf#setPartitionColumns to specify partition key columns. The default partition key columns are defined in the key schema. If you use this method, data is transferred to reducers based on the hash values of the specified columns. This prevents long-tail issues caused by data skew. You can also customize a partitioner if necessary. The following code shows how to customize a partitioner:

    import com.aliyun.odps.mapred.Partitioner;
    
    public static class MyPartitioner extends Partitioner {
    
    @Override
    public int getPartition(Record key, Record value, int numPartitions) {
      // numPartitions indicates the number of reducers.
      // This function is used to determine the reducers to which the keys of map tasks are transferred. 
      String k = key.get(0).toString();
      return k.length() % numPartitions;
    }
    }

    Configure the following settings in jobconf:

    jobconf.setPartitionerClass(MyPartitioner.class)

    Specify the number of reducers in jobconf.

    jobconf.setNumReduceTasks(num)
  • Configure JVM memory parameters as required

    The large memory of a MapReduce job increases computing costs. In the standard configuration, 1 CPU core and 4 GB of memory are configured, and odps.stage.reducer.jvm.mem is set to 4006. A large CPU core-to-memory ratio (greater than 1:4) also increases computing costs.

References