すべてのプロダクト
Search
ドキュメントセンター

DataWorks:上級編:人気商品カテゴリの分析

最終更新日:Feb 13, 2026

DataWorks は、MaxCompute、Hologres、E-MapReduce(EMR)、AnalyticDB、CDP などのビッグデータエンジンを活用した、統合型・エンドツーエンドのビッグデータ開発およびガバナンスプラットフォームです。データウェアハウス、データレイク、レイクハウスアーキテクチャをサポートします。本チュートリアルでは、データのインジェスト、ビジネスワークフローのオーケストレーション、定期的なタスクのスケジュール設定、および DataWorks を用いたデータ可視化の方法について説明します。

はじめに

本チュートリアルでは、EC(電子商取引)のシナリオを用いて、生データのインジェストからデータ分析・データ可視化までの一連のエンドツーエンドデータパイプライン構築手順を実演します。この標準化されたプロセスに従うことで、信頼性の高いスケジュール実行と運用時の観測性(Observability)を確保できる再利用可能なデータワークフローを迅速に構築できます。このアプローチにより、高度な技術的知識を持たないビジネスユーザーでもデータからインサイトを導き出すことが可能となり、組織全体でのビッグデータアプリケーションの採用も容易になります。

本チュートリアルでは、以下の作業を行います:

  1. データ同期:DataWorks のデータ統合モジュールを使用して、単一テーブルのバッチタスクを作成し、ビジネスデータを MaxCompute などのビッグデータ計算プラットフォームへ同期します。

  2. データクリーニング:DataWorks の Data Studio モジュールを使用して、ビジネスデータの処理、分析、およびマイニングを行います。

  3. データ可視化:DataWorks のデータ分析モジュールを使用して、分析結果をビジネスユーザーが直感的に理解できるチャート形式に変換します。

  4. 定期スケジュール設定:データ同期およびデータクリーニングプロセスに対して定期スケジュールを設定します。

image

本チュートリアルでは、パブリックデータソースから取得した生の商品および注文データを MaxCompute へ同期・分析し、毎日の人気商品カテゴリランキングを生成するワークフローを使用します。

image

前提条件

本チュートリアルを完了するには、Alibaba Cloud アカウントまたは AliyunDataWorksFullAccess 権限を持つ RAM ユーザーが必要です。詳細については、「Alibaba Cloud アカウントの準備」または「RAM ユーザーの準備」をご参照ください。

説明

DataWorks は、製品およびモジュールレベルのアクセス制御をサポートする包括的な権限管理システムを提供します。より詳細なアクセス制御が必要な場合は、「DataWorks 権限管理システムの概要」をご参照ください。

前提条件

DataWorks の有効化

本チュートリアルでは、中国 (上海) リージョンを用いて DataWorks の使い始め方を説明します。まず、DataWorks コンソール にログインし、中国 (上海) リージョンに切り替えた上で、DataWorks が有効化されているか確認します。

説明

ビジネスデータの所在地に基づいてリージョンを選択してください:

  • ビジネスデータが他の Alibaba Cloud サービス上にある場合は、それらのサービスと同じリージョンを選択してください。

  • オンプレミス環境でパブリックネットワーク経由でアクセスする場合は、遅延を最小限にするため、お客様に最も近いリージョンを選択してください。

新規ユーザーの方

DataWorks を初めて使用される場合、現在のリージョンで DataWorks がまだ有効化されていないことを示す画面が表示されます。「製品ポートフォリオを無料で購入」をクリックします。

image

  1. 購入ページでパラメーターを設定します。

    パラメーター

    説明

    リージョン

    DataWorks を有効化するリージョンを選択します。

    中国 (上海)

    DataWorks エディション

    購入する DataWorks エディションを選択します。

    説明

    本チュートリアルでは、例として Basic Edition を使用します。すべてのエディションで本チュートリアルで紹介する機能が利用可能です。ご自身のビジネスニーズに最適なエディションを選択するには、「DataWorks のエディションと機能」をご参照ください。

    Basic Edition

  2. 注文確定および支払い をクリックして支払いを完了します。

有効化済みだが有効期限が切れた場合

以前に 中国 (上海) リージョンで DataWorks を有効化しましたが、サービスの有効期限が切れている場合、以下のメッセージが表示されます。「エディションの購入」をクリックします。

image

  1. 購入ページでパラメーターを設定します。

    パラメーター

    説明

    エディション

    購入する DataWorks エディションを選択します。

    説明

    本チュートリアルでは、例として Basic Edition を使用します。すべてのエディションで本チュートリアルで紹介する機能が利用可能です。ご自身のビジネスニーズに最適なエディションを選択するには、「DataWorks のエディションと機能」をご参照ください。

    Basic Edition

    リージョン

    DataWorks を有効化するリージョンを選択します。

    中国 (上海)

  2. 今すぐ購入 をクリックして支払いを完了します。

重要

購入した DataWorks エディションが見つからない場合は、以下の操作をお試しください:

  • 数分待ってページをリフレッシュしてください。システムの遅延が原因である可能性があります。

  • 現在選択中のリージョンが、DataWorks エディションを購入したリージョンと一致しているか確認してください。リージョンが異なる場合、該当エディションは表示されません。

すでに有効化済みの場合

中国 (上海) リージョンで DataWorks をすでに有効化している場合、DataWorks の概要ページが表示されます。次のステップに進むことができます。

