すべてのプロダクト
Search
ドキュメントセンター

Hologres:Flink と Hologres を使用したリアルタイムデータウェアハウスの構築

最終更新日:Jan 11, 2025

Realtime Compute for Apache Flink の強力なリアルタイムデータ処理機能と、バイナリログ、ハイブリッド行 - 列ストレージ、強力なリソース分離などの Hologres の機能に基づいて、効率的かつスケーラブルなリアルタイムデータ処理と分析を実装するリアルタイムデータウェアハウスを構築できます。 これにより、増加するデータ量とリアルタイムのビジネス要件に適切に対応できます。 このトピックでは、Realtime Compute for Apache Flink と Hologres を使用してリアルタイムデータウェアハウスを構築する方法について説明します。

背景情報

デジタル化が進むにつれて、企業はデータの適時性に対する需要がますます高まっています。 ユーザーは、従来のオフラインデータ処理シナリオ以外の多くのビジネスシナリオで、データをリアルタイムで処理、保存、分析する必要があります。 従来のオフラインデータウェアハウスの場合、定期的なスケジューリングが実行され、運用データストア(ODS)、データウェアハウス詳細(DWD)、データウェアハウスサービス(DWS)、アプリケーションデータサービス(ADS)の各データレイヤーでデータが処理されます。 ただし、リアルタイムデータウェアハウスの方法論システムは明確ではありません。 Streaming Warehouse の概念を使用して、データレイヤー間でリアルタイムデータの効率的なフローを実装します。 これにより、リアルタイムデータウェアハウスのデータレイヤーに関連する問題を解決できます。

アーキテクチャ

Realtime Compute for Apache Flink は、大量のリアルタイムデータを効率的に処理できる強力なストリームコンピューティングエンジンです。 Hologres は、リアルタイムのデータ書き込みと更新をサポートするエンドツーエンドのリアルタイムデータウェアハウスです。 リアルタイムデータは、Hologres に書き込まれた直後にクエリできます。 Hologres は Realtime Compute for Apache Flink と緊密に統合されており、統合されたリアルタイムデータウェアハウスソリューションを提供します。 次の図は、Realtime Compute for Apache Flink と Hologres を使用して構築されたリアルタイムデータウェアハウスのアーキテクチャを示しています。

  1. Realtime Compute for Apache Flink は、データソースから Hologres にデータを書き込み、ODS レイヤーを形成します。

  2. Realtime Compute for Apache Flink は、ODS レイヤーのバイナリログデータを処理のためにサブスクライブし、Hologres にデータを書き戻して DWD レイヤーを形成します。

  3. Realtime Compute for Apache Flink は、DWD レイヤーのバイナリログデータを計算のためにサブスクライブし、Hologres にデータを書き戻して DWS レイヤーを形成します。

  4. Hologres はデータクエリのエントリポイントを提供します。

image.png

このソリューションには、次の利点があります。

  • Hologres の各データレイヤーは、データの書き込み時に効率的に更新、修正、クエリできます。 これにより、従来のリアルタイムデータウェアハウスソリューションでは中間レイヤーのデータのクエリ、更新、修正が困難であったという問題が解決されます。

  • Hologres の各データレイヤーは、外部サービスを提供するために独立して使用でき、効率的な方法で再利用して、データウェアハウスの階層的な再利用を実現できます。

  • このソリューションは統一モデルを提供し、簡素化されたアーキテクチャを使用します。 リアルタイムの抽出、変換、ロード(ETL)操作のロジックは、Flink SQL に基づいて実装されます。 ODS レイヤー、DWD レイヤー、DWS レイヤーのデータは Hologres に保存されます。 これにより、アーキテクチャの複雑さが軽減され、データ処理効率が向上します。

このソリューションは、Hologres の 3 つの主要機能に依存しています。 次の表に、主要機能を示します。

Hologres の主要機能

説明

バイナリロギング

Hologres はバイナリロギングをサポートしており、Realtime Compute for Apache Flink がリアルタイムコンピューティングを実行するように駆動します。 これにより、Realtime Compute for Apache Flink がストリームコンピューティングのアップストリームとして機能できます。 Hologres のバイナリロギングの詳細については、「Hologres バイナリログのサブスクライブ」をご参照ください。

