すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:淘宝の母子注文のリアルタイムクエリとダッシュボード

最終更新日:Nov 10, 2025

このトピックでは、Realtime Compute for Apache Flink を使用して MySQL からの注文情報と乳児情報をリアルタイムで処理し、ワイドテーブルを構築し、結果を Elasticsearch に書き込む方法について説明します。その後、Kibana を使用してグループ化と集約を実行し、データをダッシュボードに表示して、注文量と乳児の出生数の間の潜在的な関係を明らかにします。

背景情報

「二人っ子政策」の実施と可処分所得の着実な増加に伴い、中国の母子消費市場は黄金時代を迎えています。同時に、国民消費のアップグレードと 1990 年代生まれの親の台頭が、消費需要と哲学の大きな変化を推進しています。Roland Berger の最近のレポートによると、母子産業は 2020 年までに市場規模が 3.6 兆人民元に達し、2016 年から 2020 年までの年平均成長率 (CAGR) は 17% になると予測されていました。これは、広大な成長の見通しを示しています。このような背景の中、母子人口の消費行動はどのようなものでしょうか?どの品目が彼らの支出を占めているのでしょうか?

このシナリオでは、注文情報と乳児情報の両方が MySQL データベースに保存されます。分析を簡素化するために、注文テーブルは乳児情報と結合されて、詳細なワイドテーブルが構築されます。次に、Realtime Compute for Apache Flink がデータをリアルタイムで Elasticsearch に書き込みます。その後、Kibana を使用してグループ化、集約、および動的なダッシュボードの可視化を行い、注文量と乳児の出生数の間の潜在的な関係を明らかにすることができます。

前提条件

ステップ 1: RDS MySQL テーブルの作成とデータのインポート

この例では、3 つのデータテーブルを作成します。`orders_dataset_tmp` テーブルはデータインポート用の一時テーブルです。他の 2 つのテーブルは、淘宝の母子注文のリアルタイムクエリのソーステーブルです。

  1. [インスタンス] ページに移動します。上部のナビゲーションバーで、RDS インスタンスが存在するリージョンを選択します。次に、RDS インスタンスを見つけて、インスタンスの ID をクリックします。

  2. ページ上部の [データベースにログイン] をクリックします。DMS ログインページで、データベースアカウントとパスワードを入力し、[ログイン] をクリックします。

  3. 左側のナビゲーションウィンドウで、[データベースインスタンス] をクリックします。[ログイン中のインスタンス] リストで、ターゲットデータベースの名前をダブルクリックします。

  4. 右側の SQL コンソールで、次の文を入力し、[実行] をクリックしてテーブルを作成します。

    create table orders_dataset_tmp(
        user_id bigint comment 'ユーザー ID 情報',            
        auction_id bigint comment '購入行動 ID',        
        cat_id bigint comment '製品カテゴリのシリアル番号',            
        cat1 bigint comment '製品シリアル番号 (ルートカテゴリ)',                
        property TEXT comment '製品プロパティ',            
        buy_mount int comment '購入数量',            
        day TEXT comment '購入日'                
    );
    
    create table orders_dataset(
        order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '注文 ID',
        user_id bigint comment 'ユーザー ID 情報',            
        auction_id bigint comment '購入行動 ID',        
        cat_id bigint comment '製品カテゴリのシリアル番号',            
        cat1 bigint comment '製品シリアル番号 (ルートカテゴリ)',                
        property TEXT comment '製品プロパティ',            
        buy_mount int comment '購入数量',            
        day TEXT comment '購入日'                
    );
    
    
    create table baby_dataset(
        user_id bigint NOT NULL PRIMARY KEY,    
        birthday text comment '乳児の誕生日',
        gender int comment '0 は女性、1 は男性、2 は不明を表します'
    );
  5. データをインポートします。

    E コマース乳児ユーザー ファイルを orders_dataset_tmp テーブルに、乳児情報 ファイルを baby_dataset テーブルにインポートします。

    1. 上部のメニューバーで、[データインポート] をクリックします。

    2. インポート設定を構成します。

      設定項目

      説明

      データベース

      データベース名をあいまい検索し、ターゲットの MySQL インスタンスをクリックします。

      ファイルエンコーディング

      自動検出。

      インポートモード

      高速モード。

      ファイルタイプ

      CSV 形式。

      ターゲットテーブル

      orders_dataset_tmp または baby_dataset。

      データの場所

      最初の行にプロパティが含まれています。

      書き込みモード

      INSERT。

      添付ファイル

      [ファイルをアップロード] をクリックし、テーブルに対応するファイルをインポートします。

    3. [リクエストを送信] をクリックします。ステップ 4 で、[変更を実行] をクリックします。タスク設定ウィンドウで、[今すぐ実行] を選択し、[確認して実行] をクリックします。

  6. インポートが完了したら、次の SQL 文を実行して、注文データをソーステーブル orders_dataset にインポートします。

    insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day)
    select * from orders_dataset_tmp;

