定期実行ワークフロー(例:毎日午前 1 時に実行)とは異なり、トリガーワークフローはオンデマンドで実行されるイベント駆動型のデータ処理モデルです。その実行は、ファイルのアップロード、メッセージの到着、API 呼び出し、または手動クリックといった外部信号によってリアルタイムで開始され、データ処理において高いリアルタイム性能と柔軟性を提供します。
|
機能 |
定期実行ワークフロー |
トリガーワークフロー |
|
トリガー機構 |
固定スケジュール(Crontab 式) |
外部信号(イベント、API、手動) |
|
実行モデル |
スケジュールに基づき、予測可能な実行 |
リアクティブかつオンデマンド |
|
適用範囲 |
T+1 バッチデータウェアハウス構築、定期レポート作成 |
ファイル到着時の処理、業務システムとの統合、手動によるデータパッチ適用 |
|
主な利点 |
信頼性および定期実行の保証 |
リアルタイム応答性および柔軟性 |
サポートされるトリガーメソッド
トリガーワークフローでは、3 種類のトリガーメソッドがサポートされています。ご使用のビジネスシナリオに最も適したものを選択してください。
|
トリガーメソッド |
発信元 |
主な利用シーン |
キーポイント |
|
バージョン |
外部イベントソース(OSS や Kafka など) |
イベント駆動型 ETL:ファイル到着時に処理する、またはメッセージに基づいてリアルタイム計算を開始する。 |
まずトリガーを作成し、それをワークフローに関連付ける必要があります。本番環境でのみ有効です。 |
|
バージョン |
ユーザー(開発者または O&M エンジニア) |
アドホックタスク:ワンタイムのデータ処理または分析。 |
開発環境および本番環境の両方で手動実行できます。ワンタイムタスク の代替として推奨されます。 |
|
API トリガー |
外部システム(OpenAPI 経由) |
システム統合:CRM や ERP などの業務システムからのコールバックにより、データ処理を開始する。 |
必要な権限を持つ OpenAPI を呼び出す必要があります。 |
クイックスタート:手動トリガーワークフローの作成
このセクションでは、シンプルなトリガーワークフローを作成・手動実行する手順を説明し、全体の流れを確認します。
ステップ 1:トリガーワークフローの作成
DataWorks コンソールの ワークスペース ページに移動します。上部ナビゲーションバーから対象のリージョンを選択し、目的のワークスペースを見つけ、操作 列の を選択します。
-
左側ナビゲーションバーで
をクリックし、プロジェクトフォルダー の右側にある をクリックして、新しいワークフロー ページに移動します。 -
表示されたダイアログボックスで、ワークフローの作成 ページにて、スケジューリングタイプ を トリガー方式スケジューリング に設定します。名前 を入力し、確認 をクリックします。
ステップ 2:ノードのオーケストレーションと開発
-
ツールバーの + ノードを追加 をクリックしてノード一覧を開き、ノードタイプ一覧から Shell ノードをキャンバスにドラッグ&ドロップして名前を入力します。
-
Shell ノードをダブルクリックしてコードエディタを開き、以下のコードを入力します:
echo "Hello, Trigger Workflow! Current time is ${bizdate}" -
ツールバーの 保存 をクリックします。
ステップ 3:デバッグおよび実行(開発環境)
-
ワークフローキャンバスに戻り、上部ツールバーの
アイコンをクリックします。 -
表示されたダイアログボックスで、ワークフローの 今回の実行値 を入力します(例:現在日付が 20260310 の場合、
bizdateの値は20260309となります)。 -
すぐに下部の実行時ログに、ノードの実行状況および
echoコマンドの出力が表示されます。
ステップ 4:公開および実行(本番環境)
-
ワークフローキャンバスで 公開
ボタンをクリックし、プロンプトに従って公開します。 -
ワークフローが公開された後、オペレーションセンター > ワンタイムタスク O&M > ワンタイムタスク > トリガーワークフロー に移動します。
-
公開済みのワークフローを見つけ、実行 を 操作 列からクリックします。
-
表示されたウィンドウで、再度 実行 をクリックして、本番環境でワークフローのインスタンスを起動します。この実行の詳細は ワンタイムインスタンス ページで確認できます。
これで、トリガーワークフローの基本的な使い方が理解できました。次に、より高度なイベントトリガー機能について解説します。
高度なユースケース:イベントトリガー型ワークフロー
シナリオ 1:新規 OSS ファイルの処理
目的:Object Storage Service (OSS) の指定ディレクトリに新しい CSV ファイルがアップロードされた際に、そのファイルパスを出力するワークフローを自動起動します。
ステップ 1:OSS トリガーの作成
-
オペレーションセンター > スケジューリング設定 > トリガーマネジメント に移動します。
-
新規トリガー をクリックし、以下のように設定します:
説明各パラメーターの詳細については、「OSS トリガー」をご参照ください。
-
トリガー名:任意の名前(例:
oss_new_file_trigger)を入力します。 -
適用ワークスペース:ワークフローが配置されている対象のワークスペースを選択します。
-
トリガーイベントタイプ:
Object Storage Service (OSS)を選択します。 -
トリガーイベント:
oss:ObjectCreated:PutObject(またはその他のアップロードイベント)を選択します。 -
バケット名:ご利用の OSS バケットを選択します。
-
ファイル名:監視対象のファイルパスおよび形式を指定します。ワイルドカードが使用可能です。たとえば、
input/ディレクトリ内のすべての.csvファイルを監視する場合は、input/*.csvを入力します。 -
ロール設定:初回利用時は ワンクリック承認 を実行し、生成されたロール
DataWorks-EventBridge-OSS-MNS-Role-*************を選択します。*************は、一意性を保証するためのランダム生成された 13 桁の ID 番号を表します。
-
-
確認 をクリックしてトリガーを作成します。
ステップ 2:ワークフローの作成および関連付け
-
クイックスタート:手動トリガーワークフローの作成 の手順に従い、トリガーワークフロー を作成し、名前を
process_oss_file_workflowとします。 -
ワークフローキャンバスの右側パネルで、スケジュール設定 > スケジューリングポリシー を選択します。
-
トリガー のドロップダウンリストから、先ほど作成した
oss_new_file_triggerを選択します。
ステップ 3:イベントパラメーターの解析
-
ツールバーの + ノードを追加 をクリックしてノード一覧を開き、ノードタイプ一覧から Shell ノードをキャンバスにドラッグ&ドロップして名前を入力します。
-
ノードをダブルクリックし、トリガーイベントからファイルパスを取得・出力するコードを記述します。
# トリガーがワークフローを起動すると、イベント情報は組み込み変数 workflow.triggerMessage を通じて渡されます。 # アップロードされたファイルの完全なパスは、${workflow.triggerMessage.data.oss.object.key} を使用して取得できます。 echo "========= OSS ファイル処理を開始 =========" message="${workflow.triggerMessage}" echo "生の値: ${message}" # イベントメッセージからファイル名を抽出 FILE_PATH="${workflow.triggerMessage.data.oss.object.key}" echo "新しいファイルが到着しました: ${FILE_PATH}" # ここに具体的な処理ロジックを追加してください echo "========= OSS ファイル処理を完了 ========="説明${workflow.triggerMessage}:JSON 形式で完全なイベントメッセージ本文を取得します。OSS のメッセージ形式の詳細については、EventBridge の イベントバス >
DATAWORKS_TRIGGER_FOR_BUCKET_<OSS_Bucket_Name>> イベントトレーシング > イベント詳細 をご参照ください。
ステップ 4:デバッグおよび公開
-
デバッグ:
-
ワークフローキャンバスに戻り、実行
ボタンをクリックします。 -
トリガーメッセージ本文 入力欄に、サンプルの OSS イベント(JSON 形式)を貼り付けます。トリガー設定ページからメッセージ形式の例をコピーし、
keyの値を修正してください。以下は簡易な例です。{ "data": { "oss":{ "object": { "key": "input/test_file_20260310.csv" } } } } -
実行 をクリックし、ログに
input/test_file_20260310.csvが正しく出力されるか確認します。
-
-
公開:デバッグが成功したら、公開 ボタンをクリックして、ワークフローを本番環境に公開します。イベントトリガーは本番環境でのみ有効です。
ステップ 5:本番環境での検証
-
OSS コンソールまたはクライアントを使用して、トリガーで設定したバケットおよびパス(例:
input/ディレクトリ)に CSV ファイルをアップロードします。 -
DataWorks の オペレーションセンター > ワンタイムタスク運用管理 > ワンタイムタスク > トリガーワークフロー に移動します。正常に公開された
process_oss_file_workflowが表示されます。
-
しばらく待った後、DataWorks の オペレーションセンター > ワンタイムタスク O&M > トリガーワークフローインスタンス に移動します。新しいワークフローインスタンスが自動的に起動されます。これをクリックしてログを確認し、ファイルパスが正しく処理されたことを確認します。
ベストプラクティス:冪等性設計
ネットワークの不安定などにより、OSS イベントが重複して配信される可能性があります。重複したデータ処理を回避するため、ビジネスロジックに冪等性を実装することを推奨します。一般的な解決策として、ファイル処理前に MaxCompute テーブルなどのレコードテーブルをチェックし、ファイルの ETag や一意のパスを識別子として使用します。すでに処理済みの場合は、スキップします。
シナリオ 2:Kafka メッセージの処理
目的:Kafka トピックをユーザー行動ログ向けにモニターし、新しいメッセージが到着した際に、その内容に応じて異なるロジックを実行するワークフローを起動します。
ステップ 1:Kafka トリガーの作成
-
オペレーションセンター > スケジューリング設定 > トリガーマネジメント に移動し、新規トリガー をクリックします。
-
以下のパラメーターを設定します:
-
トリガー名:
kafka_user_action_trigger。 -
トリガーイベントタイプ:Message Queue for Apache Kafka を選択します。
-
Kafka インスタンス および トピック:モニター対象のインスタンスおよびトピックを選択します。
-
ConsumerGroupId:クイック作成 を選択して、システムが自動的にコンシューマーグループ ID を生成し、他のアプリケーションとの競合を回避します。
-
キー(任意):メッセージキーを指定できます。完全一致するキーを持つメッセージのみがワークフローをトリガーします。
-
-
確認 をクリックします。
ステップ 2:ワークフローの作成および関連付け
-
クイックスタート:手動トリガーワークフローの作成 の手順に従い、新しい トリガーワークフロー を作成し、名前を
handle_user_action_workflowとします。 -
ワークフローキャンバスの右側パネルで、スケジュール設定 > スケジューリングポリシー を選択します。
-
トリガー のドロップダウンリストから、先ほど作成した
kafka_user_action_triggerを選択します。
-
重要: メッセージは高頻度で到着する可能性があるため、急激なメッセージ流入によるスケジューリングリソースの過負荷を防ぐため、内部タスクの最大並列インスタンス数 を
100のような値に設定することを推奨します。
ステップ 3:ネストされた JSON の解析
Kafka メッセージの value フィールドが、以下の形式の JSON 文字列であると仮定します:{"user_id": "1001", "action_type": "login", "timestamp": 1688888888}。
-
ツールバーの + ノードを追加 をクリックしてノード一覧を開き、ノードタイプ一覧から Python ノードをキャンバスにドラッグ&ドロップします。
-
メッセージを解析するコードを記述します。
value自体が文字列であるため、コード内で 2 回目の JSON 解析を行う必要があります。import json # 1. 組み込み変数を使用して、Kafka メッセージの 'value' フィールド(JSON 文字列)を取得します。 message_value_str = '${workflow.triggerMessage.value}' print(f'受信した生のメッセージ value 文字列: {message_value_str}') try: # 2. Python コード内で、この文字列を JSON オブジェクト(辞書)に解析します。 message_data = json.loads(message_value_str) user_id = message_data.get("user_id") action_type = message_data.get("action_type") print(f"メッセージの解析に成功しました。ユーザー ID: {user_id}, アクション: {action_type}") # 3. action_type に応じて、異なるビジネスロジックを実行できます。 if action_type == 'login': # o.run_sql(f"INSERT OVERWRITE TABLE user_login_record PARTITION(ds='{bizdate}') VALUES ('{user_id}');") print("ログインアクションを処理中...") elif action_type == 'purchase': print("購入アクションを処理中...") else: print("不明なアクションタイプです。") except json.JSONDecodeError as e: print(f"JSON 解析エラー: {e}") # エラー処理ロジック(専用ログテーブルへのエラーメッセージ書き込みなど)を追加してください。 raise e # トラブルシューティングを容易にするため、ノードを失敗状態としてマークするために例外を再送出します。
ステップ 4:デバッグおよび公開
-
デバッグ:
-
ワークフローキャンバスに戻り、実行
ボタンをクリックします。 -
トリガーメッセージ本文 に、シミュレートされた Kafka イベントを貼り付けます。
valueフィールドはエスケープされた JSON 文字列であることに注意してください。{ "topic": "user-behavior-topic", "key": "some-key", "value": "{\"user_id\": \"1001\", \"action_type\": \"login\", \"timestamp\": 1688888888}" } -
実行し、ログを確認して、Python ノードが
user_idおよびaction_typeを正しく解析しているか確認します。
-
-
公開:デバッグが成功したら、ワークフローを本番環境に公開します。
ステップ 5:本番環境での検証
-
正しい形式のメッセージを、設定済みの Kafka トピックに送信します。

-
DataWorks の オペレーションセンター > ワンタイムタスク O&M > ワンタイムタスク > トリガーワークフロー に移動します。正常に公開された
handle_user_action_workflowが表示されます。
-
オペレーションセンター > ワンタイムタスク O&M > ワンタイムインスタンス > トリガーワークフローインスタンス で、新しいワークフローインスタンスが起動されたことを確認し、実行ログを確認します。

ベストプラクティス:同時実行制御および順序保証
-
同時実行制御:メッセージの急増に対応するため、常に適切な最大並列インスタンス数を設定してください。
-
順序保証:DataWorks のスケジューリングは、厳密なメッセージ処理順序を保証しません。同一ユーザー(またはパーティション)に対するメッセージを順序通りに処理する必要がある場合は、ビジネスコード内に分散ロック(Redis や MaxCompute を使用)を実装するか、パーティション単位で順序消費を保証する Flink などのコンピューティングエンジンに処理ロジックを委譲する必要があります。
コア設計および構成
ワークフローのオーケストレーション
トリガーワークフローのオーケストレーションのコアプロセスは、定期実行ワークフローと同様です。詳細については、「ノードおよびワークフローのオーケストレーション」をご参照ください。
スケジューリングパラメーター
ワークフローキャンバス右側の スケジュール設定 パネルで、ワークフロー 全体のグローバルパラメーターを設定できます。ワークフロー内のすべてのノードでこれらのパラメーターを参照できます。
-
参照構文:ノードのコード内で、ワークフローパラメーターは
${workflow.parameter_name}の形式で参照します。 -
パラメーター優先順位:DataWorks のパラメーターには階層的なオーバーライド関係があります。優先順位は次のとおりです:ノードパラメーター > ワークフローパラメーター。
パラメーターの詳細については、「パラメーター設計およびフロー」をご参照ください。
スケジューリングポリシー
複数のワークフローまたはタスクが同時にトリガーされ、システムリソースのボトルネックが発生した場合、優先度 および 優先度重み付けポリシー を使用して、インテリジェントなリソーススケジューリングを実現できます。これにより、最も重要なタスクが優先的に実行されます。
-
コア業務の継続性確保:コア業務ワークフローに高い優先度を設定することで、他の非コアワークフローよりも常に先に実行されるようにします。
-
クリティカルパスの所要時間短縮:単一のワークフローインスタンス内で、優先度重み付けポリシー を使用してノードの実行順序に影響を与えることができます。たとえば、下向き重み付け ポリシーでは、クリティカルパス上の依存関係の多いノードに動的に高い重みが割り当てられます。これにより、それらのノードの実行が優先され、ワークフロー全体の実行時間を短縮できます。
バージョン
バージョン
優先度
スケジューリングキュー内におけるワークフローインスタンスの絶対的優先度レベルを定義します。利用可能なレベルは 1、3、5、7、8 であり、数値が大きいほど優先度が高くなります。高優先度のタスクまたはワークフローは、低優先度のものよりも常に先にスケジューリングリソースを獲得します。
優先度重み付けポリシー
同一優先度レベル内で、内部ノード(タスク)の重みをどのように動的に算出するかを定義します。重みが高いノードが実行を優先されます。
-
重み付けなし:すべてのノードが固定のベースライン重みを持ちます。
-
下向き重み付け:ノードの重みは動的に調整されます。ノードが持つ上流依存関係が多いほど、その重みは高くなります。この戦略は、DAG(有向非巡回グラフ)におけるクリティカルパス上のノードの実行を優先するのに役立ちます。重みは以下の式で算出されます:
初期重み + すべての上流ノードの優先度の合計。
最大並列インスタンス数
このワークフローが同時に実行できるインスタンスの最大数を制御します。これは同時実行制御およびリソース保護のために使用されます。実行中のインスタンス数が上限に達すると、新たにトリガーされたインスタンスは待機状態になります。この値を 無制限 に設定するか、または最大 100,000 までのカスタム上限値を指定できます。
説明指定された上限がリソースグループの最大容量を超えた場合、リソースグループの物理的な制限が実際の同時実行ボトルネックとなります。
-
DataWorks の優先度システムは、階層的なオーバーライドルールに従います:ランタイム仕様 > ノードレベル設定 > ワークフローレベル設定。
-
ワークフローレベル設定(ベースライン):ワークフローの スケジューリングポリシー で設定され、すべてのノードのデフォルト設定として機能します。
-
ノードレベル設定(ローカル):ワークフロー内の個々のノードの スケジュール設定 > スケジューリングポリシー で、特定のノードに対してより高い 優先度 を設定できます。これはワークフローレベル設定をオーバーライドします。
-
ランタイム仕様(一時的): オペレーションセンター で手動実行する際に、ランタイムで優先度をリセット スイッチを使用して設定を指定できます。この設定は最優先され、現在の実行にのみ適用され、永続的な設定を変更しません。
O&M および管理
-
インスタンスモニタリング: オペレーションセンター > ワンタイムタスク O&M > ワンタイムインスタンス ページで、トリガーまたは手動で実行されたすべてのインスタンスを表示、再実行、終了、トラブルシューティングできます。
-
ワークフローのクローン作成: ビジネスフロー でワークフローを右クリックし、クローン を選択すると、すべてのノードおよび依存関係を含む新しいワークフローをすばやくコピーできます。「ワークフローのクローン作成」(定期実行ワークフロー向け)をご参照ください。
-
バージョン管理:ワークフローキャンバス右側の バージョン パネルで、ワークフローの履歴バージョンを表示、比較、復元できます。「バージョン管理」(定期実行ワークフロー向け)をご参照ください。
制限事項および留意事項
-
有効な環境: イベントトリガー 機構は、ワークフローが 本番環境(オペレーションセンター)に公開された後にのみ有効になります。
-
ノード数:単一のワークフローでは最大 400 ノードをサポートしますが、保守性を考慮し、100 ノード未満に抑えることを推奨します。
-
同時実行制限:最大並列インスタンス数は 100,000 ですが、実際の同時実行可能数は、購入済みのスケジューリングリソースグループの仕様によって制限されます。
-
ノードレベルのスケジューリング:ノードレベルでスケジューリングを設定する場合、優先度 のみ設定可能であり、優先度重み付けポリシー は設定できません。