ハイブリッド行 - 列ストレージ

Hologres はハイブリッド行 - 列ストレージをサポートしています。 テーブルには行指向データと列指向データの両方が保存され、行指向データと列指向データは厳密に一貫性があります。 この機能により、中間レイヤーのテーブルを Realtime Compute for Apache Flink のソーステーブルとして、またプライマリキーと JOIN 操作を使用したポイントクエリ用の Realtime Compute for Apache Flink のディメンションテーブルとして使用できます。 中間レイヤーのテーブルは、オンライン分析処理(OLAP)やオンラインサービスなどの他のアプリケーションからもクエリできます。 Hologres のハイブリッド行 - 列ストレージの詳細については、「テーブルのストレージモデル:行指向ストレージ、列指向ストレージ、行 - 列ハイブリッドストレージ」をご参照ください。

強力なリソース分離

Hologres インスタンスの負荷が高い場合、インスタンスの中間レイヤーのテーブルに対するポイントクエリの パフォーマンスが影響を受ける可能性があります。 Hologres は、プライマリインスタンスとセカンダリインスタンス(共有ストレージ)の読み取り/書き込み分割を設定することにより、強力なリソース分離をサポートしています。 これにより、Realtime Compute for Apache Flink が Hologres からバイナリログデータをプルするときに、オンラインサービスが影響を受けないようにします。 詳細については、「プライマリインスタンスとセカンダリインスタンス(共有ストレージ)の読み取り/書き込み分割を設定する」をご参照ください。

ベストプラクティス

この例では、e コマース プラットフォームのリアルタイムデータウェアハウスを構築する方法を示します。 リアルタイムデータウェアハウスは、データをリアルタイムで処理およびクレンジングし、上位レイヤーアプリケーションからデータをクエリするために構築されています。 このように、リアルタイムデータは階層化され、トランザクションダッシュボードデータ分析、行動データ分析、ユーザープロファイルタグ付け、パーソナライズされた推奨事項などの複数のビジネスシナリオをサポートするために再利用されます。

image.png

  1. ODS レイヤーの構築:ビジネスデータベースのデータをデータウェアハウスにリアルタイムで取り込む

    MySQL には、orders、orders_pay、product_catalog の 3 つのビジネステーブルがあります。 Realtime Compute for Apache Flink は、テーブルから Hologres にデータをリアルタイムで同期して、ODS レイヤーを形成します。

  2. DWD レイヤーの構築:リアルタイムワイドテーブル

    Realtime Compute for Apache Flink は、orders、orders_pay、product_catalog テーブルのデータをリアルタイムで結合して、DWD レイヤーにワイドテーブルを生成します。

  3. DWS レイヤーの構築:リアルタイムメトリック計算

    ワイドテーブルのバイナリログデータはリアルタイムで消費され、イベント駆動型集計に基づいて DWS レイヤーにメトリックテーブルが生成されます。

注意事項

  • Ververica Runtime(VVR)6.0.7 以降を使用する Realtime Compute for Apache Flink のみ、リアルタイムデータウェアハウスソリューションをサポートしています。

  • Hologres V1.3 以降の Hologres 専用インスタンスのみ、リアルタイムデータウェアハウスソリューションをサポートしています。

  • ApsaraDB RDS for MySQL インスタンスと Hologres インスタンスは、Realtime Compute for Apache Flink と同じ仮想プライベートクラウド(VPC)に存在する必要があります。 ApsaraDB RDS for MySQL インスタンスまたは Hologres インスタンスが Realtime Compute for Apache Flink と同じ VPC に存在しない場合は、VPC 間の接続を確立するか、Realtime Compute for Apache Flink がインターネット経由で ApsaraDB RDS for MySQL インスタンスまたは Hologres インスタンスにアクセスできるようにする必要があります。 詳細については、「Realtime Compute for Apache Flink は VPC 間でサービスにどのようにアクセスしますか?」および「Realtime Compute for Apache Flink はインターネットにどのようにアクセスしますか?」をご参照ください。

  • RAM ユーザーとして、または RAM ロールを使用して Realtime Compute for Apache Flink、Hologres、ApsaraDB RDS for MySQL リソースにアクセスする場合は、RAM ユーザーまたは RAM ロールに必要な権限があることを確認する必要があります。

