すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:データウェアハウスシナリオ: アドホッククエリソリューション

最終更新日:Jan 11, 2025

このトピックでは、StarRocksビューを使用してデータウェアハウスシナリオでアドホッククエリソリューションを構築する方法について説明します。

前提条件

  • Dataflowまたはカスタムクラスターが作成されていること。詳細については、「クラスターの作成」をご参照ください。

  • StarRocksクラスターが作成されていること。詳細については、「StarRocksクラスターの作成」をご参照ください。

  • ApsaraDB for RDS for MySQL インスタンスが作成されます。詳細については、「ApsaraDB for RDS for MySQL インスタンスの作成」をご参照ください。

    説明

    この例では、Dataflow クラスタは EMR V3.42.0、StarRocks クラスタは EMR V5.8.0、ApsaraDB RDS for MySQL インスタンスの MySQL バージョンは 5.7 です。

制限事項

  • Dataflowクラスター、StarRocksクラスター、およびApsaraDB RDS for MySQLインスタンスは、同じ仮想プライベートクラウド(VPC)内の同じゾーンにデプロイする必要があります。

  • DataflowクラスターとStarRocksクラスターは、インターネット経由でアクセスできる必要があります。

  • ApsaraDB RDS for MySQLインスタンスのMySQLバージョンは 5.7 以降である必要があります。

注意事項

このトピックはテスト目的でのみ提供されています。本番環境でFlinkジョブを実行するには、Alibaba Cloud Realtime Compute for Apache Flinkを使用してFlinkジョブを構成するか、YARNまたはKubernetesを使用してジョブを送信します。

詳細については、「Apache Hadoop YARN」および「Native Kubernetes」をご参照ください。

概要

ベクトル化、コストベースオプティマイザー(CBO)、単一ノードマルチコアスケジューリングなどのテクノロジーの適用により、StarRocksの計算能力が向上しています。StarRocksを使用してデータウェアハウスに階層モデリングを適用する場合、ほとんどのデータはデータウェアハウス詳細(DWD)層またはデータウェアハウスサマリー(DWS)層に書き込まれます。実際のビジネスシナリオでは、StarRocksを使用してDWD層またはDWS層のデータをクエリできます。また、StarRocksを使用して、柔軟でインタラクティブな方法でアドホッククエリを実行することもできます。

アーキテクチャ

このソリューションは、次の手順で構成されています。

  1. Flinkを使用してKafkaにインポートされたログをクレンジするか、Flink Change Data Capture(CDC)コネクタとFlink StarRocksコネクタを使用してMySQLからバイナリログを取得し、バイナリログをStarRocksにインポートします。ビジネス要件に基づいてデータモデルを選択し、Operational Data Store(ODS)層にデータを書き込むことができます。データモデルには、詳細モデル、集計モデル、更新モデル、および主キーモデルが含まれます。

  2. StarRocksビューを使用して、ODS層からDWD層およびDWS層にデータを転送できます。複数テーブル結合や入れ子になったサブクエリなどの複雑なSQLステートメントに、StarRocksのベクトル化とCBOを使用できます。メトリックのロールアップとドリルダウンの深さがソースデータの深さと同じになるように、クエリ中にメトリックがオンサイトで計算されます。

機能

計算はStarRocksで実行されます。このソリューションは、ビジネスシステムでデータが頻繁に更新されるシナリオに適しています。エンティティデータは、ODS層またはDWD層にのみ保存されます。

  • メリット

    • ビジネスロジックに基づいてビューを柔軟に調整できます。

    • メトリックを簡単に変更できます。 DWD層とDWS層は、ビューのロジックに基づいてカプセル化されています。ソーステーブルデータを更新するだけで済みます。

  • デメリット

    クエリのパフォーマンスは、ビューのロジックとデータ量の影響を受けます。ビューのロジックが複雑でデータ量が多い場合、クエリのパフォーマンスは低下します。

  • シナリオ

    • データはデータベースとイベント追跡システムから発信され、高い柔軟性が要求され、1秒あたりのクエリ数(QPS)を大量に処理する機能は要求されず、計算リソースが十分にあります。

    • 高いリアルタイムパフォーマンスが要求されます。データがStarRocksに書き込まれた直後にデータがクエリに利用できる必要があり、データの更新をリアルタイムでStarRocksに同期する必要があります。アドホッククエリが必要であり、リソースが十分にあり、クエリの複雑さが低い。

