すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Paimon と StarRocks を使用したストリーミングデータレイクハウスの構築

最終更新日:Nov 09, 2025

このトピックでは、Realtime Compute for Apache Flink、Apache Paimon、StarRocks を使用してストリーミングデータレイクハウスを構築する方法について説明します。

背景情報

社会のデジタル化が進むにつれて、ビジネスではデータへのより迅速なアクセスが求められています。従来のオフラインデータウェアハウスでは、スケジュールされたジョブを使用して、前の期間の新しい変更を、オペレーショナルデータストア (ODS)、データウェアハウス詳細 (DWD)、データウェアハウスサマリー (DWS)、アプリケーションデータストア (ADS) などの階層的なデータウェアハウスレイヤーにマージします。しかし、このアプローチには、高レイテンシーと高コストという 2 つの大きな欠点があります。オフラインジョブは通常、1 時間ごとまたは 1 日ごとに実行されるため、データコンシューマーは前の時間または日のデータしか表示できません。さらに、データ更新ではパーティション全体が上書きされることがよくあります。このプロセスでは、パーティション内の元のデータを再読み取りして新しい変更とマージし、新しい結果を生成する必要があります。

Realtime Compute for Apache Flink と Apache Paimon を使用してストリーミングデータレイクハウスを構築すると、これらの問題が解決されます。Flink のリアルタイムコンピューティング機能により、データはデータウェアハウスレイヤー間をリアルタイムで流れます。Paimon の効率的な更新機能により、データの変更が分単位のレイテンシーでダウンストリームのコンシューマーに配信されます。したがって、ストリーミングデータレイクハウスは、レイテンシーとコストの両面で利点があります。

Apache Paimon の機能の詳細については、「機能」を参照し、Apache Paimon 公式 Web サイトをご覧ください。

アーキテクチャと利点

アーキテクチャ

Realtime Compute for Apache Flink は、大量のリアルタイムデータを効率的に処理する強力なストリームコンピューティングエンジンです。Apache Paimon は、高スループットの更新と低レイテンシーのクエリをサポートする、ストリーミングとバッチの統合データレイクストレージフォーマットです。Paimon は Flink と深く統合されており、オールインワンのストリーミングデータレイクハウスソリューションを提供します。Flink と Paimon で構築されたストリーミングデータレイクハウスのアーキテクチャは次のとおりです。

  1. Flink はデータソースから Paimon にデータを書き込み、ODS レイヤーを作成します。

  2. Flink は ODS レイヤーで changelog をサブスクライブして処理し、データを Paimon に再書き込みして DWD レイヤーを作成します。

  3. Flink は DWD レイヤーで changelog をサブスクライブして処理し、データを Paimon に再書き込みして DWS レイヤーを作成します。

  4. 最後に、オープンソースのビッグデータプラットフォーム EMR 上の StarRocks は、Paimon 外部テーブルを読み取り、アプリケーションクエリをサポートします。

image

利点

このソリューションには、次の利点があります。

  • 各 Paimon レイヤーは、分単位のレイテンシーでダウンストリームに変更を配信できます。これにより、従来のオフラインデータウェアハウスのレイテンシーが時間または日から分に短縮されます。

  • 各 Paimon レイヤーは、パーティションを上書きすることなく、変更データを直接受け入れることができます。これにより、従来のオフラインデータウェアハウスでのデータ更新と修正のコストが大幅に削減されます。また、中間レイヤーのデータがクエリ、更新、または修正が困難であるという問題も解決されます。

  • モデルが統一され、アーキテクチャが簡素化されます。ETL (抽出·変換·書き出し) パイプラインのロジックは Flink SQL を使用して実装されます。ODS、DWD、および DWS レイヤーのデータは Paimon に一元的に保存されます。これにより、アーキテクチャの複雑さが軽減され、データ処理効率が向上します。

このソリューションは、次の表に示すように、Paimon の 3 つのコア機能に依存しています。

Paimon のコア機能

詳細

プライマリキーテーブルの更新

Paimon は、基盤となるレイヤーで Log-Structured Merge-tree (LSM ツリー) データ構造を使用して、効率的なデータ更新を実現します。

Paimon プライマリキーテーブルと基盤となるデータ構造の詳細については、「プライマリキーテーブル」および「ファイルレイアウト」をご参照ください。

Changelog プロデューサー

