このトピックでは、DataWorks Notebook の高度な機能を紹介し、対話型開発から本番スケジューリングへのスムーズな移行を支援します。コードの再利用、データマウント、パラメーター管理などのエンジニアリングプラクティスを使用して開発効率を向上させる方法を学習します。また、MaxCompute Spark、EMR Serverless Spark、AnalyticDB for Spark など、さまざまなコンピュートエンジンに接続してデバッグする方法も学習します。
先に進む前に、「Notebook の基本的な開発」をお読みになることを推奨します。
開発環境と本番環境の違い
DataWorks Notebook は、実行をスケジューリングできる開発および分析ツールです。これは、2 つの異なる環境で実行されることを意味します。
開発環境:DataStudio の Notebook ノード編集ページでセルを直接実行すると、コードは個人用の開発環境インスタンスで実行されます。この環境は、コードロジックを迅速に検証およびデバッグするために設計されています。
本番環境:Notebook ノードが送信、公開され、定期的なスケジュールまたはデータバックフィルによって実行がトリガーされると、コードは独立した一時的なタスクインスタンスで実行されます。この環境は、本番タスクを安定的かつ信頼性の高い方法で実行するために設計されています。
これら 2 つの環境は、機能サポートにおいて大きな違いがあります。これらの違いを理解することが、効率的な開発の鍵となります。
開発環境と本番環境の違い
特徴 | 開発環境 (セルの実行) | 本番環境 (定期的なスケジュールまたはデータバックフィル) |
プロジェクトリソースの参照 ( |
| リソースは自動的に適用されます。 |
データセット (OSS/NAS) の読み書き | 開発環境でデータセットをマウントします。 | スケジューリング設定でデータセットをマウントします。 |
ワークスペースパラメーターの参照 ( | 適用されます。コード実行前にテキストが自動的に置換されます。 | 適用されます。タスク実行前にテキストが自動的に置換されます。 |
Spark セッション管理 | Spark セッションのデフォルトのアイドルタイムアウトは 2 時間です。2 時間以内に新しいコードが実行されない場合、セッションは自動的に解放されます。 | タスクインスタンスレベルの短命なセッションです。タスクインスタンスとともに自動的に作成および破棄されます。 |
本番環境でのコードとデータの再利用
プロジェクトリソース (.py ファイル) の参照
共通の関数やクラスを別の .py ファイルにカプセル化し、##@resource_reference{"custom_name.py"} を使用して MaxCompute リソースとして参照できます。これにより、コードをモジュール化して再利用し、コードの保守性を向上させることができます。
Python リソースの作成と公開
DataWorks の DataStudio の左側のナビゲーションウィンドウで、
アイコンをクリックして [リソース管理] に移動します。リソース管理のディレクトリツリーで、対象のフォルダを右クリックするか、右上隅の + アイコンをクリックします。[新規リソース] > [MaxCompute Python] を選択し、リソースに
my_utils.pyという名前を付けます。[ファイルコンテンツ] で、[オンライン編集] をクリックします。デバッグ済みのユーティリティ関数コードをエディターに貼り付け、[保存] をクリックします。
# my_utils.py def greet(name): return f"Hello, {name} from resource file!"ツールバーの [保存] をクリックし、次にリソースを [公開] して、開発タスクと本番タスクで利用できるようにします。
Notebook でのリソースの参照
Notebook の Python セルの最初の行で、
##@resource_reference構文を使用して公開されたリソースを参照します。##@resource_reference{"my_utils.py"} # リソースが my_folder/my_utils.py のようなフォルダ内にある場合でも、フォルダ名なしで ##@resource_reference{"my_utils.py"} と参照します。 from my_utils import greet message = greet('DataWorks') print(message)開発環境でのテスト
Python セルを実行します。次の結果が出力されます。
Hello, DataWorks from resource file!重要開発環境でテストする際、システムがコード内で
##@resource_referenceを検出すると、リソース管理からオブジェクトファイルを個人のディレクトリのworkspace/_dataworks/resource_referencesパスに自動的にダウンロードします。これにより、オブジェクトファイルを参照できるようになります。ModuleNotFoundErrorエラーが発生した場合は、エディターのツールバーにある [再起動] ボタンをクリックしてリソースを再読み込みしてから、再試行してください。本番環境への公開と検証
Notebook ノードを [保存] して [公開] します。次に、[オペレーションセンター] > [自動トリガータスク] に移動し、[テスト] をクリックして実行します。タスクが正常に実行されると、ログに
Hello, DataWorks from resource file!という出力が表示されます。重要There is no file with id ...エラーが発生した場合は、Python リソースが本番環境に公開されていることを確認してください。
詳細については、「MaxCompute のリソースと関数」をご参照ください。
データセット (OSS/NAS) の読み書き
Notebook タスクの実行中に、OSS または NAS に保存されている大規模なファイルデータを簡単に読み書きできます。
開発環境でのデバッグ
データセットのマウント:開発環境の詳細ページに移動します。[ストレージ設定] > [データセット] でデータセットを設定します。
コードでのアクセス:データセットは開発環境の指定されたパスにマウントされます。コード内でこのパスを直接読み書きできます。
# データセットが開発環境の /mnt/data/dataset にマウントされていると仮定します import pandas as pd # マウントパスを直接使用します file_path = '/mnt/data/dataset/testfile.csv' df = pd.read_csv(file_path) # PyODPS を使用して MaxCompute にデータを書き込みます o = %odps o.write_table('mc_test_table', df, overwrite=True) print(f"Successfully wrote data to the MaxCompute table mc_test_table.")
本番環境へのデプロイ
データセットのマウント:Notebook ノードの編集ページで、右側のナビゲーションウィンドウの [スケジューリング設定] > [スケジューリングポリシー] に移動します。データセットを追加します。
コードでのアクセス:ノードを送信して公開すると、データセットは本番環境の指定されたパスにマウントされます。コード内でこのパスを直接読み書きできます。
# データセットが開発環境の /mnt/data/dataset にマウントされていると仮定します import pandas as pd # マウントパスを直接使用します file_path = '/mnt/data/dataset/testfile.csv' df = pd.read_csv(file_path) # PyODPS を使用して MaxCompute にデータを書き込みます o = %odps o.write_table('mc_test_table', df, overwrite=True) print(f"Successfully wrote data to the MaxCompute table mc_test_table.")
詳細については、「開発環境でのデータセットの使用」をご参照ください。
ワークスペースパラメーターの参照
この機能は、DataWorks Professional Edition 以上でのみ利用可能です。
既存の スケジューリングパラメーター に加えて、DataWorks はワークスペースパラメーターを提供します。この機能は、タスクやノード間でのグローバルな設定の再利用と環境の隔離に関する問題を解決するのに役立ちます。SQL および Python セルで、${workspace.param} 形式を使用してワークスペースパラメーターを参照できます。ここで、param は作成したワークスペースパラメーターの名前です。
1. ワークスペースパラメーターの作成:開始する前に、DataWorks コンソールの [オペレーションセンター] > [スケジューリング設定] > [ワークスペースパラメーター] に移動して、必要なパラメーターを作成します。
2. ワークスペースパラメーターの参照:
SQL セルでワークスペースパラメーターを参照します。
SELECT '${workspace.param}';ワークスペースパラメーターをクエリします。セルが正常に実行されると、ワークスペースパラメーターの値が出力されます。
Python セルでワークスペースパラメーターを参照します。
print('${workspace.param}')ワークスペースパラメーターを出力します。セルが正常に実行されると、ワークスペースパラメーターの値が出力されます。
詳細については、「ワークスペースパラメーターの使用」をご参照ください。
マジックコマンドを使用したコンピュートエンジンとの連携
マジックコマンドは、% または %% で始まる特別なコマンドです。これらは、Python セルとさまざまなコンピューティングリソースとの間の連携を簡素化します。
MaxCompute への接続
MaxCompute コンピュートリソースに接続する前に、MaxCompute コンピュートリソースをアタッチしていることを確認してください。
%odps:PyODPS エントリオブジェクトの取得このコマンドは、現在の MaxCompute プロジェクトにアタッチされた認証済みの PyODPS オブジェクトを返します。これは、コード内での AccessKey のハードコーディングを回避するため、MaxCompute と連携する推奨の方法です。
マジックコマンドを使用して MaxCompute 接続を作成します。
%odpsと入力します。右下隅に MaxCompute コンピュートリソースセレクターが表示され、リソースが自動的に選択されます。右下隅の MaxCompute プロジェクト名をクリックしてプロジェクトを切り替えることができます。o=%odps取得した MaxCompute コンピュートリソースを使用して PyODPS スクリプトを実行します。
たとえば、現在のプロジェクト内のすべてのテーブルを取得するには、次のようにします。
with o.execute_sql('show tables').open_reader() as reader: print(reader.raw)
%maxframe:MaxFrame 接続の確立このコマンドは MaxFrame セッションを作成し、MaxCompute に Pandas ライクな分散データ処理機能を提供します。
# MaxCompute MaxFrame セッションに接続してアクセスします mf_session = %maxframe df = mf_session.read_odps_table('your_mc_table') print(df.head()) # 開発とテストの後、セッションを手動で破棄してリソースを解放します mf_session.destroy()
Spark コンピュートリソースへの接続
DataWorks Notebooks は、複数の Spark エンジンへの接続をサポートしています。これらのエンジンは、接続方法、実行コンテキスト、およびリソース管理が異なります。
単一の Notebook ノードは、マジックコマンドを使用して 1 種類のコンピュートリソースにのみ接続できます。
エンジンの比較
特徴 | MaxCompute Spark | EMR Serverless Spark | AnalyticDB for Spark |
接続コマンド |
|
|
|
説明 実行後、Notebook カーネル全体の実行コンテキストがリモートの PySpark 環境に切り替わります。その後、後続のセルで直接 PySpark コードを記述できます。 | |||
前提条件 | MaxCompute リソースをアタッチします。 | EMR コンピュートリソースをアタッチし、Livy ゲートウェイを作成します。 | AnalyticDB for Spark コンピュートリソースをアタッチします。 |
開発環境モード | Livy セッションを自動的に作成または再利用します。 | 既存の Livy ゲートウェイに接続してセッションを作成します。 | Spark Connect Server を自動的に作成または再利用します。 |
本番環境モード | Livy モード:Livy サービスを通じて Spark ジョブを送信します。 | spark-submit バッチ処理モード:セッション状態を保持しない純粋なバッチ処理。 | Spark Connect Server モード:Spark Connect サービスを通じて連携します。 |
本番リソースの解放 | タスクインスタンスの終了後、セッションは自動的に解放されます。 | タスクインスタンスの終了後、リソースは自動的にクリーンアップされます。 | タスクインスタンスの終了後、リソースは自動的に解放されます。 |
利用シーン | MaxCompute エコシステムと緊密に統合された、一般的なバッチ処理および ETL タスク。 | 柔軟な設定と、Hudi や Iceberg などのオープンソースのビッグデータエコシステムとの連携を必要とする複雑な分析タスク。 | AnalyticDB for MySQL の C-Store テーブルに対する高性能な対話型クエリおよび分析。 |
MaxCompute Spark
MaxCompute コンピュートリソースに接続する前に、MaxCompute コンピュートリソースをアタッチしていることを確認してください。
Livy を通じて、MaxCompute プロジェクトに組み込まれている Spark エンジンに接続できます。
接続の確立:Python セルで次のコマンドを実行します。システムは自動的に Spark セッションを作成または再利用します。
# Spark セッションを作成し、Livy を停止します。 %maxcompute_sparkPySpark コードの実行:接続が確立された後、
%%sparkセルマジックを使用して、新しい Python セルで PySpark コードを実行します。# MaxCompute Spark を使用する場合、Python セルは %%spark で始める必要があります。 %%spark df = spark.sql("SELECT * FROM your_mc_table LIMIT 10") df.show()接続の手動解放:開発とテストの後、セッションを手動で停止または削除できます。本番環境では、システムは現在のタスクインスタンスの Livy セッションを自動的に停止および削除します。手動での操作は必要ありません。
# Spark セッションをクリーンアップし、Livy を停止します。 %maxcompute_spark stop # Spark セッションをクリーンアップし、Livy を停止してから削除します。 %maxcompute_spark delete
EMR Serverless Spark
コンピュートリソースに接続する前に、ワークスペースに EMR Serverless Spark コンピュートリソースをアタッチし、Livy ゲートウェイを作成してください。
作成した Livy ゲートウェイに接続することで、EMR Serverless Spark と連携できます。
接続の確立:コマンドを実行する前に、セルの右下隅で接続する EMR コンピュートリソースと Livy ゲートウェイを選択します。
# 基本的な接続 %emr_serverless_spark # または、接続時にカスタム Spark パラメーターを渡します。カスタム Spark パラメーターを定義する際は、2 つのパーセント記号 (%%) を使用する必要があることに注意してください。 %%emr_serverless_spark { "spark_conf": { "spark.emr.serverless.environmentId": "<EMR Serverless Spark ランタイム環境 ID>", "spark.emr.serverless.network.service.name": "<EMR Serverless Spark ネットワーク接続 ID>", "spark.driver.cores": "1", "spark.driver.memory": "8g", "spark.executor.cores": "1", "spark.executor.memory": "2g", "spark.driver.maxResultSize": "32g" } }説明カスタムパラメーターとグローバル設定の関係
デフォルトの動作:ここで定義するカスタムパラメーターは、現在の接続 (セッション) に対してのみ有効であり、1 回限りの設定です。カスタムパラメーターを指定しない場合、システムは管理センターで設定されたグローバルパラメーターを使用します。
推奨される使用法:複数のタスクや複数のユーザーで再利用する必要がある設定については、[管理センター] > [Serverless Spark] > [SPARK パラメーター] でグローバルに設定します。これにより、一貫性が確保され、管理が簡素化されます。
優先順位ルール:同じパラメーターがカスタムパラメーターとグローバル設定の両方で設定されている場合、どちらが有効になるかは、管理センターの [グローバル設定を優先] オプションによって決まります。
選択済み:グローバル設定がこのセッションのカスタムパラメーターを上書きします。
未選択:このセッションのカスタムパラメーターがグローバル設定を上書きします。
(オプション) 再接続:管理者が誤って Livy ゲートウェイページでトークンを削除した場合、このコマンドを使用して再作成できます。
# 現在の開発環境の Livy トークンを更新するために再接続します。 %emr_serverless_spark refresh_tokenPySpark または SQL コードの実行:接続が確立されると、カーネルが切り替わります。Python セルで直接 PySpark コードを記述するか、EMR Spark SQL セルで SQL を記述できます。
EMR Spark SQL を使用してコンピューティングリソースに SQL コードを送信および実行する
%emr_serverless_sparkを使用して接続が正常に確立された後、EMR Spark SQL セルで直接 SQL 文を記述できます。セルでコンピュートリソースを選択する必要はありません。EMR Spark SQL セルは
%emr_serverless_sparkからの接続を再利用し、コードをターゲットのコンピュートリソースに送信して実行します。
Python を使用してコンピューティングリソースに PySpark コードを送信して実行する
%emr_serverless_sparkを使用して接続が正常に確立された後、新しい Python セルで PySpark コードを送信して実行できます。セルに `%%spark` プレフィックスを追加する必要はありません。
接続の手動解放
重要複数のユーザーが Livy ゲートウェイを共有している場合、`stop` または `delete` コマンドは、そのゲートウェイを現在使用しているすべてのユーザーに影響します。これらのコマンドは注意して使用してください。
# Spark セッションをクリーンアップし、Livy を停止します。 %emr_serverless_spark stop # Spark セッションをクリーンアップし、Livy を停止してから削除します。 %emr_serverless_spark delete
AnalyticDB for Spark
コンピュートリソースに接続する前に、ワークスペースに AnalyticDB for Spark コンピュートリソースをアタッチしてください。
Spark Connect Server を作成することで、AnalyticDB for Spark エンジンに接続できます。
接続の確立:ネットワーク接続を確保するには、接続パラメーターで vSwitch ID とセキュリティグループ ID を正しく設定する必要があります。コマンドを実行する前に、セルの右下隅で ADB Spark コンピュートリソースを選択します。
# ネットワーク接続を確立するには、vSwitch ID とセキュリティグループ ID を設定する必要があります。 %adb_spark add \ --spark-conf spark.adb.version=3.5 \ --spark-conf spark.adb.eni.enabled=true \ --spark-conf spark.adb.eni.vswitchId=<ADB の vSwitch ID> \ --spark-conf spark.adb.eni.securityGroupId=<開発環境のセキュリティグループ ID>PySpark コードの実行:接続が確立された後、新しい Python セルで PySpark コードを実行できます。
# C-Store テーブルに対する操作のみ実行できます。 df = spark.sql("SELECT * FROM my_adb_cstore_table LIMIT 10") df.show()注:AnalyticDB for Spark エンジンは現在、
'storagePolicy'='COLD'属性を持つ C-Store テーブルのみを処理できます。接続の手動解放:開発環境でのテストが終了したら、リソースを節約するために接続セッションを手動でクリーンアップします。本番環境では、システムが自動的にリソースをクリーンアップします。
%adb_spark cleanup
付録:マジックコマンドクイックリファレンス
マジックコマンド | マジックコマンドの定義 | 適用可能なコンピュートエンジン |
| PyODPS エントリオブジェクトを取得します。 | MaxCompute |
| MaxFrame 接続を確立します。 | |
| Spark セッションを作成します。 | MaxCompute Spark |
| Spark セッションをクリーンアップし、Livy を停止します。 | |
| Spark セッションをクリーンアップし、Livy を停止して削除します。 | |
| Python セルで、作成された Spark コンピュートリソースに接続します。 | |
| Spark セッションを作成します。 | EMR Serverless Spark |
| Livy ゲートウェイの詳細を表示します。 | |
| Spark セッションをクリーンアップし、Livy を停止します。 | |
| Spark セッションをクリーンアップし、Livy を停止して削除します。 | |
| 開発環境の Livy トークンを更新します。 | |
| 再利用可能な ADB Spark セッションを作成して接続します。 | AnalyticDB for Spark |
| Spark セッション情報を表示します。 | |
| 現在の Spark 接続セッションを停止してクリーンアップします。 |
よくある質問
Q:ワークスペースリソースを参照すると、
ModuleNotFoundErrorまたはThere is no file with id ...エラーが発生します。なぜですか?A:以下を確認してください。
[データ開発] > [リソース管理] に移動し、MaxCompute Python リソースが保存されていることを確認します。このエラーが本番環境で発生した場合は、リソースが本番環境に公開されていることを確認してください。
Notebook エディターのツールバーにある [再起動] ボタンをクリックして、リソースを再読み込みします。
Q:ワークスペースリソースを更新した後、なぜ古いリソースがまだ参照されるのですか?
A:リソースを変更して再公開した場合、DataStudio の設定で
Dataworks › Notebook › リソース参照: ダウンロード戦略を autoOverwrite に設定します。次に、Notebook ツールバーの [カーネルの再起動] をクリックします。Q:データセットを参照すると、開発環境で
FileNotFoundErrorが発生します。なぜですか?A:選択した開発環境にデータセットがマウントされていることを確認してください。
Q:データセットを参照すると、開発環境では動作しますが、本番環境では
Execute mount dataset exception! Please check your dataset configエラーが発生します。なぜですか?A:Notebook ノードの [スケジューリング設定] でデータセットがマウントされ、OSS データセットに必要な権限が付与されていることを確認してください。

Q:開発環境のバージョンを確認するにはどうすればよいですか?
A:開発環境に入った後、CMD+SHIFT+P を押し、ABOUT と入力して現在のバージョンを表示します。製品の更新でバージョン 0.5.69 以降の開発環境インスタンスが必要な場合は、アップグレードプロンプトのポップアップウィンドウで [ワンクリックアップグレード] をクリックできます。
Q:Spark エンジンに接続できないのはなぜですか?
一般的な確認:ワークスペース詳細ページのコンピュートリソースリストに移動します。対応するコンピュートリソース (MaxCompute、EMR、または ADB) が現在のワークスペースに正しくアタッチされており、ご利用のアカウントに必要な権限があることを確認します。
EMR Serverless Spark:Livy ゲートウェイが作成され、正常な状態であることを確認します。
AnalyticDB for Spark:ネットワーク問題のトラブルシューティングに重点を置きます。
vswitchIdとsecurityGroupIdが正しく設定されていることを確認します。開発環境と ADB Spark インスタンス間のネットワークが接続されていることを確認します。セキュリティグループルールが必要なポートでの通信を許可しているかどうかを確認します。