すべてのプロダクト
Search
ドキュメントセンター

DataWorks:上級: 注文のベストセラー商品カテゴリを分析する

最終更新日:Oct 29, 2025

DataWorks は、ビッグデータ開発とガバナンスのための統一されたエンドツーエンドのプラットフォームです。MaxCompute、Hologres、EMR、AnalyticDB、CDP などのビッグデータエンジンと統合し、データウェアハウス、データレイク、データレイクハウスなどのソリューションをサポートします。このトピックでは、DataWorks のコア機能を使用して、データの統合、ビジネスロジックの処理、定期的なタスクのスケジューリング、データの可視化を行う方法の例を示します。

はじめに

このチュートリアルでは、E コマースシナリオを例として、生データの統合からデータの分析、計算、可視化まで、完全なデータパイプラインを構築する方法を示します。標準化された開発プロセスに従うことで、再利用可能なデータ生産パイプラインを迅速に構築する方法を学びます。このプロセスにより、スケジューリングの信頼性と運用の可観測性が保証されます。このアプローチにより、ビジネス担当者は深い技術知識を必要とせずにデータを価値に変換でき、企業がビッグデータアプリケーションを採用する際の障壁を下げることができます。

このチュートリアルは、次の操作を迅速に学ぶのに役立ちます:

  1. データ同期: DataWorks の Data Integration モジュールを使用してオフライン同期タスクを作成します。このタスクは、ビジネスデータを MaxCompute などのビッグデータコンピューティングプラットフォームに同期します。

  2. データクレンジング: DataWorks の DataStudio モジュールを使用して、ビジネスデータの処理、分析、マイニングを行います。

  3. データの可視化: DataWorks のデータ分析モジュールを使用して、分析結果をビジネス担当者が理解しやすいチャートに変換します。

  4. 定期的なスケジューリング: データ同期とデータクレンジングのワークフローに定期的なスケジュールを構成し、スケジュールされた時間に実行します。

image

このチュートリアルでは、公開データソースから商品と注文の生データを MaxCompute に同期します。その後、次のデータ分析ワークフローを使用して、日次ベストセラー商品カテゴリのランキングを生成します:

前提条件

このチュートリアルの手順を完了するには、AliyunDataWorksFullAccess 権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーを使用する必要があります。詳細については、「Alibaba Cloud アカウントを準備する」または「RAM ユーザーを準備する」をご参照ください。

説明

DataWorks は、製品およびモジュールレベルでの権限管理をサポートする包括的なアクセスの制御メカニズムを提供します。より詳細なアクセスの制御については、「DataWorks 権限モデルの概要」をご参照ください。

準備

DataWorks のアクティベート

このチュートリアルでは、中国 (上海) リージョンを例として使用します。DataWorks コンソールにログインし、中国 (上海) リージョンに切り替えて、このリージョンで DataWorks がアクティベートされているかどうかを確認します。

説明

このチュートリアルでは、中国 (上海) リージョンを例として使用します。実際には、ビジネスデータが配置されているリージョンを選択する必要があります:

  • ビジネスデータが他の Alibaba Cloud サービス上にある場合は、それらのサービスがデプロイされているのと同じリージョンを選択します。

  • ビジネスがオンプレミスにあり、インターネットアクセスが必要な場合は、アクセス待機時間を短縮するために、地理的に近いリージョンを選択します。

新規ユーザー

選択したリージョンで初めて DataWorks を使用する場合、次のページが表示されます。これは DataWorks がまだアクティベートされていないことを示しています。[0 元の組み合わせ購入] をクリックします。

image

  1. 組み合わせ購入ページでパラメーターを構成します。

    パラメーター

    説明

    リージョン

    DataWorks をアクティベートするリージョンを選択します。

    中国 (上海)

    DataWorks エディション

    購入する DataWorks エディションを選択します。

    説明

    このチュートリアルでは、Basic Edition を例として使用します。すべてのエディションがこのチュートリアルでカバーされている機能をサポートしています。DataWorks エディションの機能 を参照して、必要に応じて適切な DataWorks エディションを選択できます。

    Basic Edition

  2. [注文の確認と支払い] をクリックして支払いを完了します。

アクティベート済みだが期限切れ

以前に 中国 (上海) リージョンで DataWorks をアクティベートしたが、サブスクリプションの有効期限が切れている場合、次のメッセージが表示されます。[エディションの購入] をクリックします。

image

  1. 購入ページでパラメーターを構成します。

    パラメーター

    説明

    エディション

    購入する DataWorks エディションを選択します。

    説明

    このチュートリアルでは、Basic Edition を例として使用します。すべてのエディションがこのチュートリアルでカバーされている機能をサポートしています。DataWorks エディションの機能 を参照して、必要に応じて適切な DataWorks エディションを選択できます。

    Basic Edition

    リージョン

    DataWorks をアクティベートするリージョンを選択します。

    中国 (上海)

  2. [今すぐ購入] をクリックして支払いを完了します。

重要

購入した DataWorks エディションが見つからない場合は、次のことを試してください:

  • 数分待ってからページを更新して、システムの更新遅延の可能性を考慮してください。

  • 現在のリージョンが DataWorks エディションを購入したリージョンと同じであることを確認してください。リージョンが一致しないと、エディションが表示されないことがあります。

すでにアクティベート済み

中国 (上海) リージョンで DataWorks をすでにアクティベートしている場合は、DataWorks の概要ページに移動します。次のステップに進むことができます。

ワークスペースの作成

  1. DataWorks ワークスペースリストページに移動し、中国 (上海) リージョンに切り替えて、[ワークスペースの作成] をクリックします。

  2. [ワークスペースの作成] ページで、カスタムの [ワークスペース名] を入力し、[DataStudio のパブリックプレビューに参加] を有効にしてから、[ワークスペースの作成] をクリックします。

    説明

    2025 年 2 月 18 日以降、Alibaba Cloud アカウントが DataWorks をアクティベートし、初めて中国 (上海) リージョンにワークスペースを作成すると、新しいバージョンの DataStudio がデフォルトで有効になります。[DataStudio のパブリックプレビューに参加] オプションは表示されません。