手順

次の手順を実行します。

  1. 手順 1: ソース MySQL テーブルを作成する

  2. 手順 2: StarRocksクラスターにテーブルを作成する

  3. 手順 3: Flinkジョブを実行してデータストリームを開始する

  4. 手順 4: アドホッククエリソリューションをテストする

手順 1: ソース MySQL テーブルを作成する

  1. テストデータベースとテストアカウントを作成します。アカウントとデータベースの作成方法については、「アカウントとデータベースの作成」をご参照ください。

    テストデータベースとアカウントを作成した後、データベースに対する読み取りおよび書き込み権限をアカウントに付与します。

    説明

    この例では、flink_cdcという名前のデータベースとemr_testという名前のアカウントが作成されます。

  2. テストアカウントを使用して、ApsaraDB RDS for MySQLインスタンスにログオンします。詳細については、「DMSを使用してApsaraDB RDS for MySQLインスタンスにログオンする」をご参照ください。

  3. 次のステートメントを実行して、ordersという名前のテーブルを作成します。

    CREATE TABLE flink_cdc.orders (
        order_id INT NOT NULL AUTO_INCREMENT,
        order_revenue FLOAT NOT NULL,
        order_region VARCHAR(40) NOT NULL,
        customer_id INT NOT NULL,
        PRIMARY KEY (order_id)
    );
  4. 次のステートメントを実行して、customersという名前のテーブルを作成します。

    CREATE TABLE flink_cdc.customers (
        customer_id INT NOT NULL,
        customer_age INT NOT NULL,
        customer_name VARCHAR(40) NOT NULL,
        PRIMARY KEY (customer_id)
    );

手順 2: StarRocksクラスターにテーブルを作成する

  1. SSHモードでStarRocksクラスターにログオンします。詳細については、「クラスターにログオンする」をご参照ください。

  2. 次のコマンドを実行して、StarRocksクラスターに接続します。

    mysql -h127.0.0.1 -P 9030 -uroot
  3. 次のステートメントを実行して、データベースを作成します。

    CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  4. 次のステートメントを実行して、customersという名前のテーブルを作成します。

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
      `customer_id` INT NOT NULL  COMMENT "",  // 顧客ID
      `customer_age` FLOAT NOT NULL  COMMENT "", // 顧客の年齢
      `customer_name` STRING NOT NULL  COMMENT "" // 顧客名
    ) ENGINE=olap
    PRIMARY KEY(`customer_id`)
    COMMENT "" // 顧客テーブル
    DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  5. 次のステートメントを実行して、ordersという名前のテーブルを作成します。

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
      `order_id` INT NOT NULL  COMMENT "", // 注文ID
      `order_revenue` FLOAT NOT NULL  COMMENT "", // 注文収益
      `order_region` STRING NOT NULL  COMMENT "", // 注文地域
      `customer_id` INT NOT NULL  COMMENT "" // 顧客ID
    ) ENGINE=olap
    PRIMARY KEY(`order_id`)
    COMMENT "" // 注文テーブル
    DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  6. 次のステートメントを実行して、ODSテーブルに基づいてDWDビューを作成します。

    CREATE VIEW flink_cdc.dwd_order_customer_valid (
      order_id,  // 注文ID
      order_revenue, // 注文収益
      order_region, // 注文地域
      customer_id, // 顧客ID
      customer_age, // 顧客の年齢
      customer_name // 顧客名
    )
    AS
    SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name
    FROM flink_cdc.customers c JOIN flink_cdc.orders o
    ON c.customer_id=o.customer_id
    WHERE c.customer_id != -1;
  7. 次のステートメントを実行して、DWDテーブルに基づいてDWSビューを作成します。

    CREATE VIEW flink_cdc.dws_agg_by_region (
      order_region, // 注文地域
      order_cnt, // 注文数
      order_total_revenue // 注文合計収益
    )
    AS
    SELECT order_region, count(order_region), sum(order_revenue)
    FROM flink_cdc.dwd_order_customer_valid
    GROUP BY order_region;

