全部产品
Search
文档中心

:Praktik terbaik untuk berlangganan perubahan status acara instance node yang dipicu otomatis

更新时间:Jul 06, 2025

DataWorks menyediakan pemeriksaan bawaan seperti tinjauan kode sebelum penyebaran node dan pemeriksaan di Pusat Tata Kelola Data. DataWorks juga mendukung pemeriksaan kustom. Anda dapat mengembangkan program pemeriksaan kustom sesuai kebutuhan bisnis dan menghubungkannya ke DataWorks untuk mengelola proses node. Topik ini menjelaskan cara berlangganan perubahan status acara instance node yang dipicu otomatis menggunakan modul OpenEvent dari DataWorks Open Platform. Dalam topik ini, perubahan status instance node yang dipicu otomatis diperoleh dari Operation Center.

Informasi latar belakang

Untuk informasi lebih lanjut tentang fitur dan konsep dasar DataWorks Open Platform yang terlibat dalam topik ini, lihat Ikhtisar.

Deskripsi konfigurasi langganan

Saat mengonfigurasi parameter Konten Pola untuk aturan acara di EventBridge, atur tipe ke dataworks:InstanceStatusChanges:InstanceStatusChanges. Dengan cara ini, Anda dapat berlangganan perubahan status acara instance node yang dipicu otomatis.

Prasyarat

  • EventBridge telah diaktifkan. Untuk informasi lebih lanjut, lihat Penagihan.

  • DataWorks telah diaktifkan. Untuk informasi lebih lanjut, lihat Panduan Pembelian.

  • Ruang kerja dibuat di DataWorks. Untuk informasi lebih lanjut, lihat Buat Ruang Kerja.

Prosedur

Langkah 1: Konfigurasikan bus kustom

Berikut adalah langkah-langkah inti dan perhatian saat mengonfigurasi bus kustom. Untuk informasi lebih lanjut tentang cara mengaktifkan dan mengonfigurasi langganan pesan acara, lihat Aktifkan Langganan Pesan Acara.

  1. Masuk ke konsol EventBridge. Di bilah navigasi di sebelah kiri, klik Event Buses.

  2. Di pojok kanan atas halaman Bus Acara, klik Quickly Create untuk membuat bus kustom.

  3. Di bilah navigasi di sebelah kiri, klik Event Buses. Di halaman Bus Acara, temukan bus kustom yang Anda buat dan klik namanya. Halaman Ikhtisar bus kustom akan muncul.

    • Di bilah navigasi di sebelah kiri, klik Aturan Acara. Di halaman yang muncul, klik Buat Aturan untuk membuat aturan acara.

    • Dalam contoh ini, bus kustom dikonfigurasi untuk menerima pesan acara komitmen node dan pesan acara penyebaran node. Berikut adalah contoh cara mengonfigurasi demo dan parameter inti untuk aturan acara:

      1. Configure Basic Info: Dalam langkah ini, Anda harus mengonfigurasi parameter Nama.

      2. Configure Event Pattern:

        • Event Source Type: Atur ke Sumber Acara Kustom.

        • Event Source: Biarkan parameter ini kosong.

        • Pattern Content: Konfigurasikan parameter ini dalam format JSON. Masukkan konten berikut.

          {
              "source": [
                  "acs.dataworks"
              ],
              "type": [
                  "dataworks:InstanceStatusChanges:InstanceStatusChanges"
              ]
          }
          • source: pengenal layanan tempat acara terjadi. Atur parameter ini ke acs.dataworks.

          • type: jenis acara yang terjadi di layanan. Atur parameter ini ke dataworks:InstanceStatusChanges:InstanceStatusChanges.

        • Event Pattern Debugging: Anda dapat mengubah nilai parameter source dan type di bagian ini lalu klik Uji. Jika pengujian berhasil, klik Langkah Berikutnya.测试3

      3. Configure Targets:

        • Service Type: Atur ke HTTPS atau HTTP. Untuk informasi lebih lanjut tentang tipe layanan, lihat Kelola Aturan Acara.

        • URL: Masukkan URL untuk menerima pesan yang didorong oleh bus kustom, seperti https://Alamat server:Nomor port/event/konsumen.

        • Body: Atur ke Acara Lengkap.

        • Network Type: Atur ke Internet.

        image