リソースグループを作成してワークスペースにアタッチする

  1. DataWorks リソースグループリストページに移動し、中国 (上海) リージョンに切り替えて、[リソースグループの作成] をクリックします。

  2. リソースグループ購入ページで、次のパラメーターを構成します。

    パラメーター

    説明

    リソースグループ名

    カスタム。

    仮想プライベートクラウド (VPC)V-Switch

    既存の VPC と vSwitch を選択します。現在のリージョンで利用可能なものがない場合は、パラメーターの説明にあるコンソールリンクをクリックして作成します。

    サービスリンクロール

    画面の指示に従って AliyunServiceRoleForDataWorks サービスリンクロールを作成します。

  3. [今すぐ購入] をクリックして支払いを完了します。

  4. DataWorks リソースグループリストページに移動し、中国 (上海) リージョンに切り替え、作成したリソースグループを見つけて、[アクション] 列の [ワークスペースのアタッチ] をクリックします。

  5. [ワークスペースのアタッチ] ページで、作成した DataWorks ワークスペースを見つけ、その [アクション] 列の [アタッチ] をクリックします。

リソースグループのインターネットアクセスを有効にする

このチュートリアルで使用される E コマースプラットフォームからの公開テストデータは、インターネット経由でアクセスする必要があります。前のステップで作成した汎用リソースグループは、デフォルトではインターネットアクセスがありません。データを取得するには、インターネット NAT ゲートウェイを構成し、リソースグループにアタッチされている VPC に Elastic IP Address (EIP) を追加して、インターネットアクセスを有効にする必要があります。

  1. VPC - インターネット NAT ゲートウェイコンソールにログインします。トップメニューバーで 中国 (上海) リージョンに切り替え、Create Internet NAT Gateway をクリックします。関連するパラメーターを構成します。

    説明

    表に記載されていないパラメーターについては、デフォルト値を保持してください。

    パラメーター

    リージョン

    中国 (上海)。

    ネットワークとゾーン

    リソースグループにアタッチされている VPC と vSwitch を選択します。

    DataWorks リソースグループリストページに移動し、中国 (上海) リージョンに切り替え、作成したリソースグループを見つけて、[アクション] 列の [ネットワーク設定] をクリックします。[データスケジューリング & データ統合] エリアで、[アタッチされた VPC][VSwitch] を表示します。VPC と vSwitch の詳細については、「VPC とは」をご参照ください。

    ネットワークタイプ

    インターネット NAT ゲートウェイ。

    Elastic IP Address

    新しい Elastic IP Address を購入。

    関連付けられたロールの作成

    初めて NAT ゲートウェイを作成する場合、サービスリンクロールを作成する必要があります。[関連付けられたロールの作成] をクリックします。

  2. [今すぐ購入] をクリックして支払いを完了し、NAT ゲートウェイインスタンスを作成します。

    image

  3. NAT ゲートウェイインスタンスを購入した後、コンソールに戻り、新しいインスタンスの SNAT エントリを作成します。

    説明

    リソースグループは、その VPC に SNAT エントリが構成された後にのみインターネットにアクセスできます。

    1. 新しいインスタンスのアクション列の [管理] をクリックして、ターゲット NAT ゲートウェイインスタンスの管理ページに移動し、[SNAT 管理] タブに切り替えます

    2. [SNAT エントリリスト] で、[SNAT エントリの作成] をクリックします。主要な構成パラメーターは次のとおりです:

      パラメーター

      SNAT エントリの粒度

      VPC 粒度を選択して、NAT ゲートウェイが存在する VPC 内のすべてのリソースグループが、構成された Elastic IP Address を介してインターネットにアクセスできるようにします。

      Elastic IP Address の選択

      現在の NAT ゲートウェイインスタンスにアタッチされている Elastic IP Address を構成します。

      SNAT エントリパラメーターを構成した後、[確認] ボタンをクリックして SNAT エントリを作成します。

    [SNAT エントリリスト] で、新しく作成された SNAT エントリの [ステータス][アクティブ] に変わると、リソースグループにアタッチされた VPC はインターネットアクセスを取得します。

MaxCompute 計算資源の作成とアタッチ

このチュートリアルでは、MaxCompute プロジェクトを作成し、それを DataWorks 計算資源としてアタッチする必要があります。このリソースは、データを受信し、ビッグデータ分析を実行するために使用されます。

  1. DataWorks ワークスペースリストページに移動し、中国 (上海) リージョンに切り替え、作成したワークスペースを見つけて、ワークスペース名をクリックして [ワークスペース詳細] ページに移動します。

  2. 左側のナビゲーションウィンドウで [計算資源] をクリックします。計算資源ページで [計算資源のアタッチ] をクリックし、[MaxCompute] タイプを選択します。次の主要なパラメーターを構成して MaxCompute プロジェクトを作成し、それを DataWorks 計算資源としてアタッチします。

    説明

    表に記載されていないパラメーターについては、デフォルト値を保持してください。

    パラメーター

    説明

    MaxCompute プロジェクト

    ドロップダウンリストで [作成] をクリックし、次のパラメーターを入力します。

    • プロジェクト名: カスタムでグローバルに一意。

    • 計算資源の課金方法: [従量課金] を選択します。

      説明

      従量課金が選択できない場合は、[従量課金] の後にある [アクティベート] をクリックして MaxCompute サービスをアクティベートします。

    • デフォルトクォータ: ドロップダウンリストからデフォルトの既存クォータを選択します。

    デフォルトのアクセス ID

    [Alibaba Cloud アカウント] を選択します。

    計算資源インスタンス名

    後続のタスク実行時に、計算資源インスタンス名はタスクの計算資源を選択するために使用されます。識別しやすい名前を使用してください。たとえば、このチュートリアルでは MaxCompute_Source という名前を付けます。

  3. [確認] をクリックします。

