すべてのプロダクト
Search
ドキュメントセンター

:DataWorks OpenLake ワンストップインテリジェントデータレイクハウス開発

最終更新日:Oct 29, 2025

この実験では、OpenLake データレイクハウス環境における小売 e コマースのデータ開発および分析シナリオを体験します。DataWorks を使用して、マルチエンジン共同開発、ビジュアルワークフローオーケストレーション、およびデータカタログ管理を行います。また、Python プログラミングとデバッグを実践し、Notebook を使用して AI を活用したインタラクティブなデータ探索と分析を行います。

背景

DataWorks の概要

DataWorks は、データレイクハウスの開発とガバナンスのためのインテリジェントなプラットフォームです。Alibaba の 15 年にわたるビッグデータ構築手法に基づいて構築されています。MaxCompute、E-MapReduce、Hologres、Flink、PAI など、数十の Alibaba Cloud ビッグデータおよび AI コンピューティングサービスと深く統合されています。DataWorks は、データウェアハウス、データレイク、および OpenLake データレイクハウスアーキテクチャ向けに、インテリジェントな抽出・変換・書き出し (ETL) 開発、データ分析、およびプロアクティブなデータ資産ガバナンスサービスを提供します。これにより、「Data+AI」ライフサイクル全体を管理できます。2009 年以来、DataWorks は Alibaba のデータアーキテクチャを継続的に製品化してきました。政府、金融、小売、インターネット、自動車、製造など、さまざまな業界にサービスを提供しています。何万もの顧客が DataWorks を使用してデジタルトランスフォーメーションを推進し、価値を創造しています。

DataWorks Copilot の概要

DataWorks Copilot は DataWorks のインテリジェントアシスタントです。DataWorks では、DataWorks のデフォルトモデルである Qwen3-235B-A22B、DeepSeek-R2-0528、または Qwen3-Coder 大規模モデルを選択して Copilot 操作を実行できます。DeepSeek-R2 の深い推論機能により、DataWorks Copilot は自然言語対話を通じて SQL コードの生成、最適化、テストなどの複雑な操作を実行するのに役立ちます。これにより、ETL (抽出·変換·書き出し) 開発とデータ分析の効率が大幅に向上します。

DataWorks Notebook の概要

DataWorks Notebook は、データ開発と分析のためのインテリジェントでインタラクティブなツールです。複数のデータエンジンにまたがる SQL または Python 分析をサポートしています。コードを即座に実行またはデバッグし、視覚化されたデータ結果を表示できます。DataWorks Notebook は、他のタスクノードとオーケストレーションしてワークフローに組み込み、スケジューling システムに送信して実行することもできます。これにより、複雑なビジネスシナリオを柔軟に実装できます。

使用上の注意

制限事項

  • OpenLake は Data Lake Formation (DLF) 2.0 のみをサポートします。

  • データカタログは Data Lake Formation (DLF) 2.0 のみをサポートします。

  • Qwen3-235B-A22B/DeepSeek-R2 モデルは、中国 (杭州)、中国 (上海)、中国 (北京)、中国 (張家口)、中国 (深圳)、および中国 (成都) のリージョンで利用できます。

  • Qwen3-Coder モデルは、中国 (杭州)、中国 (上海)、中国 (北京)、中国 (張家口)、中国 (ウランチャブ)、中国 (深圳)、および中国 (成都) で利用できます。

前提条件

  1. Alibaba Cloud アカウントを準備したか、RAM ユーザーを準備したこと。

  2. ワークスペースを作成したこと。

    説明

    [Data Development (DataStudio) (New) のパブリックプレビューに参加] を選択します。

  3. 計算資源をアタッチしたこと。

手順

ステップ 1: データカタログを管理する

データレイクハウスの データカタログ 管理機能を使用すると、DLF、MaxCompute、Hologres などのサービスのデータカタログを管理および作成できます。

  1. DataStudio で、左側のメニューの image アイコンをクリックして [データカタログ] を開きます。ナビゲーションウィンドウで、管理するメタデータタイプを見つけ、[プロジェクトを追加] をクリックします。このボタンの名前はメタデータタイプによって異なる場合があります。このトピックでは MaxCompute を例として使用します。

    DataWorks ワークスペースからデータソースを追加できます。[MaxCompute-Project] タブで権限を持つ MaxCompute プロジェクトを選択することもできます。

    image

  2. プロジェクトを追加すると、対応するメタデータタイプの下に表示されます。プロジェクト名をクリックして、データカタログの詳細ページに移動します。

  3. データカタログの詳細ページで、スキーマを選択し、任意のテーブル名をクリックしてテーブルの詳細を表示します。

  4. データカタログでは、テーブルを視覚的に作成できます。

    データカタログを指定したスキーマの [テーブル] レベルまで展開します。右側の image アイコンをクリックして [テーブルを作成] ページを開きます。

    image

  5. [テーブルを作成] ページでは、次の方法でテーブルを作成できます。

    • エリア ① で、[テーブル名][フィールド情報] を入力します。

    • エリア ② では、[DDL] 文を直接入力してテーブルを作成できます。

    image

  6. ページ上部の [公開] ボタンをクリックしてテーブルを作成します。

