Unlike a scheduled workflow, which runs on a predefined schedule (such as at 1:00 AM every day), a triggered workflow is an on-demand, event-driven data processing model. Its execution is triggered in real time by an external signal — such as a file upload, message arrival, API call, or manual click — providing high real-time performance and flexibility for data processing.
| Feature | Scheduled workflow | Triggered workflow |
|---|---|---|
| Trigger mechanism | Fixed schedule (cron expression) | External signal (event, API, manual) |
| Execution model | Scheduled and predictable | Reactive and on-demand |
| Use cases | T+1 batch data warehousing, scheduled reports | Processing files upon arrival, integrating with business systems, manual data repair |
| Key advantages | Stability and periodic guarantee | Real-time responsiveness and flexibility |
Supported trigger methods
A triggered workflow supports three trigger methods. Choose one based on your scenario.
| Trigger method | Initiator | Core scenarios | Key points |
|---|---|---|---|
| Event trigger | External event source (such as OSS or ApsaraMQ for Kafka) | Event-driven ETL: process files as they arrive, or trigger real-time computation from messages | Create a trigger first and associate it with the workflow. Only takes effect in the production environment. |
| Manual trigger | User (developer or operator) | Ad-hoc tasks: one-time data processing or analysis | Runs in both the development and production environments. Recommended as a replacement for manually triggered workflows. |
| API trigger | External system (via OpenAPI) | System integration: trigger data processing through a callback from a business system such as a CRM or ERP | Requires calling an OpenAPI with the necessary permissions. |
Quick start: Create a manually triggered workflow
This section walks you through creating and manually running a simple triggered workflow.
Prerequisites
Before you begin, ensure that you have:
-
An Alibaba Cloud account with access to DataWorks
-
A DataWorks workspace
Step 1: Create a triggered workflow
-
Go to the Workspaces page in the DataWorks console. In the top navigation bar, select a region. Find the target workspace and choose Shortcuts > Data Studio in the Actions column.
-
In the left navigation bar, click
, and then next to Workspace Directories, click  > Create Workflow to open the Create Workflow page. -
Set Scheduling Type to Trigger-based Scheduling, enter a workflow Name, and click Confirm.
Step 2: Orchestrate and develop nodes
-
Click + Add Node in the toolbar to open the node list. Drag a Shell node to the canvas and enter a name.
-
Double-click the Shell node to open the code editor, then enter the following code:
bizdate=$1 echo "Hello, Trigger Workflow! Current time is ${bizdate}" -
Click Save in the toolbar.
Step 3: Debug in the development environment
-
Return to the workflow canvas and click the
icon in the top toolbar. -
Enter the run value for the workflow. For example, if the current date is 20260310, set
bizdateto20260309. -
Check the runtime log below the canvas to see the node status and the output of the
echocommand.
Step 4: Deploy and run in the production environment
-
On the workflow canvas, click Publish
and follow the prompts to publish the workflow. -
Go to Operation Center > Manually Triggered Node O&M > Manually Triggered Node > Triggered Workflow.
-
Find the deployed workflow and click Run in the Actions column.
-
In the dialog box that appears, click Run to trigger a workflow instance in the production environment. View the details of this run on the manual instance page.
You now know the basics of a triggered workflow. The following sections cover the more powerful event-triggering capabilities.
Advanced use cases: Event-triggered workflows
Scenario 1: Process new OSS files
Objective: When a new CSV file is uploaded to a specified directory in Object Storage Service (OSS), automatically trigger a workflow that prints the file path.
Step 1: Create an OSS trigger
-
Go to Operation Center > Tenant Schedule Setting > Trigger Management.
-
Click Create Trigger and configure the following parameters:
-
Trigger Name: Enter a custom name, such as
oss_new_file_trigger. -
Workspace: Select the workspace where the workflow is located.
-
Trigger Type: Select OSS.
-
Trigger Event: Select
oss:ObjectCreated:PutObject(or another upload event). -
Bucket name: Select the OSS bucket to monitor.
-
File Name: Specify the file path and format to monitor. Wildcards are supported. To monitor the
input/directory for all CSV files, enterinput/*.csv. -
Configure Role: For first-time use, click authorization and select the generated role named **DataWorks-EventBridge-OSS-MNS-Role-\*\*\*\*\*\*\*\*\*\*\*\*\*** (the 13-digit suffix ensures uniqueness).
For detailed parameter descriptions, see OSS trigger.
-
-
Click OK to create the trigger.
Step 2: Create and associate the workflow
-
Follow Step 1: Create a triggered workflow to create a triggered workflow named
process_oss_file_workflow. -
In the right-side panel of the workflow canvas, select Properties > Scheduling Policies.
-
From the Trigger dropdown list, select the
oss_new_file_triggeryou just created.
Step 3: Parse the event payload
When a trigger fires, DataWorks passes the event information through the built-in variable ${workflow.triggerMessage}. This variable contains the complete event message in JSON format. Use ${workflow.triggerMessage.data.oss.object.key} to access the path of the uploaded file.
-
Click + Add Node in the toolbar, drag a Shell node to the canvas, and enter a name.
-
Double-click the node and enter the following code:
To inspect the full OSS event message format, go to EventBridge and navigate to Event Buses >
DATAWORKS_TRIGGER_FOR_BUCKET_<OSS_Bucket_Name>> Event Tracking > Event Detail. An example message is shown below.echo "========= Start Processing OSS File =========" message="${workflow.triggerMessage}" echo "Raw Value: ${message}" # Extract the file path from the event message FILE_PATH="${workflow.triggerMessage.data.oss.object.key}" echo "A new file has arrived: ${FILE_PATH}" # Add your specific processing logic here echo "========= Finish Processing OSS File ========="
Step 4: Debug and deploy
-
Debug: Return to the workflow canvas and click the Run
button. In the Trigger Message Body input box, paste a sample OSS event in JSON format. Copy the message format from the trigger configuration page and modify the keyvalue. The following is a minimal example:{ "data": { "oss":{ "object": { "key": "input/test_file_20260310.csv" } } } }Click Run and confirm that
input/test_file_20260310.csvis printed in the logs. -
Deploy: After successful debugging, click Deploy to deploy the workflow to the production environment.
ImportantEvent triggering only takes effect in the production environment.
Step 5: Verify in production
-
Using the OSS console or a client, upload a CSV file to the bucket and path configured in the trigger (for example, the
input/directory). -
Go to DataWorks Operation Center > Manually Triggered Node O&M > Manually Triggered Node > Triggered Workflow. The deployed
process_oss_file_workflowis listed there.
-
After a short wait, go to Operation Center > Manually Triggered Node O&M > Triggered Workflow Instance. A new workflow instance should appear. Click to view its logs and confirm that the file path was processed correctly.
Best practice: Idempotency design OSS events may be delivered more than once due to network fluctuations. To avoid duplicate processing, implement idempotency in your business logic. A common approach is to check a record table — such as a MaxCompute table — before processing a file, using the file's ETag or unique path as the identifier. If the file has already been processed, skip it.
Scenario 2: Process Kafka messages
Objective: Monitor an ApsaraMQ for Kafka topic for user behavior logs. When a new message arrives, trigger a workflow that parses it and executes different logic based on the content.
Step 1: Create a Kafka trigger
-
Go to Operation Center > Tenant Schedule Setting > Trigger Management and click Create Trigger.
-
Configure the following parameters:
-
Trigger Name:
kafka_user_action_trigger. -
Trigger Type: Select ApsaraMQ for Kafka.
-
Kafka Instance and Topic: Select the instance and topic to monitor.
-
ConsumerGroupId: Select Quick Create to have the system auto-generate a consumer group ID and avoid conflicts with other applications.
-
Key (Optional): Specify a message key. Only messages with an exact key match trigger the workflow.
-
-
Click OK.
Step 2: Create and associate the workflow
-
Follow Step 1: Create a triggered workflow to create a triggered workflow named
handle_user_action_workflow. -
In the right-side panel of the workflow canvas, select Properties > Scheduling Policies.
-
In the Trigger dropdown list, select the
kafka_user_action_triggeryou just created.
-
Because Kafka messages can arrive at a high frequency, set Maximum Parallel Instances for Internal Tasks to a value such as
100to prevent message spikes from overwhelming the scheduling resources.
Step 3: Parse the event payload
The workflow.triggerMessage variable holds the complete Kafka event in JSON format. However, the value field inside the message is a JSON-encoded string, not a JSON object. This means your code must parse the value field a second time after retrieving it.
For example, if the value field contains {"user_id": "1001", "action_type": "login", "timestamp": 1688888888}:
-
Click + Add Node in the toolbar, drag a Python node to the canvas, and enter a name.
-
Enter the following code:
import json # Step 1: Retrieve the 'value' field from the Kafka message. # This is a JSON-encoded string, not a JSON object. message_value_str = '${workflow.triggerMessage.value}' print(f'Received raw message value string: {message_value_str}') try: # Step 2: Parse the string into a Python dictionary. message_data = json.loads(message_value_str) user_id = message_data.get("user_id") action_type = message_data.get("action_type") print(f"Successfully parsed message. User ID: {user_id}, Action: {action_type}") # Step 3: Execute different logic based on action_type. if action_type == 'login': # o.run_sql(f"INSERT OVERWRITE TABLE user_login_record PARTITION(ds='{bizdate}') VALUES ('{user_id}');") print("Processing login action...") elif action_type == 'purchase': print("Processing purchase action...") else: print("Unknown action type.") except json.JSONDecodeError as e: print(f"Error decoding JSON: {e}") # Write the error to a dedicated log table for troubleshooting. raise e # Re-raise to mark the node as failed.
Step 4: Debug and deploy
-
Debug: Return to the workflow canvas and click the Run
button. In the Trigger Message Body input box, paste the following simulated Kafka event. Note that the valuefield is an escaped JSON string:{ "topic": "user-behavior-topic", "key": "some-key", "value": "{\"user_id\": \"1001\", \"action_type\": \"login\", \"timestamp\": 1688888888}" }Run and check the logs to confirm that the Python node correctly parses
user_idandaction_type. -
Deploy: After successful debugging, deploy the workflow to the production environment.
Step 5: Verify in production
-
Send a message in the correct format to the configured Kafka topic.

-
Go to Operation Center > Manually Triggered Node O&M > Manually Triggered Node > Triggered Workflow. The deployed
handle_user_action_workflowis listed there.
-
Go to Operation Center > Manually Triggered Node O&M > Manually Triggered Node Instance > Triggered Workflow Instance to confirm that a new instance was triggered, and check the runtime log.

Best practice: Concurrency and ordering
-
Concurrency control: Always set a reasonable maximum number of parallel instances to handle message spikes.
-
Order guarantee: DataWorks scheduling does not guarantee strict message ordering. For ordered processing per user or partition, implement a distributed lock (for example, using Redis or MaxCompute) in your business code. Alternatively, delegate the processing to a computing engine that guarantees ordered consumption per partition, such as Flink.
Core design and configuration
Workflow orchestration
Orchestrating a triggered workflow follows the same process as a periodic workflow. For details, see Orchestrate nodes and workflows.
Scheduling parameters
Set global parameters for the workflow in the Properties panel on the right side of the workflow canvas. All nodes within the workflow can reference these parameters.
-
Reference syntax: In node code, reference a workflow parameter as
${workflow.parameter_name}. -
Parameter priority: Node parameters take precedence over workflow parameters.
For details, see Parameter design and flow.
Scheduling policy
When multiple workflows are triggered simultaneously and compete for scheduling resources, use priority and priority weighting to ensure critical tasks run first.
| Configuration item | Description |
|---|---|
| Priority | The absolute priority level of a workflow instance in the scheduling queue. Available levels are 1, 3, 5, 7, and 8, where a higher number indicates higher priority. High-priority tasks or workflows always receive scheduling resources before low-priority ones. |
| Priority weighting policy | Determines how the weights of internal nodes are dynamically calculated within the same priority level. No weighting: all nodes have a fixed baseline weight. Downward weighting: a node's weight is based on the number of upstream dependencies — the more upstream nodes, the higher the weight. This helps prioritize nodes on the critical path in a Directed Acyclic Graph (DAG). Formula: Initial weight + Sum of the priorities of all upstream nodes. |
| Maximum parallel instances | The maximum number of instances of this workflow that can run concurrently. Set to Unlimited or a custom value up to 100,000. When the limit is reached, new triggered instances enter a waiting state. Note
The actual concurrency is also bounded by the resource group's capacity. |
Priority settings follow a hierarchical override:
-
Workflow-level configuration (baseline): Set in the workflow's Scheduling Policies. Applies to all nodes by default.
-
Node-level configuration (local): Set in Properties > Scheduling Policies of an individual node, overriding the workflow-level setting. Only Priority is configurable at this level — not the priority weighting policy.
-
Runtime specification (temporary): Set via the Runtime Priority Reset switch in Operation Center when manually triggering a run. This has the highest precedence and applies only to the current run without changing permanent settings.
O&M and management
-
Instance monitoring: View, rerun, terminate, and check logs of all triggered or manually run instances at Operation Center > Manually Triggered Node O&M > Manually Triggered Node Instance.
-
Clone a workflow: In Workspace Directories, right-click a workflow and select Clone to copy it — including all nodes and dependencies — into a new workflow. For details, see Clone a workflow.
-
Version management: In the version panel on the right side of the workflow canvas, view, compare, and revert to historical versions. For details, see Version Management.
Limitations
-
Effective environment: The event trigger mechanism only takes effect after the workflow is deployed to the production environment.
-
Node count: A single workflow supports a maximum of 400 nodes. Keep the count under 100 to simplify maintenance.
-
Concurrency limit: The maximum number of parallel instances is 100,000, but the actual concurrent capacity is limited by the specifications of the purchased resource group for scheduling.
-
Node-level scheduling: At the node level, only Priority is configurable — the priority weighting policy is not.
