DataWorks は、定期スケジュールとトリガースケジュールの 2 種類のワークフロースケジューリングをサポートしています。固定時刻に実行される定期ワークフローとは異なり、トリガーワークフローは、手動操作、API 呼び出し、イベントメッセージなどの外部シグナルによって開始されるオンデマンド実行モデルです。このモデルは、データ処理に対してより高いリアルタイム性と柔軟性を提供します。
機能紹介
DataWorks は、定期スケジュールとトリガースケジュールの 2 つのワークフロースケジューリングモードをサポートしています。
トリガースケジュールは、オンデマンドのスケジューリングモードです。固定時刻に自動的に実行される定期ワークフローとは異なり、トリガーワークフローは外部シグナルによって開始されます。このモードは高い柔軟性を持ち、プログラムによる統合や外部イベントへの応答が必要なシナリオに適しています。
以下の 3 つのトリガーメソッドがサポートされています:
手動トリガー:DataWorks コンソールでワークフローを手動で実行できます。
OpenAPI トリガー:外部システムは、OpenAPI を呼び出すことでワークフローの実行をトリガーできます。
イベントトリガー:事前に設定されたトリガーが、OSS ファイルのアップロードや Kafka などのメッセージキューへのメッセージ到着といった特定のイベントをリッスンし、ワークフローを自動的に開始します。イベントトリガーは、ワークフローが本番環境に公開された後にのみ有効になります。
トリガーワークフロー内の PyODPS ノードや Shell ノードなどの内部ノードの設定は、定期ワークフローと同じですが、スケジュール期間を設定する必要はありません。
割り当てと制限
ノード数:単一のワークフローは最大 400 の内部ノードをサポートします。ワークフローの表示とメンテナンスを簡素化するため、ノード数を 100 未満に保つことを推奨します。
同時実行インスタンス:最大並列インスタンス数の最大値は 100,000 です。
ワークフローがイベントによって自動的にトリガーされるのは、本番環境 (オペレーションセンター) に公開された後のみです。
設定の制限:ノードレベルのスケジューリングでは、優先度のみ設定可能で、優先度重み付けポリシーは設定できません。
機能へのアクセス
DataWorks コンソールの Workspaces ページに移動します。上部のナビゲーションバーで、目的のリージョンを選択します。目的のワークスペースを見つけ、[操作] 列の を選択します。
左側のナビゲーションウィンドウで
をクリックします。次に、[プロジェクトフォルダ] の右側にある をクリックして、[新規ワークフロー] ページを開きます。初めて [プロジェクトフォルダ] を使用する場合、[新規ワークフロー] ボタンをクリックしてワークフローを作成できます。
トリガーワークフローの作成
[新規ワークフロー] ページで、[スケジュールタイプ] を [トリガースケジュール] に設定します。
ワークフローの [名前] を入力し、[確認] をクリックします。
(任意) キャンバスの右側で、[スケジュール設定] をクリックします。[スケジュールポリシー] をクリックし、ドロップダウンリストからトリガーを選択して現在のワークフローに関連付けます。
トリガーワークフローの設計
ノードのオーケストレーション
ワークフローキャンバスの左側のペインで、タスクに応じて必要なノードタイプを選択します。ノードをキャンバスにドラッグし、ノードを手動で接続して依存関係を設定します。
ノードの設定は定期ワークフローと似ていますが、スケジュール期間を設定する必要はありません。
ワークフローのスケジュールパラメーターの設定
ワークフローキャンバスの右側で、スケジュール設定ボタンをクリックします。[スケジュールパラメーター] インターフェイスで、[パラメーターの追加] をクリックして、ワークフローのスケジュールパラメーターを設定します。これらのパラメーターは現在のワークフローにスコープ指定され、そのすべての内部ノードから参照できます。
説明ワークフロー内の内部ノードに、ワークフローのスケジュールパラメーターと同じ名前のスケジュールパラメーターが設定されている場合、ノードレベルのパラメーターが優先されます。
トリガーの関連付け (任意)
イベントでワークフローを自動的にトリガーするには、まずオペレーションセンターでトリガーを設定します。次に、ワークフローキャンバスの右側で、[スケジュール設定] > [スケジュールポリシー] に移動し、作成したトリガーを選択してワークフローに関連付けます。
優先度と同時実行数の設定 (詳細設定)
複数のワークフローやタスクが同時にトリガーされ、システムリソースのボトルネックが発生した場合、[優先度] と [重み付けポリシー] を使用してインテリジェントなリソーススケジューリングを行うことができます。これにより、最も重要なタスクが最初に実行されるようになります。
コア業務の継続性の確保:コア業務のワークフローに高い優先度を設定することで、他の非コアワークフローよりも常に先に実行されるようにできます。
クリティカルプロセスの期間短縮:単一のワークフローインスタンス内で、[優先度重み付けポリシー] を使用してノードの実行順序に影響を与えることができます。たとえば、[下方重み付け] ポリシーでは、クリティカルパス上にあり、より多くのアップストリーム依存関係を持つノードに、より高い動的な重みが割り当てられます。これにより、それらのノードが最初に実行され、ワークフロー全体の実行時間が短縮されます。
設定項目
機能説明
優先度
スケジューリングキューにおけるワークフローインスタンスの絶対的な優先度レベルを定義します。利用可能なレベルは 1、3、5、7、8 です。数値が大きいほど優先度が高くなります。優先度の高いタスクやワークフローは、常に優先度の低いものより先にスケジューリングリソースを取得します。
優先度重み付けポリシー
同じ優先度レベル内で、内部ノード (タスク) の重みがどのように動的に計算されるかを定義します。重みが大きいノードが先に実行機会を得ます。
重み付けなし:すべてのノードが固定のベースライン重みを持ちます。
下方重み付け:ノードの重みが動的に調整されます。ノードが持つ上流の依存関係が多いほど、その重みは高くなります。このポリシーは、有向非巡回グラフ (DAG) のクリティカルパス上のノードが先に実行されるのに役立ちます。重みは次のように計算されます:
初期重み値 + すべての上流ノードの優先度の合計。
最大並列インスタンス数
このワークフローが同時に実行できるインスタンスの最大数を制御します。これは同時実行制御とリソース保護のために使用されます。実行中のインスタンス数が制限に達すると、後続のトリガーされたインスタンスは待機状態になります。これを [無制限] に設定するか、最大 100,000 までのカスタム値を指定できます。
説明設定された制限がリソースグループの最大容量を超える場合、実際の同時実行のボトルネックはリソースグループの物理的な制限によって決まります。
DataWorks の優先度システムは、
実行時の指定>ノードレベルの設定>ワークフローレベルの設定という階層的なオーバーライドルールに従います。ワークフローレベルの設定 (ベースライン):ワークフローの [スケジュールポリシー] で設定され、すべてのノードのデフォルト設定として機能します。
ノードレベルの設定 (ローカル):ワークフロー内の個々のノードの [スケジュール設定] > [スケジュールポリシー] ページで、そのノードに対してより高い [優先度] を設定できます。この設定は、ワークフローレベルの設定をオーバーライドします。
[オペレーションセンター] で手動で実行をトリガーする場合、[実行時に優先度をリセット] スイッチを使用して一時的な設定を指定できます。この設定は最も高い優先順位を持ち、現在の実行にのみ適用され、永続的な設定は変更しません。
トリガーワークフローの開発
ノードの開発
ノード編集ページで、ノードコードを編集できます。コード開発中は以下の点にご注意ください:
コードの構文は、選択したノードタイプによって異なります。タスクタイプが異なれば設定も異なります。設定の詳細については、「ノード開発」をご参照ください。
Copilot インテリジェントプログラミングアシスタントを有効にすると、インテリジェントなコード補完の提案を得て、開発効率を向上させることができます。
ほとんどのノードタイプでは、
${variable_name}形式を使用して変数を定義できます。これにより、次のステップで異なる値を代入することで、タスクコードを迅速にデバッグできます。スケジューリングタスクを実行する際、
${workflow.Parameter_name}形式を使用して、内部ノードでワークフローパラメーターの値を取得できます。
トリガーから渡されたパラメーターの使用 (任意) トリガーは、ファイルパスやメッセージ内容などのイベント情報をイベントパラメーターとして渡し、ワークフロー内の内部ノードが使用できるようにします。
Kafka トリガーを例に説明します。Kafka メッセージは以下の通りです:
メッセージ本文の形式は、トリガー詳細の [メッセージ形式の例] セクションで提供されます。
{ "headers": { "headers": [], "isReadOnly": false }, "partition": 2, "offset": 1, "topic": "demo-topic", "key": "demo-key", "value": "{\"number\":100,\"name\":\"EventBridge\"}", "timestamp": 1713852706576 }ワークフローの内部ノードで、${workflow.triggerMessage} パラメーターを使用して完全なメッセージ本文を取得します。また、${workflow.triggerMessage.xxx} を使用してメッセージ本文の特定のフィールドの値を取得することもできます。トリガーがタスクを開始すると、パラメーターは自動的に置き換えられます。例:
${workflow.triggerMessage} #メッセージ本文全体を取得 ${workflow.triggerMessage.key} #JSON から key フィールドの値を取得。結果:demo-key ${workflow.triggerMessage.value} #JSON から value フィールドの値を取得。結果:{"number":100,"name":"EventBridge"}
トリガーワークフローのデバッグ
ノードのデバッグ
コードを編集した後、ノード編集ページの右側にある [デバッグ設定] をクリックして、リソースグループやスクリプトパラメーターなどのデバッグパラメーターを設定します。設定が完了したら、ツールバーの [実行] ボタンをクリックします。ノードは [デバッグ設定] で指定したパラメーターを使用して実行されます。
ワークフローのデバッグ
Data Studio でトリガーワークフローをデバッグするには、ワークフローキャンバスの上にあるツールバーの
実行アイコンをクリックします。表示されるダイアログボックスで、[トリガーメッセージ本文] を入力してイベントをシミュレートします。
トリガーワークフローの公開
ワークフローをデバッグした後、ツールバーの
公開ボタンをクリックして公開パネルを開きます。[本番環境への公開を開始] をクリックします。タスクはリリースチェックプロセスに従って公開されます。詳細については、「ノードまたはワークフローの公開」をご参照ください。
トリガーワークフローの実行
ワークフローがオペレーションセンターに公開されると、スタンバイ状態になり、シグナルを待ちます。トリガーワークフローは、以下のいずれかの方法で実行できます。
イベントトリガー
新規作成したトリガーのイベントタイプに基づき、トリガーの監視対象で関連する操作を実行します。たとえば、OSS トリガーの場合、トリガーで設定した OSS バケットにファイルをアップロードできます。メッセージキュートリガーの場合、トリガーで設定した Topic またはキューにメッセージを送信できます。
トリガーがイベントメッセージを受信すると、オペレーションセンターでトリガーワークフローをワンタイムタスクとして開始します。
に移動し、実行されたトリガーワークフローインスタンスを表示および管理します。インスタンスログを確認して、ワークフローが正常にトリガーされ、実行されたことを確認できます。
手動トリガー
に移動します。実行したいトリガーワークフローを見つけ、[実行] をクリックします。その後、実行範囲、データタイムスタンプ、トリガーメッセージ本文などの実行時パラメーターを設定できます。
その他の操作
トリガーワークフローのクローン
クローン機能を使用して、既存のワークフローから新しいワークフローを迅速に作成できます。クローン操作は、ワークフローの内部ノード ([コード]、[デバッグ設定]、[スケジュール設定] を含む)、ノード間の依存関係、およびワークフローの [スケジュール設定] をコピーします。
左側の [プロジェクトフォルダ] で、クローンしたいトリガーワークフローを右クリックします。
ポップアップメニューで [クローン] を選択します。クローンウィンドウが開きます。
ウィンドウで、任意でトリガーワークフローの [名前] とストレージの [パス] を変更できます。[確認] をクリックしてクローンを開始します。
クローン中は、ポップアップウィンドウで [現在の進捗]、[期間]、[完了したノード数] を監視できます。
クローンが完了したら、[プロジェクトフォルダ] で生成されたトリガーワークフローを表示できます。
トリガーワークフローに新しいノードを追加するには、既存のノードをクローンして迅速にノードを作成するか、ドラッグアンドドロップで内部ノードを作成できます。
トリガーワークフローのバージョン管理
システムでは、バージョン管理機能を使用して、トリガーワークフローを指定の履歴バージョンに復元できます。この機能は、バージョンの表示と比較機能も提供し、差異の分析と調整を支援します。
左側の [プロジェクトフォルダ] で、トリガーワークフローをダブルクリックしてワークフローキャンバスを開きます。
ワークフローキャンバスの右側にある [バージョン] をクリックします。[バージョン] ページで、[開発レコード] と [公開レコード] を表示および管理できます。
表示バージョン:
[開発レコード] または [公開レコード] タブで、目的のトリガーワークフローのバージョンを見つけます。
[操作] 列で [表示] をクリックします。詳細ページで、トリガーワークフローの[コード]と[スケジュール設定]を表示できます。
説明[スケジュール設定] の情報は [コードエディター] または可視化モードで表示でき、[スケジュール設定] タブの右上隅で切り替えることができます。
バージョンの比較:
[開発レコード] または [公開レコード] タブで、トリガーワークフローのバージョンを比較できます。以下の例では、[開発レコード] タブでの操作を示します。
開発環境または公開環境でのバージョンの比較:[開発レコード] タブで、2 つのバージョンを選択し、上部にある [選択して比較] ボタンをクリックします。その後、2 つのトリガーワークフローバージョン間のコードとスケジュール設定を比較できます。
開発環境と公開環境またはビルド環境間のバージョンの比較:
[開発記録] タブで、トリガーされたワークフローの特定のバージョンを探します。
[操作] 列の [比較] ボタンをクリックします。[比較するコンテンツを選択してください] ウィンドウで、[公開レコード] または [ビルドレコード] から比較するバージョンを選択します。
以前のバージョンへの復元:
トリガーワークフローは、[開発レコード] 内の特定の履歴バージョンにのみ復元できます。[開発レコード] タブで、ターゲットバージョンを見つけ、[操作] 列の [復元] ボタンをクリックします。この操作により、トリガーワークフローはターゲットバージョンに復元されます。
説明ワークフローを復元すると、システムはターゲットバージョンを復元し、新しいバージョンレコードを作成します。
関連操作
トリガーワークフローが公開された後、オペレーションセンターで O&M (運用保守) 操作を実行できます。詳細については、「ワンタイムタスク O&M」をご参照ください。