DataWorks menyediakan pemeriksaan bawaan seperti tinjauan kode sebelum penyebaran node dan pemeriksaan di Pusat Tata Kelola Data. DataWorks juga mendukung pemeriksaan kustom. Anda dapat mengembangkan program pemeriksaan kustom sesuai kebutuhan bisnis dan menghubungkannya ke DataWorks untuk mengelola proses node. Topik ini menjelaskan cara berlangganan perubahan status acara instance node yang dipicu otomatis menggunakan modul OpenEvent dari DataWorks Open Platform. Dalam topik ini, perubahan status instance node yang dipicu otomatis diperoleh dari Operation Center.
Informasi latar belakang
Untuk informasi lebih lanjut tentang fitur dan konsep dasar DataWorks Open Platform yang terlibat dalam topik ini, lihat Ikhtisar.
Deskripsi konfigurasi langganan
Saat mengonfigurasi parameter Konten Pola untuk aturan acara di EventBridge, atur tipe ke dataworks:InstanceStatusChanges:InstanceStatusChanges. Dengan cara ini, Anda dapat berlangganan perubahan status acara instance node yang dipicu otomatis.
Prasyarat
EventBridge telah diaktifkan. Untuk informasi lebih lanjut, lihat Penagihan.
DataWorks telah diaktifkan. Untuk informasi lebih lanjut, lihat Panduan Pembelian.
Ruang kerja dibuat di DataWorks. Untuk informasi lebih lanjut, lihat Buat Ruang Kerja.
Prosedur
Langkah 1: Konfigurasikan bus kustom
Berikut adalah langkah-langkah inti dan perhatian saat mengonfigurasi bus kustom. Untuk informasi lebih lanjut tentang cara mengaktifkan dan mengonfigurasi langganan pesan acara, lihat Aktifkan Langganan Pesan Acara.
Masuk ke konsol EventBridge. Di bilah navigasi di sebelah kiri, klik Event Buses.
Di pojok kanan atas halaman Bus Acara, klik Quickly Create untuk membuat bus kustom.
Di bilah navigasi di sebelah kiri, klik Event Buses. Di halaman Bus Acara, temukan bus kustom yang Anda buat dan klik namanya. Halaman Ikhtisar bus kustom akan muncul.
Di bilah navigasi di sebelah kiri, klik Aturan Acara. Di halaman yang muncul, klik Buat Aturan untuk membuat aturan acara.
Dalam contoh ini, bus kustom dikonfigurasi untuk menerima pesan acara komitmen node dan pesan acara penyebaran node. Berikut adalah contoh cara mengonfigurasi demo dan parameter inti untuk aturan acara:
Configure Basic Info: Dalam langkah ini, Anda harus mengonfigurasi parameter Nama.
Configure Event Pattern:
Event Source Type: Atur ke Sumber Acara Kustom.
Event Source: Biarkan parameter ini kosong.
Pattern Content: Konfigurasikan parameter ini dalam format JSON. Masukkan konten berikut.
{ "source": [ "acs.dataworks" ], "type": [ "dataworks:InstanceStatusChanges:InstanceStatusChanges" ] }source: pengenal layanan tempat acara terjadi. Atur parameter ini ke acs.dataworks.
type: jenis acara yang terjadi di layanan. Atur parameter ini ke dataworks:InstanceStatusChanges:InstanceStatusChanges.
Event Pattern Debugging: Anda dapat mengubah nilai parameter source dan type di bagian ini lalu klik Uji. Jika pengujian berhasil, klik Langkah Berikutnya.

Configure Targets:
Service Type: Atur ke HTTPS atau HTTP. Untuk informasi lebih lanjut tentang tipe layanan, lihat Kelola Aturan Acara.
URL: Masukkan URL untuk menerima pesan yang didorong oleh bus kustom, seperti
https://Alamat server:Nomor port/event/konsumen.Body: Atur ke Acara Lengkap.
Network Type: Atur ke Internet.

Langkah 2: Konfigurasikan saluran distribusi acara
Pergi ke tab Backend Pengembang.
Masuk ke konsol DataWorks. Di bilah navigasi atas, pilih wilayah yang diinginkan. Di bilah navigasi di sebelah kiri, pilih . Di halaman yang muncul, klik Go to Open Platform. Tab Developer Backend akan muncul.
Di bilah navigasi di sebelah kiri halaman Backend Pengembang, klik OpenEvent. Di halaman yang muncul, klik Add Event Distribution Channel. Di kotak dialog Tambah Saluran Distribusi Acara, konfigurasikan parameter.
Workspace for Distribution of Event Messages: Pilih workspace yang diinginkan.
Specify Custom Event Bus in EventBridge for Distribution of Event Messages: Pilih bus acara yang Anda buat di Langkah 1.
Setelah Anda menyimpan saluran distribusi acara, temukan saluran distribusi acara di halaman OpenEvent dan klik Enable di kolom Actions.