ステップ 2:ワークフローを調整する

ワークフローを使用すると、ビジネスロジックに基づいてさまざまなタイプのデータ開発ノードをドラッグアンドドロップしてオーケストレーションできます。各ノードのスケジューリング時間などの共通パラメーターを設定する必要はありません。これにより、複雑なタスクプロジェクトを簡単に管理できます。

  1. DataStudio で、左側のプライマリメニューの image アイコンをクリックして [データ開発] を開きます。左側のナビゲーションウィンドウで [プロジェクトフォルダ] を見つけ、その横にある image アイコンをクリックし、[新しいワークフロー] を選択します。

  2. ワークフローの [名前] を入力し、[OK] をクリックしてワークフローエディターを開きます。

  3. ワークフローエディターで、キャンバス上の [ノードをドラッグまたはクリックして追加] をクリックします。[ノードを追加] ダイアログボックスで、[ノードタイプ][ゼロロードノード] に設定し、カスタムの [ノード名] を入力してから [確認] をクリックします。

  4. 左側のノードタイプのリストから、必要なノードタイプを見つけてキャンバスにドラッグします。[ノードを追加] ダイアログボックスで、[ノード名] を入力し、[確認] をクリックします。

    image

  5. キャンバス上で、依存関係を作成する 2 つのノードを見つけます。一方のノードの下端の中央にマウスポインターを合わせます。ポインターが + に変わったら、矢印をもう一方のノードにドラッグしてマウスを離します。図のように依存関係を設定したら、上部のツールバーで [保存] をクリックします。

    image

  6. [保存] をクリックした後、必要に応じてキャンバスのレイアウトを調整できます。image

  7. ワークフローキャンバスの右側で、[スケジューリング構成] をクリックします。[スケジューリング構成] パネルを使用して、ワークフローのスケジューリングパラメーターとノードの依存関係を構成します。[スケジューリングパラメーター] セクションで、[パラメーターを追加] をクリックします。パラメーター名フィールドに bizdate と入力します。パラメーター値のドロップダウンリストから、$[yyyymmdd-1] を選択します。

    image

  8. [ワークスペースルートノードを使用] をクリックして、このノードをワークフローの上流依存関係として設定します。

    image

  9. ワークフローキャンバスの上にある [公開] をクリックします。右下隅に [公開コンテンツ] パネルが表示されます。パネルで、[本番環境への公開を開始] をクリックし、プロンプトに従って確認します。

    image

ステップ 3: マルチエンジン共同開発を使用する