ワークスペースの作成

  1. DataWorks ワークスペース一覧 ページへ移動し、中国 (上海) リージョンに切り替えて、「ワークスペースの作成」をクリックします。

  2. ワークスペースの作成 ページで、任意の ワークスペース名 を入力し、「[Data Studio (新バージョン) を使用]」を有効化した後、「ワークスペースの作成」をクリックします。

    説明

    2025 年 2 月 18 日以降、中国 (上海) リージョンで Alibaba Cloud アカウントが初めて DataWorks ワークスペースを作成する場合、Data Studio の新バージョンがデフォルトで有効化されます。そのため、「Data Studio(新バージョン)の使用」パラメーターは表示されません。

リソースグループの設定

  1. DataWorks リソースグループ一覧 ページへ移動し、中国 (上海) リージョンに切り替えて、「リソースグループの作成」をクリックします。

  2. リソースグループ購入ページで、以下のパラメーターを設定します。

    パラメーター

    説明

    リソースグループ名

    任意の名前を入力します。

    VPCvSwitch

    既存の VPC および vSwitch を選択します。現在のリージョンに存在しない場合は、パラメーターの説明内にあるコンソールリンクをクリックして作成します。

    サービスリンクロール

    画面上の指示に従って、「AliyunServiceRoleForDataWorks サービスリンクロール」を作成します。

  3. 今すぐ購入 をクリックして支払いを完了します。

  4. DataWorks リソースグループ一覧 ページへ移動し、中国 (上海) リージョンに切り替えて、作成したリソースグループを見つけ、操作 列の ワークスペースとの関連付け をクリックします。

  5. ワークスペースとの関連付け ページで、作成した DataWorks ワークスペースを見つけ、その 操作 列の 関連付け をクリックします。

パブリックネットワークへのアクセスの有効化

本チュートリアルでは、パブリックネットワーク経由でアクセスされる EC(電子商取引)のテストデータを使用します。デフォルトでは、前述の手順で作成したリソースグループはパブリックネットワークへのアクセスを許可していません。リソースグループが関連付けられた仮想プライベートクラウド(VPC)にインターネット NAT Gateway を設定し、Elastic IP Address(EIP)を追加することで、パブリックネットワークへのアクセスを有効化し、データを取得できます。

  1. VPC - インターネット NAT Gateway コンソール にログインします。トップナビゲーションバーで 中国 (上海) リージョンに切り替えて、「インターネット NAT Gateway の作成」をクリックします。以下のパラメーターを設定します。

    説明

    表に記載されていないパラメーターについては、デフォルト値のままにしてください。

    パラメーター

    リージョン

    中国 (上海)。

    ネットワークおよびゾーン

    リソースグループに関連付けられた VPC および vSwitch を選択します。

    VPC および vSwitch は、DataWorks リソースグループ一覧 ページで確認できます。まず 中国 (上海) リージョンに切り替え、リソースグループを見つけ、ネットワーク設定 をクリックします。操作 列に表示される VPC バインディング および vSwitch は、データスケジュールおよびデータ統合 セクションにあります。VPC に関する詳細については、「仮想プライベートクラウド(VPC)とは」をご参照ください。

    ネットワークタイプ

    インターネット NAT Gateway。

    Elastic IP アドレス(EIP)

    新しい EIP を購入することを選択します。

    サービスリンクロールの作成

    初めてインターネット NAT Gateway を作成する場合は、「サービスリンクロールの作成」をクリックしてサービスリンクロールを作成します。

  2. 今すぐ購入 をクリックして支払いを完了し、インターネット NAT Gateway インスタンスを作成します。

    image

  3. インターネット NAT Gateway インスタンスの作成後、コンソールに戻り、SNAT エントリを作成します。

    説明

    SNAT エントリを設定しない限り、リソースグループはこの VPC 経由でパブリックネットワークにアクセスできません。

    1. 新しいインスタンスの操作列で、[管理] をクリックし、次に [SNAT の設定] タブをクリックします。

    2. SNAT エントリ一覧 で、「SNAT エントリの作成」をクリックし、以下の主要パラメーターを設定します:

      パラメーター

      SNAT エントリ

      VPC の指定 を選択して、インターネット NAT Gateway の VPC 内にあるすべてのリソースグループが、設定済みの EIP を使用してパブリックネットワークにアクセスできるようにします。

      EIP の選択

      現在のインターネット NAT Gateway インスタンスにバインドされた EIP を選択します。

      SNAT エントリのパラメーターを設定したら、「OK」をクリックして作成します。

    SNAT エントリ一覧 で、新しい SNAT エントリの ステータス利用可能 に変わると、リソースグループが関連付けられた VPC がパブリックネットワークへのアクセスを取得したことになります。

MaxCompute リソースの設定

本チュートリアルでは、MaxCompute プロジェクトを作成し、それを DataWorks の計算リソースとして関連付けます。このリソースを用いてデータのインジェストおよびビッグデータ分析を実行します。

  1. DataWorks ワークスペース一覧 ページへ移動し、中国 (上海) リージョンに切り替えて、作成したワークスペースを見つけ、その名前をクリックして ワークスペースの詳細 ページへ移動します。

  2. 左側のナビゲーションペインで 計算リソース をクリックし、「計算リソースの関連付け」をクリックして MaxCompute を選択します。以下の主要パラメーターを設定して MaxCompute プロジェクトを作成し、DataWorks の計算リソースとして関連付けます。

    説明

    表に記載されていないパラメーターについては、デフォルト値のままにしてください。

    パラメーター

    説明

    MaxCompute プロジェクト

    ドロップダウンリストから、[作成] をクリックし、以下のパラメーターを設定します:

    • プロジェクト名:グローバルに一意となる任意の名前を入力します。

    • 課金方法:「従量課金制」を選択します。

      説明

      従量課金制」が利用できない場合は、横の「有効化」をクリックして MaxCompute サービスを有効化します。

    • デフォルトクォータ:ドロップダウンリストから既存のクォータを選択します。

    デフォルトアクセス ID

    Alibaba Cloud アカウント」を選択します。

    計算リソースインスタンス名

    タスク実行時にこの計算リソースを選択するために使用する名前です。分かりやすい名前(例:MaxCompute_Source)を使用してください。

  3. OK をクリックします。