ステップ 2: Elasticsearch での自動インデックス作成の設定

  1. Alibaba Cloud Elasticsearch コンソールにログインします。上部のメニューバーで、リソースグループとリージョンを選択します。

  2. [Elasticsearch インスタンス] セクションで、ターゲットインスタンスの ID をクリックします。

  3. [基本情報] ページで、[設定と管理] > [ES クラスター設定] をクリックします。

    image

  4. 右上隅にある [設定を変更] をクリックします。[自動インデックス作成を許可] を選択し、[OK] をクリックします。

    image

    重要

    この操作によりインスタンスが再起動されます。続行することを確認してください。

ステップ 3: Flink SQL ストリーミングジョブの作成

  1. Realtime Compute for Apache Flink コンソールにログインします。ターゲットワークスペースについて、[アクション] 列の [コンソール] をクリックします。

  2. 左側のナビゲーションウィンドウで、[データ開発] > [ETL] をクリックします。

  3. image をクリックし、[新しいストリーミングジョブ] を選択します。[ファイル名] を入力し、[エンジンバージョン] を選択して、[作成] をクリックします。

    image

    ジョブパラメーター

    説明

    ファイル名

    ジョブの名前。

    説明

    ジョブ名は現在のプロジェクト内で一意である必要があります。

    flink-test

    エンジンバージョン

    現在のジョブで使用される Flink エンジンバージョン。エンジンバージョン番号、バージョンマッピング、および重要なライフサイクル日付の詳細については、「エンジンバージョン」をご参照ください。

    vvr-8.0.11-flink-1.17

  4. Flink SQL ストリーミングジョブのコードを編集します。

    次の SQL コードを SQL エディターにコピーし、パラメーター値を実際の値に置き換えます。

    このコードは、2 つの MySQL テーブル (orders_datasetbaby_dataset) をデータソースとして定義します。これらはそれぞれ注文情報とユーザー情報を保存します。データは、2 つの Elasticsearch 結果テーブル (es_sink1es_sink2) を介して単一のインデックス (enriched_orders_view) に書き込まれます。sink.delete-strategyNON_PK_FIELD_TO_NULL に設定することで、このコードは Elasticsearch の部分更新機能を使用します。プライマリキーが同一の場合、非プライマリキーフィールドのみが更新され、データ整合性が確保されます。

    CREATE TEMPORARY TABLE orders_dataset (
      `order_id` BIGINT,
      `user_id` bigint,            
      `auction_id` bigint,        
      `cat_id` bigint,            
      `cat1` bigint,                
      `property` varchar,            
      `buy_mount` int,            
      `day` varchar    ,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'flinkrds***',
      'password' = 'Flink***@1',
      'database-name' = 'ecommerce',
      'table-name' = 'orders_dataset'
    );
    
    CREATE TEMPORARY TABLE baby_dataset (
      `user_id` bigint,
      `birthday` varchar,
      `gender` int,
      PRIMARY KEY(user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'flinkrds***',
      'password' = 'Flink***@1',
      'database-name' = 'ecommerce',
      'table-name' = 'baby_dataset'
    );
    
    
    CREATE TEMPORARY TABLE es_sink1(
      `order_id` BIGINT,
      `user_id` BIGINT,
      `buy_mount` INT,
      `day` VARCHAR,
      PRIMARY KEY(`user_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-8',
      'hosts' = 'http://192.xx.xx.252:9200',
      'index' = 'enriched_orders_view',
      'username' ='elastic',
      'password' ='Flink***@1',
      'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL'
    );
    
    
    CREATE TEMPORARY TABLE es_sink2(
      `user_id` BIGINT,
      `birthday` VARCHAR,
      `gender` INT,
      PRIMARY KEY(`user_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-8',
      'hosts' = 'http://192.xx.xx.252:9200',
      'index' = 'enriched_orders_view',
      'username' ='elastic',
      'password' ='Flink***@1',
      'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL'
    );
    
    BEGIN STATEMENT SET;   
    INSERT INTO es_sink1
    SELECT 
        `order_id`,
        `user_id`,
        `buy_mount`,
        `day`
    FROM orders_dataset;
    
    
    INSERT INTO es_sink2
    SELECT 
        `user_id`,
        `birthday`,
        `gender`
    FROM baby_dataset;
    END;     

    ストレージクラス

    パラメーター

    必須

    説明

    MySQL

    connector

    はい

    テーブルのタイプ。値は mysql に固定されています。

    hostname

    はい

    MySQL データベースの IP アドレスまたはホスト名。VPC アドレスを使用します。

    port

    いいえ

    MySQL データベースサービスのポート番号。

    username

    はい

    MySQL データベースサービスのユーザー名。

    password

    はい

    MySQL データベースサービスのパスワード。

    database-name

    はい

    MySQL データベースの名前。

    table-name

    はい

    MySQL テーブルの名前。

    Elasticsearch

    connector

    はい

    結果テーブルのタイプ。

    hosts

    はい

    Elasticsearch エンドポイント。

    フォーマットは http://host_name:port です。

    index

    はい

    インデックス名。

    この例では、値は enriched_orders_view です。

  5. [デプロイ] をクリックします。

  6. [ジョブ O&M] ページで、[ステートレス開始] を選択し、[開始] をクリックします。

ステップ 4: Elasticsearch コンソールでデータ結果を表示する

Elasticsearch で enriched_orders_view インデックスが作成された後、次の手順に従って書き込まれたデータを表示します。

1. 準備

  1. Kibana を使用してクラスターに接続します

  2. Elasticsearch インスタンスを再起動します。

  3. 表示されたページで、[設定と管理] > [可視化コントロール] を選択します。Kibana セクションで、[インターネットエンドポイント] をクリックし、ユーザー名とパスワードを入力します。

    Kibana コンソールのデフォルトのユーザー名は elastic です。パスワードは、Alibaba Cloud Elasticsearch インスタンスを作成したときに設定したものです。

    image

  4. データフィールドのデータの型を処理します。

    後でヒストグラムを使用するには、day フィールドのデータの型を text から date に変換する必要があります。次のコマンドは [管理] > [開発ツール] で実行できます。

    1. enriched_orders_view_new などの新しいインデックスを作成し、そのマッピングを定義します。

      day フィールドのタイプを date に設定し、他のフィールドのマッピング構造は維持します。

      PUT enriched_orders_view_new
      {
        "mappings": {
          "properties": {
            "birthday": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              },
              "fielddata": true
            },
            "buy_mount": {
              "type": "long"
            },
            "day": {
              "type": "date",
              "format": "yyyy-MM-dd" // 生データとの整合性を確保するために日付形式を指定します。
            },
            "gender": {
              "type": "long"
            },
            "order_id": {
              "type": "long"
            },
            "user_id": {
              "type": "long"
            }
          }
        }
      }
      
    2. _reindex API を使用して、元のインデックスから新しいインデックスにデータをコピーします。このプロセス中に、day フィールドの値を日付形式に変換します。

      POST _reindex
      {
        "source": {
          "index": "enriched_orders_view"
        },
        "dest": {
          "index": "enriched_orders_view_new"
        },
        "script": {
          "source": """
            if (ctx._source['day'] != null) {
              // 日付を 'yyyyMMdd' 形式から 'yyyy-MM-dd' 形式に変換します。
              def originalDate = ctx._source['day'];
              if (originalDate.length() == 8) {
                ctx._source['day'] = originalDate.substring(0, 4) + '-' + originalDate.substring(4, 6) + '-' + originalDate.substring(6, 8);
              } else {
                ctx.op = 'noop'; // 形式が正しくない場合は、ドキュメントをスキップします。
              }
            }
          """
        }
      }
      
      
    3. 新しいインデックスの day フィールドが yyyy-MM-dd などの正しいデータ形式に変換されていることを確認します。

      GET enriched_orders_view_new/_search
      {
        "size": 10
      }
  5. データビューを作成します。

    1. 左側のナビゲーションウィンドウで、[Discover] をクリックします。

      image

    2. [データビューを作成] をクリックします。名前を入力します。[インデックスパターン] を enriched_orders_view_new に、[タイムスタンプフィールド] を [day] に設定します。[データビューを Kibana に保存] をクリックします。

      image