Paimon は、任意の入力データストリームに対して完全な changelog を生成できます。すべての `update_after` データには、対応する `update_before` データがあります。これにより、データの変更がダウンストリームに完全に渡されることが保証されます。詳細については、「Changelog 生成メカニズム」をご参照ください。

マージエンジン

Paimon プライマリキーテーブルが同じプライマリキーを持つ複数のレコードを受信すると、結果テーブルはそれらを 1 つのレコードにマージしてプライマリキーの一意性を維持します。Paimon は、重複排除、部分更新、事前集約など、さまざまなマージ動作をサポートしています。詳細については、「データマージメカニズム」をご参照ください。

シナリオ

このトピックでは、e コマースプラットフォームを例として、ストリーミングデータレイクハウスを構築してデータを処理およびクレンジングし、上位レイヤーのアプリケーションからのデータクエリをサポートする方法を示します。ストリーミングデータレイクハウスは、データの階層化と再利用を実装します。トランザクションダッシュボードのレポートクエリ、行動データ分析、ユーザーペルソナのタグ付け、パーソナライズされた推奨など、さまざまなビジネスシナリオをサポートします。

image

  1. ODS レイヤーの構築: ビジネスデータベースからデータウェアハウスにデータをリアルタイムで取り込みます。
    MySQL データベースには、`orders`、`orders_pay`、`product_catalog` の 3 つのビジネス テーブルが含まれています。Flink は、これらのテーブルのデータをリアルタイムで OSS に書き込み、Paimon フォーマットで保存して ODS レイヤーを作成します。

  2. DWD レイヤーの構築: トピックベースのワイドテーブルを作成します。
    Paimon の部分更新マージメカニズムを使用して、`orders`、`product_catalog`、`orders_pay` テーブルをワイド化します。これにより、DWD レイヤーのワイドテーブルが生成され、分単位のレイテンシーで changelog が生成されます。

  3. DWS レイヤーの構築: メトリックを計算します。
    Flink は、ワイドテーブルの changelog をリアルタイムで消費します。Paimon の集約マージメカニズムを使用して、DWM レイヤーで `dwm_users_shops` 中間テーブル (ユーザー-マーチャント集約) を生成します。最後に、DWS レイヤーで `dws_users` (ユーザー集約メトリック) テーブルと `dws_shops` (マーチャント集約メトリック) テーブルを生成します。

前提条件

説明

StarRocks インスタンスと DLF は、Flink ワークスペースと同じリージョンにある必要があります。

制限事項

Ververica Runtime (VVR) 11.1.0 以降を使用する Realtime Compute for Apache Flink のみが、このストリーミングデータレイクハウスソリューションをサポートします。

ストリーミングデータレイクハウスを構築する

準備: MySQL CDC データソース

この例では、ApsaraDB RDS for MySQL インスタンスを使用します。`order_dw` という名前のデータベースと、データを含む 3 つのビジネス テーブルを作成します。

  1. ApsaraDB RDS for MySQL インスタンスを作成する

    重要

    ApsaraDB RDS for MySQL インスタンスは、Flink ワークスペースと同じ VPC にある必要があります。同じ VPC にない場合は、「VPC をまたいで他のサービスにアクセスするにはどうすればよいですか?」をご参照ください。

  2. データベースとアカウントを作成する

    `order_dw` という名前のデータベースを作成します。`order_dw` データベースに対する読み取りおよび書き込み権限を持つ特権アカウントまたは標準アカウントを作成します。

    3 つのテーブルを作成し、データを挿入します。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee bigint not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null
    );
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- データを準備します。
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

メタデータの管理

Paimon カタログの作成

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. 左側のナビゲーションウィンドウで、[Catalogs] をクリックし、次に [Create Catalog] をクリックします。

  3. [Built-in Catalog] タブで、[Apache Paimon] をクリックし、次に [Next] をクリックします。

  4. 次のパラメーターを設定し、ストレージタイプとして DLF を選択し、[OK] をクリックします。

    パラメーター

    説明

    必須

    注:

    metastore

    metastore タイプ。

    はい

    この例では、dlf を選択します。

    catalog name

    DLF データカタログ名。

    重要

    Resource Access Management (RAM) ユーザーまたはロールを使用する場合は、DLF に対する読み取りおよび書き込み権限があることを確認してください。詳細については、「権限管理」をご参照ください。

    はい

    DLF 2.5 を使用します。AccessKey ペアを入力する必要はありません。既存の DLF データカタログを選択できます。データカタログの作成方法の詳細については、「データカタログ」をご参照ください。

    この例では、paimoncatalog という名前のデータカタログを選択します。

  5. データカタログに `order_dw` データベースを作成して、MySQL の `order_dw` データベースからすべてのテーブルデータを同期します。

    左側のナビゲーションウィンドウで、[Scripts] > [Query Scripts] を選択し、[New] をクリックして一時的なクエリを作成します。

    -- paimoncatalog データソースを使用します。
    USE CATALOG paimoncatalog;
    -- order_dw データベースを作成します。
    CREATE DATABASE order_dw;

    The following statement has been executed successfully! というメッセージは、データベースが作成されたことを示します。