操作手順

本チュートリアルでは、実践的な例を通じて DataWorks の主要機能を紹介します。

MySQL データベースに商品および注文情報を格納している EC(電子商取引)プラットフォームを想定します。目的は、これらのデータを定期的に分析し、毎日の人気商品カテゴリランキングを可視化することです。

ステップ 1:データの同期

データソースの作成

このステップでは、チュートリアルのソースデータベースに接続する MySQL データソース を作成します。

説明

ご自身のビジネスデータを準備する必要はありません。DataWorks では、本チュートリアル用にパブリック MySQL データベース上のサンプルデータセットを提供しています。このデータセットに接続するために MySQL データソースを作成します。

  1. DataWorks 管理センター ページへ移動します。リージョンを 中国東部 2(上海) に切り替え、ドロップダウンリストからご利用のワークスペースを選択し、「管理センターへ移動」をクリックします。

  2. 左側のナビゲーションペインで データソース をクリックして データソース一覧 ページへ移動します。「データソースの追加」をクリックし、MySQL 型を選択してデータソースのパラメーターを設定します。

    説明
    • 表に記載されていないパラメーターについては、デフォルト値のままにしてください。

    • データソースを初めて追加する場合は、クロスサービス権限付与 を完了する必要があります。画面上の指示に従って AliyunDIDefaultRole サービスリンクロールを付与します。

    パラメーター

    説明

    データソース名

    本チュートリアルでは、MySQL_Source を入力します。

    構成モード

    接続文字列モード を選択します。

    エンドポイント

    • ホストアドレス IP:rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com

    • ポート:3306

    重要

    本チュートリアルで使用するデータは、DataWorks のハンズオン学習用であり、テスト目的専用です。データ統合モジュール内では読み取り専用となっています。

    データベース名

    retail_e_commerce を入力します。

    ユーザー名

    workshop を入力します。

    パスワード

    workshop#2017 を入力します。

  3. 接続設定 セクションで、データ統合 タブに切り替えます。ワークスペースにバインドされたリソースグループを見つけ、接続状態 列の ネットワーク接続性のテスト をクリックします。

    説明

    MySQL データソースの接続性テストが失敗した場合は、以下の操作を行ってください:

    • 接続性診断ツールの後続ステップを完了してください。

    • リソースグループがバインドされた VPC に Elastic IP アドレス(EIP)が設定されているか確認してください。MySQL データソースは、リソースグループがパブリックネットワークへのアクセスを有効化している必要があります。詳細については、「リソースグループのパブリックネットワークアクセスの有効化」をご参照ください。

  4. 作成完了 をクリックします。

同期パイプラインの構築

このステップでは、EC(電子商取引)の商品および注文データを MaxCompute テーブルへ同期し、その後の処理を行う同期パイプラインを構築します。

  1. 左上隅の 图标 アイコンをクリックし、全製品 > データ開発および O&M > DataStudio(データ開発) を選択して DataStudio ページへ移動します。

  2. ページ上部でご利用のワークスペースに切り替えます。左側のナビゲーションペインで image をクリックして DataStudio ページへ移動します。

  3. ワークスペースディレクトリ セクションで image をクリックし、「ワークフローの作成」を選択して、名前を dw_quickstart とします。

  4. ワークフローキャンバス上で、左側パネルから ゼロロード ノードおよび 2 つの バッチ同期 ノードをドラッグ&ドロップします。バッチ同期 ノードの設定は以下の通りです:

    • データソースタイプMySQL

    • データ送信先タイプMaxCompute

    • 特定タイプバッチ同期ノード

    以下に、本チュートリアルで使用するノード名とその機能を示します:

    ノードタイプ

    ノード名

    機能

    imageゼロロード

    workshop

    ワークフローのエントリーポイントとして機能し、明確なデータフローを定義します。これは、[ドライラン] タスクであり、コードを必要としません。

    imageバッチ同期ノード

    ods_item_info

    MySQL の商品情報ソーステーブル item_info を MaxCompute の ods_item_info テーブルへ同期します。

    imageバッチ同期ノード

    ods_trade_order

    MySQL の注文情報ソーステーブル trade_order を MaxCompute の ods_trade_order テーブルへ同期します。

    ノードをドラッグ&ドロップして接続し、workshop ノードを両方のバッチ同期ノードの上流ノードとします。最終的な構成は以下のようになります:

    image
  5. ワークフローのスケジュール設定を行います。

    ワークフローキャンバスの右側で スケジュール設定 をクリックし、パラメーターを設定します。以下に、本チュートリアルで重要なパラメーターを示します。その他のパラメーターについてはデフォルト値のままにしてください。

    スケジュールパラメーター

    説明

    スケジュールパラメーター

    ワークフロー全体のスケジュールパラメーターを設定します。これらのパラメーターは、ワークフロー内のノードで直接使用できます。

    本チュートリアルでは、前日の日付を取得するために bizdate=$[yyyymmdd-1] を設定します。

    説明

    DataWorks では、コード内で動的な値を指定できるスケジュールパラメーターを提供しています。${variable_name} 形式で SQL コード内に変数を定義し、スケジュール設定 > スケジュールパラメーター で値を割り当てることができます。サポートされるパラメーター形式については、「スケジュールパラメーターのサポート形式」をご参照ください。

    スケジュール周期

    本チュートリアルでは、毎日 を設定します。

    スケジュール時刻

    本チュートリアルでは、スケジュール時刻00:30 に設定します。ワークフローは毎日 00:30 に開始します。

    スケジュール依存関係

    本ワークフローには上流の依存関係がないため、設定を省略できます。統一管理のために、ワークスペースのルートノードを使用 をクリックして、ワークフローをワークスペースのルートノードにアタッチできます。

    ワークスペースのルートノードの名前は WorkspaceName_root の形式です。