準備

ApsaraDB RDS for MySQL インスタンスを作成し、MySQL CDC データソースを準備する

  1. ApsaraDB RDS for MySQL インスタンスを作成します。 詳細については、「ApsaraDB RDS for MySQL インスタンスを作成する」をご参照ください。

  2. ApsaraDB RDS for MySQL インスタンスのデータベースとアカウントを作成します。

    order_dw という名前のデータベースと、データベースに対する読み取りおよび書き込み権限を持つ標準アカウントを作成します。 詳細については、「アカウントとデータベースを作成する」および「データベースを管理する」をご参照ください。

  3. MySQL CDC データソースを準備します。

    1. 目的のインスタンスの詳細ページの右上隅にある [データベースにログオン] をクリックします。

    2. [データベースインスタンスにログオン] ダイアログボックスで、[データベースアカウント] パラメーターと [データベースパスワード] パラメーターを設定し、[ログイン] をクリックします。

    3. ログオンが成功したら、左側のナビゲーションペインで order_dw データベースをダブルクリックして、データベースを切り替えます。

    4. [SQLコンソール] タブで、order_dw データベースに 3 つのビジネステーブルを作成し、ビジネステーブルにデータを挿入するために使用される DDL ステートメントを記述します。

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee numeric(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      
      CREATE TABLE `orders_pay` (
        pay_id bigint not null primary key,
        order_id bigint not null,
        pay_platform int not null,
        create_time timestamp not null
      );
      
      
      CREATE TABLE `product_catalog` (
        product_id bigint not null primary key,
        catalog_name varchar(50) not null
      );
      
      -- データを準備します。
      INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
      
      INSERT INTO orders_pay VALUES
      (2001, 100001, 1, '2023-02-15 17:40:56'),
      (2002, 100002, 1, '2023-02-15 17:40:56'),
      (2003, 100003, 0, '2023-02-15 17:40:56'),
      (2004, 100004, 0, '2023-02-15 17:40:56'),
      (2005, 100005, 0, '2023-02-15 18:40:56'),
      (2006, 100006, 0, '2023-02-15 18:40:56'),
      (2007, 100007, 0, '2023-02-15 18:40:56');
  4. [実行] をクリックします。 表示されるページで、[実行] をクリックします。

Hologres インスタンスを作成する

  1. Hologres 専用インスタンスを作成します。 詳細については、「Hologres インスタンスを購入する」をご参照ください。

  2. HoloWeb コンソールで、Hologres インスタンスに接続し、データベースを作成し、ユーザーにデータベースに対する権限を付与します。

    簡易許可モデル(SPM)を使用して order_dw という名前のデータベースを作成し、管理者ロールをユーザーに割り当てます。 データベースを作成し、ユーザーにデータベースに対する権限を付与する方法の詳細については、「データベースを管理する」をご参照ください。

    説明
    • [ユーザー] ドロップダウンリストに必要な RAM ユーザーが見つからない場合は、RAM ユーザーが現在のインスタンスにユーザーとして追加されていません。 この場合は、[ユーザー管理] ページで RAM ユーザーをインスタンスのスーパーユーザーとして追加します。

    • デフォルトでは、バイナリログの拡張は Hologres V2.0 以降で有効になっています。 この操作を手動で実行する必要はありません。 Hologres V1.3 でデータベースを作成した後、create extension hg_binlog コマンドを実行して、バイナリログの拡張を有効にする必要があります。

Realtime Compute for Apache Flink ワークスペースとカタログを作成する

  1. ワークスペースを作成します。 詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。

  2. Realtime Compute for Apache Flink コンソール にログオンします。 管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

  3. セッションクラスターを作成します。これは、カタログとスクリプトを作成するための実行環境を提供します。 詳細については、「手順 1:セッションクラスターを作成する」をご参照ください。

  4. Hologres カタログを作成します。

    Realtime Compute for Apache Flink コンソール[SQLエディター] ページの [スクリプト] タブで、次のコードをスクリプトエディターにコピーし、パラメーターを設定し、実行するコードを選択して、コードの左側に表示される [実行] をクリックします。

    CREATE CATALOG dw WITH (
      'type' = 'hologres',
      'endpoint' = '<ENDPOINT>', 
      'username' = '<USERNAME>',
      'password' = '<PASSWORD>',
      'dbname' = 'order_dw',
      'binlog' = 'true', -- カタログを作成するときに、ソーステーブル、ディメンションテーブル、および結果テーブルでサポートされている WITH 句のパラメーターを設定できます。パラメーターを設定すると、テーブルを使用するときに、カタログ内のテーブルにパラメーターが自動的に追加されます。
      'sdkMode' = 'jdbc', -- JDBC モードを使用することをお勧めします。
      'cdcmode' = 'true',
      'connectionpoolname' = 'the_conn_pool',
      'ignoredelete' = 'true', -- ワイドテーブルにデータをマージするときは、データの取り消しを防ぐために、ignoredelete パラメーターを true に設定する必要があります。
      'partial-insert.enabled' = 'true', -- ワイドテーブルにデータをマージするときは、特定の列でデータ更新を実行するために、partial-insert.enabled パラメーターを true に設定する必要があります。
      'mutateType' = 'insertOrUpdate', -- ワイドテーブルにデータをマージするときは、特定の列でデータ更新を実行するために、mutateType パラメーターを insertOrUpdate に設定する必要があります。
      'table_property.binlog.level' = 'replica', -- Hologres テーブルの永続プロパティは、カタログの作成時に渡すこともできます。その後、カタログを使用して作成されたすべての Hologres テーブルに対して、バイナリロギングがデフォルトで有効になります。
      'table_property.binlog.ttl' = '259200'
    );
    

    Hologres 情報に基づいて、次のパラメーターを設定します。

    パラメーター

    説明

    備考

    endpoint

    Hologres インスタンスのエンドポイント。

    詳細については、「インスタンス設定」をご参照ください。

    username

    Alibaba Cloud アカウントの AccessKey ID。

    AccessKey ID が属する Alibaba Cloud アカウントには、すべての Hologres データベースにアクセスするための権限が付与されている必要があります。 Hologres データベース権限の詳細については、「Hologres 権限モデル」をご参照ください。

    password

    Alibaba Cloud アカウントの AccessKey シークレット。

    説明

    カタログを作成するときに、ソーステーブル、ディメンションテーブル、および結果テーブルでデフォルトで使用される WITH 句のパラメーターを設定できます。 前のコードの table_property で始まるパラメーターなど、Hologres 物理テーブルの作成のデフォルトパラメーターを設定することもできます。 詳細については、「Hologres カタログを管理する」および「Hologres コネクター」の「WITH 句のパラメーター」をご参照ください。

  5. MySQL カタログを作成します。

    Realtime Compute for Apache Flink コンソールの [SQLエディター] ページの [スクリプト] タブで、次のコードをスクリプトエディターにコピーし、パラメーターを設定し、実行するコードを選択して、コードの左側に表示される [実行] をクリックします。

    CREATE CATALOG mysqlcatalog WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = 'order_dw'
    );

    MySQL データベースの情報に基づいてパラメーターを設定します。 次の表に、パラメーターを示します。

    パラメーター

    説明

    hostname

    MySQL データベースへのアクセスに使用される IP アドレスまたはホスト名。

    port

    MySQL データベースのポート番号。 デフォルト値:3306。

    username

    ApsaraDB RDS for MySQL データベースへのアクセスに使用されるユーザー名。

    password

    ApsaraDB RDS for MySQL データベースへのアクセスに使用されるパスワード。

