このトピックでは、Realtime Compute for Apache Flink を使用して MySQL からの注文情報と乳児情報をリアルタイムで処理し、ワイドテーブルを構築し、結果を Elasticsearch に書き込む方法について説明します。その後、Kibana を使用してグループ化と集約を実行し、データをダッシュボードに表示して、注文量と乳児の出生数の間の潜在的な関係を明らかにします。
背景情報
「二人っ子政策」の実施と可処分所得の着実な増加に伴い、中国の母子消費市場は黄金時代を迎えています。同時に、国民消費のアップグレードと 1990 年代生まれの親の台頭が、消費需要と哲学の大きな変化を推進しています。Roland Berger の最近のレポートによると、母子産業は 2020 年までに市場規模が 3.6 兆人民元に達し、2016 年から 2020 年までの年平均成長率 (CAGR) は 17% になると予測されていました。これは、広大な成長の見通しを示しています。このような背景の中、母子人口の消費行動はどのようなものでしょうか?どの品目が彼らの支出を占めているのでしょうか?
このシナリオでは、注文情報と乳児情報の両方が MySQL データベースに保存されます。分析を簡素化するために、注文テーブルは乳児情報と結合されて、詳細なワイドテーブルが構築されます。次に、Realtime Compute for Apache Flink がデータをリアルタイムで Elasticsearch に書き込みます。その後、Kibana を使用してグループ化、集約、および動的なダッシュボードの可視化を行い、注文量と乳児の出生数の間の潜在的な関係を明らかにすることができます。
前提条件
Flink ワークスペースを作成済みであること。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
ApsaraDB RDS for MySQL インスタンス、データベース、およびアカウントを作成済みであること。詳細については、「ApsaraDB RDS for MySQL インスタンスをすばやく作成する」および「データベースとアカウントを作成する」をご参照ください。
Elasticsearch インスタンスを作成済みであること。このトピックでは、バージョン 8.17.0 を例として使用します。詳細については、「Alibaba Cloud Elasticsearch インスタンスを作成する」をご参照ください。
重要Flink ワークスペース、ApsaraDB RDS for MySQL インスタンス、および Elasticsearch インスタンスは、同じ VPC (Virtual Private Cloud) 内にある必要があります。また、ApsaraDB RDS for MySQL および Elasticsearch インスタンスの IP アドレスホワイトリストを設定する必要があります。詳細については、「ApsaraDB RDS for MySQL インスタンスのホワイトリストを設定する」および「Elasticsearch インスタンスのホワイトリストを設定する」をご参照ください。
ステップ 1: RDS MySQL テーブルの作成とデータのインポート
この例では、3 つのデータテーブルを作成します。`orders_dataset_tmp` テーブルはデータインポート用の一時テーブルです。他の 2 つのテーブルは、淘宝の母子注文のリアルタイムクエリのソーステーブルです。
[インスタンス] ページに移動します。上部のナビゲーションバーで、RDS インスタンスが存在するリージョンを選択します。次に、RDS インスタンスを見つけて、インスタンスの ID をクリックします。
ページ上部の [データベースにログイン] をクリックします。DMS ログインページで、データベースアカウントとパスワードを入力し、[ログイン] をクリックします。
左側のナビゲーションウィンドウで、[データベースインスタンス] をクリックします。[ログイン中のインスタンス] リストで、ターゲットデータベースの名前をダブルクリックします。
右側の SQL コンソールで、次の文を入力し、[実行] をクリックしてテーブルを作成します。
create table orders_dataset_tmp( user_id bigint comment 'ユーザー ID 情報', auction_id bigint comment '購入行動 ID', cat_id bigint comment '製品カテゴリのシリアル番号', cat1 bigint comment '製品シリアル番号 (ルートカテゴリ)', property TEXT comment '製品プロパティ', buy_mount int comment '購入数量', day TEXT comment '購入日' ); create table orders_dataset( order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '注文 ID', user_id bigint comment 'ユーザー ID 情報', auction_id bigint comment '購入行動 ID', cat_id bigint comment '製品カテゴリのシリアル番号', cat1 bigint comment '製品シリアル番号 (ルートカテゴリ)', property TEXT comment '製品プロパティ', buy_mount int comment '購入数量', day TEXT comment '購入日' ); create table baby_dataset( user_id bigint NOT NULL PRIMARY KEY, birthday text comment '乳児の誕生日', gender int comment '0 は女性、1 は男性、2 は不明を表します' );データをインポートします。
E コマース乳児ユーザー ファイルを orders_dataset_tmp テーブルに、乳児情報 ファイルを baby_dataset テーブルにインポートします。
上部のメニューバーで、[データインポート] をクリックします。
インポート設定を構成します。
設定項目
説明
データベース
データベース名をあいまい検索し、ターゲットの MySQL インスタンスをクリックします。
ファイルエンコーディング
自動検出。
インポートモード
高速モード。
ファイルタイプ
CSV 形式。
ターゲットテーブル
orders_dataset_tmp または baby_dataset。
データの場所
最初の行にプロパティが含まれています。
書き込みモード
INSERT。
添付ファイル
[ファイルをアップロード] をクリックし、テーブルに対応するファイルをインポートします。
[リクエストを送信] をクリックします。ステップ 4 で、[変更を実行] をクリックします。タスク設定ウィンドウで、[今すぐ実行] を選択し、[確認して実行] をクリックします。
インポートが完了したら、次の SQL 文を実行して、注文データをソーステーブル orders_dataset にインポートします。
insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day) select * from orders_dataset_tmp;
ステップ 2: Elasticsearch での自動インデックス作成の設定
Alibaba Cloud Elasticsearch コンソールにログインします。上部のメニューバーで、リソースグループとリージョンを選択します。
[Elasticsearch インスタンス] セクションで、ターゲットインスタンスの ID をクリックします。
[基本情報] ページで、 をクリックします。