同期タスクの設定

初期ノード
  1. ワークフローキャンバス上で workshop ノードにマウスを合わせ、「ノードを開く」をクリックします。

  2. workshop ノードエディターの右側で スケジュール設定 をクリックし、パラメーターを設定します。以下に、本チュートリアルで重要なパラメーターを示します。その他のパラメーターについてはデフォルト値のままにしてください。

    スケジュールパラメーター

    説明

    スケジュールタイプ

    本チュートリアルでは、ダミーラン を設定します。

    リソースグループ

    本チュートリアルでは、リソースグループの作成およびワークスペースへの関連付け で作成したサーバーレスリソースグループを選択します。

    ノード依存関係の設定

    workshop は初期ノードであり、上流の依存関係がないため、ワークスペースのルートノードを使用 をクリックして、ワークフローをワークスペースのルートノードによってトリガーできます。

    ワークスペースのルートノードの名前は WorkspaceName_root の形式です。

  3. ノードツールバーの 保存 をクリックしてノードを保存します。

商品情報の同期(ods_item_info)
  1. ワークフローキャンバス上で ods_item_info ノードにマウスを合わせ、「ノードを開く」をクリックします。

  2. データソースおよびリソースグループを設定します。

    パラメーター

    説明

    ソース

    データソース:MySQL_Source

    送信先

    データソース:MaxCompute 計算リソースの作成および関連付け で関連付けた MaxCompute 計算リソースを選択します。本例では MaxCompute_Source を使用します。

    リソースグループ

    リソースグループの作成およびワークスペースへの関連付け で購入したサーバーレスリソースグループを選択します。

  3. 同期設定を設定します。

    1. データソースおよび送信先の設定

      説明

      表に記載されていないパラメーターについては、デフォルト値のままにしてください。

      設定領域

      パラメーター

      説明

      データソース

      テーブル

      item_info

      データ送信先

      テーブル

      送信先テーブルスキーマの生成」をクリックして、MaxCompute テーブルを素早く作成します。テーブル作成文 領域に以下の文を貼り付け、「テーブルの作成」をクリックします。このテーブルは、データソースから受け取った商品情報を格納します。

      テーブル作成 SQL

      CREATE TABLE IF NOT EXISTS ods_item_info(
        `id`                              BIGINT COMMENT '',
        `cate_id`                         BIGINT COMMENT '',
        `cate_name`                       STRING COMMENT '',
        `commodity_id`                    BIGINT COMMENT '',
        `commodity_name`                  STRING COMMENT '',
        `desc_path`                       STRING COMMENT '',
        `duration`                        BIGINT COMMENT '',
        `features`                        STRING COMMENT '',
        `gmt_create`                      DATETIME COMMENT '',
        `gmt_modified`                    DATETIME COMMENT '',
        `is_deleted`                      BIGINT COMMENT '',
        `is_virtual`                      STRING COMMENT '',
        `item_id`                         BIGINT COMMENT '',
        `item_status`                     BIGINT COMMENT '',
        `last_offline_time`               DATETIME COMMENT '',
        `last_online_quantity`            BIGINT COMMENT '',
        `last_online_time`                DATETIME COMMENT '',
        `pict_url`                        STRING COMMENT '',
        `reserve_price`                   DECIMAL(38,18) COMMENT '',
        `secure_trade_ems_post_fee`       DECIMAL(38,18) COMMENT '',
        `secure_trade_fast_post_fee`      DECIMAL(38,18) COMMENT '',
        `secure_trade_ordinary_post_fee`  DECIMAL(38,18) COMMENT '',
        `shop_id`                         BIGINT COMMENT '',
        `shop_nick`                       STRING COMMENT '',
        `sub_title`                       STRING COMMENT '',
        `title`                           STRING COMMENT ''
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;

      パーティション情報

      本チュートリアルでは、${bizdate} を入力します。この変数は、テスト時には定数値を使用し、スケジュール実行時には動的値を使用します。DataStudio における変数形式および設定方法の詳細については、「スケジュールパラメーター」をご参照ください。

    2. フィールドマッピング および チャネル制御 を確認します。

      DataWorks は、ソースフィールドを対応する送信先フィールドにマッピングします。また、右側のチャネル制御パネルで、タスクの並列実行数や不正データポリシーなどの設定も行えます。本チュートリアルでは、不正データレコードのポリシー不正データレコードを許可しない に設定し、その他の設定はデフォルトのままにします。詳細については、「ウィザードモードでのバッチ同期ノードの設定」をご参照ください。

  4. ノードツールバーの 保存 をクリックしてノードを保存します。

注文データの同期(ods_trade_order)
  1. ワークフローキャンバス上で ods_trade_order ノードにマウスを合わせ、「ノードを開く」をクリックします。

  2. データソースおよびリソースグループを設定します。

    パラメーター

    説明

    ソース

    データソース:MySQL_Source

    送信先

    データソース:MaxCompute 計算リソースの作成および関連付け で関連付けた MaxCompute 計算リソースを選択します。本例では MaxCompute_Source を使用します。

    リソースグループ

    リソースグループの作成およびワークスペースへの関連付け で購入したサーバーレスリソースグループを選択します。

  3. 次へ をクリックして同期タスクを設定します。

    1. データソースおよび送信先の設定

      説明

      表に記載されていないパラメーターについては、デフォルト値のままにしてください。

      設定領域

      パラメーター

      説明

      データソース

      テーブル

      trade_order

      データ送信先

      テーブル

      送信先テーブルスキーマの生成」をクリックして、MaxCompute テーブルを素早く作成します。テーブル作成文 領域に以下の文を貼り付け、「テーブルの作成」をクリックします。このテーブルは、データソースから受け取った商品情報を格納します。

      テーブル作成 SQL

      CREATE TABLE IF NOT EXISTS ods_trade_order(
        `id`                BIGINT COMMENT '',
        `biz_type`          BIGINT COMMENT '',
        `buy_amount`        BIGINT COMMENT '',
        `buyer_id`          BIGINT COMMENT '',
        `buyer_memo`        STRING COMMENT '',
        `buyer_nick`        STRING COMMENT '',
        `end_time`          DATETIME COMMENT '',
        `gmt_create`        DATETIME COMMENT '',
        `gmt_modified`      DATETIME COMMENT '',
        `ip`                STRING COMMENT '',
        `is_parent`         BIGINT COMMENT '',
        `is_sub`            BIGINT COMMENT '',
        `item_id`           BIGINT COMMENT '',
        `item_price`        DECIMAL(38,18) COMMENT '',
        `logistics_status`  BIGINT COMMENT '',
        `memo`              STRING COMMENT '',
        `parent_order_id`   BIGINT COMMENT '',
        `pay_status`        BIGINT COMMENT '',
        `pay_time`          DATETIME COMMENT '',
        `seller_memo`       STRING COMMENT '',
        `shop_id`           BIGINT COMMENT '',
        `status`            BIGINT COMMENT '',
        `sub_order_id`      BIGINT COMMENT '',
        `total_fee`         DECIMAL(38,18) COMMENT ''
      )
      COMMENT ''
      PARTITIONED BY (pt STRING) 
      lifecycle 36500;

      パーティション情報

      本チュートリアルでは、${bizdate} を入力します。この変数は、テスト時には定数値を使用し、スケジュール実行時には動的値を使用します。DataStudio における変数形式および設定方法の詳細については、「スケジュールパラメーター」をご参照ください。

    2. フィールドマッピング および チャネル制御 を確認します。

      DataWorks は、ソースフィールドを対応する送信先フィールドにマッピングします。また、右側のチャネル制御パネルで、タスクの並列実行数や不正データポリシーなどの設定も行えます。本チュートリアルでは、不正データレコードのポリシー不正データレコードを許可しない に設定し、その他の設定はデフォルトのままにします。詳細については、「ウィザードモードでのバッチ同期ノードの設定」をご参照ください。

  4. ノードツールバーの 保存 をクリックしてノードを保存します。

ステップ 2:データのクリーニングおよび処理

データを MaxCompute へ同期した後、DataStudio モジュールを使用して ods_item_info および ods_trade_order テーブルをクリーニング、処理、分析し、毎日の人気商品カテゴリランキングを生成します。

データ処理パイプラインの構築

  1. DataStudio の左側ナビゲーションペインで image をクリックして DataStudio ページへ移動します。ワークスペースディレクトリ セクションで、作成したワークフローを見つけ、クリックします。ワークフローキャンバス上で左側パネルから MaxCompute SQL ノードをドラッグ&ドロップし、それぞれに適切な名前を付けます。

    以下に、本チュートリアルで使用するノード名の例とその機能を示します:

    ノードタイプ

    ノード名

    機能

    imageMaxCompute SQL

    dim_item_info

    ods_item_info テーブルから商品データを処理し、dim_item_info ディメンションテーブルを作成します。

    imageMaxCompute SQL

    dwd_trade_order

    ods_trade_order テーブルから取引データをクリーニングおよび変換し、dwd_trade_order ファクトテーブルを作成します。

    imageMaxCompute SQL

    dws_daily_category_sales

    dwd_trade_order および dim_item_info テーブルからクリーニングおよび標準化された詳細データを集約し、日次商品カテゴリ売上サマリーテーブル dws_daily_category_sales を作成します。

    imageMaxCompute SQL

    ads_top_selling_categories

    dws_daily_category_sales テーブルに基づき、毎日の人気商品カテゴリランキングテーブル ads_top_selling_categories を生成します。

  2. ノードをドラッグ&ドロップして接続し、上流の依存関係を設定します。最終的な構成は以下のようになります:

    image
    説明

    依存関係 は、ノードを手動で接続するか、各ノード内のコードを自動解析することによって設定できます。本チュートリアルでは手動接続方式を使用します。自動解析の詳細については、「自動依存関係解析」をご参照ください。

データ処理ノードの設定

dim_item_info ノード

このノードは、ods_item_info テーブルから商品ディメンションデータを処理し、商品情報ディメンションテーブル dim_item_info を生成します。

  1. ワークフローキャンバス上で dim_item_info ノードにマウスを合わせ、「ノードを開く」をクリックします。

  2. 以下のコードをノードエディターに貼り付けます。

    CREATE TABLE IF NOT EXISTS dim_item_info (
        gmt_modified                   STRING COMMENT '商品の最終更新日',
        gmt_create                     STRING COMMENT '商品の作成日時',
        item_id                        BIGINT COMMENT '商品の数値 ID',
        title                          STRING COMMENT '商品タイトル',
        sub_title                      STRING COMMENT '商品サブタイトル',
        pict_url                       STRING COMMENT 'メイン画像の URL',
        desc_path                      STRING COMMENT '商品説明のパス',
        item_status                    BIGINT COMMENT '商品ステータス:1 = 承認済み、0 = 未承認',
        last_online_time               DATETIME COMMENT '商品が販売開始された最新日時',
        last_offline_time              DATETIME COMMENT '販売終了日時(オークション商品のみ)',
        duration                       BIGINT COMMENT '有効期間または販売サイクル(7 日または 14 日)',
        reserve_price                  DOUBLE COMMENT '現在価格',
        secure_trade_ordinary_post_fee DOUBLE COMMENT '通常郵便料金',
        secure_trade_fast_post_fee     DOUBLE COMMENT '宅配便料金',
        secure_trade_ems_post_fee      DOUBLE COMMENT 'EMS 料金',
        last_online_quantity           BIGINT COMMENT '商品が最後に出品された際の在庫数',
        features                       STRING COMMENT '商品の特徴',
        cate_id                        BIGINT COMMENT '商品の葉カテゴリ ID',
        cate_name                      STRING COMMENT '商品の葉カテゴリ名',
        commodity_id                   BIGINT COMMENT '商品カテゴリ ID',
        commodity_name                 STRING COMMENT '商品カテゴリ名',
        is_virtual                     STRING COMMENT '仮想商品かどうかを示す',
        shop_id                        BIGINT COMMENT 'ショップ ID',
        shop_nick                      STRING COMMENT 'ショップニックネーム',
        is_deleted                     BIGINT COMMENT 'カテゴリが削除済みかどうかを示す'
    )
    COMMENT '商品情報ディメンションテーブル'
    PARTITIONED BY (pt STRING COMMENT '業務日付(yyyymmdd)')
    LIFECYCLE 365;
    
    
    -- dim_item_info テーブルへのデータ挿入
    INSERT OVERWRITE TABLE dim_item_info PARTITION(pt='${bizdate}')
    SELECT
        gmt_create,
        gmt_modified,
        item_id,
        title,
        sub_title,
        pict_url,
        desc_path,
        item_status,
        last_online_time,
        last_offline_time,
        duration,
        cast(reserve_price as DOUBLE),
        cast(secure_trade_ordinary_post_fee as DOUBLE),
        cast(secure_trade_fast_post_fee as DOUBLE),
        cast(secure_trade_ems_post_fee as DOUBLE),
        last_online_quantity,
        features,
        cate_id,
        cate_name,
        commodity_id,
        commodity_name,
        is_virtual,
        shop_id,
        shop_nick,
        is_deleted
    FROM ods_item_info
    WHERE pt = '${bizdate}';
  3. 実行時パラメーターを設定します。

    MaxCompute SQL ノードエディターの右側で 実行設定 をクリックします:

  4. ノードツールバーの 保存 をクリックしてノードを保存します。

dwd_trade_order ノード

このノードは、ods_trade_order テーブルから取引の詳細データを初期クリーニング、変換、およびビジネスロジック処理を行い、取引詳細ファクトテーブル dwd_trade_order を生成します。

  1. ワークフローキャンバス上で dwd_trade_order ノードにマウスを合わせ、「ノードを開く」をクリックします。

  2. 以下のコードをノードエディターに貼り付けます。

    CREATE TABLE IF NOT EXISTS dwd_trade_order (
        id               BIGINT COMMENT '主キー:重複排除後の最新 ID',
        gmt_create       DATETIME COMMENT '作成日時',
        gmt_modified     DATETIME COMMENT '更新日時',
        sub_order_id     BIGINT COMMENT 'サブ注文 ID',
        parent_order_id  BIGINT COMMENT '親注文 ID',
        buyer_id         BIGINT COMMENT '購入者数値 ID',
        buyer_nick       STRING COMMENT '購入者ニックネーム(null の場合は空文字列)',
        item_id          BIGINT COMMENT '商品数値 ID',
        item_price       DECIMAL(38,18) COMMENT '商品価格(センチ単位)',
        buy_amount       BIGINT COMMENT '購入数量',
        biz_type         BIGINT COMMENT '取引タイプ',
        memo             STRING COMMENT 'メモ(null の場合は空文字列)',
        pay_status       BIGINT COMMENT '支払ステータス',
        logistics_status BIGINT COMMENT '物流ステータス',
        status           BIGINT COMMENT 'ステータス',
        seller_memo      STRING COMMENT '取引に対する販売者メモ',
        buyer_memo       STRING COMMENT '取引に対する購入者メモ',
        clean_ip         STRING COMMENT 'クリーニング済み購入者 IP;無効な形式をフィルター',
        end_time         DATETIME COMMENT '取引終了日時',
        pay_time         DATETIME COMMENT '支払日時',
        is_sub           BIGINT COMMENT 'サブ注文かどうかを示す(1 の場合)',
        is_parent        BIGINT COMMENT '親注文かどうかを示す(1 の場合)',
        shop_id          BIGINT COMMENT 'ショップ ID',
        total_fee        DECIMAL(38,18) COMMENT '割引および調整後のサブ注文料金',
        is_large_order_flag BOOLEAN COMMENT '大口注文かどうかを示すフラグ'
    )
    COMMENT '取引詳細ファクトテーブル(初期クリーニングおよびビジネスロジックを含む)'
    PARTITIONED BY (pt STRING COMMENT '業務日付(yyyymmdd)')
    LIFECYCLE 365; -- データライフサイクルを 365 日に設定
    
    
    INSERT OVERWRITE TABLE dwd_trade_order PARTITION(pt='${bizdate}')
    SELECT
        id,
        gmt_create,
        gmt_modified,
        sub_order_id,
        parent_order_id,
        buyer_id,
        COALESCE(buyer_nick, '') AS buyer_nick, -- null の buyer_nick を処理
        item_id,
        item_price,
        buy_amount,
        biz_type,
        COALESCE(memo, '') AS memo, -- null の memo を処理
        pay_status,
        logistics_status,
        status,
        seller_memo,
        buyer_memo,
        CASE 
            WHEN ip LIKE '__.__.__.__' THEN NULL -- 無効な IP 形式をフィルター
            ELSE ip 
        END AS clean_ip,
        end_time,
        pay_time,
        is_sub,
        is_parent,
        shop_id,
        total_fee,
        CASE 
            WHEN total_fee >= 10000 THEN TRUE -- 10,000 センチ以上の注文を大口注文とみなす
            ELSE FALSE 
        END AS is_large_order_flag -- ビジネスロジックフラグを追加
    FROM (
        SELECT
            *,
            ROW_NUMBER() OVER(PARTITION BY buyer_id, item_id, gmt_create ORDER BY id DESC) AS rn -- 重複排除のための行番号
        FROM ods_trade_order
        WHERE pt = '${bizdate}'
    ) AS sub_query
    WHERE rn = 1;
  3. 実行時パラメーターを設定します。

    MaxCompute SQL ノードエディターの右側で 実行設定 をクリックします:

  4. ノードツールバーの 保存 をクリックしてノードを保存します。

dws_daily_category_sales ノード

このノードは、dwd_trade_order および dim_item_info テーブルからクリーニングおよび標準化された詳細データを集約し、日次商品カテゴリ売上サマリーテーブル dws_daily_category_sales を生成します。

  1. ワークフローキャンバス上で dws_daily_category_sales ノードにマウスを合わせ、「ノードを開く」をクリックします。

  2. 以下のコードをノードエディターに貼り付けます。

    CREATE TABLE IF NOT EXISTS dws_daily_category_sales (
        cate_id             BIGINT COMMENT '商品の葉カテゴリ ID',
        cate_name           STRING COMMENT '商品の葉カテゴリ名',
        total_sales_amount  DECIMAL(38,18) COMMENT 'カテゴリ総売上額(センチ単位)',
        order_count         BIGINT COMMENT '注文件数'
    )
    COMMENT '日次商品カテゴリ売上サマリーテーブル'
    PARTITIONED BY (pt STRING COMMENT '業務日付(yyyymmdd)')
    LIFECYCLE 365;
    
    
    INSERT OVERWRITE TABLE dws_daily_category_sales PARTITION(pt='${bizdate}')
    SELECT
        i.cate_id,
        i.cate_name,
        SUM(t.total_fee) AS total_sales_amount,
        COUNT(DISTINCT t.id) AS order_count
    FROM dwd_trade_order t
    JOIN dim_item_info i ON t.item_id = i.item_id AND t.pt = i.pt
    WHERE t.pt = '${bizdate}'
    GROUP BY t.pt, i.cate_id, i.cate_name;
  3. 実行時パラメーターを設定します。

    MaxCompute SQL ノードエディターの右側で 実行設定 をクリックします:

  4. ノードツールバーの 保存 をクリックしてノードを保存します。

ads_top_selling_categories ノード

このノードは、dws_daily_category_sales テーブルに基づき、毎日の人気商品カテゴリランキングテーブル ads_top_selling_categories を生成します。

  1. ワークフローキャンバス上で ads_top_selling_categories ノードにマウスを合わせ、「ノードを開く」をクリックします。

  2. 以下のコードをノードエディターに貼り付けます。

    CREATE TABLE IF NOT EXISTS ads_top_selling_categories (
        rank                BIGINT COMMENT '売上順位',
        cate_id             BIGINT COMMENT '商品の葉カテゴリ ID',
        cate_name           STRING COMMENT '商品の葉カテゴリ名',
        total_sales_amount  DECIMAL(38,18) COMMENT '商品カテゴリの総売上額(センチ単位)',
        order_count         BIGINT COMMENT '注文数'
    )
    COMMENT '毎日の人気商品カテゴリランキングテーブル'
    PARTITIONED BY (pt STRING COMMENT '業務日付(yyyymmdd)');
    
    
    INSERT OVERWRITE TABLE ads_top_selling_categories PARTITION(pt='${bizdate}')
    SELECT
        rank,
        cate_id,
        cate_name,
        total_sales_amount,
        order_count
    FROM (
        SELECT
            DENSE_RANK() OVER(ORDER BY total_sales_amount DESC) AS rank,
            cate_id,
            cate_name,
            total_sales_amount,
            order_count
        FROM (
            SELECT
                cate_id,
                cate_name,
                SUM(total_sales_amount) AS total_sales_amount,
                SUM(order_count) AS order_count
            FROM dws_daily_category_sales
            WHERE pt = '${bizdate}'
            GROUP BY cate_id, cate_name
        ) agg_sub
    ) agg_outer
    WHERE rank <= 10;
  3. 実行時パラメーターを設定します。

    MaxCompute SQL ノードエディターの右側で 実行設定 をクリックします:

  4. ノードツールバーの 保存 をクリックしてノードを保存します。

ステップ 3:ワークフローの実行およびデバッグ

ワークフローの設定が完了したら、本番環境にデプロイする前に、その設定を検証するために実行します。

  1. DataStudio の左側ナビゲーションペインで image をクリックして DataStudio ページへ移動し、ワークスペースディレクトリ セクションで作成したワークフローを見つけます。

  2. ノードツールバーの 実行 をクリックします。「実行パラメーターの入力」ダイアログボックスで、前日の日付(例:20250416)を入力します。

    説明

    ワークフローノードでは、動的コード用のスケジュールパラメーターを使用します。デバッグ時は、これらのパラメーターにテスト用の定数値を割り当てる必要があります。

  3. OK をクリックしてデバッグ実行ページへ移動します。

  4. 実行が完了するまで待ちます。期待される結果は以下の通りです:

    image

ステップ 4:データの照会および可視化

生データが処理・集約され、ads_top_selling_categories テーブルに格納されたので、その結果を照会できます。

  1. 左上隅の image アイコンをクリックし、「全製品 > データ分析 > SQL クエリ」をクリックします。

  2. マイファイル の隣にある image > ファイルの作成 をクリックします。任意の ファイル名 を入力し、「OK」をクリックします。

  3. SQL クエリエディターで、以下の SQL 文を入力します。

    SELECT * FROM ads_top_selling_categories WHERE pt=${bizdate};
  4. 右上隅で MaxCompute データソースを選択し、「OK」をクリックします。

  5. 上部の 実行 ボタンをクリックします。「コスト見積もり」ページで、「実行」をクリックします。

  6. クエリ結果で image をクリックしてチャートを表示します。チャートの右上隅にある image アイコンをクリックすると、スタイルをカスタマイズできます。

  7. また、チャートの右上隅にある 保存 をクリックして、カードとして保存することもできます。その後、左側ナビゲーションペインの カードimage)をクリックして表示できます。

