ご利用のプロジェクトのノードとワークフローは、多くの場合、定期的なスケジュールで実行する必要があります。スケジュールに基づいて実行するには、各ノードまたはワークフローのスケジューリング設定パネルで、スケジュール期間、スケジュール依存関係、スケジュールパラメーターなどのスケジューリングプロパティを設定する必要があります。このトピックでは、これらのスケジュールプロパティの設定方法について説明します。
前提条件
ノードが作成されていること。DataWorks では、ノードを作成してタスクを定義します。異なるエンジンタスクは、異なるノードタイプで表されます。ビジネスニーズに応じて適切なノードタイプを選択できます。詳細については、「ノードの開発」をご参照ください。
ワークスペースのスケジューリングスイッチが有効になっていること。DataWorks ワークスペース内のタスクは、ワークスペースの [定期スケジューリングを有効にする] スイッチをオンにした後にのみ、設定に基づいて自動的に実行できます。これを行うには、ワークスペースの [スケジューリング設定] ページに移動します。詳細については、「システム設定」をご参照ください。
重要
これらの設定は、タスクが本番環境に公開された後にのみ有効になります。
スケジュール時刻は、タスクの期待される実行時刻のみを定義します。実際の実行時刻は、その先祖ノードのステータスにも依存します。タスクの実行条件の詳細については、「タスク実行の診断」をご参照ください。
DataWorks は、さまざまなタイプのタスク間の依存関係をサポートしています。依存関係を設定する前に、「複雑な依存関係シナリオにおけるスケジューリング設定の原則と例」を読んで、複雑なシナリオにおける DataWorks のデフォルトの依存関係の動作を理解することを推奨します。
DataWorks では、定期タスクは、そのスケジュールタイプと期間に基づいて、対応する定期インスタンスを生成します。たとえば、時間単位のタスクは、毎日特定の数の時間単位のインスタンスを生成します。これらのインスタンスは、タスクを自動的に実行します。
スケジュールパラメーターを使用する場合、各実行のスケジュール時刻とパラメーター式によって、コードに渡されるパラメーター値が決まります。スケジュールパラメーターの設定と置換方法の詳細については、「スケジュールパラメーターのソースと式」をご参照ください。
ワークフローには、ワークフローノード自体とその内部ノードが含まれ、複雑な依存関係が作成されます。このトピックでは、個々のノードのスケジューリングと依存関係の設定のみを説明します。ワークフローのスケジュール依存関係の詳細については、「定期的なワークフローのオーケストレーション」をご参照ください。
スケジューリング設定ページへの移動
DataWorks コンソールの [ワークスペース] ページに移動します。トップナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[操作] 列で を選択します。
スケジューリング設定ページに移動します。
DataStudio インターフェイスで、ターゲットノードを見つけてそのエディターページを開きます。
ノードエディターページの右側のナビゲーションウィンドウで [スケジューリング設定] をクリックします。
ノードスケジューリングプロパティの設定
ノードのスケジューリング設定ページで、ノードの [スケジュールパラメーター]、[スケジュールポリシー]、[スケジュール時間]、[スケジュール依存関係]、および [ノード出力パラメーター] を設定する必要があります。
スケジュールパラメーター (オプション)
ノードのコードで変数を定義している場合は、ここでそれらに値を割り当てる必要があります。
スケジュールパラメーターは、定期タスクのビジネス日付とパラメーター式のフォーマットに基づいて、特定の価に自動的に置き換えられます。これにより、実行時に動的なパラメーター置換が可能になります。
スケジュールパラメーターの設定
スケジュールパラメーターは、次の 2 つの方法で定義できます。
メソッド | 説明 | 例 |
パラメーターの追加 | 1 つのタスクに複数のスケジュールパラメーターを設定できます。パラメーターを追加するには、[パラメーターの追加] をクリックします。
|
|
コードからパラメーターを読み込む | この機能は、タスクのコードで定義された変数名を自動的に識別し、定期実行で使用するためのスケジュールパラメーターとして追加します。 説明 通常、変数はコード内で PyODPS ノードと一般的な Shell ノードの変数定義形式は、他のノードタイプとは異なります。さまざまなノードタイプのスケジュールパラメーター形式の詳細については、「さまざまなノードタイプのスケジュールパラメーター設定の例」をご参照ください。 |
|
サポートされているスケジュールパラメーターのフォーマット
詳細については、「スケジュールパラメーターのソースと式」をご参照ください。
本番環境でのスケジュールパラメーターの検証
不正なスケジュールパラメーターによるタスクの失敗を防ぐため、タスクを公開した後、オペレーションセンターの [自動トリガータスク] ページに移動して、本番環境でのタスクのスケジュールパラメーター設定を確認することを推奨します。自動トリガータスクの表示方法の詳細については、「自動トリガータスクの管理」をご参照ください。
スケジュールポリシー
スケジュールポリシーは、自動トリガータスクのインスタンス生成モード、スケジュールタイプ、コンピューティングリソース、およびリソースグループを定義します。
パラメーター | 説明 |
インスタンス生成モード | ノードが本番スケジューリングシステムに公開されると、プラットフォームは設定された [インスタンス生成モード] に基づいて、自動化された [定期インスタンス] を生成します。
|
スケジュールタイプ |
|
タイムアウト期間 | タイムアウト期間を設定すると、実行時間がこの期間を超えた場合にタスクは自動的に終了します。次のルールが適用されます。
|
再実行プロパティ | ノードを再実行できるかどうか、またいつ再実行できるかを指定します。 再実行プロパティを指定する必要があります。サポートされているタイプとそのユースケースは次のとおりです。
|
失敗時の自動再実行 | 有効にすると、タスクが失敗した場合 (手動終了を除く)、スケジューリングシステムは再試行回数と再試行間隔に基づいて自動的に再実行をトリガーします。
説明
|
コンピューティングリソース | タスクの実行に必要なエンジンリソースを設定します。新しいリソースを作成するには、コンピューティングリソースの管理に移動できます。 |
コンピューティングクォータ | MaxCompute SQL ノードおよびMaxCompute Script ノードでタスクの実行に必要なコンピューティングクォータを設定できます。クォータは、コンピューティングジョブにコンピューティングリソース (CPU とメモリ) を提供します。 |
スケジュールリソースグループ | タスクの実行に使用するスケジュールリソースグループを設定します。必要に応じてリソースグループを選択します。
|
最大並列インスタンス数 | 単一タスクの最大並列インスタンス数を制限して、同時実行制御とリソース保護を提供します。デフォルトでは、並列インスタンスの数は制限されていません。この制限を有効にすると、並列インスタンスの数を設定できます。デフォルト値は
|
データセット |
|
スケジュール時間
スケジュール時間を使用して、定期タスクの自動実行の期間、時刻、およびその他の情報を設定します。
ワークフロー内のノードの場合、[スケジュール時間] と関連パラメーターは、ワークフローページの [スケジューリング設定] で設定されます。ワークフロー内にないノードの場合、[スケジュール時間] は各個々のノードの [スケジューリング設定] で設定されます。
重要
タスクのスケジューリング頻度は先祖タスクの期間とは無関係です
タスクがスケジュールされる頻度は、自身の定義されたスケジュール期間に依存し、先祖タスクの期間には依存しません。
DataWorks は異なるスケジュール期間を持つタスク間の依存関係をサポートします
DataWorks では、定期タスクは、そのスケジュールタイプと期間に基づいて対応する定期インスタンスを生成し (たとえば、時間単位のタスクは毎日多数の時間単位のインスタンスを生成します)、これらのインスタンスを通じて実行されます。定期タスク間に設定された依存関係は、本質的にそれらが生成するインスタンス間の依存関係です。定期インスタンスの数とその依存関係は、異なるスケジュールタイプを持つ先祖タスクと子孫タスクで異なります。異なるスケジュール期間を持つタスク間の依存関係の詳細については、「スケジューリング依存関係メソッドの選択 (クロスサイクル依存)」をご参照ください。
タスクはスケジュールされた時間外にドライランを実行します
日次以外のタスク (週次や月次など) は、スケジュールされていない日にドライランを実行し、成功ステータスを返します。これにより、日次子孫タスクは自身のスケジュールで正常に実行できます。
タスク実行時間
このセクションでは、タスクの期待されるスケジュール時間のみを設定します。実際の実行時間は、先祖タスクの完了時間、リソースの可用性、タスクの実際の実行条件など、複数の要因に依存します。詳細については、「タスクの実行条件」をご参照ください。
スケジュール時間の設定
パラメーター | 説明 |
スケジュール期間 | スケジュール期間は、タスクが自動的に実行される頻度を定義します。これにより、本番環境でノード内のコードロジックが実行される頻度が決まります。定期タスクは、そのスケジュールタイプと期間に基づいて対応する定期インスタンスを生成し (たとえば、時間単位のタスクは毎日多数の時間単位のインスタンスを生成します)、これらの定期インスタンスを通じて自動的に実行されます。
重要 週次、月次、および年次のタスクは、スケジュールされた実行時間外でも毎日インスタンスを生成します。これらのインスタンスは成功ステータスを示しますが、実際にはドライランを実行し、タスクを実行しません。 |
有効日 | 定期ノードは、有効日範囲内で有効であり、自動的に実行されます。有効日を過ぎたタスクは、もはや自動的にスケジュールされません。これらは期限切れのタスクと見なされます。O&M ダッシュボードで期限切れのタスクの数を確認し、廃止などのアクションを実行できます。 |
cron 式 | この式は、時間プロパティの設定に基づいて自動的に生成され、設定する必要はありません。 |
スケジュール依存関係
DataWorks では、スケジュール依存関係はノード間の先祖-子孫関係を定義します。子孫ノードは、すべての先祖ノードが正常に実行された後にのみ実行されます。この構造により、子孫ノードが先祖ノードがデータの生成を完了する前にデータにアクセスするのを防ぎ、データ整合性の問題を回避します。
重要
ノードの依存関係が設定された後、デフォルトでは、子孫ノードが実行される条件の 1 つは、すべての先祖ノードが正常に実行されたことです。そうでない場合、現在のタスクがデータを取得する際にデータ品質の問題が発生する可能性があります。
タスクの実際の実行時間は、自身のスケジュールされた時間 (スケジューリングシナリオでの期待される実行時間) だけでなく、先祖タスクの完了時間にも依存します。先祖タスクが実行を完了していない場合、子孫タスクのスケジュールされた時間が先祖タスクよりも早くても、子孫タスクは実行されません。タスクの実行条件の詳細については、「タスク実行の診断」をご参照ください。
スケジュール依存関係の設定
DataWorks のタスク依存関係は、最終的に子孫ノードがデータを正しく取得することを保証するように設計されています。これは、実際には先祖テーブルと子孫テーブルの間のデータリネージに依存することを意味します。ビジネスニーズに応じて、テーブルリネージに基づいてスケジュール依存関係を設定するかどうかを選択できます。ノードのスケジュール依存関係を設定するプロセスは次のとおりです。
依存関係は、強いデータリネージ関係を意味します。つまり、子孫ノードの出力は先祖ノードの出力に依存します。依存関係を設定する前に、この関係が必要であることを確認してください。「先祖のデータが準備できていない場合、タスクは失敗するか、誤った結果を生成しますか?」と自問してください。もし「はい」であれば、強い依存関係が存在します。
ステップ | 説明 |
① | 現在のタスクの予期しない実行時間を避けるために、まずテーブル間に強い依存関係が存在するかどうかを評価し、データリネージに基づいてスケジュール依存関係を設定する必要があるかどうかを判断します。 |
② | データが自動トリガータスクによって生成されたテーブルからのものであるかどうかを確認します。DataWorks は、定期スケジューラによって生成されていないテーブルのデータ生成をタスク実行ステータスを通じて監視できません。したがって、一部のテーブルにはスケジュール依存関係を設定できません。 DataWorks の定期スケジュールによって生成されないテーブルには、次のタイプが含まれますが、これらに限定されません。
|
③④ | 先祖の昨日または今日のデータに依存する必要があるか、または時間単位または分単位のタスクが自身の前のインスタンスに依存する必要があるかに応じて、先祖の同一サイクルまたは前サイクルに依存することを選択します。
説明 データリネージに基づく依存シナリオの設定の詳細については、「スケジューリング依存関係メソッドの選択 (同一サイクル依存)」をご参照ください。 |
⑤⑥⑦ | 依存関係を設定して本番環境に公開した後、オペレーションセンターの [自動トリガータスク] でタスクの依存関係を確認し、それらが正しいことを検証できます。 |
カスタムノード依存関係の設定
DataWorks 上のタスク間に強いデータリネージ依存関係がない場合 (たとえば、タスクが先祖の特定のパーティションに強く依存せず、現在の時刻で最新のパーティションを取得するだけの場合)、または依存するデータが自動トリガータスクによって生成されたテーブルからのものでない場合 (たとえば、ローカルにアップロードされたテーブルデータ)、ノードの依存関係をカスタマイズできます。カスタム依存関係の設定は次のとおりです。
ワークスペースのルートノードに依存
ソースデータが別のビジネスデータベースからのものである同期タスクや、リアルタイム同期タスクによって生成されたテーブルデータを処理する SQL タスクなどのシナリオでは、直接ワークスペースのルートノードに依存関係をマウントすることを選択できます。
ゼロロードノードに依存
ワークスペースに多数または複雑なビジネスプロセスが含まれている場合、ゼロロードノードを使用してそれらを管理できます。統一された制御が必要なノードの依存関係を特定のゼロロードノードにマウントすることで、ワークスペース内のデータフローパスをより明確にすることができます。たとえば、ビジネスプロセス全体のスケジューリング時間を制御したり、スケジューリングを有効/無効 (凍結) したりできます。
ノード出力パラメーター
先祖ノードから子孫ノードに値を渡すことができます。これを行うには、先祖ノードで出力パラメーターを定義し、次に子孫ノードでそれを参照する入力パラメーターを作成します。
重要
ノードの [出力パラメーター] は、子孫ノードの入力パラメーターとしてのみ使用できます (子孫ノードのスケジュールパラメーターセクションでパラメーターを追加し、
アイコンをクリックして先祖のパラメーターにバインドします)。一部のノードは、クエリ結果を直接子孫ノードに渡すことができません。先祖ノードから子孫ノードにクエリ結果を渡す必要がある場合は、代入ノードを使用できます。詳細については、「代入ノード」をご参照ください。出力パラメーターをサポートするノードは、
EMR Hive、EMR Spark SQL、ODPS Script、Hologres SQL、AnalyticDB for PostgreSQL、およびMySQLノードです。
ノード出力パラメーターの設定
[ノード出力パラメーター] の値は、[定数] または [変数] にすることができます。
出力パラメーターを定義し、現在のノードを送信した後、子孫ノードのスケジュールパラメーターを設定する際に、[先祖ノードの出力パラメーターをバインド] して入力パラメーターとして使用できます。

