Hologres V2.0 以降では、ランタイムフィルターがサポートされています。複数テーブルの結合シナリオでは、この機能により結合中のフィルタリングが自動的に最適化され、クエリのパフォーマンスが向上します。このトピックでは、Hologres でランタイムフィルターを使用する方法について説明します。
背景情報
シナリオ
Hologres V2.0 以降では、ランタイムフィルターがサポートされています。これは通常、2 つ以上のテーブルが関与するハッシュ結合シナリオで使用され、特に大きなテーブルと小さなテーブルを結合する場合に有効です。手動での構成は不要です。オプティマイザーと実行エンジンは、クエリの実行中にフィルタリングを自動的に最適化します。これにより、I/O オーバーヘッドが削減され、結合クエリのパフォーマンスが向上します。
仕組み
ランタイムフィルターの仕組みを理解するには、まず結合プロセスを理解する必要があります。以下は、2 つのテーブルを結合するためのサンプル SQL 文です。
select * from test1 join test2 on test1.x = test2.x;対応する実行計画を以下に示します。

実行計画に示すように、2 つのテーブルが結合されると、test2 テーブルからハッシュテーブルが構築されます。次に、test1 テーブルのデータがハッシュテーブルと照合されます。最後に、結果が返されます。この結合プロセスには、2 つの重要な用語が関わっています。
Build side: 2 つのテーブルまたはサブクエリがハッシュ結合を使用して結合される場合、一方のテーブルまたはサブクエリのデータを使用してハッシュテーブルが構築されます。この側を build side と呼びます。これは実行計画の Hash ノードに対応します。
Probe side: ハッシュ結合のもう一方の側は、データを読み取り、build side からのハッシュテーブルと照合します。この側を probe side と呼びます。
通常、実行計画が正しければ、小さい方のテーブルが build side に、大きい方のテーブルが probe side になります。
ランタイムフィルターは、build side のデータ分布を使用して軽量なフィルターを生成します。このフィルターは probe side に送信され、そのデータをプルーニング (削減) します。このプロセスにより、ハッシュ結合に関与し、ネットワーク経由で送信される probe side からのデータ量が削減され、ハッシュ結合のパフォーマンスが向上します。
したがって、ランタイムフィルターは、データ量の差が著しい大きなテーブルと小さなテーブルの間の結合に最も効果的です。これらのシナリオでは、標準の結合よりも大幅なパフォーマンス向上が得られます。
制限事項とトリガー条件
制限事項
Hologres V2.0 以降でのみランタイムフィルターがサポートされます。
ランタイムフィルターは、結合条件に単一のフィールドが含まれている場合にのみトリガーされます。ただし、Hologres V2.1 以降では、ランタイムフィルターは複数フィールドでの結合をサポートし、複数の結合フィールドが条件を満たす場合にトリガーされます。
Hologres V4.0 以降でのみ TopN ランタイムフィルターがサポートされます。この機能は、単一テーブルで TopN を計算するシナリオのパフォーマンスを向上させるために使用されます。
トリガー条件
Hologres はパフォーマンス専有型の結合を提供します。SQL 文が次のすべての条件を満たす場合、エンジンは自動的にランタイムフィルターをトリガーします。
probe side のデータ量が 100,000 行以上である。
スキャンされたデータ量の比率:
build side / probe side <= 0.1。比率が小さいほど、Runtime Filter がトリガーされる可能性が高くなります。結合されたデータ量の比率:
build side / probe side <= 0.1。比率が小さいほど、Runtime Filter がトリガーされる可能性が高くなります。
Runtime Filter の種類
ランタイムフィルターは、次の 2 つのディメンションに基づいて分類できます。
ハッシュ結合の probe side がシャッフルを必要とするかどうかに基づいて、フィルターは Local または Global に分類されます。
Local: Hologres V2.0 以降でサポートされています。ハッシュ結合の probe side がシャッフルを必要としない場合、build side データの次の 3 つのシナリオのいずれかで Local ランタイムフィルターを使用できます。
build side と probe side の結合キーが同じデータ分布を持つ。
build side のデータが probe side にブロードキャストされる。
build side のデータが probe side のデータ分布に基づいて probe side にシャッフルされる。
Global: Hologres V2.2 以降でサポートされています。probe side データがシャッフルを必要とする場合、ランタイムフィルターを使用する前にマージする必要があります。この場合、Global ランタイムフィルターが必要です。
Local ランタイムフィルターは、スキャンされるデータ量とハッシュ結合によって処理されるデータを削減できます。Global ランタイムフィルターは、probe side データがシャッフルされる前にデータをフィルター処理するため、ネットワークトラフィックも削減されます。フィルターの種類を指定する必要はありません。エンジンが適切な種類を自動的に選択します。
フィルターの種類に基づいて、Bloom Filter、In Filter、MinMAX Filter に分類されます。
Bloom Filter: Hologres V2.0 以降でサポートされています。Bloom フィルターには誤検知の可能性があります。つまり、除外されるべき一部のデータが除外されないことがあります。ただし、幅広いアプリケーションがあり、build side のデータ量が大きい場合でも、高いフィルタリング効率を提供し、クエリのパフォーマンスを向上させることができます。
In Filter: Hologres V2.0 以降でサポートされています。In フィルターは、build side の個別値の数 (NDV) が少ない場合に使用されます。build side データから HashSet を構築し、フィルタリングのために probe side に送信します。In フィルターの利点は、データを正確にフィルタリングでき、ビットマップインデックスと併用できることです。
MinMAX Filter: Hologres V2.0 以降でサポートされています。MinMAX フィルターは、build side データから最大値と最小値を取得し、フィルタリングのために probe side に送信します。その利点は、メタデータ情報に基づいてファイル全体またはデータのバッチを直接フィルタリングできることであり、これにより I/O コストが削減されます。
フィルターの種類を指定する必要はありません。Hologres は、実行時に結合条件に基づいて適切なフィルターの種類を自動的に選択して使用します。
Runtime Filter の検証
以下の例は、Runtime Filter をよりよく理解するのに役立ちます。
例 1: 1 つの列を持つ結合条件に Local Runtime Filter を使用する
サンプルコード:
begin; create table test1(x int, y int); call set_table_property('test1', 'distribution_key', 'x'); create table test2(x int, y int); call set_table_property('test2', 'distribution_key', 'x'); end; insert into test1 select t, t from generate_series(1, 100000) t; insert into test2 select t, t from generate_series(1, 1000) t; analyze test1; analyze test2; explain analyze select * from test1 join test2 on test1.x = test2.x;実行計画:

test2テーブルには 1,000 行、test1テーブルには 100,000 行あります。build side と probe side のデータ量の比率は 0.01 で、結合されたデータ量の比率も 0.01 です。どちらも 0.1 未満です。これらの条件がデフォルトのトリガー条件を満たすため、エンジンは自動的にランタイムフィルターを使用します。probe side の
test1テーブルにはRuntime Filter Target Exprノードがあります。これは、ランタイムフィルターが probe side にプッシュダウンされたことを示します。probe side では、`scan_rows` はストレージから読み取られたデータを表し、100,000 行です。`rows` は、ランタイムフィルターでフィルタリングされた後のスキャンオペレーターからの行数を表し、1,000 行です。これら 2 つの値の差は、ランタイムフィルターのフィルタリング効果を示しています。
例 2: 複数の列を持つ結合条件に Local Runtime Filter を使用する (Hologres V2.1 でサポート)
サンプルコード:
drop table if exists test1, test2; begin; create table test1(x int, y int); create table test2(x int, y int); end; insert into test1 select t, t from generate_series(1, 1000000) t; insert into test2 select t, t from generate_series(1, 1000) t; analyze test1; analyze test2; explain analyze select * from test1 join test2 on test1.x = test2.x and test1.y = test2.y;実行計画:

結合条件には複数の列があり、Runtime Filter も複数の列に対して生成されます。
build side はブロードキャストされるため、Local ランタイムフィルターを使用できます。
例 3: シャッフル結合に Global Runtime Filter を使用する (Hologres V2.2 でサポート)
サンプルコード:
SET hg_experimental_enable_result_cache = OFF; drop table if exists test1, test2; begin; create table test1(x int, y int); create table test2(x int, y int); end; insert into test1 select t, t from generate_series(1, 100000) t; insert into test2 select t, t from generate_series(1, 1000) t; analyze test1; analyze test2; explain analyze select * from test1 join test2 on test1.x = test2.x;実行計画:

実行計画は、probe side データが Hash Join オペレーターにシャッフルされることを示しています。エンジンはクエリを高速化するために自動的に Global ランタイムフィルターを使用します。
例 4: In フィルターとビットマップインデックスを組み合わせる (Hologres V2.2 でサポート)
サンプルコード:
set hg_experimental_enable_result_cache=off; drop table if exists test1, test2; begin; create table test1(x text, y text); call set_table_property('test1', 'distribution_key', 'x'); call set_table_property('test1', 'bitmap_columns', 'x'); call set_table_property('test1', 'dictionary_encoding_columns', ''); create table test2(x text, y text); call set_table_property('test2', 'distribution_key', 'x'); end; insert into test1 select t::text, t::text from generate_series(1, 10000000) t; insert into test2 select t::text, t::text from generate_series(1, 50) t; analyze test1; analyze test2; explain analyze select * from test1 join test2 on test1.x = test2.x;実行計画:

実行計画は、probe side のスキャンオペレーターでビットマップが使用されていることを示しています。In フィルターは正確なフィルタリングを提供し、出力をわずか 50 行に削減します。`scan_rows` の値は 700 万を超えており、元の 1,000 万行よりも少なくなっています。この削減は、In フィルターをストレージエンジンにプッシュダウンできるためであり、これにより I/O コストを削減できます。In タイプのランタイムフィルターとビットマップを組み合わせることは、結合キーが STRING 型の場合に特に効果的です。
例 5: MinMax フィルターで I/O を削減する (Hologres V2.2 でサポート)
サンプルコード:
set hg_experimental_enable_result_cache=off; drop table if exists test1, test2; begin; create table test1(x int, y int); call set_table_property('test1', 'distribution_key', 'x'); create table test2(x int, y int); call set_table_property('test2', 'distribution_key', 'x'); end; insert into test1 select t::int, t::int from generate_series(1, 10000000) t; insert into test2 select t::int, t::int from generate_series(1, 100000) t; analyze test1; analyze test2; explain analyze select * from test1 join test2 on test1.x = test2.x;実行計画:

実行計画は、probe side のスキャンオペレーターがストレージエンジンから 320,000 行強しか読み取らないことを示しています。これは元の 1,000 万行よりもはるかに少ないです。この削減は、ランタイムフィルターがストレージエンジンにプッシュダウンされるために発生します。データブロックのメタデータを使用してブロック全体をフィルタリングするため、I/O コストを大幅に削減できます。このメソッドは、結合キーが数値型で、build side の値の範囲が probe side の値の範囲よりも小さい場合に特に効果的です。
例 6: TopN Runtime Filter (Hologres V4.0 でサポート)
Hologres では、データはストリーミング方式でブロックごとに処理されます。したがって、SQL 文に topN オペレーターが含まれている場合、Hologres はすべての結果を一度に処理しません。代わりに、動的フィルターを生成してデータを事前フィルタリングします。
次の SQL 文を例にとります。
select o_orderkey from orders order by o_orderdate limit 5;この SQL 文の実行計画は次のとおりです。
QUERY PLAN
Limit (cost=0.00..116554.70 rows=0 width=8)
-> Sort (cost=0.00..116554.70 rows=100 width=12)
Sort Key: o_orderdate
[id=6 dop=1 time=317/317/317ms rows=5(5/5/5) mem=1/1/1KB open=317/317/317ms get_next=0/0/0ms]
-> Gather (cost=0.00..116554.25 rows=100 width=12)
[20:1 id=100002 dop=1 time=317/317/317ms rows=100(100/100/100) mem=6/6/6KB open=0/0/0ms get_next=317/317/317ms * ]
-> Limit (cost=0.00..116554.25 rows=0 width=12)
-> Sort (cost=0.00..116554.25 rows=150000000 width=12)
Sort Key: o_orderdate
Runtime Filter Sort Column: o_orderdate
[id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]
-> Local Gather (cost=0.00..9.59 rows=150000000 width=12)
[id=2 dop=20 time=316/280/256ms rows=1372205(68691/68610/68498) mem=0/0/0B open=0/0/0ms get_next=316/280/256ms local_dop=1/1/1 * ]
-> Seq Scan on orders (cost=0.00..8.24 rows=150000000 width=12)
Runtime Filter Target Expr: o_orderdate
[id=1 split_count=20 time=286/249/222ms rows=1372205(68691/68610/68498) mem=179/179/179KB open=0/0/0ms get_next=286/249/222ms physical_reads=27074(1426/1353/1294) scan_rows=144867963(7324934/7243398/7172304)]
Query id:[1001003033996040311]
QE version: 2.0
Query Queue: init_warehouse.default_queue
======================cost======================
Total cost:[343] ms
Optimizer cost:[13] ms
Build execution plan cost:[0] ms
Init execution plan cost:[6] ms
Start query cost:[6] ms
- Queue cost: [0] ms
- Wait schema cost:[0] ms
- Lock query cost:[0] ms
- Create dataset reader cost:[0] ms
- Create split reader cost:[0] ms
Get result cost:[318] ms
- Get the first block cost:[318] ms
====================resource====================
Memory: total 7 MB. Worker stats: max 3 MB, avg 3 MB, min 3 MB, max memory worker id: 189*****.
CPU time: total 5167 ms. Worker stats: max 2610 ms, avg 2583 ms, min 2557 ms, max CPU time worker id: 189*****.
DAG CPU time stats: max 5165 ms, avg 2582 ms, min 0 ms, cnt 2, max CPU time dag id: 1.
Fragment CPU time stats: max 5137 ms, avg 1721 ms, min 0 ms, cnt 3, max CPU time fragment id: 2.
Ec wait time: total 90 ms. Worker stats: max 46 ms, max(max) 2 ms, avg 45 ms, min 44 ms, max ec wait time worker id: 189*****, max(max) ec wait time worker id: 189*****.
Physical read bytes: total 799 MB. Worker stats: max 400 MB, avg 399 MB, min 399 MB, max physical read bytes worker id: 189*****.
Read bytes: total 898 MB. Worker stats: max 450 MB, avg 449 MB, min 448 MB, max read bytes worker id: 189*****.
DAG instance count: total 3. Worker stats: max 2, avg 1, min 1, max DAG instance count worker id: 189*****.
Fragment instance count: total 41. Worker stats: max 21, avg 20, min 20, max fragment instance count worker id: 189*****.TopN Filter がないと、Scan ノードは `orders` テーブルから各データブロックを読み取り、それらを TopN ノードに渡します。TopN ノードはヒープソートを使用して、これまでに見られた上位 5 行を維持します。
例:
各データブロックには約 8,192 行が含まれています。最初のブロックを処理した後、TopN ノードはそのブロックで 5 番目にランク付けされた o_orderdate を特定します。この日付が 1995-01-01 であると仮定します。
Scan ノードが 2 番目のブロックを読み取るとき、フィルター条件として 1995-01-01 を使用します。o_orderdate ≤ 1995-01-01 である行のみを TopN ノードに送信します。しきい値は動的に更新されます。2 番目のブロックで 5 番目にランク付けされた o_orderdate が現在のしきい値よりも小さい場合、TopN ノードはこの新しい、より小さい値でしきい値を更新します。
EXPLAIN コマンドを使用して、オプティマイザーによって生成された TopN ランタイムフィルターを表示できます。
-> Limit (cost=0.00..116554.25 rows=0 width=12)
-> Sort (cost=0.00..116554.25 rows=150000000 width=12)
Sort Key: o_orderdate
Runtime Filter Sort Column: o_orderdate
[id=3 dop=20 time=318/282/258ms rows=100(5/5/5) mem=96/96/96KB open=318/282/258ms get_next=1/0/0ms]例に示すように、TopN ノードには Runtime Filter Sort Column が表示されます。これは、TopN ノードが TopN Runtime Filter を生成することを示します。