手順 3: Flinkジョブを実行してデータストリームを開始する

  1. CDCコネクタFlink StarRocksコネクタのパッケージをダウンロードし、Dataflowクラスターの/opt/apps/FLINK/flink-current/libディレクトリにアップロードします。

  2. SSHモードでDataflowクラスターにログオンします。詳細については、「クラスターにログオンする」をご参照ください。

  3. ポートを追加し、並列ジョブ実行のスロット数を変更します。

    1. 次のコマンドを実行して、flink-conf.yamlファイルを開きます。

      vim /etc/taihao-apps/flink-conf/flink-conf.yaml
    2. ファイルの末尾に次のコンテンツを追加します。

      rest.port: 8083
    3. taskmanager.numberOfTaskSlots パラメーターの値を 3 に変更します。

      説明

      taskmanager.numberOfTaskSlots パラメーターのデフォルト値は 1 です。これは、TaskManager で実行できるジョブが 1 つだけであることを示します。後続の操作では、demo.sql ファイルには 2 つのジョブが含まれています。ジョブが TaskManager で並列に実行できるようにするには、taskmanager.numberOfTaskSlots パラメーターを少なくとも 2 に設定することをお勧めします。

  4. 次のコマンドを実行して、Dataflowクラスターを起動します。

    重要

    このトピックの例は、テスト目的でのみ提供されています。本番環境でFlinkジョブを実行するには、YARNまたはKubernetesを使用してジョブを送信します。詳細については、「Apache Hadoop YARN」および「Native Kubernetes」をご参照ください。

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. Flink SQLジョブのコードを記述し、demo.sqlファイルとして保存します。

    次のコマンドを実行して、demo.sqlファイルを開きます。ビジネス要件に基づいてファイルを編集します。

    vim demo.sql

    次の例は、ファイル内のコードを示しています。

    CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` (
      `customer_id` INT NOT NULL, // 顧客ID
      `customer_age` FLOAT NOT NULL, // 顧客の年齢
      `customer_name` STRING NOT NULL, // 顧客名
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` (
      `customer_id` INT NOT NULL, // 顧客ID
      `customer_age` FLOAT NOT NULL, // 顧客の年齢
      `customer_name` STRING NOT NULL, // 顧客名
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'load-url' = '10.0.**.**:8030',
      'database-name' = 'flink_cdc',
      'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'root',
      'table-name' = 'customers',
      'sink.properties.strip_outer_array' = 'true',
      'password' = '',
      'sink.max-retries' = '10',
      'connector' = 'starrocks'
    );
    INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`customers_src`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src` (
      `order_id` INT NOT NULL, // 注文ID
      `order_revenue` FLOAT NOT NULL, // 注文収益
      `order_region` STRING NOT NULL, // 注文地域
      `customer_id` INT NOT NULL, // 顧客ID
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'database-name' = 'flink_cdc',
      'table-name' = 'orders',
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` (
      `order_id` INT NOT NULL, // 注文ID
      `order_revenue` FLOAT NOT NULL, // 注文収益
      `order_region` STRING NOT NULL, // 注文地域
      `customer_id` INT NOT NULL, // 顧客ID
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'sink.properties.strip_outer_array' = 'true',
      'password' = '',
      'sink.max-retries' = '10',
      'connector' = 'starrocks',
      'table-name' = 'orders',
      'jdbc-url' = 'jdbc:mysql://10.0.**.**:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'root',
      'load-url' = '10.0.**.**:8030',
      'database-name' = 'flink_cdc'
    );
    
    INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`orders_src`;

    次の表に、コードに含まれるパラメーターを示します。

    • customers_srcテーブルの作成に使用されるパラメーター

      パラメーター

      説明

      connector

      mysql-cdcに設定します。

      hostname

      ApsaraDB RDS for MySQLインスタンスの内部エンドポイント。

      ApsaraDB RDSコンソールのApsaraDB RDS for MySQLインスタンスの [データベース接続] ページで内部エンドポイントをコピーできます。例: rm-2ze8398257383****.mysql.rds.aliyuncs.com。

      port

      3306に設定します。

      username

      手順 1: ソース MySQL テーブルを作成するで作成されたアカウントの名前。この例では、emr_testが使用されます。

      password

      手順 1: ソース MySQL テーブルを作成するで作成されたアカウントのパスワード。この例では、Yz12****が使用されます。

      database-name

      手順 1: ソース MySQL テーブルを作成するで作成されたデータベースの名前。この例では、flink_cdcが使用されます。

      table-name

      手順 1: ソース MySQL テーブルを作成するで作成されたテーブルの名前。この例では、customersが使用されます。

    • customers_sinkテーブルとorders_sinkテーブルの作成に使用されるパラメーター

      パラメーター

      説明

      load-url

      フロントエンドのIPアドレスとHTTPポート。StarRocksクラスターの内部IPアドレス:8030の形式。この例では、ポート 8030 が使用されています。クラスターのバージョンに基づいてポートを選択します。

      • 18030: EMR V5.9.0以降のマイナーバージョンのクラスター、およびEMR V3.43.0以降のマイナーバージョンのクラスターの場合は、このポートを選択します。

      • 8030: EMR V5.8.0、EMR V3.42.0、またはEMR V5.8.0またはEMR V3.42.0より前のマイナーバージョンのクラスターの場合は、このポートを選択します。

      説明

      詳細については、「UIとポートへのアクセス」をご参照ください。

      database-name

      手順 1: ソース MySQL テーブルを作成するで作成されたデータベースの名前。この例では、flink_cdcが使用されます。

      jdbc-url

      StarRocksに接続し、StarRocksでクエリを実行するために使用されるJavaDatabase Connectivity(JDBC)URL。

      例: jdbc:mysql://10.0.**.**:9030。この例では、10.0.**.**はStarRocksクラスターの内部IPアドレスです。

      username

      StarRocksクラスターへの接続に使用されるユーザー名。rootに設定します。

      table-name

      テーブルの名前。この例では、customersに設定します。

      connector

      コネクタのタイプ。starrocksに設定します。

  6. 次のコマンドを実行して、Flinkジョブを開始します。

     /opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql

手順 4: アドホッククエリソリューションをテストする

  1. 手順 1: ソース MySQL テーブルを作成するで作成したテストアカウントを使用して、ApsaraDB RDS for MySQLインスタンスにログオンします。詳細については、「DMSを使用してApsaraDB RDS for MySQLインスタンスにログオンする」をご参照ください。

  2. ApsaraDB RDS for MySQLデータベースの [SQLコンソール] タブで、次のステートメントを実行して、ordersテーブルとcustomersテーブルにデータを挿入します。

    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
    INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
  3. SSHモードでStarRocksクラスターにログオンします。詳細については、「クラスターにログオンする」をご参照ください。

  4. 次のコマンドを実行して、StarRocksクラスターに接続します。

    mysql -h127.0.0.1 -P 9030 -uroot
  5. ODS層のデータをクエリします。

    1. 次のステートメントを実行して、ordersテーブルからデータをクエリします。

      SELECT * FROM flink_cdc.orders;

      次の出力が返されます。

      +----------+---------------+--------------+-------------+
      | order_id | order_revenue | order_region | customer_id |
      +----------+---------------+--------------+-------------+
      |        1 |            10 | beijing      |           1 |
      |        2 |            10 | beijing      |           1 |
      +----------+---------------+--------------+-------------+
    2. 次のステートメントを実行して、customersテーブルからデータをクエリします。

      SELECT * FROM flink_cdc.customers;

      次の出力が返されます。

      +-------------+--------------+---------------+
      | customer_id | customer_age | customer_name |
      +-------------+--------------+---------------+
      |           1 |           22 | emr_test      |
      +-------------+--------------+---------------+
  6. 次のステートメントを実行して、DWD層のデータをクエリします。

    SELECT * FROM flink_cdc.dwd_order_customer_valid;

    次の出力が返されます。

    +----------+---------------+--------------+-------------+--------------+---------------+
    | order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
    +----------+---------------+--------------+-------------+--------------+---------------+
    |        1 |            10 | beijing      |           1 |           22 | emr_test      |
    |        2 |            10 | beijing      |           1 |           22 | emr_test      |
    +----------+---------------+--------------+-------------+--------------+---------------+
    2 rows in set (0.00 sec)
  7. 次のステートメントを実行して、DWS層のデータをクエリします。

    SELECT * FROM flink_cdc.dws_agg_by_region;

    次の出力が返されます。

    +--------------+-----------+---------------------+
    | order_region | order_cnt | order_total_revenue |
    +--------------+-----------+---------------------+
    | beijing      |         2 |                  20 |
    +--------------+-----------+---------------------+
    1 row in set (0.01 sec)