パラメーター名:定義された出力パラメーターの名前。
パラメーター値:出力パラメーターの値。値のタイプには、定数と変数が含まれます。
定数は固定の文字列です。
変数には、システムがサポートするグローバル変数、組み込みのスケジュールパラメーター、およびカスタムパラメーターが含まれます。
ノードのリンクされたロールの設定
DataWorks のリンクされたロールを使用すると、特定のタスクノードに事前設定された RAM ロールを割り当てることができます。タスクが実行されると、Alibaba Cloud セキュリティトークンサービス (STS) を通じてロールの一時的なアクセス認証情報を動的に取得します。これにより、コードに永続的な AccessKey (AK) をプレーンテキストで含めることなく、他のクラウドリソースにアクセスできます。
リソースグループの制限:Serverless リソースグループで実行されるノードのみがサポートされます。
ノードタイプの制限:Python、Shell、Notebook、PyODPS 2、および PyODPS 3 ノードのみがサポートされます。
1. DataWorks ノードでリンクされたロールを設定する
ノード編集ページの右側で、Run Configuration を見つけてクリックします。
スケジューリング設定パネルで、[リンクされたロール] タブに切り替えます。
[RAM ロール] ドロップダウンリストから、準備した RAM ロールを選択します。
重要ドロップダウンリストが空であるか、必要なロールが見つからない場合は、「STS を使用して他のクラウドサービスにアクセスするためのリンクされたロールの設定」を参照して、RAM ロールの設定を完了してください。
設定が完了したら、ノードを送信します。この設定はデバッグ実行にのみ有効です。
2. コードで一時的な認証情報を取得して使用する
リンクされたロールを設定すると、タスクの実行時に DataWorks は取得した一時的な認証情報を実行時環境に注入します。コードでは、次の 2 つの方法で取得できます。
方法 1:環境変数を読み取る (Shell および Python に推奨)
システムは自動的に次の 3 つの環境変数を設定します。コードで直接読み取ることができます。
LINKED_ROLE_ACCESS_KEY_ID:一時的な AccessKey ID。LINKED_ROLE_ACCESS_KEY_SECRET:一時的な AccessKey Secret。LINKED_ROLE_SECURITY_TOKEN:一時的なセキュリティトークン。
コードサンプル (Python):
この場合、実行時環境として oss2 がインストールされたカスタム Python イメージを選択する必要があります。詳細については、「カスタムイメージ」をご参照ください。
import os
import oss2
# 1. 環境変数から一時的な認証情報を取得します。
access_key_id = os.environ.get('LINKED_ROLE_ACCESS_KEY_ID')
access_key_secret = os.environ.get('LINKED_ROLE_ACCESS_KEY_SECRET')
security_token = os.environ.get('LINKED_ROLE_SECURITY_TOKEN')
# 認証情報が取得されたか確認します。
if not all([access_key_id, access_key_secret, security_token]):
raise Exception("環境変数からリンクされたロールの認証情報を取得できませんでした。")
# 2. 一時的な認証情報を使用して OSS クライアントを初期化します。
# ロールに 'your-bucket-name' へのアクセス権限が付与されていると仮定します。
auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
bucket = oss2.Bucket(auth, 'http://oss-<regionID>-internal.aliyuncs.com', 'your-bucket-name')
# 3. クライアントを使用して OSS リソースにアクセスします。
try:
# バケット内のオブジェクトをリストアップします。
for obj in oss2.ObjectIterator(bucket):
print('object name: ' + obj.key)
print("リンクされたロールで OSS に正常にアクセスしました。")
except oss2.exceptions.OssError as e:
print(f"OSS へのアクセス中にエラーが発生しました: {e}")コードサンプル (Shell):
#!/bin/bash
access_key_id=${LINKED_ROLE_ACCESS_KEY_ID}
access_key_secret=${LINKED_ROLE_ACCESS_KEY_SECRET}
security_token=${LINKED_ROLE_SECURITY_TOKEN}
# OSS にアクセスするには、regionID、bucket_name、および file_name を実際の情報に置き換えてください。
echo "ID: "$access_key_id
echo "token: "$security_token
ls -al /home/admin/usertools/tools/
# この例では、ossutil を使用して指定された OSS パスからファイルをローカルの test_dw.py ファイルにダウンロードし、ファイルの内容を出力する方法を示します。
/home/admin/usertools/tools/ossutil64 cp --access-key-id $access_key_id --access-key-secret $access_key_secret --sts-token $security_token --endpoint http://oss-<regionID>-internal.aliyuncs.com oss://<bucket_name>/<file_name> test_dw.py
echo "************************ 成功 ************************, 結果を出力します"
cat test_dw.py方法 2:Credentials Client を使用する (Python に推奨)
コードサンプル (Python):
この場合、実行時環境として oss2 と alibabacloud_credentials がインストールされたカスタム Python イメージを選択する必要があります。詳細については、「カスタムイメージ」をご参照ください。
from alibabacloud_credentials.client import Client as CredentialClient
import oss2
# 1. SDK を使用して自動的に認証情報を取得します。
# 環境変数内の LINKED_ROLE_* などの認証情報を自動的に検索します。
cred_client = CredentialClient()
credential = cred_client.get_credential()
access_key_id = credential.get_access_key_id()
access_key_secret = credential.get_access_key_secret()
security_token = credential.get_security_token()
if not all([access_key_id, access_key_secret, security_token]):
raise Exception("SDK 経由でリンクされたロールの認証情報を取得できませんでした。")
# 2. 認証情報を使用して OSS クライアントを初期化します。
auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'your-bucket-name')
# 3. OSS にアクセスします。
print("バケット内のオブジェクトをリストアップしています...")
for obj in oss2.ObjectIterator(bucket):
print(' - ' + obj.key)
print("SDK 経由でリンクされたロールで OSS に正常にアクセスしました。")3. タスクを実行して検証する
Shell および Python:タスクが実行されると、指定された RAM ロールを使用して他のクラウドサービスにアクセスします。
PyODPS:OSS などの他のクラウドサービスにアクセスする場合、タスクは設定した RAM ロールの ID を使用します。ただし、MaxCompute データにアクセスする場合は、プロジェクトレベルでコンピューティングリソースに設定されたアクセス ID を自動的に使用します。
スケジューリングプロパティの設定
ノードのデバッグが完了したら、[実行設定] の Run Configuration を [スケジューリング設定] の 設定に同期する必要があります。公開後、タスクはこのロールとして実行されます。
Run Configuration でカスタムイメージを設定した場合、この設定もスケジューリング設定に同期する必要があります。
オペレーションセンターで実行ロールを表示する
タスクが実行された後、[オペレーションセンター] でタスクインスタンスの詳細を表示して、指定されたロールが使用されたことを確認します。
「」に移動します。
実行したノードのインスタンスを見つけてクリックし、詳細ページに移動します。
インスタンス詳細ページの[プロパティ]セクションで、[実行アイデンティティ]フィールドを表示します。このフィールドには、この実行に実際に使用されたリンクされたロールの Alibaba Cloud リソース名 (ARN) が表示されます。
ARN は一意のリソース識別子です。詳細については、「ポリシーの基本要素」をご参照ください。
参考
スケジュールパラメーター:スケジュールパラメーターのフォーマットリファレンス。
スケジュールポリシー:
スケジュール時間:スケジュール時間リファレンス。
スケジュール依存関係:
ノード出力パラメーター:ノードコンテキストパラメーターの設定と使用。
その他のリファレンス:夏時間の変更が定期タスクの実行に与える影響。


をクリックして、