DataStudio は、Data Integration、MaxCompute、Hologres、EMR、Flink、Python、Notebook、ADB など、さまざまなエンジンにまたがる数十種類のノードタイプのデータウェアハウス開発をサポートしています。複雑なスケジューリング依存関係をサポートし、開発環境を本番環境から分離する開発モデルを提供します。この実験では、Flink SQL Streaming ノードの作成を例として使用します。

  1. DataStudio で、左側のナビゲーションウィンドウの image アイコンをクリックして [データ開発] ページを開きます。ナビゲーションウィンドウで [プロジェクトフォルダ] を見つけ、その横にある image アイコンをクリックします。カスケードメニューで [Flink SQL Streaming] をクリックしてノードエディターを開きます。エディターが開く前に、[ノード名] を入力して Enter キーを押します。

    プリセットノード名: ads_ec_page_visit_log

    image

  2. ノードエディターで、プリセットの Flink SQL Stream コードをコードエディターに貼り付けます。

    image

    プリセット Flink SQL Stream コード

    CREATE TEMPORARY VIEW log_ri_base
    AS 
    SELECT 
      visit_time
      ,substring(visit_time,1,8) as stat_date
      ,substring(visit_time,9,2) as stat_hour
      ,visitor_id
      ,item_id
      ,cate_id
      ,ext_pv
    FROM vvp_ec_ads.dws_log_all_itm_ipv_ri
    WHERE
      bc_type IN ('b', 'z')
      AND coalesce(cate_id, '') <> ''
      AND visitor_type = 'uid'
      and coalesce(item_id, '') <> ''
      AND substring(visit_time,1,8) >= '${bizdate}'
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_day
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,cate_id
      ,item_id
    FROM log_ri_base
    GROUP BY stat_date
      ,cate_id
      ,item_id
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_hh_00
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,stat_hour
      ,item_id
      ,cate_id
    FROM log_ri_base
    GROUP BY stat_date
      ,stat_hour
      ,cate_id
      ,item_id
    ;
    
    BEGIN STATEMENT SET;
    
    INSERT INTO vvp_ec_ads.ads_ec_log
    SELECT
      a.stat_date
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1d
      ,a.uv as mbr_ipv_uv_1d
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_day a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_dt_rtcdm.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    --書き込み
    INSERT INTO vvp_ec_ads.ads_ec_log_hh
    
    SELECT
      a.stat_date
      ,a.stat_hour
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1h
      ,a.uv as mbr_ipv_uv_1h
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_hh_00 a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_ec_ads.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    END;
  3. ノードエディターで、コードエディターの右側にある [リアルタイム構成] をクリックして、[Flink リソース情報][スクリプトパラメーター]、および [Flink 実行時パラメーター] を構成します。

    image

    プリセット Flink SQL Stream リアルタイム構成 - エキスパートモードコード

    {
      "nodes": [
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "0"  -- グループ
          },
          "id": 1,  -- ID
          "type": "StreamExecTableSourceScan",  -- タイプ
          "desc": "Source: vvp_dt_rtcdm_dwd_tb_trd_ord_pay_nrt_ri[71980]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "0"  -- グループ
          },
          "id": 2,  -- ID
          "type": "StreamExecCalc",  -- タイプ
          "desc": "Calc[71981]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "0"  -- グループ
          },
          "id": 3,  -- ID
          "type": "StreamExecLookupJoin",  -- タイプ
          "desc": "LookupJoin[71982]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "0"  -- グループ
          },
          "id": 4,  -- ID
          "type": "StreamExecCalc",  -- タイプ
          "desc": "Calc[71983]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "1"  -- グループ
          },
          "id": 6,  -- ID
          "state": [
            {
              "userDefined": false,  -- ユーザー定義かどうか
              "name": "groupAggregateState",  -- 名前
              "index": 0,  -- インデックス
              "ttl": "36 h"  -- TTL
            }
          ],
          "type": "StreamExecGroupAggregate",  -- タイプ
          "desc": "GroupAggregate[71985]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "1"  -- グループ
          },
          "id": 7,  -- ID
          "type": "StreamExecCalc",  -- タイプ
          "desc": "Calc[71986]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "1"  -- グループ
          },
          "id": 8,  -- ID
          "type": "StreamExecSink",  -- タイプ
          "desc": "ConstraintEnforcer[71987]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "2"  -- グループ
          },
          "id": 10,  -- ID
          "state": [
            {
              "userDefined": false,  -- ユーザー定義かどうか
              "name": "sinkMaterializeState",  -- 名前
              "index": 0,  -- インデックス
              "ttl": "36 h"  -- TTL
            }
          ],
          "type": "StreamExecSink",  -- タイプ
          "desc": "SinkMaterializer[71987]"  -- 説明
        },
        {
          "profile": {
            "parallelism": 256,  -- 並列度
            "maxParallelism": 32768,  -- 最大並列度
            "minParallelism": 1,  -- 最小並列度
            "group": "2"  -- グループ
          },
          "id": 11,  -- ID
          "type": "StreamExecSink",  -- タイプ
          "desc": "Sink: vvp_dt_ads_tb_dev_ads_tb_idec_seckill_cate_bc_trd_flow_htr_000[71987]"  -- 説明
        }
      ],
      "vertices": {  -- 頂点
        "2d95a2974e3b3137fd533ecfd3490bc5": [  -- ハッシュ値
          10,  -- ID
          11  -- ID
        ],
        "717c7b8afebbfb7137f6f0f99beb2a94": [  -- ハッシュ値
          1,  -- ID
          2,  -- ID
          3,  -- ID
          4  -- ID
        ],
        "44b79c13fdb45883c7f21ee510155f4d": [  -- ハッシュ値
          6,  -- ID
          7,  -- ID
          8  -- ID
        ]
      },
      "edges": [  -- エッジ
        {
          "mode": "PIPELINED",  -- モード
          "source": 1,  -- ソース
          "strategy": "FORWARD",  -- 戦略
          "target": 2  -- ターゲット
        },
        {
          "mode": "PIPELINED",  -- モード
          "source": 2,  -- ソース
          "strategy": "FORWARD",  -- 戦略
          "target": 3  -- ターゲット
        },
        {
          "mode": "PIPELINED",  -- モード
          "source": 3,  -- ソース
          "strategy": "FORWARD",  -- 戦略
          "target": 4  -- ターゲット
        },
        {
          "mode": "PIPELINED",  -- モード
          "source": 4,  -- ソース
          "strategy": "HASH",  -- 戦略
          "target": 6  -- ターゲット
        },
        {
          "mode": "PIPELINED",  -- モード
          "source": 6,  -- ソース
          "strategy": "FORWARD",  -- 戦略
          "target": 7  -- ターゲット
        },
        {
          "mode": "PIPELINED",  -- モード
          "source": 7,  -- ソース
          "strategy": "FORWARD",  -- 戦略
          "target": 8  -- ターゲット
        },
        {
          "mode": "PIPELINED",  -- モード
          "source": 8,  -- ソース
          "strategy": "HASH",  -- 戦略
          "target": 10  -- ターゲット
        },
        {
          "mode": "PIPELINED",  -- モード
          "source": 10,  -- ソース
          "strategy": "FORWARD",  -- 戦略
          "target": 11  -- ターゲット
        }
      ],
      "ssgProfiles": [  -- SSG プロファイル
        {
          "managed": {},  -- 管理対象
          "name": "0",  -- 名前
          "cpu": 0.25,  -- CPU
          "offHeap": "32 mb",  -- オフヒープ
          "heap": "992 mb",  -- ヒープ
          "extended": {}  -- 拡張
        },
        {
          "managed": {  -- 管理対象
            "STATE_BACKEND": "512 mb"  -- STATE_BACKEND
          },
          "name": "1",  -- 名前
          "cpu": 0.25,  -- CPU
          "offHeap": "32 mb",  -- オフヒープ
          "heap": "480 mb",  -- ヒープ
          "extended": {}  -- 拡張
        },
        {
          "managed": {  -- 管理対象
            "STATE_BACKEND": "512 mb"  -- STATE_BACKEND
          },
          "name": "2",  -- 名前
          "cpu": 0.25,  -- CPU
          "offHeap": "32 mb",  -- オフヒープ
          "heap": "480 mb",  -- ヒープ
          "extended": {}  -- 拡張
        }
      ]
    }

    プリセット Flink SQL Stream リアルタイム構成 - Flink 実行時パラメーター - その他の構成

    blob.fetch.backlog: 1000
    taskmanager.debug.memory.log-interval: 5000
  4. リアルタイム設定を構成した後、コードエディターの上にある [保存] をクリックします。次に [公開] をクリックします。右下隅に表示される [公開コンテンツ] パネルで、[本番環境への公開を開始] をクリックし、プロンプトに従って確認します。

    image