ステップ 5:定期スケジュールの設定

毎日の更新を実現するため、ワークフローを本番環境にデプロイして定期実行します。

説明

データ同期および処理のステップで、ワークフローおよびそのノードに対してスケジュールパラメーターが既に設定されています。そのため、ワークフローを本番環境にデプロイするだけで済みます。スケジュール設定の詳細については、「ノードのスケジュール設定」をご参照ください。

  1. 左上隅の image アイコンをクリックし、「全製品 > データ開発および O&M > DataStudio(データ開発)」をクリックします。

  2. DataStudio の左側ナビゲーションペインで image をクリックして DataStudio ページへ移動します。本チュートリアルで使用するワークスペースに切り替え、ワークスペースディレクトリ セクションで作成したワークフローを見つけます。

  3. ノードツールバーの デプロイ をクリックします。デプロイパネルで、「本番環境へのデプロイを開始」をクリックします。「パッケージのビルド」および「本番環境オンラインチェック」のステップが完了するまで待ち、その後「デプロイ」をクリックします。

  4. 本番環境オンライン ステータスが 完了 に変わったら、「O&M を実行」をクリックしてオペレーションセンターへ移動します。

    image

  5. 自動トリガーされたノードの O&M > 自動トリガーされたノード で、ワークフローの定期タスク(本チュートリアルでは dw_quickstart)を確認できます。

  6. ワークフロー内の子ノードの定期タスクを表示するには、ワークフローの定期タスクを右クリックし、「内部タスクの表示」を選択します。

    image

    期待される結果は以下の通りです:

    image