手順

このセクションでは、次のシナリオを例として、DataWorks の機能について説明します:

E コマースプラットフォームが商品と注文情報を MySQL データベースに保存していると仮定します。定期的に注文データを分析し、日次ベストセラー商品カテゴリのランキングを視覚的な形式で表示する必要があります。

1. データ同期

データソースの作成

DataWorks は、データソース接続を使用してデータソースと宛先に接続します。このステップでは、ビジネスデータを格納するソース MySQL データベースに接続するためにMySQL データソースを作成する必要があります。このデータソースは、このチュートリアルの生データを提供します。

説明

このチュートリアルのために生のビジネスデータを準備する必要はありません。テストと学習の目的で、DataWorks はテストデータセットを提供します。テーブルデータは公開 MySQL データベースに保存されており、MySQL データソースを作成することで接続できます。

  1. DataWorks 管理センターページに移動し、中国 (上海) リージョンに切り替え、ドロップダウンリストから作成したワークスペースを選択し、[管理センターへ移動] をクリックします。

  2. 左側のナビゲーションウィンドウで [データソース] をクリックして [データソースリスト] ページに移動します。[データソースの追加] をクリックし、MySQL タイプを選択して、MySQL データソースのパラメーターを構成します。

    説明
    • 表に記載されていないパラメーターについては、デフォルト値を保持してください。

    • 初めてデータソースを追加する際には、[クロスサービス承認] を完了するように求められます。画面の指示に従って AliyunDIDefaultRole サービスリンクロールを付与します。

    パラメーター

    説明

    データソース名

    この例では MySQL_Source を使用します。

    構成モード

    [接続文字列モード] を選択します。

    接続アドレス

    • ホスト IP: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com

    • ポート: 3306

    重要

    このチュートリアルで提供されるデータは、DataWorks の実践練習専用です。すべてのデータはテストデータであり、Data Integration モジュールでのみ読み取り可能です。

    データベース名

    retail_e_commerce に設定します。

    ユーザー名

    ユーザー名 workshop を入力します。

    パスワード

    パスワード workshop#2017 を入力します。

  3. [接続構成] エリアで、[Data Integration] タブに切り替え、ワークスペースにアタッチされたリソースグループを見つけ、[接続ステータス] 列の [接続性のテスト] をクリックします。

    説明

    MySQL データソースの接続性テストが失敗した場合は、次の手順を実行してください:

    • 接続診断ツールの指示に従ってください。

    • リソースグループにアタッチされている VPC に EIP が構成されているか確認してください。MySQL データソースは、リソースグループがインターネットにアクセスできる必要があります。詳細については、「リソースグループのインターネットアクセスを有効にする」をご参照ください。

  4. [完了] をクリックします。

同期パイプラインの構築

このステップでは、E コマースの商品および注文データを MaxCompute テーブルに同期するための同期パイプラインを構築します。これにより、後続の処理のためにデータが準備されます。

  1. 左上隅の 图标 アイコンをクリックし、[すべての製品] > [データ開発と O&M] > [DataStudio (データ開発)] を選択してデータ開発ページに移動します。

  2. ページの上部で、このチュートリアル用に作成したワークスペースに切り替えます。左側のナビゲーションウィンドウで image をクリックして、データ開発 - プロジェクトディレクトリページに移動します。

  3. [プロジェクトディレクトリ] エリアで image をクリックし、[新しいワークフロー] を選択して、ワークフロー名を設定します。このチュートリアルでは dw_quickstart に設定します。

  4. ワークフローオーケストレーションページで、左側から [ゼロロードノード][オフライン同期] ノードをキャンバスにドラッグし、名前を設定します。

    このチュートリアルでのノード名の例とその機能は次のとおりです:

    ノードタイプ

    ノード名

    ノード機能

    image 仮想ノード

    workshop

    ユーザーペルソナ分析ワークフロー全体を管理し、データ転送パスをより明確にするために使用されます。このノードは [ドライランタスク] であり、コード編集は不要です。

    imageオフライン同期ノード

    ods_item_info

    商品情報ソーステーブル item_info を MySQL から MaxCompute テーブル ods_item_info に同期するために使用されます。

    imageオフライン同期ノード

    ods_trade_order

    注文情報ソーステーブル trade_order を MySQL から MaxCompute テーブル ods_trade_order に同期するために使用されます。

    ノード間に線をドラッグして手動で接続します。workshop ノードを 2 つのオフライン同期ノードの先祖ノードとして設定します。結果は次の図のようになります:

  5. ワークフローのスケジューリング構成。

    ワークフローオーケストレーションページの右側のパネルで [スケジューリング構成] をクリックし、パラメーターを構成します。このチュートリアルでは、次の主要なパラメーターを構成する必要があります。他のすべてのパラメーターはデフォルト値のままにできます。

    スケジューリング構成パラメーター

    説明

    スケジューリングパラメーター

    ワークフロー全体のスケジューリングパラメーターを設定します。ワークフロー内の内部ノードは直接使用できます。

    このチュートリアルでは、前日の日付を取得するために bizdate=$[yyyymmdd-1] として構成します。

    説明

    DataWorks は、コード内で動的なパラメーター入力を可能にするスケジューリングパラメーターを提供します。SQL コード内で ${variable_name} の形式で変数を定義し、[スケジューリング構成] > [スケジューリングパラメーター] でこれらの変数に値を割り当てることができます。スケジューリングパラメーターでサポートされている形式の詳細については、「スケジューリングパラメーターでサポートされている形式」をご参照ください。

    スケジューリングサイクル

    このチュートリアルでは 日次 として構成します。

    スケジューリング時間

    このチュートリアルでは、[スケジューリング時間]00:30 に設定します。ワークフローは毎日 00:30 に開始されます。

    定期的な依存関係

    ワークフローには上流依存関係がないため、これは構成しなくてもかまいません。統一管理のために、[ワークスペースルートノードを使用] をクリックしてワークフローをワークスペースルートノードにアタッチできます。

    ワークスペースルートノードは workspace_name_root の形式で命名されます。