ステップ 4:個人開発環境に入る

個人開発環境は、カスタムコンテナイメージ、ユーザー NAS および Git への接続、Notebook を使用した Python でのプログラミングをサポートしています。

DataStudio で、ページ上部の image アイコンをクリックします。ドロップダウンメニューで、入りたい個人開発環境を選択し、ページが読み込まれるのを待ちます。

image

ステップ 5: Python でプログラミングとデバッグを行う

DataWorks は DSW と深く統合されています。個人開発環境に入った後、DataStudio は Python コードの記述、デバッグ、実行、スケジューリングをサポートします。

重要

このステップを開始する前に、ステップ 4: 個人開発環境に入る を完了する必要があります。

  1. DataStudio ページで、個人の開発者環境の workspace フォルダをクリックします。[個人用フォルダ] の右側にある image アイコンをクリックします。左側のリストに無題のファイルが表示されます。ファイルに名前を付け、Enter キーを押し、生成されるのを待ちます。

    プリセットファイル名: ec_item_rec.py

    image

  2. Python ファイルページのコードエディターに、プリセットの Python コードを入力します。次に、コードエディターの上にある [Python ファイルを実行] をクリックし、ページ下部の [ターミナル] で結果を確認します。

    image

    image

    プリセット Python コード

    import pandas as pd
    from surprise import Dataset, Reader, SVD
    from surprise.model_selection import train_test_split
    from surprise import accuracy
    
    # サンプルデータを作成
    data_dict = {
        'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 4],
        'item_id': [101, 102, 103, 101, 104, 105, 102, 105, 101],
        'rating': [5, 3, 4, 2, 4, 5, 4, 5, 3]
    }
    
    # データを DataFrame に変換
    df = pd.DataFrame(data_dict)
    
    # Surprise ライブラリを使用してデータセットを準備
    reader = Reader(rating_scale=(1, 5))
    data = Dataset.load_from_df(df[['user_id', 'item_id', 'rating']], reader)
    
    # データセットをトレーニングセットとテストセットに分割
    trainset, testset = train_test_split(data, test_size=0.2)
    
    # SVD アルゴリズムをレコメンデーションに使用
    model = SVD()
    model.fit(trainset)
    
    # 予測を行う
    predictions = model.test(testset)
    
    # RMSE を計算
    rmse = accuracy.rmse(predictions)
    print(f'RMSE: {rmse:.2f}')
    
    # ユーザーにおすすめの商品を取得
    def get_recommendations(user_id, model, all_items, n=3):
        item_ids = all_items['item_id'].unique()
        user_item_col = all_items[(all_items['user_id'] == user_id)]['item_id']
        unseen_items = [item for item in item_ids if item not in user_item_col.values]
    
        # 未評価のアイテムの評価を予測
        predictions = []
        for item in unseen_items:
            pred = model.predict(user_id, item)
            predictions.append((item, pred.est))
    
        # 予測評価でソート
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n]
    
    # 商品のレコメンデーションを取得
    all_items = df
    user_id = 1  # レコメンデーションを取得するユーザーの ID
    recommendations = get_recommendations(user_id, model, all_items)
    
    print(f'Recommended products for user {user_id}:')
    for item_id, score in recommendations:
        print(f'Product ID: {item_id}, Predicted score: {score:.2f}')

    Python 環境のインストール

    pip install pandas scikit-surprise  # pandas と scikit-surprise をインストール
  3. コードをデバッグするには、コードエディターの上にある [Python ファイルをデバッグ] をクリックするか、コードエディターの左側にあるパネルの image アイコンをクリックします。行番号の左側をクリックしてブレークポイントを設定できます。

    image

