Progressive computation is a method that combines stream processing and batch processing to improve resource efficiency and reduce query latency. This topic describes the basic and tuning configurations of progressive computation and provides a sample job script.
Background information
During progressive computation, incremental data is captured and intermediate results are retained. This reduces resource consumption and query latency and provides greater flexibility in execution scheduling and data granularity. In the following example, a range query is used to calculate the weekly total sales of a product. Compared with traditional batch processing, progressive computation reduces the computational load by 70% in each run after the first run.
Traditional batch processing:
Starting from day n+1 (n>=7), the data that spans from day n-5 to day n is redundantly calculated.
Progressive computation:
Starting from day n+1 (n>=7), only the incremental data on the current day is calculated. The result is then combined with the calculated result for the previous six days to obtain the weekly result. This eliminates redundant processing and reduces the computational load by 70%.
NoteIn the first run of progressive computation for a range query, intermediate results are calculated for each day in the range, which may require longer time than batch processing. Therefore, we recommend that you perform a manual run in advance to ensure optimal performance in the baseline period.
During progressive computation, the intermediate results are stored until they are no longer used. The cost of additional storage is offset by the elimination of redundant calculation. Therefore, progressive computation is still more cost-efficient and faster than traditional batch processing.
Basic configuration
The first time you use the progressive computation feature, we recommend that you test its performance in the development environment. If the performance is as expected, you can use the feature in the production environment. To use the feature, perform the following steps:
Enable the feature when you submit a job.
Format:
set odps.progressive.enable=[true|false];Parameters:
true: enables the progressive computation feature.
false (default): disables the progressive computation feature.
Configure the computation mode.
Format:
set odps.progressive.range.query.input.partition.pattern=<pattern_value>;Parameters:
Valid values of the pattern_value parameter:
PASS_BY_HOUR: uses a hour-level or minute-level sliding window to process data. Applicable scenarios include querying data of the last 5 minutes or the last 3 hours.
PASS_BY_HOUR_AND_DAY: uses a multi-day sliding window to process data and stores the data in the most recent window to different partitions by hour. Applicable scenarios include querying data of the last 3 hours or days.
PASS_BY_DAY: uses a multi-day sliding window to process data. Applicable scenarios include querying data of the last 3 days.
INCREASING: uses an accumulation window that spans multiple days to process data and stores the data in the most recent window to different partitions by day. Applicable scenarios include querying data from the last 50 days or the last 3 years.
INCREASING_IN_A_MONTH: uses an accumulation window that spans from the first day of the current month to the current day.
INCREASING_IN_A_YEAR: uses an accumulation window that spans from the first day of the current year to the current day.
(Optional) If you set the pattern_value parameter to PASS_BY_HOUR in the previous step, you can specify a time column that represents a finer granularity based on your business requirements.
Format
Option 1:
set odps.progressive.range.query.time.partition.col.names=default:<col_name_day>:<col_name_hour>:<col_name_minute>|<col_name_day>:<col_name_hour>:<col_name_minute>|...;Option 2:
set odps.progressive.range.query.time.partition.col.names=<table_name>:<col_name_day>:<col_name_hour>:<col_name_minute>|<col_name_day>:<col_name_hour>:<col_name_minute>|...;Option 3:
set odps.progressive.range.query.time.partition.col.names=default:<col_name_day>:<col_name_hour>:<col_name_minute>,<table_name>:<col_name_day>:<col_name_hour>:<col_name_minute>,..;
Parameters:
default: specifies that all tables that meet the specified conditions are searched. This parameter is a keyword.
table_name: specifies the name of the table whose time column you want to query. You can specify multiple tables. Example: set odps.progressive.range.query.time.partition.col.names=table_1:day:hour,table_2:hour:minute;
<col_name_day>:<col_name_hour>:<col_name_minute>: specifies the time column you want to query. You can specify multiple columns. Example: set odps.progressive.range.query.time.partition.col.names=table_1:day:hour|day:hour2,table_2:day:hour:minute|day:hour2;
Sample configuration:
Run progressive computation every hour.
set odps.progressive.range.query.input.partition.pattern=PASS_BY_HOUR:1;Specify a finer-grained time column.
set odps.progressive.range.query.time.partition.col.names=default:ds:hh|dt:hour;
Tuning configuration
Specify a storage method for intermediate tables.
In progressive computation, data in intermediate tables is shuffled before calculation. To improve the computation performance, add the following configuration to store intermediate tables as cluster tables.
Format:
set range.query.force.cluster.table=[true|false];Parameters:
true: stores intermediate tables as cluster tables.
false (default): The system automatically selects a storage method based on the presence of the shuffle operation.
NoteWe recommend that you set this parameter to true.
Specify the maximum number of concurrent instances.
The first run of progressive computation is resource-intensive because all data within the specified time range is processed. To prevent excessive resource consumption during the first run, we recommend that you add the following configuration.
Format:
set odps.progressive.combine.exec.time.limit.num=<number>;Parameters:
number: the maximum number of concurrent instances. Default value: 15. Set this parameter to a value greater than or equal to 1.
NoteSpecify a value based on the actual running status of the job. An inappropriate value affects the running efficiency.
Example
First run
In this example of progressive computation, a multi-day sliding time window is used. The first time window to query is [20200801,20200807]. Sample job script:
set odps.progressive.enable=true; set odps.progressive.range.query.input.partition.pattern=PASS_BY_DAY; CREATE TABLE adl_aegis_webshell_test_neoke AS SELECT request_datetime,host,uri,src_ip,src_port,dst_ip,dst_port,method,post_data,user_agent,ret_code,cookie, referer,x_forward_for,rsp_content_type,rqs_content_type,content_length,jump_location,set_cookie,ttl, get_bigwebshell_uri(uri) AS nopar_uri,internet_ip FROM secbase.adl_aegis_webshell_newadd_beaverlog WHERE ds >= TO_CHAR(DATEADD(TO_DATE('20200807','yyyymmdd'),-6,'dd') ,'yyyymmdd') AND ds <= '20200807';Second run
In the first run, the system calculates the result for each partition and saves the results to seven intermediate tables. Each table corresponds to a day in the specified time window. In the second run on the second day, the job processes data in the time window [20200802, 20200808]. The data corresponding to the 20200808 column is calculated separately and then combined with the results in the seven intermediate tables to obtain the final result.