このガイドでは、コード再利用、データセットのマウント、パラメーター管理などの手法を活用して開発効率を向上させる方法について説明します。また、MaxCompute Spark、EMR Serverless Spark、AnalyticDB for Spark を含むコンピュートエンジンへの接続に関するベストプラクティスおよびデバッグ手法についても解説します。
次に進む前に、「基本的なノートブック開発」をご参照ください。
開発環境と本番環境
DataWorks ノートブックはスケジュール実行可能な開発・分析ツールとして設計されており、以下の 2 種類の環境で動作します。
開発環境:コードの迅速な検証およびデバッグを目的とした個人用開発環境インスタンスでコードが実行されます。
本番環境:定期スケジューリングまたはデータバックフィルによってトリガーされ、分離された一時的なタスクインスタンスでコードが実行され、安定かつ信頼性の高い実行を保証します。
これらの環境では、主要機能の動作が異なります。
| 機能 | 開発環境 | 本番環境 |
|---|---|---|
| プロジェクトリソース(`.py` ファイル)の参照 | 初回参照時:自動的にダウンロードされ、即座に有効になります。更新後:ツールバーの [再起動] をクリックして、更新された .py モジュールを再読み込みします。DataStudio の設定項目で Dataworks › Notebook › Resource Reference: Download Strategy を autoOverwrite に設定してください。 | 自動的に有効になります。 |
| データセット(OSS/NAS)の読み取り/書き込み | 個人用開発環境でデータセットをマウントします。 | スケジューリング構成でデータセットをマウントします。 |
| ワークスペースパラメーター(`${...}`)の参照 | コード実行前にテキスト置換が自動的に行われます。 | タスク実行前にテキスト置換が自動的に行われます。 |
| Spark セッション管理 | デフォルトのアイドルタイムアウト: 2 時間。この期間内に新しいコードが実行されない場合、セッションは自動的に解放されます。 | タスクインスタンスレベルの短期間セッションが自動的に作成され、タスクインスタンス終了時に破棄されます。 |
コードおよびデータの再利用
コード再利用方法の選択
DataWorks ノートブックでは、複数のタスク間でコードやデータを再利用するためのいくつかの方法をサポートしています。次の表を参考に適切なアプローチを選択してください。
| 方法 | 使用タイミング | 注記 |
|---|---|---|
| プロジェクトリソース(`.py` ファイル) | ノートブックタスク間で Python ユーティリティ関数を共有する場合 (推奨) | MaxCompute リソース管理に公開され、開発環境および本番環境の両方で利用可能です。 |
| データセット(OSS/NAS) | タスク実行中に大容量ファイルを読み取りまたは書き込む場合 | 開発環境および本番環境それぞれで個別にマウントする必要があります。 |
| ワークスペースパラメーター | 複数のタスク間でグローバル設定値を共有する場合 | DataWorks Professional Edition 以上で利用可能です。オペレーションセンターで作成する必要があります。 |
プロジェクトリソースの参照
共通の関数やクラスを .py ファイルにカプセル化し、##@resource_reference{"custom_name.py"} を使用して MaxCompute リソースとして参照します。これにより、コードのモジュール化、再利用性の向上、メンテナンスの簡素化が実現できます。
Python リソースの作成および公開
DataWorks DataStudio の左側ナビゲーションウィンドウで、
をクリックして Resource Management に移動します。対象ディレクトリを右クリックするか、右上隅の + アイコンをクリックします。[リソースの作成] > [MaxCompute Python] を選択し、ファイル名を
my_utils.pyとします。[ファイルの内容] セクションで、[オンライン編集] をクリックし、ユーティリティ機能コードを貼り付けて、[保存] をクリックします。
# my_utils.py def greet(name): return f"Hello, {name} from resource file!"[保存] をクリックした後、ツールバーの [公開] をクリックします。これにより、リソースが開発および本番タスクの両方で利用可能になります。
ノートブックでのリソース参照
Python セルの先頭行で、##@resource_reference を使用して公開済みリソースを参照します。
##@resource_reference{"my_utils.py"}
# リソースがサブディレクトリにある場合 (例: my_folder/my_utils.py)、
# ディレクトリ名を除いて参照します: ##@resource_reference{"my_utils.py"}
from my_utils import greet
message = greet('DataWorks')
print(message)開発環境でのデバッグ
Python セルを実行すると、次の出力が表示されます。
Hello, DataWorks from resource file!開発環境では、システムが ##@resource_reference 宣言を検出し、ファイルを個人ディレクトリ内の workspace/_dataworks/resource_references に自動的にダウンロードします。ModuleNotFoundError が発生した場合は、エディターツールバーの [再起動] をクリックしてリソースを再読み込みしてください。
本番環境への公開
ノートブックノードを保存および公開した後、[オペレーションセンター] > [定期実行タスク] に移動し、[テスト] をクリックします。タスクが成功すると、ログに Hello, DataWorks from resource file! という出力が表示されます。
There is no file with id ... というエラーが発生した場合は、まず Python リソースを本番環境に公開してください。
詳細については、「MaxCompute リソースおよび関数」をご参照ください。
データセットの読み取りおよび書き込み
ノートブックタスクは、実行中に OSS または NAS に保存された大容量ファイルを読み取りまたは書き込みできます。
開発環境でのデバッグ
データセットのマウント:個人用開発環境の詳細ページで、[ストレージ構成] > [データセット] の下にデータセットを設定します。
コードでのアクセス:データセットは個人用開発環境内のパスにマウントされます。このパスを直接読み取りまたは書き込みします。
# データセットが /mnt/data/dataset にマウントされていると仮定します。 import pandas as pd file_path = '/mnt/data/dataset/testfile.csv' df = pd.read_csv(file_path) # PyODPS を使用して MaxCompute にデータを書き込みます。 o = %odps o.write_table('mc_test_table', df, overwrite=True) print(f"Data successfully written to MaxCompute table mc_test_table.")
本番環境へのデプロイ
データセットのマウント:ノートブックノード編集ページで、右側ナビゲーションウィンドウの [スケジューリング構成] > [スケジューリングポリシー] の下にデータセットを追加します。
コードでのアクセス:ノードをコミットおよび公開すると、データセットは本番環境内のパスにマウントされます。コード内で同じパスを使用します。
# データセットが /mnt/data/dataset にマウントされていると仮定します。 import pandas as pd file_path = '/mnt/data/dataset/testfile.csv' df = pd.read_csv(file_path) # PyODPS を使用して MaxCompute にデータを書き込みます。 o = %odps o.write_table('mc_test_table', df, overwrite=True) print(f"Data successfully written to MaxCompute table mc_test_table.")
詳細については、「個人用開発環境でのデータセットの使用」をご参照ください。
ワークスペースパラメーターの参照
この機能は DataWorks Professional Edition 以上でのみ利用可能です。
ワークスペースパラメーターを使用すると、複数のタスクおよびノード間でグローバル設定を再利用し、環境を分離できます。SQL セルまたは Python セルで、形式 ${workspace.param} を使用してワークスペースパラメーターを参照します。ここで、param はパラメーター作成時に割り当てた名前です。
ワークスペースパラメーターの作成:[オペレーションセンター] > [スケジューリング設定] > [ワークスペースパラメーター] に移動してパラメーターを作成します。
コードでのパラメーター参照:SQL セルの場合:
SELECT '${workspace.param}';Python セルの場合:
print('${workspace.param}')セル実行後、ワークスペースパラメーターの解決済み値が出力されます。
詳細については、「ワークスペースパラメーターの使用」をご参照ください。
マジックコマンドを使用したコンピュートエンジンとの連携
マジックコマンドは、% または %% をプレフィックスとする特殊コマンドで、Python セルと各種コンピュートリソースとの連携を簡素化します。
1 つのノートブックノードは、マジックコマンドを使用して 1 種類のコンピュートリソースにのみ接続できます。
MaxCompute への接続
MaxCompute コンピュートリソースのバインドを接続前に完了させてください。
%odps — PyODPS エントリーオブジェクトの取得
現在の MaxCompute プロジェクトにバインドされた認証済み PyODPS オブジェクトを返します。これにより、コード内に AccessKey をハードコードする必要がなくなります。
o = %odpsコマンド実行後、右下隅にコンピュートリソースセレクターが表示され、プロジェクトが自動的に選択されます。プロジェクト名をクリックして別のプロジェクトに切り替えることができます。
このオブジェクトを使用して PyODPS スクリプトを実行します。たとえば、現在のプロジェクト内のすべてのテーブルを一覧表示するには、次のとおりです。
with o.execute_sql('show tables').open_reader() as reader:
print(reader.raw)%maxframe — MaxFrame 接続の確立
MaxCompute データ上で分散型かつ pandas 風のデータ処理を行うための MaxFrame セッションを作成します。
# MaxFrame を使用して MaxCompute に接続します。
mf_session = %maxframe
df = mf_session.read_odps_table('your_mc_table')
print(df.head())
# デバッグ後に手動でセッションを破棄してリソースを解放します。
mf_session.destroy()Spark リソースへの接続
DataWorks ノートブックは複数の Spark エンジンへの接続をサポートしています。これらのエンジンは、接続方法、実行コンテキスト、リソース管理の点で異なります。
エンジン比較
| 機能 | MaxCompute Spark | EMR Serverless Spark | AnalyticDB for Spark |
|---|---|---|---|
| 接続コマンド | %maxcompute_spark | %emr_serverless_spark | %adb_spark add |
| 前提条件 | MaxCompute リソースをバインドする | EMR コンピュートリソースのバインドおよび Livy Gateway | ADB Spark 計算リソースのバインド |
| 開発環境 | Livy セッションを自動的に作成または再利用 | 既存の Livy Gateway に接続してセッションを作成 | Spark Connect Server を自動的に作成または再利用 |
| 本番環境 | Livy モード:Livy サービス経由で Spark ジョブを送信 | spark-submit バッチ処理モード:純粋なバッチ処理で、セッション状態は保持されません | Spark Connect Server モード:Spark 接続サービス経由で連携 |
| 本番環境でのリソース解放 | タスクインスタンス終了後にセッションが自動的に解放 | タスクインスタンス終了後にリソースが自動的にクリーンアップ | タスクインスタンス終了後にリソースが自動的に解放 |
| ユースケース | MaxCompute エコシステムと緊密に統合された汎用バッチ処理および ETL タスク | 柔軟な構成およびオープンソースエコシステム統合(Hudi や Iceberg など)を必要とする複雑な分析タスク | AnalyticDB for MySQL の [C-Store テーブル] に対する高性能インタラクティブクエリ |
Spark エンジンに接続後、ノートブックカーネル全体の実行コンテキストがリモート PySpark 環境に切り替わります。以降のセルでは直接 PySpark コードを記述してください。
MaxCompute Spark
MaxCompute コンピュートリソースのバインドを接続前に完了させてください。
MaxCompute プロジェクトに組み込まれた Spark エンジンに Livy 経由で接続します。
接続の確立:Python セルで次のコマンドを実行します。システムが自動的に Spark セッションを作成または再利用します。
# Spark セッションを作成し、Livy を開始します。 %maxcompute_sparkPySpark コードの実行:新しい Python セルで
%%sparkセルマジックを使用します。# MaxCompute Spark を使用する Python セルは %%spark で始める必要があります。 %%spark df = spark.sql("SELECT * FROM your_mc_table LIMIT 10") df.show()接続の解放:デバッグ後は、セッションを停止または削除します。本番環境では、タスクインスタンス終了時にシステムが Livy セッションを自動的に停止および削除します。
# Spark セッションおよび Livy を停止します。 %maxcompute_spark stop # Livy を停止して削除します。 %maxcompute_spark delete
EMR Serverless Spark
EMR Serverless Spark コンピュートリソースのバインドおよび Livy Gateway の作成を接続前に完了させてください。
事前に作成した Livy Gateway に接続して EMR Serverless Spark と連携します。
接続の確立:セルの右下隅で EMR コンピュートリソースおよび Livy gateway を選択し、次のいずれかのコマンドを実行します。
選択済み:グローバル構成がセッションのカスタムパラメーターをオーバーライドします。
未選択:セッションのカスタムパラメーターがグローバル構成をオーバーライドします。
# 基本的な接続 %emr_serverless_sparkカスタム Spark パラメーターを渡すには、
%%emr_serverless_spark(パーセント記号 2 つ)を使用します。%%emr_serverless_spark { "spark_conf": { "spark.emr.serverless.environmentId": "<EMR_Serverless_Spark_Runtime_Environment_ID>", "spark.emr.serverless.network.service.name": "<EMR_Serverless_Spark_Network_Connection_ID>", "spark.driver.cores": "1", "spark.driver.memory": "8g", "spark.executor.cores": "1", "spark.executor.memory": "2g", "spark.driver.maxResultSize": "32g" } }カスタムパラメーターは現在のセッションにのみ適用されます。省略した場合、システムは Admin Center で設定されたグローバルパラメーターを使用します。複数のタスクおよびユーザー間で構成を共有するには、[Admin Center] > [Serverless Spark] > [SPARK parameters] でグローバルに設定します。[Prioritize Global Configurations] オプションは、両方に同じパラメーターが存在する場合の優先度を制御します。
(任意)再接続:管理者が Livy gateway ページでトークンを削除した場合は、次のコマンドで再作成します。
# Livy トークンを再接続およびリフレッシュします。 %emr_serverless_spark refresh_tokenPySpark または SQL コードの実行:接続成功後、カーネルが切り替わります。Python セルで直接 PySpark コードを記述するか、EMR Spark SQL セルで SQL を記述します。
EMR Spark SQL セルを使用して EMR Serverless Spark に SQL を送信 — セルは
%emr_serverless_sparkから接続を再利用し、ジョブを自動的に送信します。セル内でコンピュートリソースを選択する必要はありません。
Python セルで PySpark コードを送信 —
%%sparkプレフィックスは不要です。
接続の解放:
重要複数のユーザーが Livy Gateway を共有している場合、
stopまたはdeleteはそのゲートウェイを使用中のすべてのユーザーに影響します。これらのコマンドは慎重に使用してください。# Spark セッションおよび Livy を停止します。 %emr_serverless_spark stop # Livy を停止して削除します。 %emr_serverless_spark delete
AnalyticDB for Spark
AnalyticDB for Spark コンピュートリソースのバインドを接続前に完了させてください。
Spark Connect Server を作成して AnalyticDB for Spark エンジンに接続します。
接続の確立:セルの右下隅で ADB Spark コンピュートリソース を選択します。ネットワーク接続を確保するために vSwitch ID および Security Group ID を設定し、次のコマンドを実行します。
vSwitch ID (`vswitchId`):Alibaba Cloud AnalyticDB MySQL コンソールで、インスタンス詳細ページの [ネットワーク情報] 内の [vSwitch ID] を確認します。
Security Group ID (`securityGroupId`):個人用開発環境の詳細ページの [ネットワーク設定] で、選択した [セキュリティグループ] の ID(
sg-で始まる)を確認します。
重要ネットワーク接続を確保するには、個人用開発環境を作成する際に AnalyticDB for Spark インスタンスと同じ Virtual Private Cloud (VPC) および vSwitch を選択してください。
# ネットワーク接続のための vSwitch ID および Security Group ID を設定します。 %adb_spark add --spark-conf spark.adb.version=3.5 --spark-conf spark.adb.eni.enabled=true --spark-conf spark.adb.eni.vswitchId=<vSwitch_ID_of_ADB> --spark-conf spark.adb.eni.securityGroupId=<Security_Group_ID_of_personal_development_environment>vSwitch および Security Group ID の確認方法:
PySpark コードの実行:接続確立後、新しい Python セルで PySpark を実行します。
AnalyticDB for Spark エンジンは、
'storagePolicy'='COLD'属性を持つ C-Store テーブルのみを処理できます。# AnalyticDB for Spark は C-Store テーブルのみを処理できます。 df = spark.sql("SELECT * FROM my_adb_cstore_table LIMIT 10") df.show()接続の解放:デバッグ後は、リソースを節約するために接続セッションをクリーンアップします。本番環境では、システムが自動的にクリーンアップします。
%adb_spark cleanup
Lindorm Ray への接続
Lindorm コンピュートエンジンの RAY リソースグループは、エンドツーエンドの AI ワークロードをサポートする分散コンピューティングサービスを提供します。ノートブックで Lindorm Ray リソースに接続してインタラクティブ開発を行い、その後ノートブックを本番スケジューリングタスクとして公開します。
接続の確立
Python セルで %lindorm_ray を実行します。右下隅にコンピュートリソースセレクターが表示されるので、Lindorm コンピュートリソースおよび作成済みの RAY リソースグループ を選択します。
# 指定された Lindorm Ray リソースグループに接続します。
%lindorm_rayLindorm Ray コンピュートリソースに接続後、同一ノートブック内の SQL セルは実行できなくなります。Lindorm Ray は Python および Ray コードの実行に特化しています。
同じコードセルを複数回実行すると、前の Ray ジョブが自動的に終了され、新しいジョブが開始されます。これにより、リソースの浪費やタスクの競合を防止します。
Ray コードの実行
接続成功後、新しい Python セルで Ray コードを記述および実行します。ログはリアルタイムでセルの出力エリアにストリームバックされます。
次の例では、@ray.remote デコレーターを使用してリモートタスクを定義します。このタスクは Ray クラスター上で実行され、そのログおよび結果が出力エリアに返されます。
import ray
import time
@ray.remote
def hello_world():
print("Hello from Lindorm Ray!")
time.sleep(5)
return "Task finished."
# リモートタスクを送信します。
result_ref = hello_world.remote()
print(ray.get(result_ref))カスタム起動パラメーターの指定(任意)
サードパーティ製 Python パッケージをインストールする場合やローカルコードファイルをアップロードする場合は、%%lindorm_ray を使用してカスタムランタイム環境構成とともに接続を確立します。
例 1:依存関係のインストール
pip パラメーターを使用して、Ray 環境内に jieba パッケージをインストールします。
%%lindorm_ray
{
"runtime_env": {
"pip": ["jieba"]
}
}環境準備後、後続の Ray タスクでパッケージをインポートおよび使用します。
import ray
@ray.remote
def do_work(x):
import jieba
return "/".join(jieba.cut(x))
print(ray.get(do_work.remote("Welcome to the DataWorks+LindormRay solution")))例 2:DataWorks リソースのアップロードおよび使用
working_dir パラメーターを使用して、DataWorks リソース管理から Ray クラスターにリソースをアップロードします。
working_dir経由でアップロードされるファイルは、開発環境から直接取得されます。ファイルサイズは 100 MB までです。より大きなファイル(100 MB 超)の場合は、OSS にアップロードしてコード内で OSS から読み取るか、カスタムイメージにパッケージ化してください。開発環境では、
##@resource_referenceを含むセルを実行した後、%%lindorm_rayセルを再度実行して、ダウンロードされたリソースをworking_dirに含めてください。本番環境ではこの手順は不要です。
# DataStudio リソース管理からリソースを参照し、そのパスを宣言します。
%%lindorm_ray
{
"runtime_env": {
"working_dir": "/mnt/workspace/_dataworks/resource_references"
}
}ray_resource.py が DataStudio リソース管理にアップロード済みであると仮定します。
ray_resource.py:
def fun():
print("This is a test function in ray_resource.py")これを Ray タスクで参照および使用します。
import ray
##@resource_reference{"ray_resource.py"}
@ray.remote
def do_work(x):
print('Ray says:', x)
from ray_resource import fun
fun()
return x
worker = do_work.remote("Welcome to the DataWorks+LindormRay solution")
print(ray.get(worker))本番スケジューリングおよび運用管理
開発およびデバッグ後、ノートブックノードをコミットおよび公開します。これにより、ノードは DAG 内で定期スケジュールに基づき Lindorm Ray ノードとして実行されます。
パラメーター化:標準の DataWorks スケジューリングパラメーター(例:
${bizdate})を使用します。ログの確認:過剰なログがパフォーマンスに影響を与えないようにするため、システムはデフォルトで最初の 1 MB のログのみを読み込みます。ログが切り捨てられた場合、完全なタスブログへのリンクが Lindorm コンソールに表示されます。
リソース解放:スケジュールされた本番タスク終了後、Lindorm Ray タスクは望ましい状態に入り、リソースを解放します。インタラクティブ開発中は、カーネルを再起動するかノートブックを閉じてリソースを解放します。
マジックコマンド早見表
| マジックコマンド | 例 | 説明 | エンジン |
|---|---|---|---|
%odps | o = %odps | PyODPS エントリーオブジェクトを取得します。 | MaxCompute |
%maxframe | mf_session = %maxframe | MaxFrame 接続を確立します。 | MaxCompute |
%maxcompute_spark | %maxcompute_spark | Spark セッションを作成します。 | MaxCompute Spark |
%maxcompute_spark stop | %maxcompute_spark stop | Spark セッションをクリーンアップし、Livy を停止します。 | MaxCompute Spark |
%maxcompute_spark delete | %maxcompute_spark delete | Spark セッションをクリーンアップした後、Livy を停止および削除します。 | MaxCompute Spark |
%%spark | %%spark | Python セル内で、すでに作成済みの Spark コンピュートリソースに接続します。 | MaxCompute Spark |
%emr_serverless_spark | %emr_serverless_spark | Spark セッションを作成します。 | EMR Serverless Spark |
%emr_serverless_spark info | %emr_serverless_spark info | Livy Gateway の詳細情報を表示します。 | EMR Serverless Spark |
%emr_serverless_spark stop | %emr_serverless_spark stop | Spark セッションをクリーンアップし、Livy を停止します。 | EMR Serverless Spark |
%emr_serverless_spark delete | %emr_serverless_spark delete | Spark セッションをクリーンアップした後、Livy を停止および削除します。 | EMR Serverless Spark |
%emr_serverless_spark refresh_token | %emr_serverless_spark refresh_token | 個人用開発環境の Livy トークンをリフレッシュします。 | EMR Serverless Spark |
%adb_spark add | %adb_spark add --spark-conf ... | 再利用可能な ADB Spark セッションを作成および接続します。 | AnalyticDB for Spark |
%adb_spark info | %adb_spark info | Spark セッション情報を表示します。 | AnalyticDB for Spark |
%adb_spark cleanup | %adb_spark cleanup | 現在の Spark 接続セッションを停止およびクリーンアップします。 | AnalyticDB for Spark |
%lindorm_ray | %lindorm_ray | Lindorm Ray 接続を確立します。 | Lindorm Ray |
%%lindorm_ray | %%lindorm_ray with JSON config | Lindorm Ray 接続を確立し、カスタムランタイム環境を構成します。 | Lindorm Ray |
よくある質問
ワークスペースリソースを参照する際に `ModuleNotFoundError` または "There is no file with id ..." エラーが発生するのはなぜですか?
[データ開発] > [Resource Management] に移動し、MaxCompute Python リソースが保存済みであることを確認します。
本番環境では、リソースが本番環境に公開済みであることを確認します。
ノートブックエディターツールバーの [再起動] をクリックしてリソースを再読み込みします。
ワークスペースリソースを更新した後も古いバージョンが使用されるのはなぜですか?
DataStudio 設定項目でリソース競合処理ポリシー Dataworks › Notebook › Resource Reference: Download Strategy を autoOverwrite に設定し、ノートブックツールバーの [カーネルの再起動] をクリックします。
開発環境でデータセットを参照する際に `FileNotFoundError` が発生するのはなぜですか?
[ストレージ構成] > [データセット] の下で、現在選択中の個人用開発環境にデータセットがマウントされていることを確認してください。
開発環境ではデータセットアクセスが成功するのに、本番環境で `Execute mount dataset exception! Please check your dataset config` というエラーが発生するのはなぜですか?
ノートブックノードの [スケジューリング構成] にデータセットがマウントされていることを確認します。
OSS データセットに必要な権限が付与されていることを確認します。

個人用開発環境のバージョンを確認する方法を教えてください。
Cmd+Shift+P を押して ABOUT と入力すると、バージョンを確認できます。バージョン 0.5.69 以降が必要な場合は、アップグレードプロンプトの [ワンクリックアップグレード] オプションを使用してください。
Spark エンジンへの接続が失敗するのはなぜですか?
一般的なチェックから始めます。ワークスペース詳細ページで、コンピュートリソース(MaxCompute、EMR、または ADB)が正しくバインドされており、アカウントに必要な権限があることを確認します。
EMR Serverless Spark:Livy Gateway が作成および実行中であることを確認します。
AnalyticDB for Spark:
vswitchIdおよびsecurityGroupIdが正しいこと、個人用開発環境と ADB Spark インスタンス間のネットワークが接続されていること、セキュリティグループルールが必要なポートでの通信を許可していることを確認します。