このトピックでは、Realtime Compute for Apache Flink で BITMAP データ型を使用して完全な重複排除を行う方法について説明します。また、典型的な利用シーンにおける SQL の例も紹介します。
背景情報
Flink のリアルタイムコンピューティングにおいて、従来の重複排除ソリューションには以下の制限があります。
リソースオーバーヘッドが高い:
COUNT DISTINCT関数は、すべての重複排除キーをステートに保持する必要があります。大量のデータを処理するとステートが増大し、大量のメモリと CPU リソースを消費します。多次元での拡張が困難: 事前集約ソリューションは柔軟性に欠けます。N 個のディメンションの任意の組み合わせでのクエリをサポートするには、2N 個のグループをリストアップする必要があり、ジョブのロジックが複雑になります。
精度が損なわれる:
APPROX_COUNT_DISTINCTのような近似アルゴリズムには統計的な誤差があります。一部のビジネスシナリオにおける厳格な精度要件を満たすことができません。
BITMAP ソリューション
精度とパフォーマンスのバランスを取るため、Realtime Compute for Apache Flink では BITMAP データ型が導入されています。
仕組み: このソリューションは、一般的に RoaringBitmap アルゴリズムに基づいています。詳細な整数データをビットマップオブジェクトに圧縮します。計算ロジックは、コストのかかるセットのメンテナンスから、和集合、積集合、差集合などの効率的なビットマップ論理演算に変わります。
主な利点: BITMAP はステートのストレージ要件を大幅に削減します。また、精度を損なうことなく、任意のディメンションで柔軟なロールアップ、ドリルダウン、クロス分析を実行できます。
制限事項
Realtime Compute for Apache Flink の Ververica Runtime (VVR) 11.5 以降のバージョンのみが、BITMAP データ型と関連関数をサポートします。
使用方法
BITMAP 型は、RoaringBitmap 標準に基づく 32 ビット符号なし整数のセットです。定数時間でのカーディナリティ計算と効率的な集合代数演算をサポートします。主な使用方法は 2 つあります。
リアルタイムでの完全な重複排除
生の明細データを直接処理できます。これにより、COUNT DISTINCT と同等の重複排除機能を提供しつつ、大規模データに対してより優れたメモリ効率と計算パフォーマンスを実現します。(例 1 をご参照ください)
多次元分析のための事前集約
ビジネスディメンションでスライスされたビットマップを生成し、ストレージに永続化できます。これにより、集合演算関数を使用して、明細データにアクセスすることなく、任意のディメンションで柔軟にロールアップおよびドリルダウン分析を実行できます。(例 2、3、4 をご参照ください)
BITMAP ビルトイン関数の詳細とその使用方法については、「BITMAP 関数」をご参照ください。
例
以下の 4 つの例は、BITMAP がさまざまなビジネス段階でパフォーマンスと柔軟性の問題をどのように解決するかを示しています。
基本シナリオ (例 1): リアルタイムの結果出力に焦点を当てます。この例では、
COUNT DISTINCTを BITMAP に直接置き換えて、より低いリソース消費で同じ完全な重複排除結果を得る方法を示します。高度なシナリオ: データウェアハウスの階層化と柔軟な分析に焦点を当てます。これら 3 つの例は、事前集約と事後計算の完全なワークフローを構成します。
ストレージ (例 2): 明細データをビットマップスライスに変換し、それらを永続化して再利用可能な中間レイヤーを構築します。
分析 (例 3): 中間レイヤーのデータに基づき、ビットマップ演算を使用して、既存データを再処理することなく、任意のディメンションでロールアップやクロス分析を実行できます。
統合 (例 4): リアルタイムストリームとオフラインの既存ビットマップを組み合わせます。これにより、前月比、前年比、保持率など、ストリームとバッチを統合したメトリックを低コストで計算できます。
1. リアルタイムでの完全な重複排除
BITMAP_BUILD_CARDINALITY_AGG 関数を使用して、リアルタイムログから分単位のユニークビジター (UV) をカウントします。
DDL
-- ソーステーブル
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- イベントタグ
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- 分単位の UV 統計情報のシンクテーブル
CREATE TEMPORARY TABLE minute_uv
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- イベントタグ
,minute_uv BIGINT -- UV 数
)
WITH ('connector' = 'jdbc' ...)
;DML
コアロジック:
BITMAP_BUILD_CARDINALITY_AGG関数を使用して、ウィンドウ計算が完了した時点で重複排除とカウントを同時に実行します。シナリオ:リアルタイムダッシュボードや、単純な分単位のモニタリングメトリック。
注意:出力はスカラー値であり、加算的ではありません。セットの重複排除の原則により、異なるタイムウィンドウの統計値を直接足し合わせて、より長い期間の合計 UV を得ることはできません。例えば、2 つの「1 分間の UV」数を単純に足して「2 分間の重複排除された UV」数を得ることはできません。
INSERT INTO minute_uv
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- コア関数:ビットマップを構築し、そのカーディナリティ (重複排除された数) を直接返します。
-- このロジックは COUNT(DISTINCT user_id) と同等ですが、より優れたパフォーマンスを提供します。
BITMAP_BUILD_CARDINALITY_AGG(user_id) AS uv
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;2. 分単位のビットマップの保存
リアルタイムログからの分単位のビットマップデータを外部ストレージシステムに永続化します。
DDL
-- ソーステーブル
CREATE TEMPORARY TABLE user_events
(
user_id INT
,tag STRING -- イベントタグ
,event_time TIMESTAMP(3)
,WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH ('connector' = 'kafka' ...)
;
-- 分単位のビットマップのシンクテーブル
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- イベントタグ
,minute_bm BYTES -- シリアル化された分単位のビットマップ
)
WITH ('connector' = 'jdbc' ...)
;DML
コアロジック:Flink 側で、明細データからビットマップへの軽量な変換を実行します。カウント値を直接出力する代わりに、シリアル化されたバイナリデータを出力できます。
技術詳細:
BITMAP_BUILD_AGGを使用してビットマップオブジェクトを構築し、次にBITMAP_TO_BYTESを使用して、RoaringBitmap 標準に準拠したバイト配列 (VARBINARY/BYTES) にシリアル化します。シナリオ:中間データウェアハウス (DWD/DWS) の構築、または StarRocks や Hologres などのビットマップをサポートするオンライン分析処理 (OLAP) エンジンへの接続。
主な利点:加算的です。出力されたバイナリのビットマップデータはセット情報を保持します。下流のジョブやクエリエンジンは、いつでもこのデータをロードできます。その後、ビットマップの和集合 (OR) 演算を使用して、複数の分単位のビットマップを時間単位または日単位のビットマップにマージできます。これにより、タイムウィンドウをまたいだ柔軟なロールアップが可能になります。
INSERT INTO minute_bitmaps
SELECT
DATE_FORMAT(window_start, 'yyyy-MM-dd') AS ts_date,
DATE_FORMAT(window_start, 'HH') AS ts_hour,
DATE_FORMAT(window_start, 'mm') AS ts_minute,
tag,
-- コアプロセス:ビットマップの構築とシリアル化。
-- 1. BITMAP_BUILD_AGG:ウィンドウ内の user_id を集約し、メモリ内にビットマップオブジェクトを構築します。
-- 2. BITMAP_TO_BYTES:ビットマップオブジェクトを永続ストレージ用の標準バイナリ形式にエンコードします。
BITMAP_TO_BYTES(
BITMAP_BUILD_AGG(user_id)
) AS user_id_bitmap
FROM TABLE(
TUMBLE(
TABLE user_events,
DESCRIPTOR(event_time),
INTERVAL '1' MINUTE
)
)
GROUP BY
window_start,
window_end,
tag;3. ユーザープロファイルとグループフィルター
分単位のビットマップデータに基づき、1 時間あたりにキーワード (A) を検索し、プロダクトをショッピングカートに追加 (B) したが、支払い (C) を完了しなかったユーザー数をカウントします。
DDL
-- 事前計算された分単位のビットマップテーブル
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- イベントタグ
,minute_bm BYTES -- 事前計算されたシリアル化済みの分単位ビットマップ (例 2 より)
)
WITH ('connector' = 'jdbc' ...)
;
-- 時間単位の UV 分析テーブル
CREATE TEMPORARY TABLE hour_combined_uv
(
ts_date STRING
,ts_hour STRING
,tag STRING -- イベントタグ
,uv BIGINT
)
WITH ('connector' = 'jdbc' ...)
;DML
コアロジック:
デシリアライズとロールアップ:ストレージレイヤーのバイナリデータをビットマップオブジェクトに戻します。次に、和集合演算を使用して複数の分単位のビットマップを時間単位のビットマップにマージします。
集合代数演算:メモリ内で異なるディメンションのビットマップに対して、積集合、差集合などの演算を実行します。
カーディナリティ計算:最終的な結果セットの要素数をカウントします。
ビジネスシナリオ:ユーザーペルソナ分析、ファネル分析、または特定のグループフィルター。
主な利点:計算ロジックは完全にデカップリングされています。フィルター条件の追加など、ビジネスロジックの変更は SQL クエリの修正のみで済みます。上流のデータ生成パイプラインを変更する必要はありません。これにより、高度な分析の柔軟性が得られます。
INSERT INTO hour_combined_uv
SELECT
ts_date,
ts_hour,
'A and B andnot C' AS metric_name,
-- ステップ 3:カーディナリティを計算します。
-- 最終的なビットマップ内のユーザー数をカウントし、BIGINT として出力します。
BITMAP_CARDINALITY(
-- ステップ 2:集合代数演算を実行します。
-- 論理式:(タグ A ∩ タグ B) - タグ C
BITMAP_ANDNOT(
BITMAP_AND(hour_bm_a, hour_bm_b),
hour_bm_c
)
) AS uv
FROM (
SELECT
ts_date,
ts_hour,
-- ステップ 1:デシリアライズし、時間でロールアップします。
-- 保存されているバイナリデータ (BYTES) をビットマップに戻し、
-- BITMAP_OR_AGG を使用して、時間内のすべての分単位のビットマップをマージ (和集合) します。
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'A') AS hour_bm_a,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'B') AS hour_bm_b,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm))
FILTER (WHERE tag = 'C') AS hour_bm_c
FROM minute_bitmaps
WHERE tag IN ('A', 'B', 'C')
GROUP BY
ts_date,
ts_hour
);4. リアルタイムリテンション分析
分単位および日単位のビットマップデータに基づき、この例では、特定ページの当日特定分までのリアルタイム再訪問ユーザー数を計算します。
DDL
-- 事前計算された分単位のビットマップテーブル
CREATE TEMPORARY TABLE minute_bitmaps
(
ts_date STRING
,ts_hour STRING
,ts_minute STRING
,tag STRING -- ページ名
,minute_bm BYTES -- 事前計算されたシリアル化済みの分単位ビットマップ
)
WITH ('connector' = 'jdbc' ...)
;
-- 事前計算された日単位のビットマップテーブル
CREATE TEMPORARY TABLE daily_bitmaps
(
ts_date STRING
,tag STRING -- ページ名
,daily_bm BYTES -- 事前計算されたシリアル化済みの日単位ビットマップ
)
WITH ('connector' = 'jdbc' ...)
;
-- リアルタイムリテンションメトリックのシンクテーブル
CREATE TEMPORARY TABLE realtime_retention_metrics
(
ts_date STRING
,tag STRING -- ページ名
,retained_users BIGINT -- 当日の再訪問数
,total_users BIGINT -- 前日の総訪問者数
,update_time TIMESTAMP(3)
,PRIMARY KEY (ts_date, tag) NOT ENFORCED
)
WITH ('connector' = 'jdbc' ...)
;DML
コアロジック:
リアルタイム集約:当日の分単位のビットマップストリームをリアルタイムで集約し、その日の累積ビットマップを作成します。
ストリームとバッチの JOIN:JOIN 操作を使用して、当日のリアルタイムビットマップを、ストレージレイヤーからの前日の完全なビットマップと関連付けます。
積集合演算:2 日間のユーザーグループのビットマップに対して
BITMAP_AND(積集合) 演算を実行し、重複するユーザーのセットを取得します。
シナリオ:販売促進に対するユーザーの応答のリアルタイムモニタリング、またはゲームにおけるデイリーアクティブユーザーの定着率分析。
主な利点:
極めて高速な計算:従来のソリューションでは、
COUNT DISTINCTを計算するために、数億行を持つ 2 つの大きなテーブルを JOIN する必要があります。このプロセスは、莫大なオーバーヘッドと高いレイテンシーを伴います。BITMAP ソリューションでは、いくつかのバイナリオブジェクトに対してビットマップ演算を実行するだけで、計算はミリ秒単位で完了します。ストレージのデカップリング:daily_bitmaps などの既存データは、HDFS、OSS、JDBC などの外部システムに保存できます。これにより、Flink のステートリソースを長期間使用することを回避できます。
INSERT INTO realtime_retention_metrics
SELECT
T.ts_date,
T.tag,
-- メトリック 1:当日のリアルタイム再訪問 (リテンションユーザー)。
-- ロジック:本日出現したユーザー ∩ 昨日出現したユーザー。
BITMAP_CARDINALITY(
BITMAP_AND(
T.today_bm, -- 本日のリアルタイムビットマップ
BITMAP_FROM_BYTES(Y.daily_bm) -- 昨日の既存ビットマップ (デシリアライズが必要)
)
) AS retained_users,
-- メトリック 2:昨日のベースラインとなる訪問者数。
-- ロジック:昨日のビットマップのカーディナリティを直接読み取り、リテンション率計算の分母として使用します。
BITMAP_CARDINALITY(
BITMAP_FROM_BYTES(Y.daily_bm)
) AS total_users,
CURRENT_TIMESTAMP AS update_time
FROM (
-- サブクエリ T:リアルタイムストリーム処理。
-- 当日の分単位のスライスをリアルタイムで集約し、本日の「当日現在まで」のビットマップを作成します。
SELECT
ts_date,
tag,
BITMAP_OR_AGG(BITMAP_FROM_BYTES(minute_bm)) AS today_bm
FROM minute_bitmaps
GROUP BY ts_date, tag
) T
-- コア JOIN:ストリーム (本日) とバッチ (昨日) を結合します。
INNER JOIN daily_bitmaps Y
ON T.tag = Y.tag
-- JOIN 条件:T の前日のデータと一致させます。
AND CAST(T.ts_date AS DATE) = CAST(Y.ts_date AS DATE) + INTERVAL '1' DAY;