Paimon カタログの使用方法の詳細については、「Paimon カタログの管理」をご参照ください。

MySQL カタログを作成する

  1. [Catalogs] ページで、[Create Catalog] をクリックします。

  2. [Built-in Catalog] タブで、[MySQL] をクリックし、次に [Next] をクリックします。

  3. mysqlcatalog という名前の MySQL カタログを作成するには、次のパラメーターを設定し、[OK] をクリックします。

    パラメーター

    説明

    必須

    注:

    カタログ名

    カタログの名前。

    はい

    カスタム名を入力します。この例では mysqlcatalog を使用します。

    ホスト名

    MySQL データベースの IP アドレスまたはホスト名。

    はい

    詳細については、「インスタンスのエンドポイントとポートの表示と管理」をご参照ください。ApsaraDB RDS for MySQL インスタンスとフルマネージド Flink ワークスペースは同じ VPC にあるため、内部エンドポイントを入力します。

    ポート

    MySQL データベースサービスのポート番号。デフォルト値は 3306 です。

    いいえ

    詳細については、「インスタンスのエンドポイントとポートを表示および管理する」を参照してください。

    デフォルトデータベース

    デフォルトの MySQL データベースの名前。

    はい

    同期するデータベースの名前 `order_dw` を入力します。

    ユーザー名

    MySQL データベースサービスのユーザー名。

    はい

    これは、「MySQL CDC データソースの準備」セクションで作成されたアカウントです。

    パスワード

    MySQL データベースサービスのパスワード。

    はい

    これは、「MySQL CDC データソースの準備」セクションで作成されたパスワードです。

ODS レイヤーの構築: ビジネスデータベースからデータウェアハウスにデータをリアルタイムで取り込む

Flink CDC を使用して、YAML データインジェストジョブを介して MySQL から Paimon にデータを同期します。これにより、ODS レイヤーが 1 ステップで構築されます。

  1. YAML データインジェストジョブを作成して開始します。

    1. Realtime Compute for Apache Flink コンソールで、[Development] > [Data Ingestion] ページに移動し、ods という名前の空の YAML ドラフトを作成します。

    2. 次のコードをエディターにコピーします。ユーザー名やパスワードなどのパラメーターを必ず変更してください。

      source:
        type: mysql
        name: MySQL Source
        hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com
        port: 3306
        username: ${secret_values.username}
        password: ${secret_values.password}
        tables: order_dw.\.*  # 正規表現を使用して、order_dw データベース内のすべてのテーブルを読み取ります。
      
      sink:
        type: paimon
        name: Paimon Sink
        catalog.properties.metastore: rest
        catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com
        catalog.properties.warehouse: paimoncatalog
        catalog.properties.token.provider: dlf
        
      pipeline:
        name: MySQL to Paimon Pipeline

      パラメーター

      説明

      必須

      catalog.properties.metastore

      Metastore タイプ。これを `rest` に設定します。

      はい

      rest

      catalog.properties.token.provider

      トークンプロバイダー。これを `dlf` に設定します。

      はい

      dlf

      catalog.properties.uri

      DLF Rest Catalog Server にアクセスするための URI。フォーマットは http://[region-id]-vpc.dlf.aliyuncs.com です。詳細については、「サービスエンドポイント」のリージョン ID をご参照ください。

      はい

      http://cn-beijing-vpc.dlf.aliyuncs.com

      catalog.properties.warehouse

      DLF カタログ名。

      はい

      paimoncatalog

      Paimon の書き込みパフォーマンスの最適化方法の詳細については、「Paimon パフォーマンスの最適化」をご参照ください。

    3. 右上隅にある [Deploy] をクリックします。

    4. [O&M] > [Deployments] に移動します。デプロイしたばかりの `ods` ジョブを見つけます。[Actions] 列で、[Start] をクリックし、[Start Without Initial State] を選択します。ジョブの起動設定の詳細については、「ジョブの開始」をご参照ください。

  2. MySQL から Paimon に同期された 3 つのテーブルのデータを表示します。

    Realtime Compute for Apache Flink コンソールで、[Development] > [Scripts] ページに移動します。[Query Scripts] タブで、次のコードをクエリスクリプトにコピーします。コードスニペットを選択し、右上隅にある [Run] をクリックします。

    SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;

    截屏2024-09-02 14