リアルタイムデータウェアハウスを構築する

ODS レイヤーの構築:ビジネスデータベースのデータをデータウェアハウスにリアルタイムで取り込む

カタログに関連する CREATE DATABASE AS ステートメントを実行して、ODS レイヤーを作成できます。 ほとんどの場合、ODS レイヤーは、OLAP 操作を直接実行したり、キーバリューポイントクエリなどのサービスを提供したりするのではなく、ストリーミングデプロイのイベントドライバーとして使用されます。 したがって、ビジネス要件に合わせてバイナリロギングを有効にすることができます。 バイナリロギングは、Hologres の主要機能の 1 つです。 Hologres コネクターを使用して、完全なバイナリログデータを読み取り、次に増分バイナリログデータを消費できます。

  1. CREATE DATABASE AS ステートメントを実行するために、ODS という名前の同期デプロイを作成します。

    1. Realtime Compute for Apache Flink コンソール で、ODS という名前の SQL ストリーミングドラフトを作成し、次のコードを SQL エディターにコピーします。

      CREATE DATABASE IF NOT EXISTS dw.order_dw   -- table_property.binlog.level パラメーターはカタログの作成時に設定されます。したがって、CREATE DATABASE AS ステートメントを使用して作成されたすべてのテーブルに対してバイナリロギングが有効になります。
      AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- アップストリームデータベース内のテーブルを選択して、データウェアハウスに取り込むことができます。
      /*+ OPTIONS('server-id'='8001-8004') */; -- MySQL CDC ソーステーブルのパラメーターを設定します。

      説明
      • この例では、データはデフォルトで order_dw データベースのパブリックスキーマに同期されます。 また、宛先 Hologres データベースの指定されたスキーマにデータを同期することもできます。 詳細については、「CREATE TABLE AS ステートメントで使用される宛先ストアのカタログとして、作成した Hologres カタログを使用する」をご参照ください。 スキーマを指定すると、カタログを使用するときにテーブル名の形式が変更されます。 詳細については、「Hologres カタログを使用する」をご参照ください。

      • ソーステーブルのスキーマが変更された場合、結果テーブルのスキーマは、ソーステーブルのデータが変更された場合にのみ変更されます。 ソーステーブルのデータは、ソーステーブルでデータが更新、削除、または挿入された場合に変更されます。

    2. [SQLエディター] ページの右上隅にある [デプロイ] をクリックして、ドラフトをデプロイします。

    3. 左側のナビゲーションペインで、[デプロイ] をクリックします。 [デプロイ] ページで、デプロイしたドラフトの ODS デプロイを見つけ、[アクション] 列の [開始] をクリックします。 [ジョブを開始] ダイアログボックスで、[初期モード] をクリックして、初期状態なしでデプロイを開始します。

  2. HoloWeb コンソールで、SQLエディターで次のステートメントを実行して、MySQL からデータが同期される 3 つの Hologres テーブルのデータを表示します。

    --- orders テーブルのデータをクエリします。
    SELECT * FROM orders;
    
    --- orders_pay テーブルのデータをクエリします。
    SELECT * FROM orders_pay;
    
    --- product_catalog テーブルのデータをクエリします。
    SELECT * FROM product_catalog;

    image.png

