All Products
Search
Document Center

DataWorks:Alur Kerja yang Dipicu

Last Updated:Mar 18, 2026

Berbeda dengan Recurring Workflow, yang dijalankan sesuai jadwal tetap (misalnya pukul 01.00 setiap hari), Triggered Workflow merupakan model pemrosesan data berbasis permintaan yang dipicu oleh event. Eksekusinya dipicu secara real-time oleh sinyal eksternal—seperti unggahan file, kedatangan pesan, panggilan API, atau klik manual—sehingga memberikan performa real-time dan fleksibilitas tinggi dalam pemrosesan data.

Fitur

Recurring workflow

Triggered workflow

Mekanisme pemicu

Jadwal tetap (ekspresi Crontab)

Sinyal eksternal (event, API, manual)

Model eksekusi

Terkjadwal dan dapat diprediksi

Responsif dan berdasarkan permintaan

Kasus penggunaan

Pembuatan gudang data batch T+1, laporan terjadwal

Pemrosesan file saat tiba, integrasi dengan sistem bisnis, patching data manual

Keunggulan utama

Keandalan dan jaminan eksekusi periodik

Responsivitas real-time dan fleksibilitas

Metode pemicu yang didukung

Triggered Workflow mendukung tiga metode pemicu. Anda dapat memilih metode yang paling sesuai dengan skenario bisnis Anda.

Metode pemicu

Pihak yang memulai

Skenario inti

Poin penting

versi

Sumber event eksternal (seperti OSS atau Kafka)

ETL berbasis event: Proses file saat tiba atau picu komputasi real-time berdasarkan pesan.

Anda harus terlebih dahulu membuat Trigger dan mengaitkannya dengan alur kerja. Hanya berlaku di Lingkungan Produksi.

versi

User (developer atau Engineer O&M)

Tugas ad-hoc: Pemrosesan atau analisis data satu kali.

Anda dapat menjalankannya secara manual di Lingkungan Pengembangan maupun Produksi. Direkomendasikan sebagai pengganti One-time Tasks.

API Trigger

Sistem eksternal (melalui OpenAPI)

Integrasi sistem: Picu pemrosesan data melalui callback dari sistem bisnis seperti CRM atau ERP.

Memerlukan pemanggilan OpenAPI dengan izin yang sesuai.

Panduan cepat: Membuat alur kerja yang dipicu secara manual

Bagian ini memandu Anda membuat dan menjalankan Triggered Workflow sederhana secara manual untuk menunjukkan prosesnya.

Langkah 1: Buat alur kerja yang dipicu

  1. Buka halaman Workspaces di konsol DataWorks. Di bilah navigasi atas, pilih wilayah yang diinginkan. Temukan ruang kerja yang dituju, lalu pilih Shortcuts > Data Studio pada kolom Actions.

  2. Di bilah navigasi kiri, klik image, lalu di sebelah kanan Project Folder, klik image > New Workflow untuk membuka halaman New Workflow.

  3. Pada kotak dialog yang muncul, di halaman Create Workflow, atur Scheduling Type ke Trigger-based Scheduling. Masukkan Name alur kerja, lalu klik Confirm.

Langkah 2: Orkestrasi dan kembangkan node

  1. Klik + Add Node di bilah alat untuk membuka daftar node. Seret node Shell dari daftar tipe node ke kanvas dan masukkan nama.

  2. Klik ganda node Shell untuk membuka editor kode dan masukkan kode berikut:

    echo "Hello, Trigger Workflow! Current time is ${bizdate}"
  3. Klik Save di bilah alat.

Langkah 3: Debug dan jalankan (lingkungan pengembangan)

  1. Kembali ke kanvas alur kerja dan klik ikon image di bilah alat atas.

  2. Pada kotak dialog yang muncul, masukkan Value for This Run untuk alur kerja (misalnya, jika tanggal saat ini adalah 20260310, nilai untuk bizdate harus 20260309).

  3. Tak lama kemudian, di log waktu proses di bawah, Anda dapat melihat status berjalan node dan output perintah echo.

