従来の定期的なスケジュールでは、リソースの浪費やデータ処理の遅延が発生する可能性があります。これは、Object Storage Service (OSS) への新しいファイルのアップロードや Kafka の新しいメッセージなど、予測不可能な外部イベントにデータ処理が依存している場合に発生する可能性があります。これらのスケジュールは、固定時間ポーリングに依存しています。DataWorks のトリガーは、このようなシナリオ向けに設計されています。外部イベントのシグナルをリアルタイムでリッスンし、関連するデータパイプラインをオンデマンドで開始します。これにより、イベント駆動のデータパイプラインが可能になり、データ処理の自動化と適時性が向上します。このトピックでは、イベント駆動スケジューリングのトリガーを作成、設定、管理する方法について説明します。
機能の概要
DataWorks のトリガーは、事前定義されたイベントをモニターするイベント駆動スケジューリング用のコンポーネントです。イベントが発生すると、トリガーは関連するイベントトリガーワークフローを開始し、イベント情報をパラメーターとしてワークフローに渡します。
複数ソースのイベントモニタリング:異なるソースからのイベントをモニターするようにトリガーを設定できます。
外部ストレージイベント:OSS バケット内の新しいファイルの作成をモニターします。
外部メッセージイベント:Kafka や RocketMQ などのメッセージキューにおける新しいメッセージの到着をモニターします。
動的なパラメーターのキャプチャと受け渡し:トリガーがイベントを検出すると、イベントのコンテキスト情報をキャプチャし、この情報をパラメーターとして下流ノードに渡します。
OSS イベントの場合、キャプチャされる情報にはファイル詳細が含まれます。
メッセージキューイベントの場合、キャプチャされる情報にはキー、値、ヘッダーなど、完全なメッセージ内容が含まれます。
トリガーされたワークフローの内部ノードは、実行中にこれらのパラメーターを参照できます。これにより、イベント内容に基づいたデータ処理が可能になります。
サポートタイプ:
メッセージキュー:Kafka、RocketMQ、RabbitMQ
ストレージオブジェクト:OSS
適用範囲
リージョン:この機能は、中国 (杭州)、中国 (北京)、中国 (張家口)、中国 (ウランチャブ)、中国 (深セン)、中国 (香港)、およびシンガポールのリージョンでのみ利用可能です。
エディション:この機能は、DataWorks Professional Edition 以降でのみ利用可能です。現在のエディションがこの機能をサポートしていない場合は、DataWorks を Professional Edition またはそれ以上のエディションにアップグレードできます。
課金
トリガーベースのワークフローがイベントに基づいて実行されるように設定されている場合、定期タスクの実行料金に加えて、EventBridge の料金が発生します。料金の詳細については、「イベントストリームの課金ルール」をご参照ください。課金はイベント数に基づきます。
事前準備
トリガーは EventBridge のイベントルールに依存します。EventBridge を有効化して必要な権限を付与し、Simple Message Queue (旧称:MNS) を有効化する必要があります。
モニター対象のオブジェクト、例えば OSS インスタンスやクラウドメッセージキュー (Kafka、RocketMQ、または RabbitMQ) は、ワークスペースと同じリージョンにある必要があります。また、これらのインスタンスに対するアクセス権限も必要です。
ワークスペースにユーザーを追加し、開発者、O&M、またはワークスペース管理者のロールを付与していること。詳細については、「ワークスペースメンバーの追加とロール権限の管理」をご参照ください。
トリガー管理ページへの移動
オペレーションセンターページに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、を選択します。表示されたページで、ドロップダウンリストから目的のワークスペースを選択し、[オペレーションセンターへ] をクリックします。
オペレーションセンターページの左側のナビゲーションウィンドウで、を選択し、トリガー管理タブをクリックします。
トリガーの作成
ワークスペースの「開発者」、「O&M」、または「管理者」ロールを持っている場合は、トリガー管理タブでトリガーを作成できます。
[トリガーの作成] をクリックして、トリガーの作成設定ページに移動します。
説明DataWorks のイベントトリガーは EventBridge に依存します。この機能を初めて使用する場合、または AliyunServiceRoleForDataWorksScheduler サービスリンクロールがない場合は、[ワンクリック承認] をクリックして必要な権限を付与できます。
必要な権限:
ram:CreateServiceLinkedRole。詳細については、「サービスリンクロールの作成に必要な最小権限」をご参照ください。
次の表の説明に従ってパラメーターを設定します。
Kafka トリガー
パラメーター
設定内容
適用ワークスペース
トリガーを使用できるワークスペースを選択します。新しい Data Studio をサポートするワークスペースのみが利用可能です。
[適用環境]
トリガーは本番環境にのみ適用されます。開発環境で実行する場合は、パラメーターを手動で入力する必要があります。
所有者
ドロップダウンリストからトリガーのオーナーを選択します。
トリガータイプ
[Message Queue For Apache Kafka] を選択します。
トリガーイベント
alikafka:Topic:Messageトリガーイベントタイプをサポートします。このイベントは Kafka メッセージによってトリガーされます。メッセージの送信方法については、「Kafka メッセージの送受信」をご参照ください。Kafka インスタンス
ご利用のワークスペースと同じリージョンにある Kafka インスタンスを選択します。利用可能なインスタンスがない場合は、「Kafka インスタンスの購入」に移動して作成します。
トピック
トリガーがリッスンする Topic を指定します。利用可能な Topic がない場合は、「Topic の作成」で作成します。
[キー]
メッセージのキーを事前に設定できます。キーが完全に一致する場合にのみタスクがトリガーされます。このパラメーターはオプションです。空のままにすると、どのメッセージでもワークフローがトリガーされます。
コンシューマーグループ ID
[クイック作成] または [既存のものを使用] を選択できます。[クイック作成] を選択すると、システムは生成された名前でグループ ID を自動的に作成します。
[メッセージ形式の例]
Kafka メッセージの固定例です。トリガーされたワークフローでは、
${workflow.triggerMessage} を使用して完全なメッセージ本文を取得できます。また、${workflow.triggerMessage.xxx} を使用してメッセージ本文の特定のフィールドの値を取得することもできます。
RocketMQ トリガー
パラメーター
設定内容
[適用ワークスペース]
トリガーを使用できるワークスペースを選択します。新しい Data Studio をサポートするワークスペースのみが利用可能です。
適用環境
トリガーは本番環境にのみ適用されます。開発環境で実行する場合は、パラメーターを手動で入力する必要があります。
所有者
ドロップダウンリストからトリガーのオーナーを選択します。
トリガータイプ
[Message Queue For Apache RocketMQ] を選択します。
説明5.x より前のバージョンはサポートされていません。デフォルトでバージョン 5.x が使用されます。
トリガーイベント
RocketMQ メッセージを消費することでトリガーされる
mq:Topic:SendMessageトリガーイベントをサポートします。[RocketMQ インスタンス]
ワークスペースと同じリージョンにある RocketMQ インスタンスを選択します。アクティブなインスタンスがない場合は、「RocketMQ インスタンス管理」で作成します。
トピック
トリガーがリッスンする Topic オブジェクトを指定します。利用可能な Topic がない場合は、「Topic 管理」に移動して作成します。
タグ
メッセージのタグを事前に設定できます。タグが完全に一致する場合にのみタスクがトリガーされます。このパラメーターはオプションです。空のままにすると、どのメッセージでもワークフローがトリガーされます。
セキュリティグループ
RocketMQ インスタンスがサブスクリプションインスタンスである場合は、セキュリティグループを選択できます。
コンシューマーグループ
[クイック作成] または [既存のものを使用] を選択できます。[クイック作成] を選択すると、システムは生成された名前でグループ ID を自動的に作成します。
[メッセージ形式の例]
RocketMQ メッセージの固定例です。トリガーされたワークフローでは、
${workflow.triggerMessage} を使用して完全なメッセージ本文を取得できます。また、${workflow.triggerMessage.xxx} を使用してメッセージ本文の特定のフィールドの値を取得することもできます。
RabbitMQ トリガー
パラメーター
設定内容
[適用ワークスペース]
トリガーを使用できるワークスペースを選択します。新しい Data Studio をサポートするワークスペースのみが利用可能です。
[適用環境]
トリガーは本番環境にのみ適用されます。開発環境で実行する場合は、パラメーターを手動で入力する必要があります。
所有者
ドロップダウンリストからトリガーのオーナーを選択します。
トリガータイプ
[Message Queue For RabbitMQ] を選択します。
トリガーイベント
amqp:Queue:SendMessageトリガーイベントタイプをサポートします。これは RabbitMQ メッセージを消費することでトリガーされます。RabbitMQ インスタンス
ワークスペースと同じリージョンにある RabbitMQ インスタンスを選択します。アクティブなインスタンスがない場合は、「RabbitMQ インスタンスの作成」で作成します。
仮想ホスト
RabbitMQ の仮想ホストの名前で、キューを論理的に分離するために使用されます。Vhost がない場合は、「Vhost 管理」を参照して作成します。
キュー
トリガーがリッスンするキューオブジェクトを指定します。適切なキューがない場合は、「キュー管理」を参照して作成します。
メッセージフォーマットの例
RabbitMQ メッセージ本文の例です。トリガーされたワークフローでは、
${workflow.triggerMessage} を使用して完全なメッセージ本文を取得できます。また、${workflow.triggerMessage.xxx} を使用してメッセージ本文の特定のフィールドの値を取得することもできます。
OSS トリガー
パラメーター
設定内容
[適用ワークスペース]
トリガーを使用できるワークスペースを選択します。新しい Data Studio をサポートするワークスペースのみが利用可能です。
[適用環境]
トリガーは本番環境にのみ適用されます。開発環境で実行する場合は、パラメーターを手動で入力する必要があります。
所有者
ドロップダウンリストからトリガーのオーナーを選択します。
トリガータイプ
Object Storage Service (OSS)
トリガーイベント
次の 3 つのイベントタイプがサポートされています:
[oss:ObjectCreated:PutObject]:ファイルのアップロード。
[oss:ObjectCreated:PostObject]:HTML フォームを使用したファイルのアップロード。
[oss:ObjectCreated:CompleteMultipartUpload]:マルチパートアップロードの完了。
バケット名
ドロップダウンリストから、イベントソースとして使用する OSS バケットの名前を選択します。バケットを作成していない場合は、「OSS バケットの作成」で作成できます。
ファイル名
イベントをトリガーするファイルの名前を指定します。ワイルドカード文字がサポートされています:
ファイルプレフィックス一致:
例:
task*説明:
taskプレフィックスを持つファイル (例:task10.txt) を OSS にアップロードすると、イベントがトリガーされます。
ファイルサフィックス一致:
例:
*task.txt説明:
task.txtサフィックスを持つファイル (例:work_task.txt) を OSS にアップロードすると、イベントがトリガーされます。
フレキシブル一致:
例:
*task*説明:名前に
taskを含むファイル (例:work_task.txt) を OSS にアップロードすると、イベントがトリガーされます。
[確認] をクリックしてトリガーを作成します。
トリガーの使用
トリガーはイベントトリガーワークフローと併用する必要があります。トリガーによって実行できるのは、オペレーションセンターに公開済みのイベントトリガーワークフローのみです。
イベント駆動スケジューリングを実装するには、トリガーをイベントトリガーワークフローに関連付ける必要があります。イベントトリガーワークフローをオペレーションセンターに送信すると、モニター対象のイベントが発生したときに、ワークフロー内のタスクが自動的にトリガーされます。
トリガーベースのワークフローを作成する際に、[スケジューリングポリシー] セクションで作成したトリガーを設定します。
イベントトリガーワークフローの内部ノードは、通常のワークフローの内部ノードと同じ方法で設定されます。
主な違いは実行メカニズムです。イベントトリガーワークフローは定期的なスケジュールに依存せず、代わりに外部イベントに応答するトリガーによって駆動されます。
トリガーの管理
トリガー管理タブでは、トリガーを検索して、その参照タスクの表示、変更、バージョンの表示、またはロールバックを実行できます。
[参照タスクの表示]:トリガーがイベントトリガーワークフローによって参照されている場合、[操作] 列の [参照タスクの表示] をクリックできます。[参照タスクの表示] ページでは、トリガーを参照しているイベントトリガーワークフローを表示できます。
トリガーの変更:[操作] 列の [変更] をクリックします。[トリガーの変更] ページで、トリガー情報を編集し、[確認] をクリックします。
説明トリガーを変更すると、システムは自動的に新しいバージョンを作成します。
[バージョン] の 表示:
[操作] 列の [バージョン] をクリックします。[バージョンの表示] ページでは、トリガーのすべての履歴バージョンを表示できます。
[操作] 列の [表示] をクリックすると、特定のバージョンの詳細を確認できます。
ロールバックがサポートされています。以前のバージョンにロールバックするには、そのバージョンの横にある [ロールバック] をクリックします。表示されるダイアログボックスで、[備考] フィールドにコメントを入力し、[OK] をクリックします。
説明ロールバックを実行すると、システムは選択した履歴バージョンに基づいて新しいバージョンを自動的に作成します。
トリガーの削除:トリガーを削除する前に、それを参照しているすべてのタスクが非公開になり、削除されていることを確認する必要があります。次に、[削除] をクリックし、確認ダイアログボックスで [確認] をクリックします。
関連操作
トリガーを作成して設定した後、イベントトリガーワークフローで使用できます。詳細については、「イベントトリガーワークフロー」をご参照ください。