DWD レイヤーの構築:リアルタイムワイドテーブル

Hologres コネクターでサポートされている特定の列を更新する機能を使用して、DWD レイヤーを構築します。 INSERT ステートメントを使用して、特定の列を更新するセマンティクスを効率的な方法で表現できます。 Hologres の列指向データストレージとハイブリッド行 - 列データストレージに基づく高 パフォーマンス ポイントクエリは、デプロイでさまざまなディメンションテーブルのデータをクエリするのに役立ちます。 Hologres は強力なリソース分離アーキテクチャを使用しているため、書き込み、読み取り、分析のデプロイ間で干渉が発生しません。

  1. Realtime Compute for Apache Flink のカタログを使用して、Hologres の DWD レイヤーに dwd_orders という名前のワイドテーブルを作成します。

    Realtime Compute for Apache Flink コンソール[SQLエディター] ページの [スクリプト] タブで、次のコードをドラフトのスクリプトエディターにコピーし、コードを選択して、コードの左側に表示される [実行] をクリックします。

    -- 異なるストリームのデータが同じ結果テーブルに書き込まれると、テーブルの任意の列に NULL 値が表示される場合があります。したがって、ワイドテーブルのフィールドが NULL 許容であることを確認してください。
    CREATE TABLE dw.order_dw.dwd_orders (
      order_id bigint not null,
      order_user_id string,
      order_shop_id bigint,
      order_product_id bigint,
      order_product_catalog_name string,
      order_fee numeric(20,2),
      order_create_time timestamp,
      order_update_time timestamp,
      order_state int,
      pay_id bigint,
      pay_platform int comment 'プラットフォーム 0:電話、1:PC',
      pay_create_time timestamp,
      PRIMARY KEY(order_id) NOT ENFORCED
    );
    
    -- カタログを使用して、Hologres 物理テーブルのプロパティを変更できます。
    ALTER TABLE dw.order_dw.dwd_orders SET (
      'table_property.binlog.ttl' = '604800' -- バイナリログデータのタイムアウト期間を 1 週間に変更します。
    );
  2. ODS レイヤーの orders テーブルと orders_pay テーブルのバイナリロギングを実装します。

    Realtime Compute for Apache Flink コンソール で、DWD という名前の SQL ドラフトを作成し、次のコードをドラフトの SQL エディターにコピーし、ドラフトを [デプロイ] してから、ドラフトのデプロイを [開始] します。 次の SQL ステートメントを実行して、orders テーブルをディメンションテーブル product_catalog と結合し、最終結果をワイドテーブル dwd_orders に書き込みます。 このようにして、データはリアルタイムでワイドテーブルに書き込まれます。

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dwd_orders 
     (
       order_id,
       order_user_id,
       order_shop_id,
       order_product_id,
       order_fee,
       order_create_time,
       order_update_time,
       order_state,
       order_product_catalog_name
     ) SELECT o.*, dim.catalog_name 
       FROM dw.order_dw.orders as o
       LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
       ON o.product_id = dim.product_id;
    
    INSERT INTO dw.order_dw.dwd_orders 
      (pay_id, order_id, pay_platform, pay_create_time)
       SELECT * FROM dw.order_dw.orders_pay;
    
    END;
  3. ワイドテーブル dwd_orders のデータを表示します。

    HoloWeb コンソールで、Hologres インスタンスに接続し、宛先データベースにログオンします。次に、SQL エディターで次のステートメントを実行します。

    SELECT * FROM dwd_orders;

    image