Langkah 2: Konfigurasikan saluran distribusi acara

  1. Pergi ke tab Backend Pengembang.

    Masuk ke konsol DataWorks. Di bilah navigasi atas, pilih wilayah yang diinginkan. Di bilah navigasi di sebelah kiri, pilih More > Open Platform. Di halaman yang muncul, klik Go to Open Platform. Tab Developer Backend akan muncul.

  2. Di bilah navigasi di sebelah kiri halaman Backend Pengembang, klik OpenEvent. Di halaman yang muncul, klik Add Event Distribution Channel. Di kotak dialog Tambah Saluran Distribusi Acara, konfigurasikan parameter.

    • Workspace for Distribution of Event Messages: Pilih workspace yang diinginkan.

    • Specify Custom Event Bus in EventBridge for Distribution of Event Messages: Pilih bus acara yang Anda buat di Langkah 1.

  3. Setelah Anda menyimpan saluran distribusi acara, temukan saluran distribusi acara di halaman OpenEvent dan klik Enable di kolom Actions.image

Kembangkan program layanan

Kode contoh

Ubah kode untuk mendapatkan pesan yang didorong EventBridge ke layanan dan menghasilkan pesan.

package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/event")
public class ExtensionsController {

    /**
     * Terima pesan acara yang dikirim dari EventBridge.
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
            JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
            // Waktu ketika instance node yang dipicu otomatis mulai menunggu waktu penjadwalan.
            System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
            //DagId
            System.out.println("dagId: "+ dataParam.getString("dagId"));
            // Jenis grafik asiklik terarah (DAG). Nilai valid:
            // 0: untuk node yang dipicu otomatis
            // 1: untuk node yang dipicu manual
            // 2: untuk pengujian asap
            // 3: untuk node yang Anda isi ulang datanya
            // 4: untuk alur kerja yang dipicu manual
            // 5: untuk alur kerja sementara
            System.out.println("dagType: "+dataParam.getString("dagType"));
            // Jenis node. Nilai valid:
            // NORMAL(0): Node adalah node yang dipicu otomatis. Sistem penjadwalan secara teratur menjalankan node.
            // MANUAL(1): Node adalah node yang dipicu manual. Sistem penjadwalan tidak secara teratur menjalankan node.
            // PAUSE(2): Node adalah node yang dibekukan. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Gagal ketika sistem penjadwalan mulai menjalankan node.
            // SKIP(3): Node adalah node uji coba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
            // SKIP_UNCHOOSE(4): Node adalah node yang tidak dipilih dalam alur kerja sementara. Jenis node ini hanya ada di alur kerja sementara. Sistem penjadwalan menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
            // SKIP_CYCLE(5): Node dijadwalkan mingguan atau bulanan, dan sedang menunggu waktu penjadwalan tiba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
            // CONDITION_UNCHOOSE(6): Node tidak dipilih oleh node cabang leluhurnya dan dijalankan sebagai node uji coba.
            // REALTIME_DEPRECATED(7): Node memiliki instance yang dihasilkan secara real-time tetapi sudah usang. Sistem penjadwalan menetapkan status node ke Berhasil.
            System.out.println("taskType: "+dataParam.getString("taskType"));
            // Waktu ketika instance node dimodifikasi.
            System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
            // Waktu ketika instance node dibuat.
            System.out.println("createTime: "+dataParam.getString("createTime"));
            // ID ruang kerja. Anda dapat memanggil operasi ListProjects untuk menanyakan ID.
            System.out.println("appId: "+dataParam.getString("appId"));
            // ID penyewa yang mengelola ruang kerja tempat instance node yang dipicu otomatis milik.
            System.out.println("tenantId: "+dataParam.getString("tenantId"));
            // Kode operasi instance node yang dipicu otomatis. Anda dapat mengabaikan nilai bidang ini.
            System.out.println("opCode: "+dataParam.getString("opCode"));
            // ID alur kerja. Untuk instance node yang dipicu otomatis, nilai bidang ini adalah 1. Untuk alur kerja yang dipicu manual atau instance node yang dipicu otomatis dari tipe alur kerja internal, nilai bidang ini adalah ID alur kerja aktual.
            System.out.println("flowId: "+dataParam.getString("flowId"));
            // ID node tempat instance node yang dipicu otomatis dihasilkan.
            System.out.println("nodeId:"+dataParam.getString("nodeId"));
            // Waktu ketika instance node yang dipicu otomatis mulai menunggu sumber daya.
            System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
            // ID instance node yang dipicu otomatis.
            System.out.println("taskId: "+dataParam.getString("taskId"));
            // Status node. Nilai valid:
            // 0: Node tidak berjalan.
            // 2: Node sedang menunggu waktu penjadwalan tiba. Waktu penjadwalan ditentukan oleh parameter dueTime atau cycleTime.
            // 3: Node sedang menunggu sumber daya.
            // 4: Node sedang berjalan.
            // 7: Tabel yang ditentukan dalam node diterbitkan ke Data Quality dan data dalam tabel diperiksa berdasarkan aturan pemantauan di Data Quality.
            // 8: Kondisi cabang sedang diperiksa.
            // 5: Node gagal dijalankan.
            // 6: Node berhasil dijalankan.
            System.out.println("status: "+dataParam.getString("status"));
        }else{
            System.out.println("Gagal menyaring jenis acara lainnya. Periksa konfigurasi parameter.");
        }
    }
}
            

Contoh penyebaran proyek

  1. Siapkan lingkungan dan proyek.

    1. Persyaratan lingkungan: Java 8 atau lebih baru dan Maven. Maven adalah alat otomatisasi pembuatan untuk Java.

    2. Tautan unduhan untuk file proyek: event-demo-instance-status.zip.

  2. Setelah Anda mengunduh file proyek, masuk ke direktori root proyek dan jalankan perintah berikut:

    mvn clean package -Dmaven.test.skip=true spring-boot:repackage
  3. Setelah Anda mendapatkan paket JAR yang dapat langsung diinstal, jalankan perintah berikut:

    java -jar target/event-demo-instance-status-1.0.jar

    Gambar berikut menunjukkan proyek yang berhasil dimulai.部署成功

  4. Masukkan http://localhost:8080/index di bilah alamat browser dan tekan Enter. Jika "hello world!" dikembalikan, ekstensi berhasil diterapkan. Anda dapat berlangganan pesan acara setelah koneksi jaringan dibuat antara DataWorks dan ekstensi Anda serta antara EventBridge dan ekstensi Anda.

Aktifkan dan konfigurasikan langganan pesan acara (OpenEvent)

Aktifkan Langganan Pesan Acara

  1. Di konsol EventBridge, buat bus kustom. Anda tidak perlu mengonfigurasi parameter di langkah Sumber Acara, Aturan Acara, dan Target Acara. Create Custom Event Bus panel

  2. Di konsol EventBridge, buat aturan acara untuk bus kustom.

    Dalam contoh ini, bus kustom dikonfigurasi untuk menerima pesan untuk perubahan status acara instance node yang dipicu otomatis. Berikut adalah contoh cara mengonfigurasi demo dan parameter inti untuk aturan acara:

    1. Konfigurasikan parameter di langkah Configure Event Pattern.事件规则3

      {
          "source": [
              "acs.dataworks"
          ],
          "type": [
              "dataworks:InstanceStatusChanges:InstanceStatusChanges"
          ]
      }
      • source: pengenal layanan tempat acara terjadi. Atur parameter ini ke acs.dataworks.

      • type: jenis acara yang terjadi di layanan. Atur parameter ini ke dataworks:InstanceStatusChanges:InstanceStatusChanges. Anda dapat mengubah nilai parameter source dan type di bagian Event Pattern Debugging lalu klik Uji. Jika pengujian berhasil, klik Next Step.测试3

    2. Di langkah Configure Targets, atur Service Type ke HTTPS dan masukkan URL yang valid. Gunakan pengaturan default untuk parameter lainnya. Configure Targets step

  3. Pergi ke halaman Platform Terbuka di konsol DataWorks. Di pohon navigasi di sebelah kiri, klik OpenEvent. Di halaman yang muncul, tambahkan saluran distribusi acara. Add an event distribution channel

Tulis kode

package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/event")
public class ExtensionsController {

    /**
     * Terima pesan acara yang dikirim dari EventBridge.
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
            JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
            // Waktu ketika instance node yang dipicu otomatis mulai menunggu waktu penjadwalan.
            System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
            //DagId
            System.out.println("dagId: "+ dataParam.getString("dagId"));
            // Jenis grafik asiklik terarah (DAG). Nilai valid:
            // 0: untuk node yang dipicu otomatis
            // 1: untuk node yang dipicu manual
            // 2: untuk pengujian asap
            // 3: untuk node yang Anda isi ulang datanya
            // 4: untuk alur kerja yang dipicu manual
            // 5: untuk alur kerja sementara
            System.out.println("dagType: "+dataParam.getString("dagType"));
            // Jenis node. Nilai valid:
            // NORMAL(0): Node adalah node yang dipicu otomatis. Sistem penjadwalan secara teratur menjalankan node.
            // MANUAL(1): Node adalah node yang dipicu manual. Sistem penjadwalan tidak secara teratur menjalankan node.
            // PAUSE(2): Node adalah node yang dibekukan. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Gagal ketika sistem penjadwalan mulai menjalankan node.
            // SKIP(3): Node adalah node uji coba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
            // SKIP_UNCHOOSE(4): Node adalah node yang tidak dipilih dalam alur kerja sementara. Jenis node ini hanya ada di alur kerja sementara. Sistem penjadwalan menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
            // SKIP_CYCLE(5): Node dijadwalkan mingguan atau bulanan, dan sedang menunggu waktu penjadwalan tiba. Sistem penjadwalan secara teratur menjalankan node tetapi menetapkan status node ke Berhasil ketika sistem penjadwalan mulai menjalankan node.
            // CONDITION_UNCHOOSE(6): Node tidak dipilih oleh node cabang leluhurnya dan dijalankan sebagai node uji coba.
            // REALTIME_DEPRECATED(7): Node memiliki instance yang dihasilkan secara real-time tetapi sudah usang. Sistem penjadwalan menetapkan status node ke Berhasil.
            System.out.println("taskType: "+dataParam.getString("taskType"));
            // Waktu ketika instance node dimodifikasi.
            System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
            // Waktu ketika instance node dibuat.
            System.out.println("createTime: "+dataParam.getString("createTime"));
            // ID ruang kerja. Anda dapat memanggil operasi ListProjects untuk menanyakan ID.
            System.out.println("appId: "+dataParam.getString("appId"));
            // ID penyewa yang mengelola ruang kerja tempat instance node yang dipicu otomatis milik.
            System.out.println("tenantId: "+dataParam.getString("tenantId"));
            // Kode operasi instance node yang dipicu otomatis. Anda dapat mengabaikan nilai bidang ini.
            System.out.println("opCode: "+dataParam.getString("opCode"));
            // ID alur kerja. Untuk instance node yang dipicu otomatis, nilai bidang ini adalah 1. Untuk alur kerja yang dipicu manual atau instance node yang dipicu otomatis dari tipe alur kerja internal, nilai bidang ini adalah ID alur kerja aktual.
            System.out.println("flowId: "+dataParam.getString("flowId"));
            // ID node tempat instance node yang dipicu otomatis dihasilkan.
            System.out.println("nodeId:"+dataParam.getString("nodeId"));
            // Waktu ketika instance node yang dipicu otomatis mulai menunggu sumber daya.
            System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
            // ID instance node yang dipicu otomatis.
            System.out.println("taskId: "+dataParam.getString("taskId"));
            // Status node. Nilai valid:
            // 0: Node tidak berjalan.
            // 2: Node sedang menunggu waktu penjadwalan tiba. Waktu penjadwalan ditentukan oleh parameter dueTime atau cycleTime.
            // 3: Node sedang menunggu sumber daya.
            // 4: Node sedang berjalan.
            // 7: Tabel yang ditentukan dalam node diterbitkan ke Data Quality dan data dalam tabel diperiksa berdasarkan aturan pemantauan di Data Quality.
            // 8: Kondisi cabang sedang diperiksa.
            // 5: Node gagal dijalankan.
            // 6: Node berhasil dijalankan.
            System.out.println("status: "+dataParam.getString("status"));
        }else{
            System.out.println("Gagal menyaring jenis acara lainnya. Periksa konfigurasi parameter.");
        }
    }
}
            

Sebarkan dan jalankan kode di mesin lokal Anda

  1. Unduh file proyek demo:

    • Persyaratan lingkungan: Java 8 atau lebih baru dan Maven. Maven adalah alat otomatisasi pembuatan untuk Java.

    • Tautan unduhan untuk file proyek: event-demo-instance-status.zip.

  2. Pilih mode penyebaran.

    • Penyebaran Lokal: Setelah Anda mengemas file proyek menjadi paket JAR, jalankan perintah java -jar yourapp.jar di server lokal tempat Java 8 dan Maven diterapkan untuk memulai program layanan.

    • Penyebaran Berbasis Cloud: Setelah Anda mengemas file proyek menjadi paket JAR, unggah paket tersebut ke lingkungan runtime terkait, seperti kontainer Docker dan server cloud, untuk penyebaran.

    Catatan

    Setelah Anda menerapkan program layanan, pastikan EventBridge dapat mengakses layanan melalui Internet.

  3. Setelah Anda mengunduh file proyek, masuk ke direktori root proyek dan jalankan perintah berikut:

    mvn clean package -Dmaven.test.skip=true spring-boot:repackage

    Setelah Anda mendapatkan paket JAR yang dapat langsung diinstal, jalankan perintah berikut:

    java -jar target/event-demo-instance-status-1.0.jar

    Gambar berikut menunjukkan proyek yang berhasil dimulai.部署成功Masukkan http://localhost:8080/index di bilah alamat browser dan tekan Enter. Jika "hello world!" dikembalikan, ekstensi berhasil diterapkan. Anda dapat berlangganan pesan acara setelah koneksi jaringan dibuat antara DataWorks dan ekstensi Anda serta antara EventBridge dan ekstensi Anda.