この実験では、OpenLake House プラットフォームに基づく小売 E コマース データ開発および分析シナリオを体験し、DataWorks を使用してマルチエンジン共同開発を実行し、ワークフローを調整し、データカタログを視覚的に管理できます。 さらに、Python ベースのプログラミングとデバッグを実行し、ノートブックと AI を組み合わせてインタラクティブなデータ探索と分析を実行できます。
背景情報
DataWorks の概要
DataWorks は、15 年にわたるビッグデータの経験に基づく Alibaba グループのビッグデータ開発手法を活用した、インテリジェントなレイクハウスベースのデータ開発およびガバナンスプラットフォームです。 DataWorks は、MaxCompute、E-MapReduce(EMR)、Hologres、Realtime Compute for Apache Flink、Platform for AI(PAI)など、Alibaba Cloud が提供する数十のビッグデータおよび AI コンピューティングサービスと互換性があります。 DataWorks は、データウェアハウス、データレイク、OpenLake レイクハウスアーキテクチャ向けのインテリジェントな抽出、変換、ロード(ETL)開発、データ分析、プロアクティブなデータ資産ガバナンスをサポートし、Data+AI ライフサイクル全体でデータ管理を促進します。 2009 年以来、DataWorks は Alibaba データシステムを継続的に製品化および改良し、公共サービス部門、金融、小売、インターネット、自動車、製造などのさまざまな業界にサービスを提供してきました。 DataWorks は、デジタルトランスフォーメーションと価値創造のために DataWorks を選択した数万の顧客の信頼を獲得しています。
DataWorks Copilot の概要
DataWorks Copilot は、DataWorks のインテリジェントアシスタントです。 DataWorks では、デフォルトの大規模言語モデル(LLM)、DeepSeek-R1-671B(フルパワーエディション)、または DeepSeek-R1-Distill-Qwen-32B を使用して、DataWorks での関連操作を完了できます。 DeepSeek-R1 の高度な推論能力により、DataWorks Copilot では、自然言語インタラクションに基づいて、SQL コードの生成、最適化、テストなどの複雑なタスクを実行できます。 これにより、抽出、変換、ロード(ETL)開発とデータ分析の効率が大幅に向上します。
DataWorks ノートブック機能の概要
DataWorks のノートブック機能は、エンジン固有の SQL または Python コード分析を実行し、コードをリアルタイムで実行またはデバッグするために使用できる、インテリジェントなインタラクティブデータ開発および分析ツールを提供します。 このようにして、視覚化されたデータ処理結果を取得できます。 さらに、ノートブックと他のタイプのノードを組み合わせてワークフローを形成し、ワークフローをスケジューリングシステムにコミットして実行できます。 これにより、複雑なビジネスシナリオを柔軟に実装できます。
使用上の注意
DataWorks Copilot パブリックプレビューが利用可能なリージョンと DataWorks エディションは限られています。 詳細については、「DataWorks Copilot」トピックの パブリックプレビュー セクションをご参照ください。
Data Studio で Python とノートブック機能を使用するには、最初に 個人開発環境 に切り替える必要があります。
制限事項
OpenLake は DLF 2.0 のみをサポートしています。
データカタログは DLF 2.0 のみをサポートしています。
環境の準備
- 説明
ワークスペースで [data Studio のパブリックプレビューに参加する] がオンになっていることを確認してください。
「Data Studio のパブリックプレビューに参加する」がオンになっているワークスペースに計算リソースを関連付ける。
実験手順
ステップ 1:データカタログを管理する
データレイクハウスソリューションのデータカタログ管理機能を使用すると、Data Lake Formation(DLF)、MaxCompute、Hologres などのコンピュートエンジンのタイプのデータカタログを作成および管理できます。
Data Studio ページの左側のナビゲーションウィンドウで、
アイコンをクリックします。[DATA CATALOG] ウィンドウが表示されます。[DATA CATALOG] ウィンドウで、管理するメタデータタイプを見つけ、目的のデータカタログの名前にポインターを移動し、データカタログの右側にある
アイコンをクリックして、[開く] をクリックします。データカタログの構成タブが表示されます。
データカタログの構成タブで、スキーマの名前をクリックします。 表示されるタブで、テーブルの名前をクリックします。 テーブルの詳細ページが表示されます。
Data Studio
データカタログ
テーブルの作成 ページの左側のナビゲーションウィンドウで、 アイコンをクリックします。 ウィンドウが表示されます。 [データカタログ] ウィンドウで、管理するメタデータタイプを見つけ、目的のデータカタログの名前にポインタを移動し、データカタログの右側にある アイコンをクリックして、 をクリックします。 [テーブルの作成] タブが表示されます。
[テーブルの作成] タブの左側のエリアで、[テーブル名] と [フィールド名] を指定します。または、[テーブルの作成] タブの右側のエリアに [DDL] 文を入力してテーブルを作成します。次に、上部のツールバーの [デプロイ] をクリックします。
ステップ 2:ワークフローを調整する
DataWorks では、視覚的にドラッグ操作を実行して、ビジネスの観点からワークフロー内のさまざまなタイプのデータ開発ノードを調整できます。 スケジュール時間などの共通パラメータを個別に構成する必要はありません。 これにより、複雑なタスクプロジェクトを簡単に管理できます。
Data Studio
DATASTUDIOワークスペースディレクトリ
[ワークフローの作成] ページの左側のナビゲーションウィンドウで、 アイコンをクリックします。 ウィンドウが表示されます。 [DATASTUDIO] ウィンドウで、 をクリックし、右側にある アイコンをクリックして、 をクリックします。
表示されるポップオーバーに、ワークフロー名を入力し、Enter キーを押します。
プリセットワークフロー名:
小売 E コマースビジネスの概要
ワークフローの構成タブで、キャンバスの中央にある [ノードを追加するには、ドラッグまたはクリックします] をクリックします。[ノードの作成] ダイアログボックスで、[ノードタイプ] パラメーターと [ノード名] パラメーターを構成し、[確認] をクリックします。
プリセットノード名:
小売 E コマースの概要
プリセットノードタイプ:
ゼロロードノード
ワークフローの構成タブの [ノードの作成] セクションで、使用するノードタイプを見つけ、ノードタイプをキャンバスにドラッグして、リリースします。 [ノードの作成] ダイアログボックスで、[ノード名] を指定し、[確認] をクリックします。
次の表は、プリセットノード名とタイプを示しています。
ノードタイプ
ノード名
Data Integration - バッチ同期
ods_mbr_user_info
MaxCompute-MaxCompute SQL
dim_ec_mbr_user_info
MaxCompute-MaxCompute SQL
dws_ec_mbr_cnt_nd
ノートブック
ads_ec_kpi_report
キャンバスで、依存関係を構成する 2 つのノードを見つけ、一方のノードの長方形ボックスの下端の中央にポインタを移動します。 + アイコンが表示されたら、接続線をドラッグして他のノードに接続し、リリースします。
必要なノードを順番に作成し、ノードの依存関係を構成して、保存 をクリックします。
構成を 保存 した後、ビジネス要件に基づいて、トップ ツールバーでキャンバス上のノードの レイアウト モード を変更できます。
ワークフローの構成タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[プロパティ] タブで、ワークフローのスケジューリングパラメーターとノードの依存関係を構成します。[スケジューリング パラメーター] セクションで、[パラメーターの追加] をクリックします。表示される入力ボックスで、[パラメーター名] フィールドに bizdate と入力し、[パラメーター値] ドロップダウンリストから $[yyyymmdd-1] を選択します。
タブの [スケジューリングの依存関係] セクションで、[依存関係の追加] をクリックします。表示されるセクションの [祖先オブジェクト] フィールドに、ワークフローに属していないノードの名前を入力し、Enter キーを押します。結果が返されるまで待ちます。結果リストから必要な祖先オブジェクトを選択し、[追加] をクリックします。
上部のツールバーで、[デプロイ] をクリックします。 右下に [デプロイメント] タブが表示されます。 [デプロイメントの説明] フィールドの右側にある [本番環境へのデプロイを開始] をクリックし、画面上の指示に基づいてチェックと確認操作を順番に実行します。
ステップ 3:マルチエンジン共同開発を実行する
Data Studio は、データ同期ノード、MaxCompute、Hologres、EMR、Flink、ADB などの数十種類のコンピュートエンジンのノード、およびノートブックと Python を使用して開発されたノードのウェアハウス開発をサポートしています。 Data Studio は、これらのノードの複雑なスケジューリング構成もサポートしています。 Data Studio は、開発 - 本番環境分離の R&D モードを提供します。 この実験では、Flink SQL Streaming ノードが作成されます。
Data Studio
DATASTUDIOワークスペースディレクトリ
Flink SQL ストリーミングノード名 ページの左側のナビゲーションウィンドウで、 アイコンをクリックします。 ウィンドウが表示されます。 [DATASTUDIO] ウィンドウで、 をクリックし、右側にある アイコンをクリックして、[ノードの作成] > [Flink] > を選択します。 表示されるポップオーバーに、 を入力し、Enter キーを押します。
プリセットノード名:
ads_ec_page_visit_log
ノードの構成タブで、Flink SQL Streaming ノードのプリセットコードをコードエディタに貼り付けます。
ノードの構成タブの右側のナビゲーションウィンドウで、リアルタイム構成Flink リソース関連パラメータースクリプト パラメーターFlink ランタイム パラメーター をクリックします。 表示されるタブで、、、 を構成します。 次の図は、パラメータ値を示しています。
[リアルタイム構成] タブでパラメータを構成した後、上部のツールバーで [保存] をクリックし、次に [デプロイ] をクリックします。 右下に [デプロイメント] タブが表示されます。 [デプロイメントの説明] フィールドの右側にある [本番環境へのデプロイを開始] をクリックし、画面上の指示に基づいてチェックと確認操作を順番に実行します。
ステップ 4:個人開発環境に入る
個人開発環境では、カスタムコンテナイメージ、NAS ファイルシステム(NAS)および Git への接続、Python ベースのプログラミングとノートブックがサポートされています。
Data Studio[個人用開発環境を選択] ページの上部のナビゲーションバーで、 をクリックし、入力する個人開発環境を選択します。
ステップ 5:Python コードを記述してデバッグする
DataWorks は DSW と緊密に統合されています。 Data Studio は、個人開発環境での Python コードの記述、デバッグ、スケジューリング、および実行をサポートしています。
この手順の操作は、ステップ 4:個人開発環境に入る を完了した後にのみ実行できます。
Data Studio
ワークスペース
個人用ディレクトリページの選択した個人開発環境で、 の下の をクリックし、 アイコンをクリックします。 名前のないファイルがリストに追加されます。 プリセットファイル名を入力し、Enter キーを押して、ファイルが生成されるまで待ちます。
プリセットファイル名:
ec_item_rec.py
Python ファイルの構成タブにあるコードエディタに、プリセット Python コードを入力します。 上部のツールバーで、[python ファイルの実行] を選択します。 構成タブの下部にある [ターミナル] タブで、実行結果をクエリします。
Python ファイルの構成タブの上部のツールバーで、[python ファイルのデバッグ] を選択します。 コードエディタでコード行番号にポインタを移動します。 赤い点が表示されます。 赤い点をクリックして、ブレークポイントを追加します。 [DATASTUDIO] ウィンドウの上にある左上で、
をクリックしてコードをデバッグします。
ステップ 6:ノートブックに基づいてデータを探索する
ノートブックに基づいてデータを探索する場合、関連する操作は個人開発環境で実行されます。 したがって、この手順の操作を実行する前に、ステップ 4:個人開発環境に入る を完了する必要があります。
ノートブックを作成する
Data Studio ページに移動します。
個人用ディレクトリ セクションで、目的のフォルダを右クリックし、[ノートブックの作成] を選択します。
名前のないファイルがリストに追加されます。 ノートブック名を入力し、Enter キー空白 を押すか、 をクリックして、ノートブック名を有効にします。
[個人ディレクトリ] セクションでノートブック名をクリックします。 ノートブックの構成タブが表示されます。クリック
ノートブックを使用する
[ノートブックを使用する] セクションの操作は、特定の実行順序のない独立した操作です。 ビジネス要件に基づいて操作を実行できます。
ノートブックでのマルチエンジン開発
EMR Spark SQL
ノートブックの構成タブで、
をクリックして [SQL セル] を追加します。
SQL セルに、次の文を入力して dim_ec_mbr_user_info テーブルをクエリします。
SQL セルの右下隅で、EMR Spark SQL
openlake サーバーレス Spark
を選択し、計算リソースとして を選択します。[実行] をクリックします。実行が完了するまで待って、データ結果を表示します。
StarRocks SQL
ノートブックの構成タブで、
をクリックして [SQL セルを追加] します。
SQL セルに、次の文を入力して dws_ec_trd_cate_commodity_gmv_kpi_fy テーブルをクエリします。
SQL セルの右下隅で、StarRocks SQL
openlake_starrocks
を選択し、計算リソースとして を選択します。[実行] をクリックします。実行が完了するまで待って、データ結果を表示します。
Hologres SQL
ノートブックの構成タブで、
をクリックして [SQL セル] を追加します。
SQL セルに、次の文を入力して dws_ec_mbr_cnt_std テーブルをクエリします。
SQL セルの右下隅で、Hologres SQL
openlake_hologres
を選択し、計算リソースとして を選択します。[実行] をクリックします。実行が完了するまで待って、データ結果を表示します。
MaxCompute SQL
ノートブックの構成タブで、
をクリックして [SQL セル] を追加します。
SQL セルに、次の文を入力して dws_ec_mbr_cnt_std テーブルをクエリします。
SQL セルの右下隅で、MaxCompute SQL
openlake_maxcompute
を選択し、計算リソースとして を選択します。[実行] をクリックします。実行が完了するまで待って、データ結果を表示します。
ノートブックのインタラクティブデータ
ノートブックの構成タブで、
をクリックして Python セルを追加します。
Python セルの右上隅にある
をクリックして、インテリジェントプログラミングアシスタントである DataWorks Copilot を起動します。
表示される入力ボックスに、ipywidgets を使用してメンバーの年齢をクエリできるウィジェットを生成するための要件を入力します。
説明要件の説明:Python を使用して、メンバーの年齢のスライダーウィジェットを生成します。 値の範囲は 1 ~ 100 で、デフォルト値は 20 です。値の変更をリアルタイムで監視し、値を query_age グローバル変数に保存します。
DataWorks Copilot によって生成された Python コードを確認し、承諾 をクリックします。
Python セルで [実行] アイコンをクリックし、実行が完了するまで待ちます。 ウィジェットの生成を表示します。 ウィジェットは、DataWorks Copilot によって生成されたコードまたはプリセットコードを実行することによって生成されます。 ウィジェットでスライダーを動かして、目的の年齢を選択できます。
ノートブックの構成タブで、
をクリックして [SQL セル] を追加します。
SQL セルに次のステートメントを入力します。 ステートメントには、Python で定義されているメンバー年齢変数
${query_age}
が含まれています。SELECT * FROM openlake_win.default.dim_ec_mbr_user_info -- メンバー基本情報ディメンションテーブルからすべての列を選択 WHERE CAST(id_age AS INT) >= ${query_age}; -- 年齢が query_age 以上のメンバーを選択
SQL セルの右下隅で、Hologres SQL
openlake_hologres
を選択し、計算リソースとして を選択します。[実行] をクリックします。実行が完了するまで待って、データ結果を表示します。
実行結果で、
をクリックしてグラフを生成します。
ノートブックでのモデル開発とトレーニング
ノートブックの構成タブで、
をクリックして [SQL セル] を追加します。
SQL セルに、次の文を入力して ods_trade_order テーブルをクエリします。
SELECT * FROM openlake_win.default.ods_trade_order; -- 注文テーブルからすべての列を選択
SQL クエリの結果を DataFrame 変数に書き込みます。 [df] がある場所をクリックして、
df_ml
などの DataFrame 変数のカスタム名を指定します。SQL セルで [実行] アイコンをクリックし、実行が完了するまで待ってから、データ結果を表示します。
ノートブックの構成タブで、
をクリックして Python セルを追加 します。
Python セルに次のステートメントを入力して、Pandas を使用してデータをクレンジングおよび処理し、データを DataFrame の新しい変数
df_ml_clean
に格納します。import pandas as pd # pandas をインポート def clean_data(df_ml): # データをクレンジングする関数 # 新しい列を生成します:推定注文総額 = 商品単価 × 商品数量。 -- Generate a new column: Estimated total order amount = Product unit price × Product quantity. df_ml['predict_total_fee'] = df_ml['item_price'].astype(float).values * df_ml['buy_amount'].astype(float).values # 推定注文総額を計算 # total_fee 列の名前を actual_total_fee に変更します。 -- Rename the total_fee column to actual_total_fee. df_ml = df_ml.rename(columns={'total_fee': 'actual_total_fee'}) # 列名を変更 return df_ml # クレンジングされたデータを返す df_ml_clean = clean_data(df_ml.copy()) # データをクレンジング df_ml_clean.head() # クレンジングされたデータの最初の 5 行を表示
Python セルで [実行] アイコンをクリックし、実行が完了するまで待って、データクレンジング結果を表示します。実行
ノートブックの構成タブで、
をクリックして [Python セルを追加] します。
Python セルに次のステートメントを入力して、線形回帰機械学習モデルを構築し、トレーニングとテストを実行します。
import pandas as pd # pandas をインポート from sklearn.model_selection import train_test_split # sklearn.model_selection から train_test_split をインポート from sklearn.linear_model import LinearRegression # sklearn.linear_model から LinearRegression をインポート from sklearn.metrics import mean_squared_error # sklearn.metrics から mean_squared_error をインポート # 商品価格と総費用を取得します。 -- Obtain the product price and total costs. X = df_ml_clean[['predict_total_fee']].values # 商品価格 y = df_ml_clean['actual_total_fee'].astype(float).values # 総費用 # データを準備します。 -- Prepare data. X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42) # データをトレーニングセットとテストセットに分割 # モデルを作成してトレーニングします。 -- Create and train a model. model = LinearRegression() # 線形回帰モデルを作成 model.fit(X_train, y_train) # トレーニングセットでモデルをトレーニング # 予測と評価を実行します。 -- Perform prediction and evaluation. y_pred = model.predict(X_test) # テストセットで予測を行う for index, (x_t, y_pre, y_t) in enumerate(zip(X_test, y_pred, y_test)): # 予測結果と実際の値を表示 print("[{:>2}] input: {:<10} prediction:{:<10} gt: {:<10}".format(str(index+1), f"{x_t[0]:.3f}", f"{y_pre:.3f}", f"{y_t:.3f}")) # 平均二乗誤差(MSE)を計算します。 -- Calculate the mean squared error (MSE). mse = mean_squared_error(y_test, y_pred) # MSE を計算 print("MSE:", mse) # MSE を表示
[実行] をクリックし、実行が完了するまで待って、モデルのトレーニングのテスト結果を表示します。