ステップ 6: Notebook でデータを探索する

Notebook のデータ探索操作は、個人開発環境で実行されます。開始する前に ステップ 4: 個人開発環境に入る を完了する必要があります。

Notebook の作成

  1. [DataStudio] > [データ開発] に移動します。

  2. [個人フォルダ] で、ターゲットフォルダを右クリックし、[新しい Notebook] を選択します。

  3. Notebook の名前を入力し、Enter キーを押すか、ページ上の空白領域をクリックして名前を適用します。

  4. 個人フォルダで、Notebook 名をクリックしてエディターで開きます。

Notebook の使用

説明

以下のステップは独立しており、任意の順序で実行できます。

  • Notebook マルチエンジン開発

    EMR Spark SQL

    1. DataWorks Notebook で、image ボタンをクリックして新しい SQL Cell を作成します。

    2. SQL Cell に、dim_ec_mbr_user_info テーブルをクエリする次の文を入力します。

      dim_ec_mbr_user_info

      -- 説明: メンバー情報ソーステーブルと地域ソーステーブルに基づいて、e コマースプラットフォームの基本メンバー情報をクエリします。
      USE openlake_win.default;
      SELECT  user.user_id AS user_id
              ,user.nick AS nick
              ,user.gmt_create AS gmt_modified
              ,user.gmt_modified AS gmt_create
              ,user.reg_fullname AS reg_fullname
              ,user.reg_mobile_phone AS reg_mobile_phone
              ,user.reg_email AS reg_email
              ,user.reg_gender AS reg_gender
              ,user.reg_gender_name AS reg_gender_name
              ,user.reg_birthdate AS reg_birthdate
              ,user.reg_address AS reg_address
              ,user.reg_nation_id AS reg_nation_id
              ,user.reg_nation_name AS reg_nation_name
              ,user.reg_prov_id AS reg_prov_id
              ,area_prov.name AS reg_prov_name
              ,user.reg_city_id AS reg_city_id
              ,area_city.name AS reg_city_name
              ,user.user_regip AS user_regip
              ,user.id_card_type AS id_card_type
              ,user.id_card_type_name AS id_card_type_name
              ,user.id_card_number AS id_card_number
              ,null as id_gender
              ,null as id_bday
              ,(2024 - CAST(SUBSTR(user.id_card_number,7,4) AS INT)) AS id_age
              ,user.user_regdate AS user_regdate
              ,user.user_active_type AS user_active_type
              ,user.user_active_name AS user_active_name
              ,user.user_active_time AS user_active_time
              ,user.vip_level AS vip_level
              ,user.vip_level_name AS vip_level_name
              ,user.is_delete AS is_delete
      FROM    (
                  SELECT  id    -- 主キー
                          ,gmt_create    -- 作成時間
                          ,gmt_modified    -- 変更時間
                          ,user_id    -- メンバー数値 ID
                          ,nick    -- メンバー NICK。メンバーニックネーム
                          ,reg_fullname    -- 個人ユーザーの実名、法人ユーザーの法人名
                          ,reg_mobile_phone    -- 登録時に紐付けられた携帯電話番号
                          ,reg_email    -- 登録時に入力されたメールアドレス (ユーザーによる変更が可能)
                          ,reg_gender    -- 登録時に入力された性別 (F は女性、M は男性、その他は不明、性別は非公開)
                          ,CASE    WHEN reg_gender='F' THEN 'Female'
                                   WHEN reg_gender='M' THEN 'Male' 
                                   ELSE 'Unknown' 
                           END AS reg_gender_name    -- 登録時に入力された性別 (F は女性、M は男性、その他は不明、性別は非公開)
                          ,reg_birthdate    -- 登録時に入力された誕生日 (ユーザーによる変更が可能)
                          ,reg_address    -- 登録時に入力された住所 (ユーザーによる変更が可能)
                          ,reg_nation_id    -- 登録時に入力された国 ID (現在は空)
                          ,CASE    WHEN reg_nation_id='cn' THEN 'China' 
                                   ELSE 'Outside China' 
                           END AS reg_nation_name
                          ,reg_prov_id    -- 登録時に入力された省 ID
                          ,reg_city_id    -- 登録時に入力された市 ID
                          ,user_regip    -- 登録 IP
                          ,id_card_type    -- メンバー証明書タイプ 0: 不明 1: ID カード 2: 事業者登録番号
                          ,CASE    WHEN id_card_type=0 THEN 'Unknown'
                                   WHEN id_card_type=1 THEN 'ID Card'
                                   WHEN id_card_type=2 THEN 'Business License Number' 
                                   ELSE 'Abnormal' 
                           END AS id_card_type_name
                          ,id_card_number    -- 個人ユーザーの ID カード番号、法人ユーザーの事業者登録番号、検証なしでは精度は保証されません
                          ,user_regdate    -- 登録時間
                          ,user_active_type    -- ユーザーアクティベーション方法
                          ,CASE    WHEN user_active_type='email' THEN 'Email'
                                   WHEN user_active_type='mobile_phone' THEN 'Mobile Phone' 
                                   ELSE 'Abnormal' 
                           END AS user_active_name    -- ユーザーアクティベーション方法
                          ,user_active_time    -- アクティベーション時間
                          ,cast(vip_level AS BIGINT) AS vip_level    -- VIP レベル
                          ,CASE    WHEN vip_level>0 AND vip_level<=3 THEN 'Junior'
                                   WHEN vip_level>3 AND vip_level<=6 THEN 'Intermediate'
                                   WHEN vip_level>6 AND vip_level<=10 THEN 'Senior' 
                                   WHEN vip_level>10  THEN 'Premium' 
                           ELSE 'Abnormal'
                           END  AS vip_level_name
                          ,is_delete    -- 削除済み
                  FROM    ods_mbr_user_info
              ) AS USER
      LEFT JOIN (
                    SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                    FROM    ods_t_area
                ) AS area_prov
      ON      user.reg_prov_id = area_prov.id 
      LEFT JOIN    (
                  SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                  FROM    ods_t_area
              ) AS area_city
      ON      user.reg_city_id = area_city.id
      ;
    3. SQL Cell の右下隅で、SQL Cell タイプを EMR Spark SQL に、計算資源を openlake_serverless_spark に設定します。

      image

    4. [実行] ボタンをクリックし、実行が完了するのを待って、データ結果を表示します。

    StarRocks SQL

    1. DataWorks Notebook で、image ボタンをクリックして新しい SQL Cell を作成します。

    2. SQL Cell に、dws_ec_trd_cate_commodity_gmv_kpi_fy テーブルをクエリする次の文を入力します。

      dws_ec_trd_cate_commodity_gmv_kpi_fy

      -- 説明: 取引注文ファクトテーブルと基本商品情報ディメンションテーブルに基づいて、「会計年度_成功注文支払額」や「会計年度_取引額達成率」などのデータメトリックをクエリします。
      USE `openlake_win`.`default`;
      select   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name, round(10*sum(t1.total_fee),4) as pay_ord_amt_fy, round((10*sum(t1.total_fee)/30000000),4) as kpi_gmv_rate_fy
      from    (
                  select  DATE_FORMAT(a.gmt_create,'yyyymmdd') as stat_date
                          ,a.sub_order_id, a.buyer_id, a.item_id, a.biz_type, a.pay_status, a.total_fee/100 as total_fee, b.cate_id, b.cate_name, b.commodity_id, b.commodity_name 
                  from    `openlake_win`.`default`.dwd_ec_trd_create_ord_di a
                  left outer join (
                                      select  distinct item_id, cate_id, cate_name, commodity_id, commodity_name, shop_id, shop_nick
                                      from    `openlake_win`.`default`.dim_ec_itm_item_info
                                  ) b
                  on      a.item_id = b.item_id
                  and     a.shop_id = b.shop_id
              ) t1
      where   t1.pay_status in ('2')
      and     t1.biz_type in ('2','3','4')
      group by   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name
      ;
    3. SQL Cell の右下隅で、SQL Cell タイプを StarRocks SQL に、計算資源を openlake_starrocks に設定します。

      image

    4. [実行] ボタンをクリックし、実行が完了するのを待って、データ結果を表示します。

    Hologres SQL

    1. DataWorks Notebook で、image ボタンをクリックして新しい SQL Cell を作成します。

    2. SQL Cell に、dws_ec_mbr_cnt_std テーブルをクエリする次の文を入力します。

      dws_ec_mbr_cnt_std

      -- 説明: 「基本メンバー情報ディメンションテーブル」のデータを計算・変換して「既存メンバー数」などのデータメトリックを取得し、当日までの既存メンバー数のキューブ統計を取得します。
      SELECT    IF(grouping(reg_prov_id) = 0, reg_prov_id, '-9999') as reg_prov_id
              , IF(grouping(reg_prov_name) = 0, reg_prov_name, 'All') as reg_prov_name
              , IF(grouping(reg_gender) = 0, reg_gender, '-9999') as reg_gender
              , IF(grouping(reg_gender_name) = 0, reg_gender_name, 'All') as reg_gender_name
              , IF(grouping(age_tag) = 0, age_tag, '-9999') as age_tag
              , IF(grouping(user_active_type) = 0, user_active_type, '-9999') as user_active_type
              , IF(grouping(user_active_name) = 0, user_active_name, 'All') as user_active_name
              , IF(grouping(vip_level) = 0, vip_level, '-9999') as vip_level
              , IF(grouping(vip_level_name) = 0, vip_level_name, 'All') as vip_level_name 
              , count(distinct user_id) as mbr_cnt
      from (
          select    reg_prov_id
                  , reg_prov_name
                  , reg_gender
                  , reg_gender_name
                  , case when cast(substr(reg_birthdate,1,4) as int)>=2010 and cast(substr(reg_birthdate,1,4) as int)<2020 then 'Post-2010s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=2000 and cast(substr(reg_birthdate,1,4) as int)<2010 then 'Post-2000s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1990 and cast(substr(reg_birthdate,1,4) as int)<2000 then 'Post-90s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1980 and cast(substr(reg_birthdate,1,4) as int)<1990 then 'Post-80s' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1970 and cast(substr(reg_birthdate,1,4) as int)<1980 then 'Post-70s' 
                          else 'Other' 
                    end as age_tag
                  , user_active_type
                  , user_active_name
                  , vip_level
                  , vip_level_name 
                  , user_id
          from    openlake_win.default.dim_ec_mbr_user_info
      ) _main       
      group by 
      grouping sets(
          (reg_prov_id, reg_prov_name)
         ,(reg_gender, reg_gender_name)
         ,(age_tag)
         ,(user_active_type, user_active_name)
         ,(vip_level, vip_level_name)
         ,()
      );
    3. SQL Cell の右下隅で、SQL Cell タイプを Hologres SQL に、計算資源を openlake_hologres に設定します。

      image

    4. [実行] ボタンをクリックし、実行が完了するのを待って、データ結果を表示します。

    MaxCompute SQL

    1. DataWorks Notebook で、image ボタンをクリックして新しい SQL Cell を作成します。

    2. SQL Cell に、dws_ec_mbr_cnt_std テーブルをクエリする次の文を入力します。

      dws_ec_mbr_cnt_std

      -- 説明: データウェアハウス基盤レイヤーの「履歴から現在までの既存メンバー数キューブ統計テーブル」をクエリします。
      set odps.task.major.version=flighting;
      set odps.namespace.schema=true;
      set odps.sql.allow.namespace.schema=true;
      set odps.service.mode=only;
      set odps.sql.unstructured.data.split.size=1;
      
      SELECT * 
      FROM openlake_win.default.dws_ec_mbr_cnt_std 
      LIMIT 200;
    3. SQL Cell の右下隅で、SQL Cell タイプを MaxCompute SQL に、計算資源を openlake_maxcompute に設定します。

      image

    4. [実行] ボタンをクリックし、実行が完了するのを待って、データ結果を表示します。

  • Notebook インタラクティブデータ
    1. DataWorks Notebook で、image ボタンをクリックして新しい Python Cell を作成します。

    2. Python Cell の右上隅にある image ボタンをクリックして、DataWorks Copilot インテリジェントプログラミングアシスタントを開きます。

    3. DataWorks Copilot の入力ボックスに、メンバーの年齢をクエリするための ipywidgets インタラクティブコンポーネントを生成する次の要件を入力します。

      説明

      説明: Python を使用して、メンバーの年齢用のスライダーウィジェットを生成します。値の範囲は 1 から 100 で、デフォルト値は 20 です。ウィジェットの値の変更をリアルタイムで監視し、その値を query_age という名前のグローバル変数に保存します。

    4. DataWorks Copilot によって生成された Python コードを確認し、[承認] ボタンをクリックします。

      image

    5. Python Cell の実行ボタンをクリックし、実行が完了するのを待って、生成されたインタラクティブコンポーネントを表示します。Copilot によって生成されたコードまたはプリセットコードを実行できます。インタラクティブコンポーネントをスライドしてターゲットの年齢を選択することもできます。

      ipywidgets インタラクティブコンポーネントを生成するためのサンプルコード

      import ipywidgets as widgets
      
      # スライダーウィジェットを作成
      slider = widgets.IntSlider(
          min = 1,
          max = 100,
          value = 20,
          description = 'Age:',
      )
      
      # グローバル変数 query_age を定義
      query_age = None
      
      
      # スライダーの値の変更を処理する関数を定義
      def on_slider_change(change):
          global query_age
          query_age = change.new
      
      # 関数をスライダーの値変更イベントにバインド
      slider.observe(on_slider_change,names='value')
      
      # スライダーを表示
      display(slider)

      image

    6. DataWorks Notebook で、image ボタンをクリックして新しい SQL Cell を作成します。

    7. SQL Cell に、Python で定義されたメンバー年齢変数 ${query_age} を含む次のクエリ文を入力します。

      SELECT * FROM openlake_win.default.dim_ec_mbr_user_info  -- メンバー基本情報ディメンションテーブルからすべての列を選択
      WHERE CAST(id_age AS INT) >= ${query_age};  -- 年齢が query_age 以上のメンバーを選択
      
    8. SQL Cell の右下隅で、SQL Cell タイプを Hologres SQL に、計算資源を openlake_hologres に設定します。

      image

    9. [実行] ボタンをクリックし、実行が完了するのを待って、データ結果を表示します。

    10. 結果で、image ボタンをクリックしてチャートを生成します。

  • Notebook モデルの開発とトレーニング
    1. DataWorks Notebook で、image ボタンをクリックして新しい SQL Cell を作成します。

    2. SQL Cell に、ods_trade_order テーブルをクエリする次の文を入力します。

      SELECT * FROM openlake_win.default.ods_trade_order;  -- 注文テーブルからすべての列を選択
      
    3. SQL クエリの結果を DataFrame 変数に書き込みます。df の場所をクリックし、df_ml などのカスタム DataFrame 変数名を入力します。

      image

    4. SQL Cell の [実行] ボタンをクリックし、実行が完了するのを待って、データ結果を表示します。

    5. DataWorks Notebook で、image ボタンをクリックして新しい Python Cell を作成します。

    6. Python Cell に、Pandas を使用してデータをクリーンアップおよび処理し、df_ml_clean という名前の新しい DataFrame 変数に格納する次の文を入力します。

      import pandas as pd
      
      def clean_data(df_ml):
          # 新しい列を生成: 推定注文合計 = 商品価格 * 数量
          df_ml['predict_total_fee'] = df_ml['item_price'].astype(float).values * df_ml['buy_amount'].astype(float).values
          # 'total_fee' 列を 'actual_total_fee' に名前変更
          df_ml = df_ml.rename(columns={'total_fee': 'actual_total_fee'})
          return df_ml
      
      df_ml_clean = clean_data(df_ml.copy())
      df_ml_clean.head()
    7. Python Cell の [実行] ボタンをクリックし、実行が完了するのを待って、データクリーニングの結果を表示します。

    8. DataWorks Notebook で、image ボタンをクリックして新しい Python Cell を作成します。

    9. Python Cell に、線形回帰機械学習モデルを構築、トレーニング、テストするための次の文を入力します。

      import pandas as pd  
      from sklearn.model_selection import train_test_split  
      from sklearn.linear_model import LinearRegression  
      from sklearn.metrics import mean_squared_error  
        
      # 商品価格と合計料金を取得
      X = df_ml_clean[['predict_total_fee']].values  
      y = df_ml_clean['actual_total_fee'].astype(float).values  
      
      # データを準備  
      X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)  
      
      # モデルを作成してトレーニング  
      model = LinearRegression()  
      model.fit(X_train, y_train)  
        
      # 予測と評価  
      y_pred = model.predict(X_test)  
      for index, (x_t, y_pre, y_t) in enumerate(zip(X_test, y_pred, y_test)):
          print("[{:>2}] input: {:<10} prediction:{:<10} gt: {:<10}".format(str(index+1), f"{x_t[0]:.3f}", f"{y_pre:.3f}", f"{y_t:.3f}"))
      
      # 平均二乗誤差 (MSE) を計算
      mse = mean_squared_error(y_test, y_pred)  
      print("Mean Squared Error (MSE):", mse)
    10. [実行] ボタンをクリックし、実行が完了するのを待って、モデルトレーニングのテスト結果を表示します。