次のステップ

リソースのクリーンアップ

本チュートリアルで作成したリソースをクリーンアップするには、以下の手順に従ってください:

  1. 自動トリガーされたノードのデプロイを解除します。

    1. DataWorks コンソール にログインします。トップナビゲーションバーで目的のリージョンを選択し、左側ナビゲーションペインで データ開発および O&M > オペレーションセンター を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、「オペレーションセンターへ移動」をクリックします。

    2. 自動トリガーされたノードの O&M > 自動トリガーされたノード へ移動します。作成したすべての自動トリガーされたノードのチェックボックスをオンにします。ワークスペースのルートノードはデプロイ解除しないでください。その後、ページ下部で 操作 > デプロイ解除 をクリックします。

  2. データ開発ノードを削除し、MaxCompute 計算リソースの関連付けを解除します。

    1. DataWorks コンソールの ワークスペース ページへ移動します。トップナビゲーションバーで目的のリージョンを選択し、目的のワークスペースを見つけ、操作 列の ショートカット > Data Studio を選択します。

    2. Data Studio の左側ナビゲーションペインで image アイコンをクリックして データ開発 ページを開きます。ワークスペースディレクトリ セクションで、作成したワークフローを見つけ、右クリックして「削除」をクリックします。

    3. 左側ナビゲーションペインで image > 計算リソース をクリックします。関連付けられた MaxCompute 計算リソースを見つけ、「関連付け解除」をクリックします。確認ダイアログボックスでチェックボックスをオンにし、画面上の指示に従います。

  3. MySQL データソースを削除します。

    1. DataWorks コンソール にログインします。トップナビゲーションバーで目的のリージョンを選択し、左側ナビゲーションペインで その他 > 管理センター を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、「管理センターへ移動」をクリックします。

    2. 左側ナビゲーションペインで データソース をクリックします。データソース ページで、作成した MySQL データソースを見つけ、操作 列の「削除」をクリックし、画面上の指示に従います。

  4. MaxCompute プロジェクトを削除します。

    MaxCompute プロジェクト管理 ページへ移動します。作成した MaxCompute プロジェクトを見つけ、操作 列の「削除」をクリックし、画面上の指示に従います。

  5. インターネット NAT Gateway を削除し、Elastic IP Address(EIP)を解放します。

    1. VPC - インターネット NAT Gateway コンソール へ移動します。トップメニューバーでリージョンを 中国 (上海) に切り替えます。

    2. 作成したインターネット NAT Gateway を見つけ、[操作] 列で image > [削除] をクリックします。確認ダイアログボックスで、[強制削除] チェックボックスを選択し、[OK] をクリックします。

    3. 左側ナビゲーションペインで インターネットへのアクセス > Elastic IP アドレス をクリックします。作成した EIP を見つけ、操作 列で image > インスタンス管理 > 解放 を選択します。確認ダイアログボックスで「OK」をクリックします。