This topic describes how to use the computing pushdown feature of Data Lake Analytics (DLA). This feature effectively accelerates data computations for Tablestore.

Background information

If you use the serverless Presto engine of DLA to query and compute data, the data is usually extracted from a specific data source. If the amount of data extracted from the data source is large, the computing efficiency is severely reduced. To address this issue, DLA introduces the computing pushdown feature. This feature allows the serverless Presto engine to push down some data computations to the data source. After data computations, the data source returns the computing results to the serverless Presto engine. This effectively reduces data transfers between the data source and serverless Presto engine and improves computing efficiency in some scenarios, for example, when you perform aggregate operations.

Supported operators

Operator Type
FILTER
  • OR/AND pushdown.
  • LIKE/NOT LIKE pushdown.
  • IS NULL/NOT NULL pushdown.
  • BETWEEN XX AND XX pushdown.
  • Logical expression push-down. The following logical expressions are supported: =, <, <=, >, >=, and <>.
  • IN/NOT IN pushdown.
LIMIT
  • LIMIT pushdown.
  • ORDER BY XX LIMIT N pushdown.
AGGREGATE
  • MIN(COL) pushdown.
  • MAX(COL) pushdown.
  • SUM(COL) pushdown.
  • COUNT(*), COUNT(COL), and COUNT(DISTINCT COL) pushdown.
  • AVG(COL) pushdown.
  • GROUP BY pushdown.

Limits

The computing pushdown feature has the following limits:

  • Only simple expressions are supported, such as a > 10, sum(a), and avg(a). The expressions that are not supported include a + b > 10, abs(a) > 10, and sum(a + b).
  • All the columns involved must belong to the same index of Tablestore. If the columns do not belong to the same index, some or all of the operators are not pushed down.
  • The LIKE operator must be 1 to 20 characters in length. For example, a like '%123456789%' contains no more than 20 characters, including the percent sign (%).
  • A maximum of four columns can be specified in a GROUP BY clause, for example, GROUP BY a1,a2,a3,a4. If more than four columns are specified in a GROUP BY clause, data computations cannot be pushed down to Tablestore.
  • By default, GROUP BY pushdown is disabled. After GROUP BY pushdown is enabled, Tablestore can return a maximum of 2,000 aggregate data records to DLA. If the number of aggregate data records exceeds 2,000, the aggregate results will be incorrect. For more information about how to enable the computing pushdown feature, see Enable the computing pushdown feature.

Enable the computing pushdown feature

By default, the computing pushdown feature is disabled. To enable this feature, set ots-index-first to auto and ots-pushdown-enabled to true. Sample statements:

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 limit 100;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select count(l_orderkey),sum(l_partkey), avg(l_linenumber) from lineitem;

GROUP BY pushdown

To enable GROUP BY pushdown, you must enable the computing pushdown feature and add the hint ots-groupby-pushdown-enabled=true. Sample statements:

/*+ots-index-first=auto,ots-pushdown-enabled=true,ots-groupby-pushdown-enabled=true*/ 
select
l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= '1998-12-01'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

Verify computing pushdown

You can add the EXPLAIN keyword before a SELECT statement to check whether data computations are pushed down to Tablestore. Sample statement:

explain select
l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= '1998-12-01'
group by
    l_returnflag,
    l_linestatus;

The following results are returned:

- Output[l_returnflag, l_linestatus, sum_qty, sum_base_price, avg_qty, avg_price, avg_disc, count_order] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
        sum_qty := sum
        sum_base_price := sum_7
        avg_qty := avg
        avg_price := avg_8
        avg_disc := avg_9
        count_order := count
    - RemoteStreamingMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
        - LocalMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
            - PartialSort[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                - RemoteStreamingExchange[REPARTITION] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                        Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                    - TableScan[TableHandle {connectorId='ots', connectorHandle='table=ots20201208_mzl.lineitem', layout='Optional[table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]]'}] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                            Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                            LAYOUT: table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]
                            avg_8 := OtsColumnHandle{columnName=avg(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
                            avg := OtsColumnHandle{columnName=avg(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
                            sum := OtsColumnHandle{columnName=sum(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
                            avg_9 := OtsColumnHandle{columnName=avg(l_discount), mappedName=L_DISCOUNT, primaryKey=false, columnType=double}
                            count := OtsColumnHandle{columnName=count(*), mappedName=_ID, primaryKey=false, columnType=bigint}
                            l_returnflag := OtsColumnHandle{columnName=l_returnflag, mappedName=L_RETURNFLAG, primaryKey=false, columnType=varchar}
                            sum_7 := OtsColumnHandle{columnName=sum(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
                            l_linestatus := OtsColumnHandle{columnName=l_linestatus, mappedName=L_LINESTATUS, primaryKey=false, columnType=varchar}

In the preceding execution plan, the information about aggregate nodes is not displayed and OtsQueryGeneratorContex in TableScan includes the pushdown information. This indicates that data computations have been pushed down to Tablestore.