DWD レイヤーの構築: トピックベースのワイドテーブルを作成する

  1. DWD レイヤーの Paimon ワイドテーブル dwd_orders の作成

    Realtime Compute for Apache Flink コンソールで、[Development] > [Scripts] ページに移動します。[Query Scripts] タブで、次のコードをクエリスクリプトにコピーします。コードスニペットを選択し、右上隅にある [Run] をクリックします。

    CREATE TABLE paimoncatalog.order_dw.dwd_orders (
        order_id BIGINT,
        order_user_id STRING,
        order_shop_id BIGINT,
        order_product_id BIGINT,
        order_product_catalog_name STRING,
        order_fee BIGINT,
        order_create_time TIMESTAMP,
        order_update_time TIMESTAMP,
        order_state INT,
        pay_id BIGINT,
        pay_platform INT COMMENT 'プラットフォーム 0: 電話, 1: PC',
        pay_create_time TIMESTAMP,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'partial-update', -- 部分更新マージエンジンを使用してワイドテーブルを生成します。
        'changelog-producer' = 'lookup' -- lookup changelog プロデューサーを使用して、低レイテンシーで changelog を生成します。
    );

    Query has been executed というメッセージは、テーブルが作成されたことを示します。

  2. ODS レイヤーの orders および orders_pay テーブルの changelog をリアルタイムで消費する

    Realtime Compute for Apache Flink コンソールで、[Development] > [ETL] ページに移動します。`dwd` という名前の新しい SQL ストリーミングジョブを作成し、次のコードを SQL エディターにコピーします。次に、ジョブを [Deploy] し、初期状態なしで [Start] します。 ​

    この SQL ジョブは、`orders` テーブルを `product_catalog` テーブルと結合します。結合された結果と `orders_pay` テーブルは `dwd_orders` テーブルに書き込まれます。Paimon の部分更新マージエンジンは、同じ `order_id` を持つ `orders` テーブルと `orders_pay` テーブルのデータをワイド化します。

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- Paimon は現在、単一のジョブで同じテーブルに複数の INSERT 文を記述することをサポートしていません。したがって、UNION ALL を使用します。
    INSERT INTO paimoncatalog.order_dw.dwd_orders 
    SELECT 
        o.order_id,
        o.user_id,
        o.shop_id,
        o.product_id,
        dim.catalog_name,
        o.buy_fee,
        o.create_time,
        o.update_time,
        o.state,
        NULL,
        NULL,
        NULL
    FROM
        paimoncatalog.order_dw.orders o 
        LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
        ON o.product_id = dim.product_id
    UNION ALL
    SELECT
        order_id,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        NULL,
        pay_id,
        pay_platform,
        create_time
    FROM
        paimoncatalog.order_dw.orders_pay;
  3. dwd_orders ワイドテーブルのデータを表示する

    Realtime Compute for Apache Flink コンソールで、[Development] > [Scripts] ページに移動します。[Query Scripts] タブで、次のコードをクエリスクリプトにコピーします。コードスニペットを選択し、右上隅にある [Run] をクリックします。

    SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;

    截屏2024-09-02 14