2. データ書き込みステータスの表示

  1. ページの左上隅で、[分析] > [Discover] をクリックします。

  2. 作成したデータビューに切り替えます。

  3. [全時間範囲を検索] をクリックします。

    image

  4. データ書き込みステータスを表示します。

    image

3. 可視化チャートの設定

  1. day フィールドをクリックし、次に [可視化] をクリックします。

    image

  2. ページの右側で、縦棒チャートの水平軸と垂直軸を設定します。

    1 つの軸を設定した後、[閉じる] をクリックして、もう一方の軸を設定します。

    設定項目

    設定の説明

    水平軸

    • [関数] を [日付ヒストグラム] に設定します

    • [フィールド] を [day] に設定します

    • [名前] を [year_month] に設定します

    image

    垂直軸

    • [関数] を [合計] に設定します

    • [フィールド] を [buy_mount] に設定します

    • [名前] を [buy_num] に設定します

    • [サイド] を [左] に設定します

    image

  3. ページの右側で、チャートの水平軸と垂直軸を設定します。

    右下隅で、[レイヤーを追加] をクリックします。可視化タイプとして [線] を選択します。次に、水平軸と垂直軸を設定します。1 つの軸を設定し、[閉じる] をクリックしてから、もう一方の軸を設定します。

    設定項目

    設定の説明

    水平軸

    • [関数] を [日付ヒストグラム] に設定します

    • [フィールド] を [day] に設定します

    • [名前] を [year_month] に設定します

    image

    垂直軸

    • [関数] を [カウント] に設定します

    • [フィールド] を [birthday] に設定します

    • [名前] を [baby_num] に設定します

    • [サイド] を [右] に設定します

    image

4. 可視化結果の保存と表示

複合線グラフと縦棒グラフを保存するには、ページの右上隅にある [保存] をクリックします。

image

参考

  • Elasticsearch コネクタの構文、WITH パラメーター、および使用例の詳細については、「Elasticsearch」をご参照ください。

  • ApsaraDB RDS for MySQL コネクタの構文、WITH パラメーター、および使用例の詳細については、「ApsaraDB RDS for MySQL」をご参照ください。