この記事では、DataWorks Notebooks を使用する高度なテクニックについて説明します。インタラクティブな開発とスケジュールされた本番ジョブの間にあるギャップを埋めます。コード再利用、データマウント、パラメーター管理などのエンジニアリングプラクティスを通じて、開発効率を向上させる方法を学びます。また、MaxCompute Spark、EMR Serverless Spark、AnalyticDB for Spark など、さまざまなコンピュートエンジンに接続するための実践的なテクニックやデバッグ方法についても学びます。
まず、「Notebook 開発」をお読みになることを推奨します。
開発環境と本番環境
DataWorks Notebooks はスケジュール実行をサポートする開発・分析ツールです。これは、以下の 2 つの異なるランタイム環境で動作することを意味します。
開発環境:DataStudio のノートブックエディターでセルを直接実行すると、コードはご利用の個人開発環境インスタンスで実行されます。この環境は、コードの迅速な検証およびデバッグを目的として設計されています。
本番環境:ノートブックがコミットされ公開された後、定期タスクまたはバックフィルプロセスによって実行されます。コードは分離された一時的なタスクインスタンスで実行されます。この環境は、本番タスクを安定かつ確実に実行することを目的として設計されています。
これらの 2 つの環境には大きな違いがあります。これらを理解することが、効率的な開発の鍵となります。
機能比較:開発環境と本番環境
特徴 | 開発環境 | 本番環境 |
プロジェクトリソース( |
| 自動的に有効になります。 |
データセット(OSS/NAS)の読み取りおよび書き込み | 個人開発環境でのマウントが必要です。 | [スケジューリング構成] でのマウントが必要です。 |
ワークスペースパラメーター( | サポートされています。コード実行前にテキストが自動的に置き換えられます。 | サポートされています。タスク実行前にテキストが自動的に置き換えられます。 |
Spark セッション管理 | Spark セッションのデフォルトのアイドルタイムアウトは 2 時間です。この期間中に新しいコードが実行されない場合、セッションは自動的に解放されます。 | 短命なタスクレベルのセッションです。タスクインスタンスとともに自動的に作成および破棄されます。 |
本番環境でのコードおよびデータの再利用
プロジェクトリソース (.py ファイル) の参照
共通の関数やクラスを個別の .py ファイルにカプセル化し、##@resource_reference{"custom_name.py"} を使用して MaxCompute リソースとして参照します。このアプローチにより、コードがモジュール化され、再利用性および保守性が向上します。
Python リソースの作成と公開
DataWorks DataStudio の左側ナビゲーションウィンドウで、
をクリックして [リソース管理] に移動します。リソース管理ツリーで、対象ディレクトリを右クリックするか、右上隅の + アイコンをクリックし、[リソースの作成] > [MaxCompute Python] を選択します。リソース名を
my_utils.pyとします。[ファイルコンテンツ] セクションで [オンライン編集] をクリックし、テスト済みのユーティリティコードをエディターに貼り付け、[保存] をクリックします。
# my_utils.py def greet(name): return f"Hello, {name} from resource file!"ツールバーで、[保存] をクリックした後、[公開] をクリックしてリソースを公開し、開発および本番タスクの両方で利用可能にします。
Notebook でのリソースの参照
ノートブックの Python セル で、公開済みリソースを参照するために、最初の行 に
##@resource_reference構文を使用します。##@resource_reference{"my_utils.py"} # リソースが my_folder/my_utils.py のようなディレクトリ内にある場合でも、ディレクトリ名を含めずに ##@resource_reference{"my_utils.py"} として参照します。 from my_utils import greet message = greet('DataWorks') print(message)開発環境での実行およびデバッグ
Python セルを実行します。次の出力が表示されます。
Hello, DataWorks from resource file!重要開発環境でデバッグする際、システムが
##@resource_referenceを検出すると、対象ファイルをリソース管理からご利用の個人ディレクトリ内のworkspace/_dataworks/resource_referencesパスに自動的にダウンロードし、参照を有効にします。ModuleNotFoundErrorが発生した場合は、ツールバーの [再起動] ボタンをクリックしてリソースを再読み込みしてください。本番環境への公開と検証
ノートブックを [保存] および [公開] します。次に、[オペレーションセンター] > [周期タスク] に移動し、[テスト] をクリックして実行します。タスクが成功すると、ログに
Hello, DataWorks from resource file!が出力されます。重要There is no file with id ...というエラーが発生した場合は、Python リソースが事前に本番環境に公開されていることを確認してください。
詳細については、「MaxCompute のリソースと関数」をご参照ください。
データセットの読み取りおよび書き込み
ノートブックタスク実行中に、Object Storage Service (OSS) または Network Attached Storage (NAS) に保存された大規模なファイルデータを簡単に読み書きできます。
開発環境でのデバッグ
データセットのマウント:ご利用の個人開発環境の詳細ページに移動し、[ストレージ構成] > [データセット] でデータセットを構成します。
コードでのデータアクセス:データセットは個人開発環境内のパスにマウントされます。コード内でこのパスを直接読み書きできます。
# 個人開発環境でマウントパス /mnt/data/dataset でデータセットが構成されていると仮定します。 import pandas as pd # マウントパスを直接使用します。 file_path = '/mnt/data/dataset/testfile.csv' df = pd.read_csv(file_path) # PyODPS を使用して MaxCompute にデータを書き込みます。 o = %odps o.write_table('mc_test_table', df, overwrite=True) print(f"Successfully wrote data to the MaxCompute table mc_test_table.")
本番デプロイメント
データセットのマウント:ノートブックエディターの右側ナビゲーションウィンドウで、[スケジューリング構成] > [スケジューリングポリシー] に移動し、データセットを追加します。
コードでのデータアクセス:ノートブックをコミットおよび公開すると、データセットは本番環境内のパスにマウントされます。コード内でこのパスを直接読み書きできます。
# マウントパス /mnt/data/dataset でデータセットが構成されていると仮定します。 import pandas as pd # マウントパスを直接使用します。 file_path = '/mnt/data/dataset/testfile.csv' df = pd.read_csv(file_path) # PyODPS を使用して MaxCompute にデータを書き込みます。 o = %odps o.write_table('mc_test_table', df, overwrite=True) print(f"Successfully wrote data to the MaxCompute table mc_test_table.")
詳細については、「個人開発環境でのデータセットの使用」をご参照ください。
ワークスペースパラメーターの参照
この機能は、DataWorks Professional Edition 以上でのみ利用可能です。
既存のスケジューリングパラメーターに加えて、DataWorks ではクロスタスク、クロスノードのグローバル設定の再利用および環境隔離のためのワークスペースパラメーターが導入されています。${workspace.param} の形式を使用して、SQL セルおよび Python セルでワークスペースパラメーターを参照できます。ここで、param はワークスペースパラメーターの名前です。
1. ワークスペースパラメーターの作成:DataWorks の [オペレーションセンター] > [スケジューリング設定] > [ワークスペースパラメーター] ページに移動し、必要なパラメーターを作成します。
2. ワークスペースパラメーターの参照:
SQL セルでワークスペースパラメーターを参照します。
SELECT '${workspace.param}';このクエリは、ワークスペースパラメーターの解決済み値を返します。
Python セルでワークスペースパラメーターを参照します。
print('${workspace.param}')これにより、ワークスペースパラメーターの解決済み値が出力されます。
詳細については、「ワークスペースパラメーターの使用」をご参照ください。
Magic コマンドおよびコンピュートエンジン
Magic コマンドは、% または %% で始まる特殊なコマンドです。これにより、Python セル とさまざまな計算リソースとのやり取りが簡素化されます。
MaxCompute への接続
MaxCompute 計算リソースに接続する前に、MaxCompute 計算リソースの関連付けを行っていることを確認してください。
%odps:PyODPS エントリポイントオブジェクトの取得このコマンドは、現在の MaxCompute プロジェクトにバインドされた認証済みの PyODPS オブジェクトを返します。コード内に AccessKey をハードコードすることを回避できるため、MaxCompute とやり取りする際の推奨方法です。
Magic コマンドを使用して MaxCompute 接続を作成します。
%odpsを入力します。右下隅に MaxCompute の計算リソースセレクターが表示され、計算リソースが自動的に選択されます。MaxCompute プロジェクト名をクリックして、別のプロジェクトに切り替えることができます。o=%odps返された MaxCompute 計算リソースを使用して、PyODPS スクリプトを実行します。
たとえば、現在のプロジェクト内のすべてのテーブルを取得するには、次のとおりです。
with o.execute_sql('show tables').open_reader() as reader: print(reader.raw)
%maxframe:MaxFrame 接続の確立このコマンドは MaxFrame セッションを作成し、MaxCompute に Pandas ライクな分散データ処理機能を提供します。
# MaxCompute MaxFrame セッションに接続してアクセスします。 mf_session = %maxframe df = mf_session.read_odps_table('your_mc_table') print(df.head()) # 開発およびデバッグ後は、リソースを解放するために手動でセッションを破棄する必要があります。 mf_session.destroy()
Spark コンピュートエンジンへの接続
DataWorks Notebooks は、複数の Spark エンジンへの接続をサポートしています。これらのエンジンは、接続方法、実行コンテキスト、リソース管理において異なります。
1 つのノートブックでは、Magic コマンドを使用して 1 種類の計算リソースにのみ接続できます。
エンジンの比較
特徴 | MaxCompute Spark | EMR Serverless Spark | AnalyticDB for Spark |
接続コマンド |
|
|
|
説明 実行後、ノートブックカーネルはリモートの PySpark 環境に切り替わります。その後、後続のセルに PySpark コードを記述できます。 | |||
前提条件 | MaxCompute リソースをバインドしていること。 | EMR リソースをバインドし、Livy Gateway を作成していること。 | ADB Spark リソースをバインドしていること。 |
開発環境モード | Livy セッションを自動作成または再利用します。 | 既存の Livy ゲートウェイに接続してセッションを作成します。 | Spark Connect Server を自動作成または再利用します。 |
本番環境モード | Livy モード:Livy サービス経由で Spark ジョブを送信します。 | spark-submit バッチモード:ステートレスなバッチ処理モードです。 | Spark Connect Server モード:Spark 接続サービス経由でやり取りします。 |
本番リソース解放 | タスクインスタンス終了後にセッションが自動的に解放されます。 | タスクインスタンス終了後にリソースが自動的にクリーンアップされます。 | タスクインスタンス終了後にリソースが自動的に解放されます。 |
ユースケース | MaxCompute エコシステムと密接に統合された一般的なバッチ処理および ETL タスク。 | Hudi や Iceberg などのオープンソースエコシステムとの連携や柔軟な設定を必要とする複雑な分析タスク。 | ADB for MySQL の C-Store テーブル に対する高性能なインタラクティブクエリおよび分析。 |
MaxCompute Spark
MaxCompute 計算リソースに接続する前に、MaxCompute 計算リソースの関連付けを行っていることを確認してください。
Livy を介して、MaxCompute プロジェクトに組み込まれた Spark エンジンに接続します。
接続の確立:Python セルで次のコマンドを実行します。システムは自動的に Spark セッションを作成または再利用します。
# Spark セッションを作成します。 %maxcompute_sparkPySpark コードの実行:接続が成功した後、新しい Python セル で
%%sparkセル Magic を使用して PySpark コードを実行します。# MaxCompute Spark を使用する場合、Python セルは %%spark で始める必要があります。 %%spark df = spark.sql("SELECT * FROM your_mc_table LIMIT 10") df.show()接続の手動解放:開発およびデバッグ後、セッションを手動で停止または削除できます。本番環境では、システムが現在のタスクインスタンスの Livy セッションを自動的に停止および削除します。
# Spark セッションをクリーンアップし、Livy を停止します。 %maxcompute_spark stop # Spark セッションをクリーンアップし、Livy を停止してから削除します。 %maxcompute_spark delete
EMR Serverless Spark
計算リソースに接続する前に、ワークスペースでEMR Serverless Spark 計算リソースの関連付けを行い、Livy Gateway の作成を行ってください。
Livy Gateway に接続して、EMR Serverless Spark とやり取りします。
接続の確立:コマンドを実行する前に、セルの右下隅で EMR 計算リソース および Livy Gateway を選択します。
# 基本的な接続 %emr_serverless_spark # または、接続時にカスタム Spark パラメーターを渡すこともできます。カスタム Spark パラメーターを指定する場合は、2 つのパーセント記号(%%)を使用する必要があることに注意してください。 %%emr_serverless_spark { "spark_conf": { "spark.emr.serverless.environmentId": "<EMR_Serverless_Spark_runtime_environment_ID>", "spark.emr.serverless.network.service.name": "<EMR_Serverless_Spark_network_connection_ID>", "spark.driver.cores": "1", "spark.driver.memory": "8g", "spark.executor.cores": "1", "spark.executor.memory": "2g", "spark.driver.maxResultSize": "32g" } }説明カスタムパラメーターとグローバル設定の関係
デフォルトの動作:ここで定義されたカスタムパラメーターは、現在の接続(セッション)にのみ適用され、1 回限りで使用されます。カスタムパラメーターを指定しない場合、システムは自動的に [管理センター] で構成されたグローバルパラメーターを使用します。
推奨される使用方法:複数のタスクまたは複数のユーザー間で再利用が必要な構成については、[管理センター] > [Serverless Spark] > [SPARK パラメーター] でグローバルパラメーターとして設定することを推奨します。これにより、一貫性が確保され、管理が簡素化されます。
優先順位ルール:カスタムパラメーターとグローバル構成の両方に同じパラメーターが設定されている場合、[管理センター] の [グローバル構成を優先] オプションが、どちらのパラメーターが使用されるかを決定します。
選択済み:グローバル構成がこのセッションのカスタムパラメーターをオーバーライドします。
未選択:このセッションのカスタムパラメーターがグローバル構成をオーバーライドします。
(オプション)再接続:管理者が誤って Livy Gateway ページのトークンを削除した場合、このコマンドを使用して再作成できます。
# 現在の個人開発環境の Livy トークンを再接続およびリフレッシュします。 %emr_serverless_spark refresh_tokenPySpark または SQL コードの実行:接続が成功すると、カーネルが切り替わります。Python セル に直接 PySpark コードを記述するか、EMR Spark SQL セル に SQL を記述できます。
EMR Spark SQL セルによる SQL コードの送信および実行
%emr_serverless_sparkを使用して接続を確立した後、EMR Spark SQL セルに SQL 文を直接記述できます。セル内で計算リソースを選択する必要はありません。EMR Spark SQL セルは、
%emr_serverless_sparkからの接続を再利用し、ジョブを対象の計算リソースに送信します。
Python セルによる PySpark コードの送信および実行
%emr_serverless_sparkを使用して接続を確立した後、新しい Python セルに PySpark コードを送信および実行できます。セルに%%sparkプレフィックスを追加する必要はありません。
接続の手動解放
重要複数のユーザーが単一の Livy Gateway を共有している場合、stop または delete コマンドは、そのゲートウェイを使用中のすべてのユーザーに影響します。これらのコマンドは慎重に使用してください。
# Spark セッションをクリーンアップし、Livy を停止します。 %emr_serverless_spark stop # Spark セッションをクリーンアップし、Livy を停止してから削除します。 %emr_serverless_spark delete
AnalyticDB for Spark
計算リソースに接続する前に、ワークスペースでAnalyticDB for Spark 計算リソースの関連付けを行ってください。
Spark Connect Server を作成して、AnalyticDB for Spark エンジンに接続します。
接続の確立:ネットワーク接続を確保するため、接続パラメーターで vSwitch ID およびセキュリティグループ ID を構成する必要があります。コマンドを実行する前に、セルの右下隅で ADB Spark 計算リソース を選択します。
# ネットワーク接続を確立するには、vSwitch ID およびセキュリティグループ ID を構成する必要があります。 %adb_spark add \ --spark-conf spark.adb.version=3.5 \ --spark-conf spark.adb.eni.enabled=true \ --spark-conf spark.adb.eni.vswitchId=<vSwitch_ID_of_ADB> \ --spark-conf spark.adb.eni.securityGroupId=<Security_Group_ID_of_personal_development_environment>PySpark コードの実行:接続が成功した後、新しい Python セル で PySpark コードを実行します。
# C-Store テーブルでのみ操作を実行できます。 df = spark.sql("SELECT * FROM my_adb_cstore_table LIMIT 10") df.show()注記:AnalyticDB for Spark エンジンは、
'storagePolicy'='COLD'属性を持つ C-Store テーブルのみを処理できます。接続の手動解放:開発環境でのデバッグ後、リソースを節約するために接続セッションを手動でクリーンアップします。本番環境では、システムがリソースを自動的にクリーンアップします。
%adb_spark cleanup
付録:Magic コマンドリファレンス
マジックコマンド | 説明 | 適用可能なコンピュートエンジン |
| PyODPS エントリポイントオブジェクトを取得します。 | MaxCompute |
| MaxFrame 接続を確立します。 | |
| Spark セッションを作成します。 | MaxCompute Spark |
| Spark セッションをクリーンアップし、Livy を停止します。 | |
| Spark セッションをクリーンアップし、Livy を停止および削除します。 | |
| Python セルで、既存の Spark リソースに接続します。 | |
| Spark セッションを作成します。 | EMR Serverless Spark |
| Livy Gateway の詳細を表示します。 | |
| Spark セッションをクリーンアップし、Livy を停止します。 | |
| Spark セッションをクリーンアップし、Livy を停止および削除します。 | |
| 個人開発環境の Livy トークンをリフレッシュします。 | |
| 再利用可能な ADB Spark セッションを作成して接続します。 | AnalyticDB for Spark |
| Spark セッション情報を表示します。 | |
| 現在の Spark 接続セッションを停止してクリーンアップします。 |
よくある質問
Q:ワークスペースリソースを参照する際に、
ModuleNotFoundErrorまたはThere is no file with id ...エラーが発生するのはなぜですか?A:以下を確認してください。
[データ開発] > [リソース管理] に移動し、MaxCompute Python リソースが保存されていることを確認します。本番環境でこのエラーが発生する場合は、リソースが公開されていることを確認してください。
ノートブックツールバーの [再起動] ボタンをクリックして、リソースを再読み込みします。
Q:ワークスペースリソースを更新した後も、ノートブックが古いバージョンを参照するのはなぜですか?
A:リソースを変更および再公開した後、DataStudio 設定でリソース競合ポリシー
Dataworks › Notebook › Resource Reference: Download Strategyを autoOverwrite に設定します。その後、ノートブックツールバーの [カーネルの再起動] をクリックします。Q:データセットを参照する際に、開発環境で
FileNotFoundErrorが発生するのはなぜですか?A:データセットが現在選択されている個人開発環境にマウントされていることを確認してください。
Q:データセットの参照は開発環境では正常に動作しますが、本番環境で
Execute mount dataset exception! Please check your dataset configエラーが発生するのはなぜですか?A:データセットがノートブックの [スケジューリング構成] にマウントされており、OSS データセットに対して必要な権限が付与されていることを確認してください。

Q:個人開発環境のバージョンを確認する方法を教えてください。
A:個人開発環境に入室後、`Cmd+Shift+P` を押して `ABOUT` と入力すると、現在のバージョンを確認できます。プロダクトのアップデートにより、個人開発環境インスタンスのバージョンが 0.5.69 以降を必要とする場合、表示されるアップグレードプロンプトから [ワンクリックアップグレード] を実行してください。
Q:Spark エンジンへの接続が失敗するのはなぜですか?
A:以下を確認してください。
一般的な確認事項:ワークスペースの計算リソース一覧に移動し、対応する計算リソース(MaxCompute、EMR、または ADB)が現在のワークスペースに正しくバインドされており、ご利用のアカウントに必要な権限が付与されていることを確認します。
EMR Serverless Spark:Livy ゲートウェイが作成され、正常な状態であることを確認します。
AnalyticDB for Spark:ネットワークの問題に焦点を当ててトラブルシューティングを行います。
vswitchIdおよびsecurityGroupIdが正しく構成されており、個人開発環境が ADB Spark インスタンスに接続できることを確認します。セキュリティグループルールを確認し、必要なポート通信が許可されていることを確認します。