DataWorks オープンプラットフォームは、OpenEvent や OpenAPI などのオープンな機能を提供します。オープンプラットフォームを使用して、サードパーティのスケジューリングシステムを DataWorks と統合し、サードパーティシステムのタスクを DataWorks のワークフローに埋め込むことができます。このトピックでは、例を用いてサードパーティのスケジューリングシステムを統合するための主要な構成について説明します。
背景情報
主要なデータ処理フローが DataWorks にあるものの、別のスケジューリングシステムからのスケジューリングタスクを統合する必要がある場合、DataWorks オープンプラットフォームと HTTP トリガーノードを使用できます。次の図にそのプロセスを示します。
サードパーティのスケジューリングシステムを統合した後、全体的なタスク実行フローは次のようになります。
サードパーティのスケジューリングシステムは、DataWorks の OpenEvent 機能を使用して、依存する DataWorks ノードのステータスをサブスクライブできます。依存ノードの実行が完了すると、サードパーティのスケジューリングシステム内のタスクが開始されます。
サードパーティシステム内のタスクの実行が完了すると、DataWorks の RunTriggerNode API を呼び出して HTTP トリガーノードをトリガーできます。その後、HTTP トリガーノードは DataWorks 内の子孫ノードをトリガーして実行します。
使用される主要な DataWorks の機能と概念は次のとおりです:
以下のセクションでは、例を用いてこの統合の主要なワークフローについて説明します。
DataWorks の構成:メッセージサブスクリプション (OpenEvent) の有効化と構成
メッセージサブスクリプションの有効化と構成に関する詳細な手順については、「メッセージサブスクリプションの有効化」をご参照ください。このセクションでは、このベストプラクティスにおける中核となる構成プロセスと重要な考慮事項について説明します。
ステップ 1:カスタムバスの構成
このセクションでは、カスタムバスを構成するための主要な構成手順と注意事項について説明します。イベントメッセージサブスクリプションの有効化と構成に関する詳細については、「イベントメッセージサブスクリプションの有効化」をご参照ください。
EventBridge コンソールにログインします。左側のナビゲーションウィンドウで、[Event Buses] をクリックします。
[Event Buses] ページの右上隅にある [Quickly Create] をクリックして、カスタムバスを作成します。
左側のナビゲーションウィンドウで、[Event Buses] をクリックします。[Event Buses] ページで、作成したカスタムバスを見つけてその名前をクリックします。カスタムバスの [概要] ページが表示されます。
左側のナビゲーションウィンドウで、[Event Rules] をクリックします。表示されたページで、[Create Rule] をクリックしてイベントルールを作成します。
この例では、カスタムバスはノードコミットイベントメッセージとノードデプロイイベントメッセージを受信するように構成されています。以下に、デモの構成方法とイベントルールの主要なパラメーターの例を示します:
[Configure Basic Info]:このステップでは、[Name] パラメーターを構成する必要があります。
[Configure Event Pattern]:
イベントソースタイプ: カスタムイベントソースに設定します。
イベントソース:このパラメーターは空のままにします。
[Pattern Content]:このパラメーターを JSON 形式で構成します。次の内容を入力します。
{ "source": [ "acs.dataworks" ], "type": [ "dataworks:InstanceStatusChanges:InstanceStatusChanges" ] }source:イベントが発生したサービスの識別子。このパラメーターを acs.dataworks に設定します。
type:サービスで発生したイベントのタイプ。このパラメーターを dataworks:InstanceStatusChanges:InstanceStatusChanges に設定します。
[Event Pattern Debugging]:このセクションで source および type パラメーターの値を変更し、[Test] をクリックできます。テストが成功したら、[Next Step] をクリックします。
[Configure Targets]:
[サービスタイプ]:HTTPS または HTTP に設定します。詳細については、「イベントルールを管理する」をご参照ください。
[URL]:カスタムバスからプッシュされたメッセージを受信するための URL を入力します。例:
https://Server address:Port number/event/consumer。本文: Complete Event に設定します。
ネットワークタイプ: インターネットに設定します。