Langkah 4: Publikasikan dan jalankan (lingkungan produksi)

  1. Di kanvas alur kerja, klik tombol Publish image dan ikuti petunjuk untuk mempublikasikannya.

  2. Setelah alur kerja dipublikasikan, buka Operation Center > One-time Task O&M > One-time Task > Triggered Workflow.

  3. Temukan alur kerja yang telah dipublikasikan, lalu klik Run di kolom Operation.

  4. Pada jendela yang muncul, klik Run lagi untuk memicu instans alur kerja di Lingkungan Produksi. Anda dapat melihat detail eksekusi ini di halaman One-time Instance.

Anda kini telah mempelajari dasar-dasar Triggered Workflow. Selanjutnya, panduan ini akan menjelajahi kemampuan pemicu berbasis event yang lebih canggih.

Kasus penggunaan lanjutan: Alur kerja yang dipicu oleh event

Skenario 1: Memproses file OSS baru

Tujuan: Saat file CSV baru diunggah ke direktori tertentu di Object Storage Service (OSS), otomatis picu alur kerja yang mencetak path file tersebut.

Langkah 1: Buat pemicu OSS

  1. Buka Operation Center > Scheduling Settings > Trigger Management.

  2. Klik New Trigger dan konfigurasikan sebagai berikut:

    Catatan

    Untuk deskripsi parameter lengkap, lihat OSS Trigger.

    • Trigger Name: Masukkan nama kustom, misalnya oss_new_file_trigger.

    • Applicable workspace: Pilih ruang kerja target tempat alur kerja Anda berada.

    • Trigger Event Type: Pilih Object Storage Service (OSS).

    • Trigger Event: Pilih oss:ObjectCreated:PutObject (atau event unggah lainnya).

    • Bucket name: Pilih bucket OSS Anda.

    • File Name: Tentukan path dan format file yang akan dipantau. Wildcard didukung. Misalnya, untuk memantau direktori input/ terhadap semua file .csv, Anda dapat memasukkan input/*.csv.

    • Role Configuration: Untuk penggunaan pertama kali, lakukan One-click Authorization dan pilih role yang dihasilkan bernama DataWorks-EventBridge-OSS-MNS-Role-*************.

      ************* merepresentasikan ID acak 13 digit yang digunakan untuk memastikan keunikan.
  3. Klik Confirm untuk membuat Trigger.

Langkah 2: Buat dan kaitkan alur kerja

  1. Ikuti langkah-langkah di Panduan Cepat: Buat Alur Kerja yang Dipicu Manual untuk membuat Triggered Workflow bernama process_oss_file_workflow.

  2. Di panel kanan kanvas alur kerja, pilih Schedule Configuration > Scheduling Policy.

  3. Dari daftar drop-down Trigger, pilih oss_new_file_trigger yang baru saja Anda buat.

    image

Langkah 3: Uraikan parameter event

  1. Klik + Add Node di bilah alat untuk membuka daftar node. Seret node Shell dari daftar tipe node ke kanvas dan masukkan nama untuk membuatnya.

  2. Klik ganda node tersebut dan tulis kode untuk mengambil serta mencetak path file dari event pemicu.

    # Saat Trigger memulai alur kerja, informasi event diteruskan melalui variabel bawaan workflow.triggerMessage.
    # Anda dapat mengakses path lengkap file yang diunggah menggunakan ${workflow.triggerMessage.data.oss.object.key}.
    
    echo "========= Start Processing OSS File ========="
    message="${workflow.triggerMessage}"
    echo "Raw Value: ${message}"
    
    # Ekstrak nama file dari pesan event
    FILE_PATH="${workflow.triggerMessage.data.oss.object.key}"
    echo "A new file has arrived: ${FILE_PATH}"
    
    # Tambahkan logika pemrosesan spesifik Anda di sini
    
    echo "========= Finish Processing OSS File ========="
    Catatan

    ${workflow.triggerMessage}: Mengambil isi pesan event lengkap dalam format JSON. Anda dapat memperoleh format pesan spesifik untuk OSS di EventBridge di bawah Event Bus > DATAWORKS_TRIGGER_FOR_BUCKET_<OSS_Bucket_Name> > Event Tracing > Event Details.

    Contoh format pesan OSS

    {
        "datacontenttype": "application/json;charset=utf-8",
        "aliyunaccountid": "1***********9",
        "data": {
            "eventVersion": "1.0",
            "responseElements": {
                "requestId": "69B1***********C0A8"
            },
            "eventSource": "acs:oss",
            "eventTime": "2026-03-11T05:40:45.000Z",
            "requestParameters": {
                "sourceIPAddress": "***********"
            },
            "eventName": "ObjectCreated:PostObject",
            "userIdentity": {
                "principalId": "1***********9"
            },
            "region": "cn-hangzhou",
            "oss": {
                "bucket": {
                    "name": "******",
                    "arn": "acs:oss:cn-hangzhou:1***********9:******",
                    "virtualBucket": "",
                    "ownerIdentity": "1***********9"
                },
                "ossSchemaVersion": "1.0",
                "object": {
                    "size": 59537,
                    "objectMeta": {
                        "mimeType": "text/csv"
                    },
                    "deltaSize": 0,
                    "eTag": "63***********D32",
                    "key": "input/***********.csv"
                }
            }
        },
        "subject": "acs:oss:cn-hangzhou:1***********9:dwoss1024/input/******.csv",
        "aliyunoriginalaccountid": "1***********9",
        "source": "acs.oss",
        "type": "oss:ObjectCreated:PostObject",
        "aliyunpublishtime": "2026-03-11T05:40:45.682Z",
        "specversion": "1.0",
        "aliyuneventbusname": "DATAWORKS_TRIGGER_FOR_BUCKET_******",
        "id": "69B1***********0A8",
        "time": "2026-03-11T05:40:45.000Z",
        "aliyunregionid": "cn-hangzhou"
    }

Langkah 4: Debug dan publikasikan

  1. Debug:

    • Kembali ke kanvas alur kerja dan klik tombol Run image.

    • Di kotak input Trigger Message Body, tempel contoh event OSS dalam format JSON. Anda dapat menyalin contoh format pesan dari halaman konfigurasi trigger dan mengubah nilai key. Berikut contoh sederhananya.

      {
        "data": {
          "oss":{
            "object": {
              "key": "input/test_file_20260310.csv" 
            }
          } 
        }
      }
    • Klik Run, lalu periksa apakah input/test_file_20260310.csv berhasil dicetak di log.

  2. Publish: Setelah debugging berhasil, klik tombol Publish untuk mempublikasikan alur kerja ke Lingkungan Produksi. Pemicu berbasis event hanya berlaku di Lingkungan Produksi.

Langkah 5: Verifikasi di produksi

  1. Dengan menggunakan konsol OSS atau klien, unggah file CSV ke bucket dan path yang telah Anda konfigurasikan di trigger (misalnya, direktori input/).

    Verifikasi pemicu event

    Buka https://eventbridge.console.alibabacloud.com/<regionId>/event-bus/DATAWORKS_TRIGGER_FOR_BUCKET_<OssBucketName>/event-tracing untuk melihat daftar event yang baru dipicu. Anda juga dapat mengklik Event Details untuk melihat pesan yang dipicu secara spesifik (workflow.triggerMessage).

    image

  2. Buka DataWorks Operation Center > One-time Task Operations > One-time Task > Triggered Workflow. Alur kerja process_oss_file_workflow yang berhasil dipublikasikan akan ditampilkan.

    image

  3. Setelah menunggu sebentar, buka DataWorks Operation Center > One-time Task O&M > Triggered Workflow Instances. Instans alur kerja baru akan dipicu secara otomatis. Klik untuk melihat log-nya dan pastikan path file diproses dengan benar.

Penting

Praktik terbaik: Desain idempotensi

Event OSS mungkin dikirimkan berulang kali karena faktor seperti fluktuasi jaringan. Untuk menghindari pemrosesan data duplikat, kami menyarankan Anda menerapkan idempotensi dalam logika bisnis Anda. Solusi umum adalah memeriksa tabel catatan, seperti tabel MaxCompute, sebelum memproses file. Gunakan ETag atau path unik file sebagai pengenal. Jika file sudah diproses, lewati pemrosesannya.

Skenario 2: Memproses pesan Kafka

Tujuan: Pantau topik Kafka untuk log perilaku pengguna. Saat pesan baru tiba, picu alur kerja untuk menguraikannya dan mengeksekusi logika berbeda berdasarkan isinya.

Langkah 1: Buat pemicu Kafka

  1. Buka Operation Center > Scheduling Settings > Trigger Management dan klik New Trigger.

  2. Konfigurasikan parameter berikut:

    • Trigger Name: kafka_user_action_trigger.

    • Trigger event type: Pilih Message Queue for Apache Kafka.

    • Kafka instance dan Topic: Pilih instans dan topik yang ingin Anda pantau.

    • ConsumerGroupId: Pilih Quick Create agar sistem secara otomatis menghasilkan ID kelompok konsumen dan menghindari konflik dengan aplikasi lain.

    • Key (Opsional): Anda dapat menentukan kunci pesan. Hanya pesan dengan kunci yang persis sama yang akan memicu alur kerja.

  3. Klik Confirm.

Langkah 2: Buat dan kaitkan alur kerja

  1. Ikuti langkah-langkah di Panduan Cepat: Buat Alur Kerja yang Dipicu Manual untuk membuat Triggered Workflow baru bernama handle_user_action_workflow.

  2. Di panel kanan kanvas alur kerja, pilih Schedule Configuration > Scheduling Policy.

  3. Di daftar drop-down Trigger, pilih kafka_user_action_trigger yang baru dibuat.

    image

  4. Penting: Karena pesan dapat tiba dengan frekuensi tinggi, kami menyarankan Anda mengatur Maximum Parallel Instances for Internal Tasks ke nilai seperti 100 untuk mencegah lonjakan pesan tiba-tiba membebani sumber daya penjadwalan.

Langkah 3: Uraikan JSON bersarang

Asumsikan bidang value dari pesan Kafka berupa string JSON dengan format berikut: {"user_id": "1001", "action_type": "login", "timestamp": 1688888888}.

  1. Klik + Add Node di bilah alat untuk membuka daftar node. Seret node Python ke kanvas.

  2. Tulis kode untuk mengurai pesan tersebut. Karena value itu sendiri berupa string, Anda perlu melakukan penguraian JSON kedua dalam kode Anda.

    import json
    
    # 1. Gunakan variabel bawaan untuk mendapatkan bidang 'value' dari pesan Kafka, yang berupa string JSON.
    message_value_str = '${workflow.triggerMessage.value}'
    
    print(f'Received raw message value string: {message_value_str}')
    
    try:
        # 2. Dalam kode Python, uraikan string ini menjadi objek JSON (dictionary).
        message_data = json.loads(message_value_str)
        
        user_id = message_data.get("user_id")
        action_type = message_data.get("action_type")
        print(f"Successfully parsed message. User ID: {user_id}, Action: {action_type}")
        
        # 3. Anda kemudian dapat mengeksekusi logika bisnis berbeda berdasarkan action_type.
        if action_type == 'login':
            # o.run_sql(f"INSERT OVERWRITE TABLE user_login_record PARTITION(ds='{bizdate}') VALUES ('{user_id}');")
            print("Processing login action...")
        elif action_type == 'purchase':
            print("Processing purchase action...")
        else:
            print("Unknown action type.")
            
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        # Tambahkan logika penanganan exception, seperti menulis pesan error ke tabel log khusus.
        raise e # Bangkitkan kembali exception untuk menandai node sebagai gagal guna mempermudah troubleshooting.

Langkah 4: Debug dan publikasikan

  1. Debug:

    • Kembali ke kanvas alur kerja dan klik tombol Run image.

    • Di Trigger Message Body, tempel event Kafka simulasi. Perhatikan bahwa bidang value berupa string JSON yang telah di-escape.

      {
        "topic": "user-behavior-topic",
        "key": "some-key",
        "value": "{\"user_id\": \"1001\", \"action_type\": \"login\", \"timestamp\": 1688888888}"
      }
    • Jalankan dan periksa log untuk memastikan node Python berhasil mengurai user_id dan action_type.

  2. Publish: Setelah debugging berhasil, publikasikan alur kerja ke Lingkungan Produksi.

Langkah 5: Verifikasi di produksi

  1. Kirim pesan dengan format yang benar ke topik Kafka yang telah Anda konfigurasikan.

    image

  2. Buka Operation Center > One-time Task O&M > One-time Task > Triggered Workflow di DataWorks. Alur kerja handle_user_action_workflow yang berhasil dipublikasikan akan ditampilkan.

    image

  3. Di Operation Center > One-time Task O&M > One-time Instance > Triggered Workflow Instances, verifikasi bahwa instans alur kerja baru telah dipicu dan periksa log eksekusinya.

    image

Penting

Praktik terbaik: Kontrol konkurensi dan jaminan urutan

  • Kontrol konkurensi: Selalu atur jumlah maksimum instans paralel yang wajar untuk menangani lonjakan pesan.

  • Jaminan urutan: Penjadwalan DataWorks tidak menjamin urutan pemrosesan pesan yang ketat. Jika Anda perlu memastikan pesan untuk pengguna yang sama (atau partisi) diproses berurutan, Anda harus menerapkan kunci terdistribusi (misalnya, menggunakan Redis atau MaxCompute) dalam kode bisnis Anda. Alternatifnya, Anda dapat mendelegasikan logika pemrosesan ke mesin komputasi yang menjamin konsumsi berurutan per partisi, seperti Flink.

Desain dan konfigurasi inti

Orkestrasi alur kerja

Proses inti orkestrasi Triggered Workflow mirip dengan Recurring Workflow. Untuk informasi selengkapnya, lihat Orchestrate nodes and workflows.

Parameter penjadwalan

Di panel Schedule Configuration di sisi kanan kanvas alur kerja, Anda dapat mengatur parameter global untuk workflow. Semua node di dalamnya dapat mereferensikan parameter ini.

  • Sintaks referensi: Di kode node, referensikan parameter alur kerja dengan format ${workflow.parameter_name}.

  • Prioritas parameter: Parameter di DataWorks memiliki hubungan override hierarkis. Urutan prioritasnya adalah: Parameter node > Parameter alur kerja.

    Untuk informasi selengkapnya tentang parameter, lihat Parameter design and flow.

Kebijakan penjadwalan

Saat beberapa alur kerja atau tugas dipicu secara bersamaan hingga menyebabkan bottleneck sumber daya sistem, Anda dapat menggunakan Priority dan Priority Weighting Policy untuk menerapkan penjadwalan sumber daya cerdas. Hal ini memastikan tugas paling penting dieksekusi terlebih dahulu.

  • Menjamin kelangsungan bisnis inti: Atur prioritas lebih tinggi untuk alur kerja bisnis inti agar selalu dijalankan sebelum alur kerja non-inti lainnya.

  • Mengurangi durasi jalur kritis: Dalam satu instans alur kerja, Anda dapat menggunakan Priority Weighting Policy untuk memengaruhi urutan eksekusi node. Misalnya, dengan kebijakan Downward Weighting, node pada jalur kritis yang memiliki lebih banyak dependensi hulu akan diberi bobot dinamis lebih tinggi. Hal ini memprioritaskan eksekusinya, sehingga dapat memperpendek waktu proses alur kerja secara keseluruhan.

    version

    versi

    Priority

    Menentukan tingkat prioritas absolut suatu instans alur kerja dalam antrian penjadwalan. Tingkat yang tersedia adalah 1, 3, 5, 7, dan 8, di mana angka lebih tinggi menunjukkan prioritas lebih tinggi. Tugas atau alur kerja berprioritas tinggi selalu menerima sumber daya penjadwalan sebelum yang berprioritas rendah.

    Priority Weighting Policy

    Menentukan cara bobot node internal (tugas) dihitung secara dinamis dalam tingkat prioritas yang sama. Node dengan bobot lebih tinggi diprioritaskan untuk dieksekusi.

    • Tanpa pembobotan: Semua node memiliki bobot garis dasar tetap.

    • Downward Weighting: Bobot node disesuaikan secara dinamis. Semakin banyak dependensi hulu yang dimiliki node, semakin tinggi bobotnya. Strategi ini membantu memprioritaskan eksekusi node pada jalur kritis dalam DAG (Directed Acyclic Graph). Bobot dihitung sebagai berikut: Bobot awal + Jumlah prioritas semua node hulunya.

    Maximum Parallel Instances

    Mengontrol jumlah maksimum instans alur kerja yang dapat berjalan secara konkuren. Ini digunakan untuk kontrol konkurensi dan perlindungan sumber daya. Saat jumlah instans yang sedang berjalan mencapai batas, instans baru yang dipicu akan masuk ke status menunggu. Anda dapat mengatur nilai ini ke Unlimited atau menentukan nilai maksimum kustom hingga 100.000.

    Catatan

    Jika batas yang ditentukan melebihi kapasitas maksimum kelompok sumber daya, batas fisik kelompok sumber daya tersebut menjadi bottleneck konkurensi aktual.

Sistem prioritas DataWorks mengikuti aturan override hierarkis: Spesifikasi runtime > Konfigurasi tingkat node > Konfigurasi tingkat alur kerja.

  1. Konfigurasi tingkat alur kerja (Garis dasar): Diatur di Scheduling Policy alur kerja, berfungsi sebagai pengaturan default untuk semua node.

  2. Konfigurasi tingkat node (Lokal): Di Schedule Configuration > Scheduling Policy node individual dalam alur kerja, Anda dapat mengatur Priority lebih tinggi untuk node tersebut, yang menggantikan pengaturan tingkat alur kerja.

  3. Spesifikasi runtime (Sementara): Saat memicu eksekusi manual di Operation Center, Anda dapat menentukan konfigurasi menggunakan sakelar Reset Priority At Runtime. Konfigurasi ini memiliki prioritas tertinggi, hanya berlaku untuk eksekusi saat ini, dan tidak mengubah pengaturan permanen apa pun.

O&M dan manajemen

  • Pemantauan instans: Anda dapat melihat, menjalankan ulang, menghentikan, dan melakukan troubleshooting semua instans yang dipicu atau dijalankan secara manual di halaman Operation Center > One-time Task O&M > One-time Instance.

  • Clone alur kerja: Di Business Flow, klik kanan alur kerja dan pilih Clone untuk menyalinnya dengan cepat menjadi alur kerja baru, termasuk semua node dan dependensinya. Untuk informasi selengkapnya, lihat Clone a workflow untuk recurring workflow.

  • Manajemen versi: Di panel Version di sisi kanan kanvas alur kerja, Anda dapat melihat, membandingkan, dan mengembalikan ke versi historis alur kerja. Untuk informasi selengkapnya, lihat Version Management untuk recurring workflow.

Batasan dan catatan

  • Lingkungan berlaku: Mekanisme Event Trigger hanya berlaku setelah alur kerja dipublikasikan ke Lingkungan Produksi (Operation Center).

  • Jumlah node: Satu alur kerja mendukung maksimal 400 node. Kami menyarankan agar jumlahnya di bawah 100 untuk mempermudah pemeliharaan.

  • Batas konkurensi: Jumlah maksimum instans paralel adalah 100.000, tetapi kapasitas konkuren aktual dibatasi oleh spesifikasi kelompok sumber daya penjadwalan yang dibeli.

  • Penjadwalan tingkat node: Saat mengonfigurasi penjadwalan di tingkat node, Anda hanya dapat mengatur Priority, bukan Priority Weighting Policy.

Dokumentasi terkait