MaxCompute allows you to configure MapReduce jobs to use the SQL runtime execution mode. Based on SQL runtime, various new features of the SQL engine can be applied to MapReduce jobs. This topic describes how to configure MapReduce jobs to use the SQL runtime execution mode.
Background information
MaxCompute provides MapReduce APIs. You can use the Java API provided by MapReduce to write the MapReduce program to process data in MaxCompute.
The new version of MaxCompute allows you to configure MapReduce jobs to use the SQL runtime execution mode. You can use the MaxCompute SQL engine compiler, cost-based optimizer (CBO), and vectorized execution engine based on SQL runtime to process MapReduce jobs. Various new features of the SQL engine can also be applied to MapReduce jobs. This helps improve the features, performance, and stability of MapReduce jobs.
After a MapReduce job is configured to use the SQL runtime execution mode, the job can use the new features of the SQL engine of MaxCompute. Compared with original MapReduce jobs, MapReduce jobs in SQL runtime execution mode support the following features:
Use views as the input.
Use external tables as the input.
Support read and write operations on a distributed file system.
Support read and write operations on hash-clustered tables or range-clustered tables.
MapReduce jobs in SQL runtime execution mode also support the following capabilities:
Continuously improve performance by using the CBO optimizer and vectorized execution engine of SQL.
Support a new storage format compression mechanism.
Support dynamic adjustment of the parallelism to improve the performance in scenarios where a JOIN operation is performed on an ultra-large input table, such as a hash-clustered table.
Gain the stability advantage of the SQL engine that is verified based on a large number of jobs and stress testing. Mechanisms such as failovers and persistent volume claim (PVC) are ensured.
Help you obtain information about the input and output of tasks at each stage and the overall procedure based on the detailed execution information, execution plan, compilation information, and job configuration provided by MaxCompute Studio and LogView. The information helps you identify issues and optimize tasks in an efficient manner and improve the development and O&M efficiency.
Precautions
To allow a MapReduce job to run in SQL runtime execution mode, you need to only configure the execution mode for the job. You do not need to modify the original interface or job logic.
Only MapReduce jobs that are compiled by using MapReduce APIs can run in SQL runtime execution mode. For more information about MapReduce APIs, see Overview.
If you run a MapReduce job in SQL runtime execution mode, you are charged for the MapReduce job based on the MapReduce billing rules. For more information, see Pay-as-you-go billing for MapReduce jobs.
Usage notes
Configure the execution mode of MapReduce tasks.
You can configure the
odps.mr.run.modeparameter to specify the execution mode of MapReduce tasks. Valid values:lot: The MapReduce engine is used to run tasks. This is the default value.sql: The SQL engine is used to run tasks. If the execution fails, an error is returned.hybrid: The SQL engine is preferentially used to run MapReduce tasks. If the execution fails, the MapReduce engine is used to run MapReduce tasks.
You can configure the execution mode at the project level or session level.
Configure the execution mode at the project level
To configure the execution mode for all MapReduce jobs in a project, the administrator of the project must run the following command:
setproject odps.mr.run.mode=<lot/sql/hybrid>;Configure the execution mode at the session level
To configure the execution mode for the current MapReduce job, use one of the following methods:
Add the
set odps.mr.run.mode=<lot/sql/hybrid>statement before a JAR statement.Configure the
jobparameter in the job code. Sample code:JobConf job = new JobConf(); job.set("odps.mr.run.mode","hybrid")
NoteIn scenarios where StreamJob or SecondarySort is used, you must add one of the following configurations:
StreamJob:
set odps.mr.sql.stream.enable=true;SecondarySort:
set odps.mr.sql.group.enable=true;
View the job details.
You can use LogView or MaxCompute Studio to view the SQL expressions that are generated by a MapReduce job in SQL runtime execution mode on the client and view the running details of the job. For more information about how to use LogView, see Use LogView V2.0 to view job information.
LogView XML
Open LogView and view the XML information that is submitted from the client on the SourceXML tab. The SQL expression that has the same meaning as the MapReduce job is displayed. Example:
create temporary function mr2sql_mapper_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotMapperUDTF' using ; create temporary function mr2sql_reducer_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotReducerUDTF' using ; @sub_query_mapper := SELECT k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code FROM( SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku_tt_inc WHERE ds = "20180615" AND hh = "21" UNION ALL SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku ) open_mr_alias1 DISTRIBUTE BY k_id SORT BY k_id ASC; @sub_query_reducer := SELECT mr2sql_reducer_152955927079392291755(k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) as (id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code) FROM @sub_query_mapper; FROM @sub_query_reducer INSERT OVERWRITE TABLE ae_antispam.product_sku SELECT id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ;LogView Summary
On the Summary tab of the LogView page, you can view that the
execution engineof SQL runtime is used to run the MapReduce job. Example:NoteFor MapReduce jobs that do not run in SQL runtime execution mode, no execution engine information is displayed. For the extended MapReduce (MR2) jobs of MaxCompute that do not run in SQL runtime execution mode, the execution engine is
cganjiang.Job run mode: fuxi job Job run engine: execution engineLogview JSONSummary
The Json Summary tab for MapReduce contains only the input and output information about Map and Reduce. The Json Summary tab for SQL contains the details of each phase of SQL execution, including all execution parameters, logical plans, physical plans, and execution details. Example:
"midlots" : [ "LogicalTableSink(table=[[odps_flighting.flt_20180621104445_step1_ad_quality_tech_qp_algo_antifake_wordbag_filter_bag_change_result_lv2_20, auctionid,word,match_word(3) {0, 1, 2}]]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$2], word=[$3], match_word=[$4]) OdpsLogicalTableFunctionScan(invocation=[[MR2SQL_MAPPER_152955294118813063732($0, $1)]()], rowType=[RecordType(VARCHAR(2147483647) item_id, VARCHAR(2147483647) text, VARCHAR(2147483647) __tf_0_0, VARCHAR(2147483647) __tf_0_1, VARCHAR(2147483647) __tf_0_2)]) OdpsLogicalTableScan(table=[[ad_quality_tech.qp_algo_antifake_wordbag_filter_bag_change_lv2_20, item_id,text(2) {0, 1}]]) ]