DWS レイヤーの構築: メトリックを計算する

  1. DWS レイヤーの集約テーブル dws_users と dws_shops を作成します

    Realtime Compute for Apache Flink コンソールで、[Development] > [Scripts] ページに移動します。[Query Scripts] タブで、次のコードをクエリスクリプトにコピーし、コードスニペットを選択して、右上隅にある [Run] をクリックします。

    -- ユーザーディメンション集約メトリックテーブル。
    CREATE TABLE paimoncatalog.order_dw.dws_users (
        user_id STRING,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'その日に完了した支払いの合計金額',
        PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- 集約マージエンジンを使用して集約テーブルを生成します。
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- payed_buy_fee_sum データを合計して集約結果を生成します。
        -- dws_users テーブルはストリーミング方式でダウンストリームで消費されなくなったため、changelog プロデューサーを指定する必要はありません。
    );
    
    -- マーチャントディメンション集約メトリックテーブル。
    CREATE TABLE paimoncatalog.order_dw.dws_shops (
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'その日に完了した支払いの合計金額',
        uv BIGINT COMMENT 'その日の個別購入ユーザーの総数',
        pv BIGINT COMMENT 'その日のユーザーによる購入総数',
        PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- 集約マージエンジンを使用して集約テーブルを生成します。
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- payed_buy_fee_sum データを合計して集約結果を生成します。
        'fields.uv.aggregate-function' = 'sum', -- uv データを合計して集約結果を生成します。
        'fields.pv.aggregate-function' = 'sum' -- pv データを合計して集約結果を生成します。
        -- dws_shops テーブルはストリーミング方式でダウンストリームで消費されなくなったため、changelog プロデューサーを指定する必要はありません。
    );
    
    -- ユーザー視点とマーチャント視点の両方の集約テーブルを計算するには、主キーとしてユーザー + マーチャントを持つ中間テーブルを作成します。
    CREATE TABLE paimoncatalog.order_dw.dwm_users_shops (
        user_id STRING,
        shop_id BIGINT,
        ds STRING,
        payed_buy_fee_sum BIGINT COMMENT 'その日にユーザーがマーチャントで支払った合計金額',
        pv BIGINT COMMENT 'その日にユーザーがマーチャントで行った購入数',
        PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation', -- 集約マージエンジンを使用して集約テーブルを生成します。
        'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- payed_buy_fee_sum データを合計して集約結果を生成します。
        'fields.pv.aggregate-function' = 'sum', -- pv データを合計して集約結果を生成します。
        'changelog-producer' = 'lookup', -- lookup changelog プロデューサーを使用して、低レイテンシーで changelog を生成します。
        -- DWM レイヤーの中間テーブルは、通常、上位レイヤーのアプリケーションから直接クエリされないため、書き込みパフォーマンスを最適化できます。
        'file.format' = 'avro', -- avro ローストアフォーマットは、より効率的な書き込みパフォーマンスを提供します。
        'metadata.stats-mode' = 'none' -- 統計情報を破棄すると OLAP クエリのコストが増加しますが (連続ストリーム処理には影響しません)、書き込みパフォーマンスがより効率的になります。
    );

    Query has been executed というメッセージは、テーブルが作成されたことを示します。

  2. DWD レイヤーの dwd_orders テーブルの changelog を消費する

    Realtime Compute for Apache Flink コンソールで、[Development] > [ETL] タブに移動します。`dwm` という名前の SQL ストリーミングジョブを作成します。次のコードを SQL エディターにコピーします。次に、ジョブを [Deploy] し、初期状態なしで [Start] します。

    この SQL ジョブは、`dwd_orders` テーブルから `dwm_users_shops` テーブルにデータを書き込みます。Paimon の事前集約マージエンジンを使用して、`order_fee` を自動的に合計し、マーチャントでのユーザーの総支出を計算します。また、`1` を合計して、ユーザーがマーチャントから購入した回数を計算します。

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    INSERT INTO paimoncatalog.order_dw.dwm_users_shops
    SELECT
        order_user_id,
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        order_fee,
        1 -- 1 つの入力レコードは 1 つの購入を表します。
    FROM paimoncatalog.order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
  3. DWM レイヤーの dwm_users_shops テーブルの changelog をリアルタイムで消費する

    Realtime Compute for Apache Flink コンソールで、[Development] > [ETL] ページに移動します。`dws` という名前の新しい SQL ストリーミングジョブを作成します。次のコードを SQL エディターにコピーします。次に、ジョブを [Deploy] し、初期状態なしで [Start] します。

    この SQL ジョブは、`dwm_users_shops` テーブルから `dws_users` および `dws_shops` テーブルにデータを書き込みます。Paimon の事前集約マージエンジンを使用して、`dws_users` テーブルの各ユーザーの総支出 (`payed_buy_fee_sum`) を計算します。`dws_shops` テーブルでは、マーチャントの総収益 (`payed_buy_fee_sum`)、`1` を合計することによる購入ユーザー数、および総購入数 (`pv`) を計算します。

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    -- DWD とは異なり、ここの各 INSERT 文は異なる Paimon テーブルに書き込むため、同じジョブに含めることができます。
    BEGIN STATEMENT SET;
    
    INSERT INTO paimoncatalog.order_dw.dws_users
    SELECT 
        user_id,
        ds,
        payed_buy_fee_sum
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    -- マーチャントを主キーとすると、一部の人気のあるマーチャントは他のマーチャントよりもはるかに多くのデータを持つ可能性があります。
    -- したがって、ローカルマージを使用してメモリ内で事前集約してから Paimon に書き込み、データスキューを軽減します。
    INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
        shop_id,
        ds,
        payed_buy_fee_sum,
        1, -- 1 つの入力レコードは、このマーチャントでのユーザーのすべての購入を表します。
        pv
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    END;
  4. dws_users および dws_shops テーブルのデータを表示する

    Realtime Compute for Apache Flink コンソールで、[Development] > [Scripts] に移動します。[Query Scripts] タブで、次のコードをエディターにコピーします。コードスニペットを選択し、右上隅にある [Run] をクリックします。

    --dws_users テーブルのデータを表示
    SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;

    image

    --dws_shops テーブルのデータを表示
    SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

    截屏2024-09-02 14

