Runtime filters cut join query time by pruning the left table before a hash join runs. Instead of scanning every row, ApsaraDB for SelectDB generates a filter at runtime from the right table and pushes it down to the scan layer — so only matching rows enter the join.
Runtime filters are enabled by default. ApsaraDB for SelectDB automatically generates IN predicates and bloom filters based on the query and table statistics. Use session variables to tune behavior for specific queries.
How it works
In a hash join, the right table is loaded first to build a hash table. A runtime filter captures the key values from that hash table and sends the filter to the OlapScanNode scanning the left table. The OlapScanNode discards non-matching rows before they reach the HashJoinNode.
The following example uses two tables: T1 (a fact table with 1,000,000 rows) and T2 (a dimension table with 200 rows).
Without a runtime filter — all 1,000,000 rows from T1 flow up to the join:
HashJoinNode
| |
| 1,000,000 | 200
| |
OlapScanNode OlapScanNode
^ ^
| 1,000,000 | 200
T1 (fact) T2 (dimension)With a runtime filter applied at the scan layer — only 6,000 rows reach the join:
HashJoinNode
| |
| 6,000 | 200
| |
OlapScanNode OlapScanNode
^ ^
| 1,000,000 | 200
T1 (fact) T2 (dimension)With the filter pushed further down to the storage engine — indexes prune the data before it is read:
HashJoinNode
| |
| 6,000 | 200
| |
OlapScanNode OlapScanNode
^ ^
| 6,000 | 200
T1 (fact) T2 (dimension)Unlike predicate pushdown or partition pruning, the filter condition is not known at query planning time. It is computed from the actual data in the right table during execution, then broadcast to the OlapScanNode reading the left table.
Key concepts
Left table — the table on the left side of a join; used for the probe operation. Join reorder can adjust which table is on which side.
Right table — the table on the right side of a join; used to build the hash table. Join reorder can adjust this as well.
Fragment — a unit of query execution. The frontend (FE) node splits an SQL statement into fragments and dispatches them to backend (BE) nodes in the distributed cluster.
When runtime filters help
Runtime filters are most effective when:
The left table is significantly larger than the right table. Generating a filter has a memory and compute cost; this only pays off at scale.
The join result is much smaller than the left table — meaning the filter can discard most left-table rows.
If the right table is large or the join result is nearly as large as the left table, runtime filters may add overhead without benefit.
Filter types
ApsaraDB for SelectDB supports five runtime filter types:
| Type | How it works | Best for | Limitations |
|---|---|---|---|
| IN predicate | Builds a HashSet of all right-table key values; filters left table with IN | Small right tables in broadcast joins | Only works for broadcast joins; becomes invalid when right-table rows exceed runtime_filter_max_in_num (default: 1,024) |
| Bloom filter | Builds a probabilistic structure from the hash table; has a small false positive rate | Large right tables, most data types | Higher creation and application overhead; may hurt performance if filtering rate is low or left table is small |
| MinMax filter | Extracts the min/max range from the right table; filters rows outside that range | Numeric key columns with non-overlapping ranges | Ineffective on non-numeric columns (e.g., VARCHAR); no benefit when ranges overlap completely |
| IN_OR_BLOOM_FILTER | Automatically selects IN predicate or bloom filter based on right-table row count at runtime | General-purpose use (default) | Threshold controlled by runtime_filter_max_in_num; uses IN below 102,400 rows, bloom filter above |
| Bitmap filter | Filters using a bitmap column returned by an IN subquery | Queries with bitmap IN subqueries | Only supported in the vectorized engine |
Configure runtime filters
Use the following session variables to tune runtime filter behavior for specific queries.
Parameters
| Parameter | Default | Description |
|---|---|---|
runtime_filter_mode | GLOBAL | Controls how far filters propagate across execution fragments. Values: OFF, LOCAL, GLOBAL. |
runtime_filter_type | IN_OR_BLOOM_FILTER | Filter type(s) to generate. Values: IN, BLOOM_FILTER, MIN_MAX, IN_OR_BLOOM_FILTER, BITMAP_FILTER. Combine with commas. |
runtime_filter_wait_time_ms | 1000 | Maximum wait time (ms) the OlapScanNode holds before starting a scan. Applies per filter: three filters means up to 3,000 ms total. |
runtime_filters_max_num | 10 | Maximum number of bloom filters per query. Bloom filters with higher selectivity are kept when the limit is exceeded. |
runtime_bloom_filter_min_size | 1048576 (1 MiB) | Minimum bloom filter size in bytes. |
runtime_bloom_filter_max_size | 16777216 (16 MiB) | Maximum bloom filter size in bytes. |
runtime_bloom_filter_size | 2097152 (2 MiB) | Default bloom filter size when right-table cardinality is unavailable. |
runtime_filter_max_in_num | 1024 | Row count threshold above which no IN predicate is generated. Also controls the IN vs. bloom filter switch in IN_OR_BLOOM_FILTER mode (threshold: 102,400). |
runtime_filter_mode
Controls the scope of filter propagation between query execution fragments.
LOCAL— the filter producer (HashJoinNode) and consumer (OlapScanNode) must be in the same fragment. Suitable for broadcast joins. Lower overhead.GLOBAL— filters are merged and transferred across fragment boundaries over the network. Required for shuffle joins where the producer and consumer are in different fragments.
GLOBAL covers all scenarios that LOCAL handles, plus shuffle joins. Switch to LOCAL if the overhead of merging and transmitting filters outweighs the scan savings for a particular shuffle join. Set to OFF to disable runtime filters entirely.
For the technical design of cross-fragment filter merging, see ISSUE 6116.
runtime_filter_type
Specify one type or combine multiple types:
-- By name (comma-separated, quoted)
SET runtime_filter_type = "BLOOM_FILTER,IN,MIN_MAX";
-- Equivalent numeric form (1=IN, 2=BLOOM_FILTER, 4=MIN_MAX)
SET runtime_filter_type = 7;IN predicate behavior:
Uses a merge strategy for distributed execution.
When IN and other filter types are both specified and the right table stays within
runtime_filter_max_in_num, the system drops the other filters — an IN predicate is exact, so additional filters add no benefit. This optimization applies only when the producer and consumer are in the same fragment.
Bloom filter behavior:
Has a non-zero false positive rate, meaning it may pass some non-matching rows. Results remain correct; only filtering efficiency is affected.
Can be pushed down to the storage engine, but only for key columns in the left table. Without storage-layer pushdown, performance often deteriorates.
Avoid using bloom filters on columns with low selectivity or on small left tables.
MinMax filter behavior:
Most effective when the value ranges of the right and left tables do not overlap (e.g., right-table max is below left-table min, or vice versa).
On numeric columns (INT, BIGINT, DOUBLE), overlapping ranges reduce effectiveness to zero.
On non-numeric columns (VARCHAR), typically degrades performance.
IN_OR_BLOOM_FILTER behavior:
Uses IN predicate when right-table rows < 102,400; switches to bloom filter above that threshold.
Adjust the threshold with
runtime_filter_max_in_num.
Bitmap filter behavior:
Only applicable when the IN subquery returns a bitmap column.
Requires the vectorized engine.
runtime_filter_wait_time_ms
The OlapScanNode waits up to this duration for each assigned runtime filter before starting a scan. With three filters, the maximum wait is three times this value.
Filters that arrive within the wait window are pushed down to the storage engine. Filters that arrive after the scan has started are applied as expression filters on already-scanned data — effective but less efficient than storage-layer pushdown.
Tuning guidance:
Busy clusters running long-running joins — increase this value so filters have time to be built and transmitted before the scan starts.
Light clusters running many small queries — decrease this value to avoid adding unnecessary latency to queries that finish in seconds.
If a cluster is busy and many resource-intensive or time-consuming queries are executed in the cluster, you can prolong the waiting duration to prevent complex queries from missing optimization opportunities. If the cluster load is light and many small queries that take only a few seconds are executed in the cluster, you can reduce the waiting duration to prevent an increase of 1s latency for each query.
runtime_filters_max_num
Caps the number of bloom filters per query. IN predicates and MinMax filters are not counted.
When the limit is exceeded, the system retains bloom filters with higher selectivity — those expected to filter out more rows:
Selectivity = HashJoinNode cardinality / HashJoinNode left child cardinalityFE-estimated cardinality can be inaccurate, so selectivity-based ranking may not perfectly reflect actual filtering effectiveness.
Adjust this parameter only when tuning slow join queries between large tables.
Bloom filter size parameters
The FE calculates bloom filter length at query planning time. All HashJoinNode bloom filters in a query must have the same length to be mergeable.
If right-table cardinality is available in statistics, the FE estimates the optimal size and rounds up to the nearest power of 2.
If cardinality is unavailable, the FE uses
runtime_bloom_filter_sizeas the default.runtime_bloom_filter_min_sizeandruntime_bloom_filter_max_sizebound the final size regardless of the estimate.
Larger bloom filters handle high-cardinality columns more accurately but consume more memory. If filtering accuracy is insufficient for a high-cardinality column (millions of distinct values), increase runtime_bloom_filter_size and benchmark the result.
Adjust bloom filter size per query, not globally.
Verify that runtime filters are applied
Run EXPLAIN to view the query plan and confirm filters are generated and consumed on the expected columns.
Join side (filter generated):
runtime filters: RF000[in] <- table.columnScan side (filter applied):
runtime filters: RF000[in] -> table.column
Example:
CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2;
INSERT INTO test VALUES (1), (2), (3), (4);
CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2;
INSERT INTO test2 VALUES (3), (4), (5);
EXPLAIN SELECT t1 FROM test JOIN test2 WHERE test.t1 = test2.t2;+-------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`t1` |
| |
| 4:EXCHANGE |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test`.`t1` |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BUCKET_SHUFFLE) |
| | equal join conjunct: `test`.`t1` = `test2`.`t2` |
| | runtime filters: RF000[in] <- `test2`.`t2` |
| | |
| |----3:EXCHANGE |
| | |
| 0:OlapScanNode |
| TABLE: test |
| runtime filters: RF000[in] -> `test`.`t1` |
| |
| PLAN FRAGMENT 2 |
| OUTPUT EXPRS: |
| PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test2`.`t2` |
| |
| 1:OlapScanNode |
| TABLE: test2 |
+-------------------------------------------------------------------+Join side — 2:HASH JOIN in PLAN FRAGMENT 1 generates an IN predicate (RF000) from test2.t2. The values are only known at runtime.
Scan side — 0:OlapScanNode applies RF000 to filter test.t1 before rows reach the join.
Running the query returns [3, 4] — only the two rows where t1 = t2.
Check filter effectiveness with a profile:
Enable profiling, then run the query:
SET enable_profile = true;In the profile, look for the RuntimeFilter section under OLAP_SCAN_NODE:
RuntimeFilter:in:
HasPushDownToEngine: true -- filter reached the storage engine
AWaitTimeCost: 0ns -- no wait; filter arrived before scan started
EffectTimeCost: 2.76ms -- time spent applying the filterAnd the filtering result:
RowsVectorPredFiltered: 9,320,008 -- rows discarded by the filter
VectorPredEvalTime: 364.39ms -- time spent on filter evaluationA high RowsVectorPredFiltered count confirms the filter is effective. HasPushDownToEngine: true confirms it reached the storage layer.
Planning rules
Runtime filters are generated and applied according to the following rules. Violations of these rules cause filters to be skipped or produce incorrect results.
Generation rules:
Filters are generated only for equality conditions in JOIN ON clauses. NULL-safe equality (
<=>) is excluded because null values in the left table could be incorrectly filtered.The source expression type cannot be
HLLorBITMAP.Source and target expressions cannot be constants.
Source and target expressions cannot be the same expression.
Types of the source and target expressions must match (bloom filters are hash-based). If types differ, the system attempts to cast the target expression to the source type.
Filters from
PlanNode.Conjunctsare not pushed down — these can produce incorrect results. For example, when anINsubquery is rewritten as a join, the system stores the auto-generated JOIN condition inPlanNode.Conjuncts; applying a runtime filter there may cause rows to be dropped from the results.
Pushdown rules:
Filters can only be pushed down to OlapScanNode. Other scan node types are not supported.
Filters cannot be pushed to the left table of left outer joins, full outer joins, or anti-joins.
The target expression must reference a column that exists in the original base table.
The target expression cannot contain null-checking expressions such as
COALESCE,IFNULL, orCASE.
Column conduction rules:
If the JOIN ON clause contains
A.k = B.k AND B.k = C.k, the filter forC.kcan be pushed down toB.konly — not propagated further toA.k.If the JOIN ON clause contains
A.a + B.b = C.candA.ais equivalent toB.a,A.acan be replaced withB.aand the filter pushed toB. IfA.aandB.aare not equivalent, the filter cannot be pushed toBbecause the target expression must be bound to a single left table.