DWS レイヤーの構築:リアルタイムメトリック計算

  1. Realtime Compute for Apache Flink のカタログを使用して、Hologres に dws_users および dws_shops という名前の集計メトリックテーブルを作成します。

    Realtime Compute for Apache Flink コンソール[SQLエディター] ページの [スクリプト] タブで、次のコードをドラフトのスクリプトエディターにコピーし、コードを選択して、コードの左側に表示される [実行] をクリックします。

    -- ユーザーディメンション集計メトリックテーブルを作成します。
    CREATE TABLE dw.order_dw.dws_users (
      user_id string not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment 'その日に完了した支払いの合計金額',
      primary key(user_id,ds) NOT ENFORCED
    );
    
    -- マーチャントディメンション集計メトリックテーブルを作成します。
    CREATE TABLE dw.order_dw.dws_shops (
      shop_id bigint not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment 'その日に完了した支払いの合計金額',
      primary key(shop_id,ds) NOT ENFORCED
    );
  2. DWD レイヤーのワイドテーブル dw.order_dw.dwd_orders のデータをリアルタイムで消費し、Realtime Compute for Apache Flink で集計計算を実行してから、データを Hologres の DWS レイヤーのテーブルに書き込みます。

    Realtime Compute for Apache Flink コンソール で、DWS という名前の SQL ストリーミングドラフトを作成し、次のコードをドラフトの SQL エディターにコピーし、ドラフトを [デプロイ] してから、ドラフトのデプロイを [開始] します。

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
      SELECT 
        order_user_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
        FROM dw.order_dw.dwd_orders c
        WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 注文フローデータと支払いフローデータの両方がワイドテーブルに書き込まれます。
        GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    
    INSERT INTO dw.order_dw.dws_shops
      SELECT 
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
       FROM dw.order_dw.dwd_orders c
       WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 注文フローデータと支払いフローデータの両方がワイドテーブルに書き込まれます。
       GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    END;
  3. DWS レイヤーの集計結果を表示します。 結果は、入力データの変更に基づいてリアルタイムで更新されます。

    1. Hologres コンソールで、更新前のデータを表示します。

      • dws_users テーブルのデータをクエリします。

        SELECT * FROM dws_users;

        image

      • dws_shops テーブルのデータをクエリします。

        SELECT * FROM dws_shops;

        image

    2. ApsaraDB RDS コンソールで、order_dw データベースの orders テーブルと orders_pay テーブルにデータレコードを挿入します。

      INSERT INTO orders VALUES
      (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1);
      
      INSERT INTO orders_pay VALUES
      (2008, 100008, 1, '2023-02-15 19:40:56');
    3. Hologres コンソールで、更新後のデータを表示します。

      • dwd_orders テーブル

        image

      • dws_users テーブル

        image

      • dws_shops テーブル

        image

