The Flink SQL Streaming node in the new Data Studio of DataWorks lets you use standard SQL to define real-time task processing logic. Flink SQL Streaming is easy to use, supports extensive SQL, and offers robust state management and fault tolerance. It is compatible with both event time and processing time and can be scaled flexibly. The node integrates easily with systems such as Kafka and HDFS, and provides detailed logs and performance monitoring tools. To begin, add a Flink SQL Streaming task to your DataWorks project and write SQL statements. This topic explains how to develop and use a Flink SQL Streaming node in DataWorks to process real-time data.
Prerequisites
You have associated a compute resource for Realtime Compute for Apache Flink in Administration. For more information, see Bind a compute engine.
You have created a Flink SQL Streaming node. For more information, see Create a node for a scheduling workflow.
You have granted the required OpenAPI permissions to the RAM user or RAM role that DataWorks uses to call Realtime Compute for Apache Flink APIs. These permissions allow DataWorks to submit and deploy the node task to a Flink cluster.
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": ["stream:CreateDeployment", "stream:UpdateDeployment", "stream:GetDeployment", "stream:DeleteDeployment"], "Resource": ["*"] } ] }
Limitations
This node cannot be used in a workflow; it must be developed and run as a standalone node.
Only serverless resource groups are supported. Legacy exclusive resource groups for scheduling are not supported.
Step 1: Develop the Flink SQL Streaming node
On the Flink SQL Streaming node editing page, develop the node task as described below.
Develop SQL code
In the SQL editor, you can define variables using the ${variable_name} format. Assign values to these variables in the Script Parameters section of the Real-Time configuration panel to pass parameters dynamically in scheduling scenarios. For example:
--Create the source table datagen_source.
CREATE TEMPORARY TABLE datagen_source(
name VARCHAR
) WITH (
'connector' = 'datagen'
);
--Create the result table blackhole_sink.
CREATE TEMPORARY TABLE blackhole_sink(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
--Insert data from the source table into the result table.
INSERT INTO blackhole_sink
SELECT
name
FROM datagen_source WHERE LENGTH(name) > ${name_length};In this example, the value of the parameter name_length is 5. This parameter filters the data to process only records where the name length is greater than 5 characters.
Step 2: Configure the Flink SQL Streaming node
Configure the Flink SQL Streaming node task with the following parameters based on your business requirements.
Configure Flink resources
In the Flink resource information section of the Real-Time configuration panel, configure the following parameters based on the selected Resource Mode. For more information, see Configure Flink resources.
Parameter | Description |
Flink cluster | The name of the fully managed Flink compute resource associated in Administration. |
Flink engine version | Select the engine version based on your needs. |
Resource Group | Select a serverless resource group that has network connectivity with Flink. |
Resource Mode supports the following two modes. For more information, see Configure Flink resources.
Configure the relevant parameters based on the resource mode you selected. A thorough understanding of the Flink architecture helps you configure parameters more effectively. For more information about the Flink architecture, see Flink Architecture | Apache Flink. | |
Basic mode | |
Job Manager CPU | Based on Flink best practices, JobManager requires at least 0.5 CPU cores and 2 GiB of memory for stable operation. The recommended configuration is 1 CPU core and 4 GiB of memory, with a maximum of 16 CPU cores. Adjust the configuration based on the cluster scale and job complexity. |
Job Manager Memory | The memory configuration of JobManager affects its ability to handle scheduling and management tasks. The recommended range is 2 GiB to 64 GiB to ensure stable and efficient operation. Adjust the size based on the cluster scale and job requirements. |
Task Manager CPU | The CPU resource configuration of TaskManager affects its task processing capability. Based on Flink best practices, at least 0.5 CPU cores and 2 GiB of memory are recommended, with a preferred configuration of 1 CPU core and 4 GiB of memory. The maximum is 16 CPU cores. Adjust the configuration based on your actual requirements. |
Task Manager Memory | The memory configuration of TaskManager determines the volume of data it can process and its performance. To ensure stable and efficient task execution, the memory size must be at least 2 GiB and can be set up to 64 GiB. |
Concurrency | Determines the number of parallel task executions in a Flink job. Higher concurrency can improve processing speed and resource utilization. Set this value appropriately based on the cluster resources and job characteristics. |
Number of slots per TaskManager | The number of slots per TaskManager determines how many tasks it can execute in parallel. You can adjust the slot configuration to optimize resource utilization and the parallel processing capability of jobs. |
Expert mode | |
Job Manager CPU | Based on Flink best practices, JobManager requires at least 0.25 CPU cores and 1 GiB of memory for stable operation, with a maximum of 16 CPU cores. Adjust the configuration based on the cluster scale and job complexity. |
Job Manager Memory | The memory configuration of JobManager affects its ability to handle scheduling and management tasks. The recommended range is 1 GiB to 64 GiB to ensure stable and efficient operation. Adjust the size based on the cluster scale and job requirements. |
Number of slots per TaskManager | The number of slots per TaskManager determines how many tasks it can execute in parallel. You can adjust the slot configuration to optimize resource utilization and the parallel processing capability of jobs. |
Multiple SSG mode | By default, all operators are placed in a single slot sharing group, so you cannot modify the resource configuration for each operator individually. If you need to set resources for individual operators, enable Multiple SSG mode so that each operator has its own independent slot. You can then configure resources directly on the corresponding slot. |
(Optional) Configure script parameters
In the Script Parameters section of the Real-Time configuration panel in the right navigation pane, click Add parameters and edit the Parameter name and Parameter Value to dynamically use them in your code.
(Optional) Configure Flink running parameters
In the Flink running parameters section of the Real-Time configuration panel in the right navigation pane, configure the following parameters. For more information, see Configure Flink running parameters.
Parameter | Description |
System Checkpoint Interval | Specifies the time interval at which Flink performs periodic checkpoints for the job. A shorter interval reduces fault recovery time but increases system overhead. If this parameter is left empty, checkpoints are disabled. |
Minimum time interval between two system checkpoints | Specifies the minimum time that Flink must wait between consecutive checkpoints to prevent overly frequent checkpoints from affecting system performance. This ensures a minimum time gap between two checkpoints when the maximum parallelism for checkpoints is 1. |
State Data Expiration Time | Specifies the maximum time that state data in a Flink job can be retained without being accessed or updated. The default value is 36 hours, which means the job state information automatically expires and is cleared after 36 hours. This optimizes state storage and resource usage. Important The default value is based on cloud best practices and differs from the open-source default (0 in open source, which means state information never expires). |
Others | Supports other Flink running parameters. You can configure additional Flink running parameters here, for example: Note For more information about parameter configurations, see Configure Flink running parameters. |
After the task is configured, click Save to save the node task.
Step 3: (Optional) Debug the Flink SQL Streaming node
Before deploying the node to the production environment, you can use the debug feature to perform a trial run of the node code based on uploaded mock data. This allows you to verify the SQL logic and upstream and downstream data without deploying the task to Operation Center.
The debug feature is available through an allowlist. To use this feature, submit a ticket to request access.
Configure Flink resource information
In the Flink resource information section of the Run Configuration panel on the right side of the node editing page, configure the parameters as described in the following table.
Parameter | Description |
Flink Debug Cluster | The Flink session cluster used to run the debug task. This parameter is required. The drop-down list displays the existing session clusters under the current compute resource along with their running status. Only clusters in the Running state can be selected. If no clusters are available in the list, click Create Cluster to go to the Realtime Compute for Apache Flink console and create a session cluster. |
Flink Engine Version | The Flink engine version of the selected session cluster. This is automatically displayed by the system based on the cluster and does not require manual input. |
Timeout | The maximum runtime for a single debug task, in minutes. The default is 30 minutes. The debug task is automatically stopped after this duration. |
After you switch the compute resource for the current node, the selected Flink Debug Cluster and uploaded debug data are cleared. You must reselect a cluster and re-upload the data.
Prepare debug data
In the Debug Data section of the Run Configuration panel, prepare mock data for the source tables referenced in your code.
Click Generate Template. The system parses the source tables referenced in the current SQL and generates corresponding table name records in the list below. Previously uploaded data is not cleared.
In the Actions column of a table name record, click Download Template to download a CSV template that matches the schema of the source table.
Fill in the debug data locally in the column order of the template and save the file in CSV format.
In the Actions column of a table name record, click Upload and select the completed CSV file to upload. After the upload succeeds, the Status column displays Enabled.
(Optional) After the upload succeeds, you can click Preview to view the data content in the bottom panel. To modify the data, re-upload a CSV file to overwrite the existing data.
If you do not want the mock data of a specific source table to participate in the current debug session, click Disable to change the status to Disabled. To re-enable it, click Enable. Only data with the Enabled status is used in the debug session.
Before uploading debug data, select a Flink Debug Cluster first. Otherwise, you are prompted to select a compute resource first.
Debug data supports only the CSV format, with a maximum file size of 1 MB. The first row of the CSV file must contain column names, and UTF-8 encoding is recommended.
Run the debug task
After the debug data is prepared, click the Run button on the editor toolbar (or press F8). The system submits the code, mock data, and Flink resource information to the selected session cluster for execution.
If your code uses parameters in the ${variable_name} format, make sure that you have assigned values to the variables in the Script Parameters section. During debugging, the system replaces the placeholders in the code with the assigned values before submission.
View debug results
After the debug task runs, the result area at the bottom of the node provides the following information to help you quickly identify issues:
Code: The SQL code submitted to the Flink engine for this run (with variable substitution completed).
Logs: The runtime logs and error information of the debug task.
Query results: The output data of the debug task.
Step 4: Start the Flink SQL Streaming node
Deploy the Flink SQL Streaming node.
The task must be deployed to Operation Center before it can run. Follow the on-screen instructions to deploy the Flink SQL Streaming node. For more information, see Deploy a node.
NoteThe deployment operation also deploys the task to the Flink VVP workspace. You can view the tasks deployed through DataWorks in Flink VVP Operation Center > Job O&M.
Start the Flink SQL Streaming node.
After the task is deployed, click Go to operation and maintenance below Deploy to production environment. In Operation Center, go to , find the task that you want to start, and click Start in the Operation column to start the real-time task and view its running status.