ビジネスデータベースの変更をキャプチャする

ストリーミングデータレイクハウスを構築したので、次のステップでは、ビジネスデータベースからの変更をキャプチャする能力をテストします。

  1. MySQL の `order_dw` データベースに次のデータを挿入します。

    INSERT INTO orders VALUES
    (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
    (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
    (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2008, 100008, 1, '2023-02-15 18:40:56'),
    (2009, 100009, 1, '2023-02-15 19:40:56'),
    (2010, 100010, 0, '2023-02-15 20:40:56');
  2. dws_users および dws_shops テーブルのデータを表示します。 Realtime Compute for Apache Flink コンソールで、[Development] > [Scripts] ページに移動します。[Query Scripts] タブで、次のコードをクエリスクリプトにコピーします。コードスニペットを選択し、右上隅にある [Run] をクリックします。

    • dws_users テーブル

      SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;

      截屏2024-09-02 15

    • dws_shops テーブル

      SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

      截屏2024-09-02 15

ストリーミングデータレイクハウスを使用する

前のセクションでは、Paimon カタログを作成し、Flink で Paimon テーブルに書き込む方法を示しました。このセクションでは、ストリーミングデータレイクハウスが構築された後の StarRocks を使用した簡単なデータ分析シナリオについて説明します。

まず、StarRocks インスタンスにログインし、`oss-paimon` カタログを作成します。詳細については、「Paimon カタログ」をご参照ください。

CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
    'type' = 'paimon',
    'paimon.catalog.type' = 'filesystem',
    'aliyun.oss.endpoint' = 'oss-cn-beijing-internal.aliyuncs.com',
    'paimon.catalog.warehouse' = 'oss://<bucket>/<object>'
);

プロパティ

必須

備考

type

はい

データソースタイプ。これを `paimon` に設定します。

paimon.catalog.type

はい

Paimon が使用する metastore タイプ。この例では、metastore タイプとして `filesystem` を使用します。

aliyun.oss.endpoint

はい

ウェアハウスとして OSS または OSS-HDFS を使用する場合は、対応するエンドポイントを指定する必要があります。

paimon.catalog.warehouse

はい

フォーマットは oss://<bucket>/<object> です。ここで、

  • bucket: OSS バケットの名前。

  • object: データが保存されているパス。

OSS コンソールでバケットとオブジェクト名を表示できます。

ランキングクエリ

DWS レイヤーの集約テーブルを分析するために、次のサンプルコードは、StarRocks を使用して 2023 年 2 月 15 日に取引額が最も高かった上位 3 つのマーチャントをクエリする方法を示しています。

SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum 
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;

image

詳細クエリ

DWD レイヤーのワイドテーブルを分析するために、次のサンプルコードは、StarRocks を使用して 2023 年 2 月に特定の支払いプラットフォームで顧客が支払った注文の詳細をクエリする方法を示しています。

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;;

image

データレポート

DWD レイヤーのワイドテーブルを分析するために、次のサンプルコードは、StarRocks を使用して 2023 年 2 月の各カテゴリの総注文数と総注文額をクエリする方法を示しています。

SELECT
  order_create_time AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image

参考資料