右上隅にある [設定を変更] をクリックします。[自動インデックス作成を許可] を選択し、[OK] をクリックします。
重要この操作によりインスタンスが再起動されます。続行することを確認してください。
ステップ 3: Flink SQL ストリーミングジョブの作成
Realtime Compute for Apache Flink コンソールにログインします。ターゲットワークスペースについて、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 をクリックします。
をクリックし、[新しいストリーミングジョブ] を選択します。[ファイル名] を入力し、[エンジンバージョン] を選択して、[作成] をクリックします。
ジョブパラメーター
説明
例
ファイル名
ジョブの名前。
説明ジョブ名は現在のプロジェクト内で一意である必要があります。
flink-test
エンジンバージョン
現在のジョブで使用される Flink エンジンバージョン。エンジンバージョン番号、バージョンマッピング、および重要なライフサイクル日付の詳細については、「エンジンバージョン」をご参照ください。
vvr-8.0.11-flink-1.17
Flink SQL ストリーミングジョブのコードを編集します。
次の SQL コードを SQL エディターにコピーし、パラメーター値を実際の値に置き換えます。
このコードは、2 つの MySQL テーブル (
orders_datasetとbaby_dataset) をデータソースとして定義します。これらはそれぞれ注文情報とユーザー情報を保存します。データは、2 つの Elasticsearch 結果テーブル (es_sink1とes_sink2) を介して単一のインデックス (enriched_orders_view) に書き込まれます。sink.delete-strategyをNON_PK_FIELD_TO_NULLに設定することで、このコードは Elasticsearch の部分更新機能を使用します。プライマリキーが同一の場合、非プライマリキーフィールドのみが更新され、データ整合性が確保されます。CREATE TEMPORARY TABLE orders_dataset ( `order_id` BIGINT, `user_id` bigint, `auction_id` bigint, `cat_id` bigint, `cat1` bigint, `property` varchar, `buy_mount` int, `day` varchar , PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'flinkrds***', 'password' = 'Flink***@1', 'database-name' = 'ecommerce', 'table-name' = 'orders_dataset' ); CREATE TEMPORARY TABLE baby_dataset ( `user_id` bigint, `birthday` varchar, `gender` int, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'flinkrds***', 'password' = 'Flink***@1', 'database-name' = 'ecommerce', 'table-name' = 'baby_dataset' ); CREATE TEMPORARY TABLE es_sink1( `order_id` BIGINT, `user_id` BIGINT, `buy_mount` INT, `day` VARCHAR, PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-8', 'hosts' = 'http://192.xx.xx.252:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='Flink***@1', 'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL' ); CREATE TEMPORARY TABLE es_sink2( `user_id` BIGINT, `birthday` VARCHAR, `gender` INT, PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-8', 'hosts' = 'http://192.xx.xx.252:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='Flink***@1', 'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL' ); BEGIN STATEMENT SET; INSERT INTO es_sink1 SELECT `order_id`, `user_id`, `buy_mount`, `day` FROM orders_dataset; INSERT INTO es_sink2 SELECT `user_id`, `birthday`, `gender` FROM baby_dataset; END;ストレージクラス
パラメーター
必須
説明
MySQL
connector
はい
テーブルのタイプ。値は
mysqlに固定されています。hostname
はい
MySQL データベースの IP アドレスまたはホスト名。VPC アドレスを使用します。
port
いいえ
MySQL データベースサービスのポート番号。
username
はい
MySQL データベースサービスのユーザー名。
password
はい
MySQL データベースサービスのパスワード。
database-name
はい
MySQL データベースの名前。
table-name
はい
MySQL テーブルの名前。
Elasticsearch
connector
はい
結果テーブルのタイプ。
hosts
はい
Elasticsearch エンドポイント。
フォーマットは
http://host_name:portです。index
はい
インデックス名。
この例では、値は enriched_orders_view です。
[デプロイ] をクリックします。
[ジョブ O&M] ページで、[ステートレス開始] を選択し、[開始] をクリックします。
ステップ 4: Elasticsearch コンソールでデータ結果を表示する
Elasticsearch で enriched_orders_view インデックスが作成された後、次の手順に従って書き込まれたデータを表示します。
1. 準備
Elasticsearch インスタンスを再起動します。
表示されたページで、 を選択します。Kibana セクションで、[インターネットエンドポイント] をクリックし、ユーザー名とパスワードを入力します。
Kibana コンソールのデフォルトのユーザー名は elastic です。パスワードは、Alibaba Cloud Elasticsearch インスタンスを作成したときに設定したものです。

データフィールドのデータの型を処理します。
後でヒストグラムを使用するには、
dayフィールドのデータの型を text から date に変換する必要があります。次のコマンドは で実行できます。enriched_orders_view_newなどの新しいインデックスを作成し、そのマッピングを定義します。dayフィールドのタイプをdateに設定し、他のフィールドのマッピング構造は維持します。PUT enriched_orders_view_new { "mappings": { "properties": { "birthday": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } }, "fielddata": true }, "buy_mount": { "type": "long" }, "day": { "type": "date", "format": "yyyy-MM-dd" // 生データとの整合性を確保するために日付形式を指定します。 }, "gender": { "type": "long" }, "order_id": { "type": "long" }, "user_id": { "type": "long" } } } }_reindexAPI を使用して、元のインデックスから新しいインデックスにデータをコピーします。このプロセス中に、dayフィールドの値を日付形式に変換します。POST _reindex { "source": { "index": "enriched_orders_view" }, "dest": { "index": "enriched_orders_view_new" }, "script": { "source": """ if (ctx._source['day'] != null) { // 日付を 'yyyyMMdd' 形式から 'yyyy-MM-dd' 形式に変換します。 def originalDate = ctx._source['day']; if (originalDate.length() == 8) { ctx._source['day'] = originalDate.substring(0, 4) + '-' + originalDate.substring(4, 6) + '-' + originalDate.substring(6, 8); } else { ctx.op = 'noop'; // 形式が正しくない場合は、ドキュメントをスキップします。 } } """ } }新しいインデックスの day フィールドが
yyyy-MM-ddなどの正しいデータ形式に変換されていることを確認します。GET enriched_orders_view_new/_search { "size": 10 }
データビューを作成します。
左側のナビゲーションウィンドウで、[Discover] をクリックします。

[データビューを作成] をクリックします。名前を入力します。[インデックスパターン] を
enriched_orders_view_newに、[タイムスタンプフィールド] を [day] に設定します。[データビューを Kibana に保存] をクリックします。
2. データ書き込みステータスの表示
ページの左上隅で、 をクリックします。
作成したデータビューに切り替えます。
[全時間範囲を検索] をクリックします。

データ書き込みステータスを表示します。

3. 可視化チャートの設定
day フィールドをクリックし、次に [可視化] をクリックします。

ページの右側で、縦棒チャートの水平軸と垂直軸を設定します。
1 つの軸を設定した後、[閉じる] をクリックして、もう一方の軸を設定します。
設定項目
設定の説明
図
水平軸
[関数] を [日付ヒストグラム] に設定します
[フィールド] を [day] に設定します
[名前] を [year_month] に設定します

垂直軸
[関数] を [合計] に設定します
[フィールド] を [buy_mount] に設定します
[名前] を [buy_num] に設定します
[サイド] を [左] に設定します

ページの右側で、線チャートの水平軸と垂直軸を設定します。
右下隅で、[レイヤーを追加] をクリックします。可視化タイプとして [線] を選択します。次に、水平軸と垂直軸を設定します。1 つの軸を設定し、[閉じる] をクリックしてから、もう一方の軸を設定します。
設定項目
設定の説明
図
水平軸
[関数] を [日付ヒストグラム] に設定します
[フィールド] を [day] に設定します
[名前] を [year_month] に設定します

垂直軸
[関数] を [カウント] に設定します
[フィールド] を [birthday] に設定します
[名前] を [baby_num] に設定します
[サイド] を [右] に設定します

4. 可視化結果の保存と表示
複合線グラフと縦棒グラフを保存するには、ページの右上隅にある [保存] をクリックします。

参考
Elasticsearch コネクタの構文、WITH パラメーター、および使用例の詳細については、「Elasticsearch」をご参照ください。
ApsaraDB RDS for MySQL コネクタの構文、WITH パラメーター、および使用例の詳細については、「ApsaraDB RDS for MySQL」をご参照ください。