全部產品
Search
文件中心

MaxCompute:MapReduce支援SQL運行時執行模式

更新時間:May 22, 2025

MaxCompute新增支援將MapReduce作業指定為SQL運行時(Runtime)執行模式,基於SQL運行時,MapReduce作業可以應用SQL引擎的各種新特性,實現之前不支援的功能。本文為您介紹如何將MapReduce作業指定為SQL運行時執行模式。

背景資訊

MaxCompute提供了MapReduce編程介面,您可以使用MapReduce提供的介面(Java API)編寫MapReduce程式處理MaxCompute中的資料。

新版MaxCompute支援將MapReduce作業指定為SQL運行時執行模式。基於SQL運行時,MapReduce可以使用MaxCompute SQL引擎編譯器、基於代價的最佳化器和向量化執行的執行引擎,同時可以複用SQL引擎開發的各種新特性,包括功能、效能、穩定性等方面不斷的最佳化改進。

MapReduce指定為SQL運行時執行後可以使用MaxCompute SQL的新特性,可以滿足原來MapReduce不支援的功能。相比原來MapReduce,新增如下功能:

  • 支援輸入源為視圖。

  • 支援輸入源為外部表格。

  • 支援Distributed File System讀寫。

  • 支援Hash或Range聚簇表讀寫。

此外還新增支援如下能力:

  • 可以持續獲得SQL在CBO最佳化器和向量化執行引擎等方面的持久性能最佳化收益。

  • 可以使用新的儲存格式壓縮機制。

  • 支援動態調整並發度,提高輸入表為HashCluster等超大表Join情境的效能。

  • 可以獲得經過大量作業和壓力驗證的SQL引擎穩定性優勢,Failover和PVC等機制更有保障。

  • MaxCompute Studio和Logview針對SQL作業提供了更詳細的執行資訊、執行計畫、編譯資訊、作業配置資訊等,可以對任務運行各階段輸入輸出及整體流程一目瞭然,能方便的定位問題和最佳化,提高開發和營運效率。

注意事項

  • 該功能不需要您對原介面和作業邏輯做任何改動,只需要指定執行模式。

  • 該功能僅支援MapReduce介面編寫的MapReduce作業,MapReduce介面請參見原生SDK概述

  • 基於SQL運行時執行模式運行MapReduce作業,MapReduce作業仍然按照MapReduce計費規則進行計費,詳情請參見MapReduce隨用隨付

使用說明

  1. 設定執行模式。

    通過odps.mr.run.mode開關控制,取值如下:

    • lot(預設):使用MapReduce引擎執行任務。

    • sql:使用SQL引擎執行任務。如果執行不通過,則會報錯。

    • hybrid:優先嘗試使用SQL引擎執行,不通過時會回退到MapReduce引擎執行。

    您可通過兩種方式設定執行模式:

    Project層級控制

    全域性統一開啟,針對所有作業有效,需要Project管理員執行如下命令:

    setproject odps.mr.run.mode=<lot/sql/hybrid>;

    Session級控制

    只針對當前作業有效,有如下兩種方式:

    • 運行JAR語句前添加set odps.mr.run.mode=<lot/sql/hybrid>語句。

    • 在作業代碼中通過job設定,如下所示:

      JobConf job = new JobConf();
      job.set("odps.mr.run.mode","hybrid")
    說明

    對於特殊功能情境StreamJob和SecondarySort的功能,需要設定以下Flag:

    • StreamJob: set odps.mr.sql.stream.enable=true;

    • SecondarySort: set odps.mr.sql.group.enable=true;

  2. 查看運行詳情。

    您可通過Logview、MaxCompute Studio等查看MapReduce基於SQL運行時執行在用戶端產生的SQL運算式,以及作業的運行詳情,Logview操作說明請參見使用Logview 2.0查看作業運行資訊

    • Logview XML

      開啟Logview,在Source XML頁簽中查看用戶端提交的XML資訊,顯示了MapReduce作業同義的SQL表達。樣本如下所示:

      create temporary function mr2sql_mapper_152955927079392291755 as   'com.aliyun.odps.mapred.bridge.LotMapperUDTF' using ; 
      create temporary function mr2sql_reducer_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotReducerUDTF' using ; 
      
      @sub_query_mapper :=
      SELECT k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code
      FROM(
        SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code)
        FROM ae_antispam.product_sku_tt_inc
        WHERE ds = "20180615"  AND hh = "21"                     
        UNION ALL
        SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code)
        FROM ae_antispam.product_sku
      ) open_mr_alias1
      DISTRIBUTE BY k_id SORT BY k_id ASC;
      
      @sub_query_reducer := 
      SELECT mr2sql_reducer_152955927079392291755(k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) as (id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code)
      FROM @sub_query_mapper;
      FROM @sub_query_reducer	
      INSERT OVERWRITE TABLE ae_antispam.product_sku
      SELECT id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ;
    • Logview Summary

      在Logview的Summary頁簽您可以看到作業的執行採用了SQL運行時的執行引擎execution engine,樣本如下:

      說明

      未開啟SQL運行時執行模式的MapReduce作業無執行引擎資訊,未開啟SQL運行時執行模式的MaxCompute擴充MapReduce(MR2)作業的執行引擎為cganjiang

       Job run mode: fuxi job
       Job run engine: execution engine
    • Logview JSONSummary

      MapReduce的JSONSummary資訊僅包含了簡單的Map和Reduce輸入輸出資訊。 SQL的JSONSummary資訊可以查看SQL執行各階段的詳細資料,包含所有執行參數、邏輯計劃、物理計劃和執行詳情等。樣本如下所示:

       "midlots" : 
       [
       "LogicalTableSink(table=[[odps_flighting.flt_20180621104445_step1_ad_quality_tech_qp_algo_antifake_wordbag_filter_bag_change_result_lv2_20, auctionid,word,match_word(3) {0, 1, 2}]])
      OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2])
      OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2])
      OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2])
      OdpsLogicalProject(auctionid=[$2], word=[$3], match_word=[$4])
      OdpsLogicalTableFunctionScan(invocation=[[MR2SQL_MAPPER_152955294118813063732($0, $1)]()], rowType=[RecordType(VARCHAR(2147483647) item_id, VARCHAR(2147483647) text, VARCHAR(2147483647) __tf_0_0, VARCHAR(2147483647) __tf_0_1, VARCHAR(2147483647) __tf_0_2)])
      OdpsLogicalTableScan(table=[[ad_quality_tech.qp_algo_antifake_wordbag_filter_bag_change_lv2_20, item_id,text(2) {0, 1}]])
      ]