Kembangkan program layanan
Kode contoh
Ubah kode untuk mendapatkan pesan yang didorong EventBridge ke layanan dan menghasilkan pesan.
package com.aliyun.dataworks.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author dataworks demo
*/
@RestController
@RequestMapping("/event")
public class ExtensionsController {
/**
* Terima pesan acara yang dikirim dari EventBridge.
* @param jsonParam
*/
@PostMapping("/consumer")
public void consumerEventBridge(@RequestBody String jsonParam){
JSONObject jsonObj = JSON.parseObject(jsonParam);
String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
// Waktu ketika instance node yang dipicu otomatis mulai menunggu waktu penjadwalan.
System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
//DagId
System.out.println("dagId: "+ dataParam.getString("dagId"));
// Jenis grafik asiklik terarah (DAG). Nilai valid:
// 0: untuk node yang dipicu otomatis
// 1: untuk node yang dipicu manual
// 2: untuk pengujian asap
// 3: untuk node yang Anda isi ulang datanya
// 4: untuk alur kerja yang dipicu manual
// 5: untuk alur kerja sementara
System.out.println("dagType: "+dataParam.getString("dagType"));
// Jenis node. Nilai valid:
// NORMAL(0): Node adalah node yang dipicu otomatis. Sistem penjadwalan secara teratur menjalankan node.
// MANUAL(1): Node adalah node yang dipicu manual. Sistem penjadwalan tidak secara teratur menjalankan node.
// PAUSE(2): Node adalah node yang dibekukan. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Gagal ketika sistem penjadwalan mulai menjalankan node.
// SKIP(3): Node adalah node uji coba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
// SKIP_UNCHOOSE(4): Node adalah node yang tidak dipilih dalam alur kerja sementara. Jenis node ini hanya ada di alur kerja sementara. Sistem penjadwalan menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
// SKIP_CYCLE(5): Node dijadwalkan mingguan atau bulanan, dan sedang menunggu waktu penjadwalan tiba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
// CONDITION_UNCHOOSE(6): Node tidak dipilih oleh node cabang leluhurnya dan dijalankan sebagai node uji coba.
// REALTIME_DEPRECATED(7): Node memiliki instance yang dihasilkan secara real-time tetapi sudah usang. Sistem penjadwalan menetapkan status node ke Berhasil.
System.out.println("taskType: "+dataParam.getString("taskType"));
// Waktu ketika instance node dimodifikasi.
System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
// Waktu ketika instance node dibuat.
System.out.println("createTime: "+dataParam.getString("createTime"));
// ID ruang kerja. Anda dapat memanggil operasi ListProjects untuk menanyakan ID.
System.out.println("appId: "+dataParam.getString("appId"));
// ID penyewa yang mengelola ruang kerja tempat instance node yang dipicu otomatis milik.
System.out.println("tenantId: "+dataParam.getString("tenantId"));
// Kode operasi instance node yang dipicu otomatis. Anda dapat mengabaikan nilai bidang ini.
System.out.println("opCode: "+dataParam.getString("opCode"));
// ID alur kerja. Untuk instance node yang dipicu otomatis, nilai bidang ini adalah 1. Untuk alur kerja yang dipicu manual atau instance node yang dipicu otomatis dari tipe alur kerja internal, nilai bidang ini adalah ID alur kerja aktual.
System.out.println("flowId: "+dataParam.getString("flowId"));
// ID node tempat instance node yang dipicu otomatis dihasilkan.
System.out.println("nodeId:"+dataParam.getString("nodeId"));
// Waktu ketika instance node yang dipicu otomatis mulai menunggu sumber daya.
System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
// ID instance node yang dipicu otomatis.
System.out.println("taskId: "+dataParam.getString("taskId"));
// Status node. Nilai valid:
// 0: Node tidak berjalan.
// 2: Node sedang menunggu waktu penjadwalan tiba. Waktu penjadwalan ditentukan oleh parameter dueTime atau cycleTime.
// 3: Node sedang menunggu sumber daya.
// 4: Node sedang berjalan.
// 7: Tabel yang ditentukan dalam node diterbitkan ke Data Quality dan data dalam tabel diperiksa berdasarkan aturan pemantauan di Data Quality.
// 8: Kondisi cabang sedang diperiksa.
// 5: Node gagal dijalankan.
// 6: Node berhasil dijalankan.
System.out.println("status: "+dataParam.getString("status"));
}else{
System.out.println("Gagal menyaring jenis acara lainnya. Periksa konfigurasi parameter.");
}
}
}
Contoh penyebaran proyek
Siapkan lingkungan dan proyek.
Persyaratan lingkungan: Java 8 atau lebih baru dan Maven. Maven adalah alat otomatisasi pembuatan untuk Java.
Tautan unduhan untuk file proyek: event-demo-instance-status.zip.
Setelah Anda mengunduh file proyek, masuk ke direktori root proyek dan jalankan perintah berikut:
mvn clean package -Dmaven.test.skip=true spring-boot:repackageSetelah Anda mendapatkan paket JAR yang dapat langsung diinstal, jalankan perintah berikut:
java -jar target/event-demo-instance-status-1.0.jarGambar berikut menunjukkan proyek yang berhasil dimulai.