データプロファイリングを実行する

バイナリロギング機能が有効になっています。 したがって、データプロファイリングを実行して、データの変更を直接表示できます。 ビジネスデータプロファイリングのための中間結果に対してアドホッククエリを実行したり、最終的な計算結果の正確性をチェックしたりする場合は、このソリューションの各データレイヤーが永続化されているため、このソリューションの各データレイヤーを使用して中間データのクエリを容易にすることができます。

  • ストリーミングモードでのデータプロファイリング

    1. データプロファイリングストリーミングドラフトを作成し、ドラフトのデプロイを開始します。

      Realtime Compute for Apache Flink コンソール で、Data-exploration という名前の SQL ストリーミングドラフトを作成し、次のコードをドラフトの SQL エディターにコピーし、ドラフトを [デプロイ] してから、ドラフトのデプロイを [開始] します。

      -- ストリーミングモードでデータプロファイリングを実行します。印刷テーブルを作成して、データの変更を表示できます。
      CREATE TEMPORARY TABLE print_sink(
        order_id bigint not null,
        order_user_id string,
        order_shop_id bigint,
        order_product_id bigint,
        order_product_catalog_name string,
        order_fee numeric(20,2),
        order_create_time timestamp,
        order_update_time timestamp,
        order_state int,
        pay_id bigint,
        pay_platform int,
        pay_create_time timestamp,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO print_sink SELECT *
      FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ -- startTime パラメーターは、バイナリログデータが生成される時刻を指定します。
      WHERE order_user_id = 'user_001';
    2. データプロファイリング結果を表示します。

      [デプロイ] ページで、クエリを実行するデータのデプロイ名をクリックします。[ログ] タブの [実行ログ] タブをクリックします。[実行ログ] タブで、[実行中のタスク マネージャー] タブをクリックし、[パス、ID] 列の値をクリックします。[Stdout] タブで、user_001 に関連するログを検索します。

      image.png

  • バッチモードでのデータプロファイリング

    Realtime Compute for Apache Flink コンソール で SQL ストリーミングドラフトを作成し、次のコードを SQL エディターにコピーして、[デバッグ] をクリックします。 詳細については、「デプロイをデバッグする」をご参照ください。

    バッチモードでのデータプロファイリングは、現在の時刻の最終状態データを取得するのに役立ちます。 次の図は、[SQLエディター] ページのデバッグ結果を示しています。

    SELECT *
    FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */
    WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; -- バッチモードでのデータプロファイリングは、フィルタープッシュダウンをサポートしています。これにより、バッチデプロイの実行効率が向上します。

    image.png

リアルタイムデータウェアハウスを使用する

前のセクションでは、Realtime Compute for Apache Flink のカタログを使用して、Realtime Compute for Apache Flink と Hologres の Streaming Warehouse に基づいて Realtime Compute for Apache Flink で階層型リアルタイムデータウェアハウスを構築する方法について説明しました。 このセクションでは、データウェアハウスの構築後の簡単な使用シナリオについて説明します。

キーバリューサービス

プライマリキーに基づいて DWS レイヤーの集計メトリックテーブルをクエリします。 1 秒あたり数百万レコード(RPS)がサポートされています。

次のサンプルコードは、HoloWeb コンソールで指定した日付の指定したユーザーの消費量を照会する方法の例を示しています。

-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';

image.png

詳細クエリ

DWD レイヤーのワイドテーブルで OLAP 分析を実行します。

次のサンプルコードは、HoloWeb コンソールで 2023 年 2 月に特定の決済プラットフォームにおける顧客の注文の詳細をクエリする方法の例を示しています。

-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;

image.png

リアルタイムレポート

DWD レイヤーのワイドテーブルのデータに基づいてリアルタイムレポートを表示します。 Hologres のハイブリッド行 - 列データストレージと列指向データストレージは、高い OLAP 分析機能を提供します。 データクエリは数秒以内に応答できます。

次のサンプルコードは、HoloWeb コンソールで 2023 年 2 月の各カテゴリの注文総数と注文総額をクエリする方法の例を示しています。

-- holo sql
SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image.png

参考文献