ステップ 2:イベント配信チャネルの構成
[開発者バックエンド] タブに移動します。
DataWorks コンソールにログインします。上部のナビゲーションバーで、目的のリージョンを選択します。左側のナビゲーションウィンドウで、を選択します。表示されたページで、[オープンプラットフォームへ] をクリックします。[開発者バックエンド] タブが表示されます。
[開発者バックエンド] ページで、左側のナビゲーションペインにある [OpenEvent] をクリックします。表示されたページで、[イベント配信チャネルの追加] をクリックし、ダイアログボックスでパラメーターを構成します。
[イベント配信用のワークスペース]:作成したワークスペースを選択します。
[配信用のカスタム EventBridge バス]:ステップ 1 で作成したイベントバスを選択します。
イベント配信チャネルを保存した後、そのチャネルを見つけ、[操作] 列の [有効化] をクリックします。

サードパーティシステムの構成:サードパーティタスクをトリガーするロジックの開発
依存する DataWorks ノードをサブスクライブした後、DataWorks インスタンスのステータスに基づいてタスクをトリガーするようにサードパーティのスケジューリングシステムを構成する必要があります。システムが依存する DataWorks ノードの実行成功通知を受信すると、サードパーティシステム内のタスクが開始されます。以下のサンプルコードに構成例を示します。
package com.aliyun.dataworks.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.services.LocalScheduleEventService;
import com.aliyun.dataworks.utils.Constants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author dataworks demo
*/
@RestController
@RequestMapping("/event")
public class EventController {
@Autowired
private LocalScheduleEventService localScheduleEventService;
/**
* EventBridge からプッシュされたメッセージを受信します。
*
* @param jsonParam
*/
@PostMapping("/consumer")
public void consumerEventBridge(@RequestBody String jsonParam) {
JSONObject jsonObj = JSON.parseObject(jsonParam);
String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
if (Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)) {
JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
// スケジュールされたタスクインスタンスが待機を開始した特定の時刻。
System.out.println("beginWaitTimeTime: " + dataParam.getString("beginWaitTimeTime"));
// DagId
System.out.println("dagId: " + dataParam.getString("dagId"));
// DAG のタイプ。有効な値:
// 0:定期的にスケジュールされたタスク
// 1:ワンタイムタスク
// 2:スモークテスト
// 3:データバックフィル
// 4:手動トリガーワークフロー
// 5:一時的なワークフロー
System.out.println("dagType: " + dataParam.getString("dagType"));
// タスクインスタンスのスケジューリングタイプ。有効な値:
// NORMAL(0):通常スケジュールされたタスク。このタスクは毎日スケジュールされます。
// MANUAL(1):ワンタイムタスク。このタスクは毎日スケジュールされません。
// PAUSE(2):凍結されたタスク。このタスクは毎日スケジュールされますが、スケジューリング開始時にステータスが直接「失敗」に設定されます。
// SKIP(3):ドライランタスク。このタスクは毎日スケジュールされますが、スケジューリング開始時にステータスが直接「成功」に設定されます。
// SKIP_UNCHOOSE(4):一時的なワークフローで選択されなかったタスク。このタイプのタスクは一時的なワークフローにのみ存在し、スケジューリング開始時にステータスが直接「成功」に設定されます。
// SKIP_CYCLE(5):週次または月次のタスクで、スケジューリングサイクルがまだ到来していないもの。このタスクは毎日スケジュールされますが、スケジューリング開始時にステータスが直接「成功」に設定されます。
// CONDITION_UNCHOOSE(6):アップストリームインスタンスに分岐 (IF) ノードがあるが、この子孫ノードが分岐ノードによって選択されなかった場合。タスクは直接ドライランタスクとして設定されます。
// REALTIME_DEPRECATED(7):リアルタイムで生成されたが期限切れになった定期インスタンス。このタイプのタスクのステータスは直接「成功」に設定されます。
System.out.println("taskType: " + dataParam.getString("taskType"));
// タスクインスタンスが変更された時刻。
System.out.println("modifyTime: " + dataParam.getString("modifyTime"));
// タスクインスタンスが作成された時刻。
System.out.println("createTime: " + dataParam.getString("createTime"));
// ワークスペースの ID。ListProjects を呼び出してワークスペース ID を表示できます。
System.out.println("appId: " + dataParam.getString("appId"));
// スケジュールされたタスクインスタンスが存在するワークスペースのテナント ID。
System.out.println("tenantId: " + dataParam.getString("tenantId"));
// スケジュールされたタスクインスタンスの操作コード。このフィールドは無視できます。
System.out.println("opCode: " + dataParam.getString("opCode"));
// ワークフローの ID。定期タスクインスタンスのワークフロー ID はデフォルトで 1 です。手動トリガーワークフローまたは内部ワークフロータスクインスタンスの場合、これは実際のワークフロー ID です。
System.out.println("flowId: " + dataParam.getString("flowId"));
// スケジュールされたタスクインスタンスに対応するノードの ID。
System.out.println("nodeId:" + dataParam.getString("nodeId"));
// スケジュールされたタスクインスタンスがリソースを待機し始めた特定の時刻。
System.out.println("beginWaitResTime: " + dataParam.getString("beginWaitResTime"));
// スケジュールされたタスクインスタンス ID。
System.out.println("taskId: " + dataParam.getString("taskId"));
// タスクのステータス。有効な値:
// 0 (未実行)
// 2 (dueTime または cycleTime で指定されたスケジュール時刻を待機中)
// 3 (リソースを待機中)
// 4 (実行中)
// 7 (データ検証のために Data Quality に送信済み)
// 8 (分岐条件をチェック中)
// 5 (失敗)
// 6 (成功)
System.out.println("status: " + dataParam.getString("status"));
// DataWorks からノード完了イベントをサブスクライブした後、ローカルスケジューリングノードをトリガーして実行します。
localScheduleEventService.triggerLocalNode(dataParam);
} else {
System.out.println("他のイベントのフィルタリングに失敗しました。構成手順を確認してください。");
}
}
}
DataWorks の構成:HTTP トリガーノードの作成
サードパーティシステム内のタスクが正常に実行された後、DataWorks の RunTriggerNode API を呼び出して HTTP トリガーノードをトリガーする必要があります。その後、HTTP トリガーノードは子孫ノードをトリガーします。したがって、必要に応じて HTTP トリガーノードを作成する必要があります。
HTTP トリガーノードとその作成方法の詳細については、「HTTP トリガーノード」をご参照ください。
サードパーティシステムの構成:HTTP トリガーノードをトリガーするロジックの開発
以下のサンプルコードは、サードパーティシステムが RunTriggerNode API を呼び出して HTTP トリガーノードをトリガーする方法を示しています。
HttpTriggerNodeServiceクラスのサンプル実装。package com.aliyun.dataworks.services; import com.aliyun.dataworks_public20200518.models.RunTriggerNodeRequest; import com.aliyun.dataworks_public20200518.models.RunTriggerNodeResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author dataworks demo */ @Service public class HttpTriggerNodeService { @Autowired private DataWorksOpenApiClient dataWorksOpenApiClient; /** * @return */ public boolean triggerNode(Long appId, Long nodeId, Long bizDate, Long cycleTime) { try { RunTriggerNodeRequest runTriggerNodeRequest = new RunTriggerNodeRequest(); // NodeId を設定します。これはトリガーノードの ID です。ListNodes API を呼び出すことでノード ID を取得できます。 runTriggerNodeRequest.setNodeId(nodeId); // CycleTime を設定します。これはトリガーノードのタスクのランタイムの UNIX タイムスタンプです。HTTP トリガーノードのスケジューリング構成でノードに指定されたランタイムをタイムスタンプに変換する必要があります。 // HTTP トリガーノードとスケジューリングシステムが異なるタイムゾーンにある場合は、トリガーノードのタイムゾーンに基づいて時刻を構成します。 // たとえば、HTTP トリガーノードが中国 (北京) リージョンにあり、その Cyctime が 18:00 (UTC+8) であるが、スケジューリングシステムが米国 (西部) リージョンにある場合、18:00 (UTC+8) に対応するタイムスタンプを構成する必要があります。 runTriggerNodeRequest.setCycleTime(cycleTime); // BizDate を設定します。これはトリガーノードインスタンスのデータタイムスタンプです。データタイムスタンプを UNIX タイムスタンプに変換する必要があります。 // データタイムスタンプはランタイムの前日を指し、日単位で正確です (時刻部分は 00:00:00)。たとえば、ランタイムが 2020 年 11 月 25 日の場合、ビジネス時刻は 2020112400000000 です。この時刻を UNIX タイムスタンプに変換します。 // HTTP トリガーノードとスケジューリングシステムが異なるタイムゾーンにある場合は、トリガーノードのタイムゾーンに基づいて時刻を構成します。 runTriggerNodeRequest.setBizDate(bizDate); // AppId (appId=projectId) を設定します。これはトリガーノードが属する DataWorks ワークスペースの ID です。GetNode を呼び出すことで、ノードに対応する projectId を見つけることができます。 runTriggerNodeRequest.setAppId(appId); RunTriggerNodeResponse runTriggerNodeResponse = dataWorksOpenApiClient.createClient().runTriggerNode(runTriggerNodeRequest); System.out.println(runTriggerNodeResponse.getBody().getRequestId()); return runTriggerNodeResponse.getBody().getSuccess(); } catch (Exception e) { e.printStackTrace(); } return false; } }サードパーティのスケジューリング状態機械のサンプル実行。
@Scheduled(cron = "0 0/30 * * * ? ") public void schedule() { //TODO:ローカルスケジューリングをシミュレートして、HTTP トリガーノードを介して DataWorks の定期ノードを呼び出します。 triggerDwScheduleNode(); }triggerDwScheduleNodeメソッドのサンプル実装/** * この例では、日次スケジューリング頻度のトリガーノードを使用して、トリガーノードを呼び出すために必要なパラメーター情報を取得する方法を示します。 */ public void triggerDwScheduleNode() { Date gmtDate = getTimeByZeroEnd(new Date()); GregorianCalendar gc = (GregorianCalendar) GregorianCalendar.getInstance(); gc.setTime(gmtDate); gc.add(Calendar.DATE, -1); Long bizDate = gc.getTimeInMillis(); GetNodeResponseBody.GetNodeResponseBodyData node = nodeService.getNode(nodeId); if (node != null) { String bizDateStr = getTimeInExpress(gc.getTime(), "yyyy-MM-dd HH:mm:ss"); ListInstancesResponseBody.ListInstancesResponseBodyData instances = nodeService.getInstance(nodeId, node.getProjectId(), bizDateStr); if (!CollectionUtils.isEmpty(instances.getInstances())) { ListInstancesResponseBody.ListInstancesResponseBodyDataInstances instance = instances.getInstances().get(0); httpTriggerNodeService.triggerNode(node.getProjectId(), node.getNodeId(), bizDate, instance.getCycTime()); } } }
ローカルでのデプロイと実行
プロジェクトのダウンロード:
前提条件:Java 8 以降および Maven ビルドツール。
プロジェクトのダウンロードリンク:schedule-integration-demo.zip。
プロジェクトをダウンロードした後、プロジェクトのルートディレクトリに移動し、次のコマンドを実行します:
mvn clean package -Dmaven.test.skip=true spring-boot:repackage実行可能な JAR パッケージを取得した後、次のコマンドを実行します:
java -jar target/schedule-integration-demo-1.0.jarコマンドが実行された後、ブラウザでhttp://localhost:8080/indexと入力します。"hello world!"が返された場合、アプリケーションは正常にデプロイされています。