Masukkan
http://localhost:8080/indexdi bilah alamat browser dan tekan Enter. Jika"hello world!"dikembalikan, ekstensi berhasil diterapkan. Anda dapat berlangganan pesan acara setelah koneksi jaringan dibuat antara DataWorks dan ekstensi Anda serta antara EventBridge dan ekstensi Anda.
Aktifkan dan konfigurasikan langganan pesan acara (OpenEvent)
Aktifkan Langganan Pesan Acara
Di konsol EventBridge, buat bus kustom. Anda tidak perlu mengonfigurasi parameter di langkah Sumber Acara, Aturan Acara, dan Target Acara.

Di konsol EventBridge, buat aturan acara untuk bus kustom.
Dalam contoh ini, bus kustom dikonfigurasi untuk menerima pesan untuk perubahan status acara instance node yang dipicu otomatis. Berikut adalah contoh cara mengonfigurasi demo dan parameter inti untuk aturan acara:
Konfigurasikan parameter di langkah Configure Event Pattern.

{ "source": [ "acs.dataworks" ], "type": [ "dataworks:InstanceStatusChanges:InstanceStatusChanges" ] }source: pengenal layanan tempat acara terjadi. Atur parameter ini ke acs.dataworks.
type: jenis acara yang terjadi di layanan. Atur parameter ini ke dataworks:InstanceStatusChanges:InstanceStatusChanges. Anda dapat mengubah nilai parameter source dan type di bagian Event Pattern Debugging lalu klik Uji. Jika pengujian berhasil, klik Next Step.

Di langkah Configure Targets, atur Service Type ke HTTPS dan masukkan URL yang valid. Gunakan pengaturan default untuk parameter lainnya.

Pergi ke halaman Platform Terbuka di konsol DataWorks. Di pohon navigasi di sebelah kiri, klik OpenEvent. Di halaman yang muncul, tambahkan saluran distribusi acara.