同期タスクの構成

初期ノードの構成
  1. ワークフローオーケストレーションページで、workshop ノードにマウスを合わせ、[ノードを開く] をクリックします。

  2. workshop ノード編集ページの右側のパネルで [スケジューリング構成] をクリックし、パラメーターを構成します。このチュートリアルでは、次の主要なパラメーターを構成する必要があります。他のすべてのパラメーターはデフォルト値のままにできます。

    スケジューリング構成パラメーター

    説明

    スケジューリングタイプ

    このチュートリアルでは ドライランスケジューリング に設定します。

    スケジューリングリソースグループ

    このチュートリアルでは、「リソースグループを作成してワークスペースにアタッチする」で作成したサーバーレスリソースグループに設定します。

    ノード依存関係の構成

    workshop は初期ノードであり、上流依存関係がないため、[ワークスペースルートノードを使用] をクリックしてワークスペースルートノードにワークフローの実行をトリガーさせることができます。

    ワークスペースルートノードの名前は workspace_name_root です。

  3. ノードツールバーで [保存] をクリックしてノードを保存します。

商品情報同期パイプライン (ods_item_info) の構成
  1. ワークフローオーケストレーションページで、ods_item_info ノードにマウスを合わせ、[ノードを開く] をクリックします。

  2. 同期パイプラインのネットワークとリソース設定を構成します。

    パラメーター

    説明

    データソース

    • データソース: MySQL

    • データソース名: MySQL_Source

    マイリソースグループ

    リソースグループを作成してワークスペースにアタッチする」で購入したサーバーレスリソースグループを選択します。

    データ宛先

  3. [次へ] をクリックして同期タスクを構成します。

    1. [データソースと宛先の構成]

      説明

      表に記載されていないパラメーターについては、デフォルト値を保持してください。

      構成エリア

      パラメーター

      構成の説明

      データソース

      テーブル

      item_info

      データ宛先

      テーブル

      [ワンクリックでターゲットテーブルスキーマを生成] をクリックして MaxCompute テーブルを迅速に作成します。次のテーブル作成ステートメントを [テーブル作成ステートメント] エリアに貼り付け、[テーブルの作成] をクリックします。このテーブルは、データソースから商品情報を受け取るために使用されます。

      テーブル作成 SQL

      CREATE TABLE IF NOT EXISTS ods_item_info(
      `id`                            BIGINT COMMENT '',
      `cate_id`                       BIGINT COMMENT '',
      `cate_name`                     STRING COMMENT '',
      `commodity_id`                  BIGINT COMMENT '',
      `commodity_name`                STRING COMMENT '',
      `desc_path`                     STRING COMMENT '',
      `duration`                      BIGINT COMMENT '',
      `features`                      STRING COMMENT '',
      `gmt_create`                    DATETIME COMMENT '',
      `gmt_modified`                  DATETIME COMMENT '',
      `is_deleted`                    BIGINT COMMENT '',
      `is_virtual`                    STRING COMMENT '',
      `item_id`                       BIGINT COMMENT '',
      `item_status`                   BIGINT COMMENT '',
      `last_offline_time`             DATETIME COMMENT '',
      `last_online_quantity`          BIGINT COMMENT '',
      `last_online_time`              DATETIME COMMENT '',
      `pict_url`                      STRING COMMENT '',
      `reserve_price`                 DECIMAL(38,18) COMMENT '',
      `secure_trade_ems_post_fee`     DECIMAL(38,18) COMMENT '',
      `secure_trade_fast_post_fee`    DECIMAL(38,18) COMMENT '',
      `secure_trade_ordinary_post_fee`DECIMAL(38,18) COMMENT '',
      `shop_id`                       BIGINT COMMENT '',
      `shop_nick`                     STRING COMMENT '',
      `sub_title`                     STRING COMMENT '',
      `title`                         STRING COMMENT ''
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;

      パーティション情報

      このチュートリアルでは ${bizdate} を入力します。これは、テストフェーズ中に bizdate パラメーターに定数値を割り当て、スケジュールされた実行中に bizdate パラメーターに動的に値を割り当てるために使用されます。DataStudio でサポートされている変数形式と構成メソッドの詳細については、「スケジューリングパラメーター」をご参照ください。

    2. [フィールドマッピング][チャネル制御] を確認します。

      DataWorks は、構成されたフィールドマッピングに基づいて、ソースフィールドから宛先フィールドにデータを書き込みます。タスクの同時実行数やダーティデータポリシーなどの設定も構成できます。このチュートリアルでは、[ダーティデータポリシー][ダーティデータを許容しない] に設定します。他のすべての設定はデフォルト値のままにできます。詳細については、「コードレス UI でオフライン同期タスクを構成する」をご参照ください。

  4. ノードツールバーで [保存] をクリックしてノードを保存します。

注文データ同期パイプライン (ods_trade_order) の構成
  1. ワークフローオーケストレーションページで、ods_trade_order ノードにマウスを合わせ、[ノードを開く] をクリックします。

  2. 同期パイプラインのネットワークとリソース設定を構成します。

    パラメーター

    説明

    データソース

    • データソース: MySQL

    • データソース名: MySQL_Source

    マイリソースグループ

    リソースグループを作成してワークスペースにアタッチする」で購入したサーバーレスリソースグループを選択します。

    データ宛先

  3. [次へ] をクリックして同期タスクを構成します。

    1. [データソースと宛先の構成]

      説明

      表に記載されていないパラメーターについては、デフォルト値を保持してください。

      構成エリア

      パラメーター

      構成の説明

      データソース

      テーブル

      trade_order

      データ宛先

      テーブル

      [ワンクリックでターゲットテーブルスキーマを生成] をクリックして MaxCompute テーブルを迅速に作成します。次のテーブル作成ステートメントを [テーブル作成ステートメント] エリアに貼り付け、[テーブルの作成] をクリックします。このテーブルは、データソースから商品情報を受け取るために使用されます。

      テーブル作成 SQL

      CREATE TABLE IF NOT EXISTS ods_trade_order(
      `id`                            BIGINT COMMENT '',
      `biz_type`                      BIGINT COMMENT '',
      `buy_amount`                    BIGINT COMMENT '',
      `buyer_id`                      BIGINT COMMENT '',
      `buyer_memo`                    STRING COMMENT '',
      `buyer_nick`                    STRING COMMENT '',
      `end_time`                      DATETIME COMMENT '',
      `gmt_create`                    DATETIME COMMENT '',
      `gmt_modified`                  DATETIME COMMENT '',
      `ip`                            STRING COMMENT '',
      `is_parent`                     BIGINT COMMENT '',
      `is_sub`                        BIGINT COMMENT '',
      `item_id`                       BIGINT COMMENT '',
      `item_price`                    DECIMAL(38,18) COMMENT '',
      `logistics_status`              BIGINT COMMENT '',
      `memo`                          STRING COMMENT '',
      `parent_order_id`               BIGINT COMMENT '',
      `pay_status`                    BIGINT COMMENT '',
      `pay_time`                      DATETIME COMMENT '',
      `seller_memo`                   STRING COMMENT '',
      `shop_id`                       BIGINT COMMENT '',
      `status`                        BIGINT COMMENT '',
      `sub_order_id`                  BIGINT COMMENT '',
      `total_fee`                     DECIMAL(38,18) COMMENT ''
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;

      パーティション情報

      このチュートリアルでは ${bizdate} を入力します。これは、テストフェーズ中に bizdate パラメーターに定数値を割り当て、スケジュールされた実行中に bizdate パラメーターに動的に値を割り当てるために使用されます。DataStudio でサポートされている変数形式と構成メソッドの詳細については、「スケジューリングパラメーター」をご参照ください。

    2. [フィールドマッピング][チャネル制御] を確認します。

      DataWorks は、構成されたフィールドマッピングに基づいて、ソースフィールドから宛先フィールドにデータを書き込みます。タスクの同時実行数やダーティデータポリシーなどの設定も構成できます。このチュートリアルでは、[ダーティデータポリシー][ダーティデータを許容しない] に設定します。他のすべての設定はデフォルト値のままにできます。詳細については、「コードレス UI でオフライン同期タスクを構成する」をご参照ください。

  4. ノードツールバーで [保存] をクリックしてノードを保存します。

2. データクレンジング

データが MySQL から MaxCompute に同期された後、商品情報テーブル ods_item_info と注文情報テーブル ods_trade_order の 2 つのデータテーブルができます。DataWorks の DataStudio モジュールを使用して、これらのテーブルのデータをクレンジング、処理、分析し、日次ベストセラー商品カテゴリのランキングを生成できます。

データ変換パイプラインの構築

  1. DataStudio の左側のナビゲーションウィンドウで image をクリックしてデータ開発ページに移動し、[プロジェクトディレクトリ] エリアで作成したワークフローを見つけ、ワークフローオーケストレーションページを開き、左側のコンポーネントリストから MaxCompute SQL ノードをキャンバスにドラッグします。次に、ノードに名前を付けます。

    このチュートリアルでのノード名の例とその機能は次のとおりです:

    ノードタイプ

    ノード名

    ノード機能

    imageMaxCompute SQL

    dim_item_info

    ods_item_info テーブルに基づいて、商品ディメンションデータを処理し、商品基本情報ディメンションテーブル dim_item_info を生成します。

    imageMaxCompute SQL

    dwd_trade_order

    ods_trade_order テーブルに基づいて、注文の詳細なトランザクションデータに対して初期のクレンジング、変換、ビジネスロジック処理を行い、トランザクション注文詳細ファクトテーブル dwd_trade_order を生成します。

    imageMaxCompute SQL

    dws_daily_category_sales

    dwd_trade_order テーブルと dim_item_info テーブルに基づいて、DWD レイヤーからクレンジングおよび標準化された詳細データを集計し、日次商品カテゴリ売上集計テーブル dws_daily_category_sales を生成します。

    imageMaxCompute SQL

    ads_top_selling_categories

    dws_daily_category_sales テーブルに基づいて、日次ベストセラー商品カテゴリのランキングテーブル ads_top_selling_categories を生成します。

  2. ノード間に線をドラッグして手動で接続し、各ノードの先祖ノードを構成します。結果は次の図のようになります:

    説明

    ワークフローでは、各ノードの上流および下流の依存関係を手動で接続して設定できます。また、子ノードでコード解析を使用してノードの依存関係を自動的に識別することもできます。このチュートリアルでは手動接続方法を使用します。コード解析の詳細については、「自動依存関係解析」をご参照ください。

データ変換ノードの構成

dim_item_info ノードの構成

このノードは、ods_item_info テーブルから商品ディメンションデータを処理し、商品ディメンションテーブル dim_item_info を生成します。

  1. ワークフローオーケストレーションページで、dim_item_info ノードにマウスを合わせ、[ノードを開く] をクリックします。

  2. ノード編集ページに次のコードを貼り付けます。

    CREATE TABLE IF NOT EXISTS dim_item_info (
        gmt_modified                   STRING COMMENT '商品最終更新日',
        gmt_create                     STRING COMMENT '商品作成時間',
        item_id                        BIGINT COMMENT '商品数値 ID',
        title                          STRING COMMENT '商品タイトル',
        sub_title                      STRING COMMENT '商品サブタイトル',
        pict_url                       STRING COMMENT 'メイン画像 URL',
        desc_path                      STRING COMMENT '商品説明のパス',
        item_status                    BIGINT COMMENT '商品ステータス 1: 確認済み 0: 未確認',
        last_online_time               DATETIME COMMENT '最終販売開始時間, 商品出品時間',
        last_offline_time              DATETIME COMMENT '販売終了時間, オークション商品のみの販売サイクルの終了を示す',
        duration                       BIGINT COMMENT '有効期間, 販売サイクル, 7 日または 14 日の 2 つの値のみ',
        reserve_price                  DOUBLE COMMENT '現在価格',
        secure_trade_ordinary_post_fee DOUBLE COMMENT '標準郵便料金',
        secure_trade_fast_post_fee     DOUBLE COMMENT '速達郵便料金',
        secure_trade_ems_post_fee      DOUBLE COMMENT 'EMS 郵便料金',
        last_online_quantity           BIGINT COMMENT '商品が最後に出品されたときの在庫数',
        features                       STRING COMMENT '商品特徴',
        cate_id                        BIGINT COMMENT '商品リーフカテゴリ ID',
        cate_name                      STRING COMMENT '商品リーフカテゴリ名',
        commodity_id                   BIGINT COMMENT 'カテゴリ ID',
        commodity_name                 STRING COMMENT 'カテゴリ名',
        is_virtual                     STRING COMMENT '仮想商品かどうか',
        shop_id                        BIGINT COMMENT 'ショップ ID',
        shop_nick                      STRING COMMENT 'ショップ NICK',
        is_deleted                     BIGINT COMMENT 'カテゴリが削除されたかどうか'
    )
    COMMENT '商品基本情報ディメンションテーブル'
    PARTITIONED BY (pt STRING COMMENT 'データタイムスタンプ, yyyymmdd')
    LIFECYCLE 365;
    
    
    -- dim_item_info テーブルにデータを挿入
    INSERT OVERWRITE TABLE dim_item_info PARTITION(pt='${bizdate}')
    SELECT
        gmt_create,
        gmt_modified,
        item_id,
        title,
        sub_title,
        pict_url,
        desc_path,
        item_status,
        last_online_time,
        last_offline_time,
        duration,
        cast(reserve_price as DOUBLE),
        cast(secure_trade_ordinary_post_fee as DOUBLE),
        cast(secure_trade_fast_post_fee as DOUBLE),
        cast(secure_trade_ems_post_fee as DOUBLE),
        last_online_quantity,
        features,
        cate_id,
        cate_name,
        commodity_id,
        commodity_name,
        is_virtual,
        shop_id,
        shop_nick,
        is_deleted
    FROM ods_item_info
    WHERE pt = '${bizdate}';
  3. デバッグパラメーターを構成します。

    MaxCompute SQL ノード編集ページの右側のパネルで、[デバッグ構成] をクリックします:

  4. ノードツールバーで [保存] をクリックしてノードを保存します。

dwd_trade_order ノードの構成

このノードは、ods_trade_order テーブルからの詳細なトランザクションデータに対して初期のクレンジング、変換、ビジネスロジック処理を行い、トランザクション注文詳細ファクトテーブル dwd_trade_order を生成します。

  1. ワークフローオーケストレーションページで、dwd_trade_order ノードにマウスを合わせ、[ノードを開く] をクリックします。

  2. ノード編集ページに次のコードを貼り付けます。

    CREATE TABLE IF NOT EXISTS dwd_trade_order (
        id               BIGINT COMMENT '主キー, 重複排除後の最新 ID',
        gmt_create       DATETIME COMMENT '作成時間',
        gmt_modified     DATETIME COMMENT '変更時間',
        sub_order_id     BIGINT COMMENT 'サブオーダー ID',
        parent_order_id  BIGINT COMMENT '親オーダー ID',
        buyer_id         BIGINT COMMENT '購入者数値 ID',
        buyer_nick       STRING COMMENT '購入者ニックネーム, null 値を処理',
        item_id          BIGINT COMMENT '商品数値 ID',
        item_price       DECIMAL(38,18) COMMENT '商品価格, セント単位',
        buy_amount       BIGINT COMMENT '購入数量',
        biz_type         BIGINT COMMENT 'トランザクションタイプ',
        memo             STRING COMMENT 'メモ, null 値を処理',
        pay_status       BIGINT COMMENT '支払いステータス',
        logistics_status BIGINT COMMENT '物流ステータス',
        status           BIGINT COMMENT 'ステータス',
        seller_memo      STRING COMMENT 'トランザクションの販売者メモ',
        buyer_memo       STRING COMMENT 'トランザクションの購入者メモ',
        clean_ip         STRING COMMENT 'クレンジングされた購入者 IP, 無効なフォーマットをフィルター',
        end_time         DATETIME COMMENT 'トランザクション終了時間',
        pay_time         DATETIME COMMENT '支払い時間',
        is_sub           BIGINT COMMENT 'サブオーダーかどうか, 1 はサブオーダーを示す',
        is_parent        BIGINT COMMENT '親オーダーかどうか, 1 は親オーダーを示す',
        shop_id          BIGINT COMMENT 'ショップ ID',
        total_fee        DECIMAL(38,18) COMMENT '割引および調整後のサブオーダー料金',
        is_large_order_flag BOOLEAN COMMENT '大口注文のフラグ'
    )
    COMMENT 'トランザクション注文詳細ファクトテーブル, 初期クレンジングとビジネスロジック処理を含む'
    PARTITIONED BY (pt STRING COMMENT 'データタイムスタンプ, yyyymmdd')
    LIFECYCLE 365; -- データ存続期間を 365 日に設定
    
    
    INSERT OVERWRITE TABLE dwd_trade_order PARTITION(pt='${bizdate}')
    SELECT
        MAX(id) AS id, -- 最新の ID を重複排除の基準として使用すると仮定
        gmt_create,
        gmt_modified,
        sub_order_id,
        parent_order_id,
        buyer_id,
        COALESCE(buyer_nick, '') AS buyer_nick, -- null の buyer_nick を処理
        item_id,
        item_price,
        buy_amount,
        biz_type,
        COALESCE(memo, '') AS memo, -- null の memo を処理
        pay_status,
        logistics_status,
        status,
        seller_memo,
        buyer_memo,
        CASE 
            WHEN ip LIKE '__.__.__.__' THEN NULL -- 無効な IP フォーマットをフィルター
            ELSE ip 
        END AS clean_ip,
        end_time,
        pay_time,
        is_sub,
        is_parent,
        shop_id,
        total_fee,
        CASE 
            WHEN total_fee >= 10000 THEN TRUE -- 10000 セントを超える注文を大口注文と仮定
            ELSE FALSE 
        END AS is_large_order_flag -- ビジネスロジックフラグを追加
    FROM (
        SELECT
            *,
            ROW_NUMBER() OVER(PARTITION BY buyer_id, item_id, gmt_create ORDER BY id DESC) AS rn -- 重複排除のための行番号
        FROM ods_trade_order
        WHERE pt = '${bizdate}'
    ) AS sub_query
    WHERE rn = 1 -- 各重複排除グループの最初のレコードのみを保持
    GROUP BY 
        gmt_create,
        gmt_modified,
        sub_order_id,
        parent_order_id,
        buyer_id,
        buyer_nick,
        item_id,
        item_price,
        buy_amount,
        biz_type,
        memo,
        pay_status,
        logistics_status,
        status,
        seller_memo,
        buyer_memo,
        clean_ip,
        end_time,
        pay_time,
        is_sub,
        is_parent,
        shop_id,
        total_fee,
        is_large_order_flag;
  3. デバッグパラメーターを構成します。

    MaxCompute SQL ノード編集ページの右側のパネルで、[デバッグ構成] をクリックします:

  4. ノードツールバーで [保存] をクリックしてノードを保存します。

dws_daily_category_sales ノードの構成

このノードは、dwd_trade_order テーブルと dim_item_info テーブルからクレンジングおよび標準化された詳細データを集計し、日次商品カテゴリ売上集計テーブル dws_daily_category_sales を生成します。

  1. ワークフローオーケストレーションページで、dws_daily_category_sales ノードにマウスを合わせ、[ノードを開く] をクリックします。

  2. ノード編集ページに次のコードを貼り付けます。

    CREATE TABLE IF NOT EXISTS dws_daily_category_sales (
        cate_id             BIGINT COMMENT '商品リーフカテゴリ ID',
        cate_name           STRING COMMENT '商品リーフカテゴリ名',
        total_sales_amount  DECIMAL(38,18) COMMENT '商品カテゴリの総売上高, セント単位',
        order_count         BIGINT COMMENT '注文数'
    )
    COMMENT '日次商品カテゴリ売上集計テーブル'
    PARTITIONED BY (pt STRING COMMENT 'データタイムスタンプ, yyyymmdd')
    LIFECYCLE 365;
    
    
    INSERT OVERWRITE TABLE dws_daily_category_sales PARTITION(pt='${bizdate}')
    SELECT
        i.cate_id,
        i.cate_name,
        SUM(t.total_fee) AS total_sales_amount,
        COUNT(DISTINCT t.id) AS order_count
    FROM dwd_trade_order t
    JOIN dim_item_info i ON t.item_id = i.item_id AND t.pt = i.pt
    WHERE t.pt = '${bizdate}'
    GROUP BY t.pt, i.cate_id, i.cate_name;
  3. デバッグパラメーターを構成します。

    MaxCompute SQL ノード編集ページの右側のパネルで、[デバッグ構成] をクリックします:

  4. ノードツールバーで [保存] をクリックしてノードを保存します。

ads_top_selling_categories ノードの構成

このノードは、dws_daily_category_sales テーブルに基づいて、日次ベストセラー商品カテゴリのランキングテーブル ads_top_selling_categories を生成します。

  1. ワークフローオーケストレーションページで、ads_top_selling_categories ノードにマウスを合わせ、[ノードを開く] をクリックします。

  2. ノード編集ページに次のコードを貼り付けます。

    CREATE TABLE IF NOT EXISTS ads_top_selling_categories (
        rank                BIGINT COMMENT '売上ランキング',
        cate_id             BIGINT COMMENT '商品リーフカテゴリ ID',
        cate_name           STRING COMMENT '商品リーfカテゴリ名',
        total_sales_amount  DECIMAL(38,18) COMMENT '商品カテゴリの総売上高, セント単位',
        order_count         BIGINT COMMENT '注文数'
    )
    COMMENT '日次ベストセラー商品カテゴリのランキングテーブル'
    PARTITIONED BY (pt STRING COMMENT 'データタイムスタンプ, yyyymmdd');
    
    
    INSERT OVERWRITE TABLE ads_top_selling_categories PARTITION(pt='${bizdate}')
    SELECT
        rank,
        cate_id,
        cate_name,
        total_sales_amount,
        order_count
    FROM (
        SELECT
            DENSE_RANK() OVER(ORDER BY total_sales_amount DESC) AS rank,
            cate_id,
            cate_name,
            total_sales_amount,
            order_count
        FROM (
            SELECT
                cate_id,
                cate_name,
                SUM(total_sales_amount) AS total_sales_amount,
                SUM(order_count) AS order_count
            FROM dws_daily_category_sales
            WHERE pt = '${bizdate}'
            GROUP BY cate_id, cate_name
        ) agg_sub
    ) agg_outer
    WHERE rank <= 10;
  3. デバッグパラメーターを構成します。

    MaxCompute SQL ノード編集ページの右側のパネルで、[デバッグ構成] をクリックします:

  4. ノードツールバーで [保存] をクリックしてノードを保存します。

3. デバッグと実行

ワークフローを構成した後、本番環境に公開する前に、ワークフロー全体を実行して構成が正しいことを確認する必要があります。

  1. DataStudio の左側のナビゲーションウィンドウで image をクリックしてデータ開発ページに移動し、[プロジェクトディレクトリ] エリアで作成したワークフローを見つけます。

  2. ツールバーで [実行] をクリックします。[現在の実行値] に、現在の日付の前日の日付を入力します (例: 20250416)。

    説明

    ワークフローノードの構成では、DataWorks が提供するスケジューリングパラメーターを使用して、コード内で動的なパラメーター入力を有効にします。ノードをデバッグする際には、テストのためにこのパラメーターに定数値を割り当てる必要があります。

  3. [OK] をクリックしてデバッグ実行ページに移動します。

  4. 実行が完了するのを待ちます。期待される結果は次の図のようになります:

    image

4. データのクエリと可視化

MySQL からの生のテストデータを処理し、ads_top_selling_categories テーブルに集計しました。これで、このテーブルをクエリして分析結果を表示できます。

  1. 左上隅の image アイコンをクリックします。ポップアップページで [すべての製品] > [データ分析] > [SQL クエリ] をクリックします。

  2. マイファイルの後ろにある image > [新しいファイル] をクリックします。カスタムの [ファイル名] を入力し、[OK] をクリックします。

  3. SQL クエリページで、次の SQL を構成します。

    SELECT * FROM ads_top_selling_categories WHERE pt=${bizdate};
  4. 右上隅で MaxCompute データソースを選択し、[OK] をクリックします。

  5. 上部の [実行] ボタンをクリックします。[コスト見積もり] ページで [実行] をクリックします。

  6. クエリ結果で image をクリックして結果をチャートとして表示します。チャートの右上隅にある image をクリックしてチャートスタイルをカスタマイズできます。

  7. また、チャートの右上隅にある [保存] をクリックしてカードとして保存することもできます。その後、左側のナビゲーションウィンドウで [カード] (image) をクリックして保存したカードを表示できます。

5. 定期的なスケジューリング

前の手順を完了することで、前日のさまざまな商品カテゴリの売上データを取得しました。毎日最新の売上データを取得するには、ワークフローを本番環境に公開して、スケジュールされた時間に定期的に実行できます。

説明

データ同期およびデータ変換タスクを構成した際に、ワークフローとそのノードのスケジューリングパラメーターも構成しました。再度構成する必要はありません。ワークフローを本番環境に公開するだけで済みます。スケジューリング構成の詳細については、「ノードスケジューリング構成」をご参照ください。

  1. 左上隅の image アイコンをクリックします。ポップアップページで [すべての製品] > [データ開発と O&M] > [DataStudio (データ開発)] をクリックします。

  2. DataStudio の左側のナビゲーションウィンドウで image をクリックしてデータ開発ページに移動します。このケースで使用されているプロジェクトスペースに切り替えます。次に、[プロジェクトディレクトリ] エリアで作成したワークフローを見つけます。

  3. ツールバーで [公開] をクリックします。公開パネルで [本番環境への公開を開始] をクリックします。[デプロイメントパッケージのビルド][本番チェッカー] のステップが完了するのを待ち、[公開の確認] をクリックします。

  4. [本番環境への公開] ステータスが [完了] に変わったら、[O&M へ移動] をクリックしてオペレーションセンターに移動します。

    image

  5. [自動トリガータスク O&M] > [自動トリガータスク] で、ワークフローの自動トリガータスク (このチュートリアルでは dw_quickstart という名前) を確認できます。

  6. ワークフロー内の子ノードの自動トリガータスクの詳細を表示するには、ワークフローの自動トリガータスクを右クリックし、[内部タスクの表示] を選択します。

    image

    期待される結果は次のとおりです:

    image

次のステップ

付録: リソースのリリースとクリーンアップ

このチュートリアルで作成したリソースをリリースするには、次の手順を実行します:

  1. 自動トリガータスクを停止します。

    1. DataWorks コンソールにログインします。トップナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[データ開発と O&M] > [オペレーションセンター] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[オペレーションセンターへ移動] をクリックします。

    2. [自動トリガータスク O&M] > [自動トリガータスク] ページで、作成したすべての自動トリガータスクを選択します。ワークスペースルートノードは非公開にする必要はありません。次に、ページの下部で [操作] > [ノードの非公開] をクリックします。

  2. データ開発ノードを削除し、MaxCompute 計算資源をデタッチします。

    1. DataWorks コンソールの ワークスペースページに移動します。トップナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[アクション] 列の [ショートカット] > [Data Studio] を選択します。

    2. DataStudio の左側のナビゲーションウィンドウで image をクリックしてデータ開発ページに移動します。次に、[プロジェクトディレクトリ] エリアで作成したワークフローを見つけ、ワークフローを右クリックして [削除] をクリックします。

    3. 左側のナビゲーションウィンドウで image > [計算資源管理] をクリックし、アタッチした MaxCompute 計算資源を見つけて [デタッチ] をクリックします。確認ウィンドウでチェックボックスを選択し、画面の指示に従ってデタッチを完了します。

  3. MySQL データソースを削除します。

    1. DataWorks コンソールにログインします。トップナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、[その他] > [管理センター] を選択します。表示されるページで、ドロップダウンリストから目的のワークスペースを選択し、[管理センターへ移動] をクリックします。

    2. 左側のナビゲーションウィンドウで [データソース] をクリックして [データソースリスト] ページに移動します。作成した MySQL データソースを見つけ、[アクション] 列の [削除] をクリックし、指示に従って削除を完了します。

  4. MaxCompute プロジェクトを削除します。

    MaxCompute プロジェクト管理ページに移動し、作成した MaxCompute プロジェクトを見つけ、[アクション] 列の [削除] をクリックし、画面の指示に従って削除を完了します。

  5. インターネット NAT ゲートウェイを削除し、Elastic IP Address をリリースします。

    1. VPC - インターネット NAT ゲートウェイコンソールに移動します。トップメニューバーで 中国 (上海) リージョンに切り替えます。

    2. 作成したインターネット NAT ゲートウェイを見つけます。[アクション] 列で image > [削除] をクリックします。確認ウィンドウで [強制削除] を選択し、[OK] をクリックします。

    3. 左側のナビゲーションウィンドウで [インターネットアクセス] > [Elastic IP Address] をクリックします。作成した EIP を見つけます。[アクション] 列で image > [インスタンス管理] > [リリース] をクリックします。確認ウィンドウで [OK] をクリックします。