Tulis kode
package com.aliyun.dataworks.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author dataworks demo
*/
@RestController
@RequestMapping("/event")
public class ExtensionsController {
/**
* Terima pesan acara yang dikirim dari EventBridge.
* @param jsonParam
*/
@PostMapping("/consumer")
public void consumerEventBridge(@RequestBody String jsonParam){
JSONObject jsonObj = JSON.parseObject(jsonParam);
String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
// Waktu ketika instance node yang dipicu otomatis mulai menunggu waktu penjadwalan.
System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
//DagId
System.out.println("dagId: "+ dataParam.getString("dagId"));
// Jenis grafik asiklik terarah (DAG). Nilai valid:
// 0: untuk node yang dipicu otomatis
// 1: untuk node yang dipicu manual
// 2: untuk pengujian asap
// 3: untuk node yang Anda isi ulang datanya
// 4: untuk alur kerja yang dipicu manual
// 5: untuk alur kerja sementara
System.out.println("dagType: "+dataParam.getString("dagType"));
// Jenis node. Nilai valid:
// NORMAL(0): Node adalah node yang dipicu otomatis. Sistem penjadwalan secara teratur menjalankan node.
// MANUAL(1): Node adalah node yang dipicu manual. Sistem penjadwalan tidak secara teratur menjalankan node.
// PAUSE(2): Node adalah node yang dibekukan. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Gagal ketika sistem penjadwalan mulai menjalankan node.
// SKIP(3): Node adalah node uji coba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
// SKIP_UNCHOOSE(4): Node adalah node yang tidak dipilih dalam alur kerja sementara. Jenis node ini hanya ada di alur kerja sementara. Sistem penjadwalan menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
// SKIP_CYCLE(5): Node dijadwalkan mingguan atau bulanan, dan sedang menunggu waktu penjadwalan tiba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
// CONDITION_UNCHOOSE(6): Node tidak dipilih oleh node cabang leluhurnya dan dijalankan sebagai node uji coba.
// REALTIME_DEPRECATED(7): Node memiliki instance yang dihasilkan secara real-time tetapi sudah usang. Sistem penjadwalan menetapkan status node ke Berhasil.
System.out.println("taskType: "+dataParam.getString("taskType"));
// Waktu ketika instance node dimodifikasi.
System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
// Waktu ketika instance node dibuat.
System.out.println("createTime: "+dataParam.getString("createTime"));
// ID ruang kerja. Anda dapat memanggil operasi ListProjects untuk menanyakan ID.
System.out.println("appId: "+dataParam.getString("appId"));
// ID penyewa yang mengelola ruang kerja tempat instance node yang dipicu otomatis milik.
System.out.println("tenantId: "+dataParam.getString("tenantId"));
// Kode operasi instance node yang dipicu otomatis. Anda dapat mengabaikan nilai bidang ini.
System.out.println("opCode: "+dataParam.getString("opCode"));
// ID alur kerja. Untuk instance node yang dipicu otomatis, nilai bidang ini adalah 1. Untuk alur kerja yang dipicu manual atau instance node yang dipicu otomatis dari tipe alur kerja internal, nilai bidang ini adalah ID alur kerja aktual.
System.out.println("flowId: "+dataParam.getString("flowId"));
// ID node tempat instance node yang dipicu otomatis dihasilkan.
System.out.println("nodeId:"+dataParam.getString("nodeId"));
// Waktu ketika instance node yang dipicu otomatis mulai menunggu sumber daya.
System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
// ID instance node yang dipicu otomatis.
System.out.println("taskId: "+dataParam.getString("taskId"));
// Status node. Nilai valid:
// 0: Node tidak berjalan.
// 2: Node sedang menunggu waktu penjadwalan tiba. Waktu penjadwalan ditentukan oleh parameter dueTime atau cycleTime.
// 3: Node sedang menunggu sumber daya.
// 4: Node sedang berjalan.
// 7: Tabel yang ditentukan dalam node diterbitkan ke Data Quality dan data dalam tabel diperiksa berdasarkan aturan pemantauan di Data Quality.
// 8: Kondisi cabang sedang diperiksa.
// 5: Node gagal dijalankan.
// 6: Node berhasil dijalankan.
System.out.println("status: "+dataParam.getString("status"));
}else{
System.out.println("Gagal menyaring jenis acara lainnya. Periksa konfigurasi parameter.");
}
}
}
Sebarkan dan jalankan kode di mesin lokal Anda
Unduh file proyek demo:
Persyaratan lingkungan: Java 8 atau lebih baru dan Maven. Maven adalah alat otomatisasi pembuatan untuk Java.
Tautan unduhan untuk file proyek: event-demo-instance-status.zip.
Pilih mode penyebaran.
Penyebaran Lokal: Setelah Anda mengemas file proyek menjadi paket JAR, jalankan perintah
java -jar yourapp.jardi server lokal tempat Java 8 dan Maven diterapkan untuk memulai program layanan.Penyebaran Berbasis Cloud: Setelah Anda mengemas file proyek menjadi paket JAR, unggah paket tersebut ke lingkungan runtime terkait, seperti kontainer Docker dan server cloud, untuk penyebaran.
CatatanSetelah Anda menerapkan program layanan, pastikan EventBridge dapat mengakses layanan melalui Internet.
Setelah Anda mengunduh file proyek, masuk ke direktori root proyek dan jalankan perintah berikut:
mvn clean package -Dmaven.test.skip=true spring-boot:repackageSetelah Anda mendapatkan paket JAR yang dapat langsung diinstal, jalankan perintah berikut:
java -jar target/event-demo-instance-status-1.0.jarGambar berikut menunjukkan proyek yang berhasil dimulai.
Masukkan http://localhost:8080/indexdi bilah alamat browser dan tekan Enter. Jika"hello world!"dikembalikan, ekstensi berhasil diterapkan. Anda dapat berlangganan pesan acara setelah koneksi jaringan dibuat antara DataWorks dan ekstensi Anda serta antara EventBridge dan ekstensi Anda.