A Flink SQL batch node lets you use standard SQL statements to define and run data processing tasks. Use it to analyze and transform large datasets for tasks such as data cleansing and aggregation. The node supports visual configuration and provides an efficient and flexible solution for large-scale batch processing. This topic describes how to use a Flink SQL batch node to process data in batches.
Prerequisites
You have created a workspace and bound a Realtime Compute for Apache Flink computing resource in Administration. For more information, see Bind computing resources.
You have created a Flink SQL batch node. For more information, see Create a node for a scheduling workflow.
You have granted the following API permissions to the RAM user or RAM role that DataWorks uses to call Realtime Compute for Apache Flink APIs. These permissions are required to submit and deploy node tasks to a Flink cluster. For more information, see Grant permissions.
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": ["stream:CreateDeployment", "stream:UpdateDeployment", "stream:GetDeployment", "stream:DeleteDeployment"], "Resource": ["*"] } ] }
Limitation
Only a serverless resource group is supported. The legacy exclusive resource group for scheduling is not supported.
Step 1: Develop the Flink SQL batch node
On the edit page for the Flink SQL batch node, develop the node task.
Develop SQL code
Develop your task code in the SQL editing area. In your code, you can define variables by using the ${variable_name} format. Then, on the right side of the node editing page, assign a value to the variable in the Scheduling Parameters section of the Scheduling Settings pane. This allows you to dynamically pass parameters to your code in scheduling scenarios. For more information about how to use scheduling parameters, see Scheduling parameter sources and their expressions. The following is an example.
-- Create a source table named datagen_source.
CREATE TEMPORARY TABLE datagen_source_${var}(
name VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '1000'
);
-- Create a result table named blackhole_sink.
CREATE TEMPORARY TABLE blackhole_sink_${var}(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
-- Insert data from the source table into the result table.
INSERT INTO blackhole_sink_${var}
SELECT
name
FROM datagen_source_${var};In this example, the parameter bizdate has a value of $[yyyymmdd], which enables batch synchronization of new daily data.
Step 2: Configure the Flink SQL batch node
Configure the task parameters for the Flink SQL batch node based on your business requirements.
Configure Flink resources
You can configure the following parameters on the right side of the edit page in the Flink resource information section under Scheduling Settings. For more information, see Configure schedule settings.
Parameter | Description |
Flink cluster | The name of the fully managed Flink compute resource bound in Administration. |
Flink engine version | Select an engine version based on your requirements. |
Resource Group for Scheduling | Select a serverless resource group that has network connectivity with Flink. |
Job Manager CPU | Based on Flink best practices, the JobManager requires at least 0.5 CPU cores and 2 GiB of memory for stable operation. We recommend 1 CPU core and 4 GiB of memory, with a maximum of 16 CPU cores. Adjust the configuration based on your cluster scale and job complexity. |
Job Manager Memory | The JobManager memory configuration affects its ability to handle scheduling and management tasks. The recommended range is 2 GiB to 64 GiB for stable and efficient operation. Adjust the value based on your cluster scale and job requirements. |
Task Manager CPU | The TaskManager CPU configuration affects its task processing capability. Based on Flink best practices, we recommend at least 0.5 CPU cores and 2 GiB of memory, with a recommended configuration of 1 CPU core and 4 GiB of memory, up to a maximum of 16 CPU cores. Adjust the configuration based on your requirements. |
Task Manager Memory | The TaskManager memory configuration determines the data volume and performance for task processing. To ensure stable and efficient task execution, the memory size must be at least 2 GiB and can be set up to 64 GiB. |
Concurrency | This parameter determines the number of parallel task executions in a Flink job. A higher concurrency can improve processing speed and resource utilization. Set this value based on your cluster resources and job characteristics. |
Maximum number of slots | A slot represents a fixed-size resource unit on a Task Manager that can be allocated to tasks. Each slot can run one task or operator instance. You can adjust the maximum number of slots based on your available resources. |
Number of slots per TaskManager | The number of slots per TaskManager determines how many tasks it can execute in parallel. You can adjust the slot configuration to optimize resource utilization and parallel processing capability. |
(Optional) Configure scheduling parameters
On the right side of the edit page, in the Scheduling Parameters section under Scheduling Settings, click Add parameters, and then edit the Parameter name and Parameter Value to dynamically use them in your code.
(Optional) Configure Flink runtime parameters
You can configure runtime parameters on the right side of the edit page in the Flink running parameters section under Scheduling Settings. For more information, see Configure schedule settings.
When configuring Flink runtime parameters, the syntax is compatible with VVP (Ververica Platform). You can write configurations directly in YAML format without adding semicolons or other special characters for line breaks.
To run the node task on a periodic schedule, configure the scheduling information (Scheduling Policy, Scheduling time, Scheduling Dependency, and Node output parameters) based on your business requirements. For more information, see Configure schedule settings.
After you complete the task configuration, click Save.
Step 3: (Optional) Debug the Flink SQL batch node
Before deploying the node to the production environment, you can use the debug feature to perform a trial run of the node code with uploaded mock data. This allows you to verify the SQL logic and upstream and downstream data without deploying the task to Operation Center.
The debug feature is available through an allowlist. To use this feature, submit a ticket to request access.
Configure Flink resource information
On the right side of the node editing page, in the Flink resource information section of the Run Configuration pane, configure the parameters as described in the following table.
Parameter | Description |
Flink Debug Cluster | The Flink session cluster used to run the debug task. This parameter is required. The drop-down list displays the existing session clusters under the current compute resource along with their running status. Only clusters with a status of Running can be selected. If no clusters are available in the list, click Create Cluster to go to the Realtime Compute for Apache Flink console and create a session cluster. |
Flink Engine Version | The Flink engine version of the selected session cluster. This value is automatically displayed by the system based on the cluster and does not require manual input. |
Timeout | The maximum duration of a single debug task, in minutes. The default value is 30 minutes. The debug task is automatically stopped after the specified duration elapses. |
After you switch the compute resource for the current node, the selected Flink Debug Cluster and the uploaded debug data are cleared. You must reselect a cluster and re-upload the data.
Prepare debug data
In the Debug Data section of the Run Configuration pane, prepare mock data for the source tables referenced in your code.
Click Generate Template. The system parses the source tables referenced in the current SQL code and generates corresponding table name records in the list below. Previously uploaded data is not cleared.
In the Actions column of the table name record, click Download Template to download a CSV template that matches the structure of the source table.
Fill in the debug data in the downloaded template following the column order and save the file in CSV format.
In the Actions column of the table name record, click Upload and select the completed CSV file to upload. After the upload succeeds, the Status column displays Enabled.
(Optional) After the upload succeeds, you can click Preview in the bottom panel to view the data. To modify the data, re-upload a CSV file to overwrite the existing data.
If you do not want the mock data of a specific source table to be used in the current debug session, click Disable to change the status to Disabled. To re-enable the data, click Enable. Only data with the Enabled status is used in the current debug session.
Before uploading debug data, you must first select a Flink Debug Cluster. Otherwise, you are prompted to select a compute resource first.
Debug data supports only the CSV format, and the file size cannot exceed 1 MB. The first row of the CSV file must contain column names. We recommend that you use UTF-8 encoding.
Run the debug task
After the debug data is ready, click the Run button on the editor toolbar (or press F8). The system submits the code, mock data, and Flink resource information to the selected session cluster for execution.
If your code uses parameters in the ${variable_name} format, make sure that you have assigned values to the variables in the Scheduling Parameters section. During debugging, the system replaces the placeholders in the code with the assigned values before submission.
View the debug results
After the debug task runs, the result area at the bottom of the node provides the following information to help you quickly identify issues:
Code: The SQL code submitted to the Flink engine for this run (with variable substitutions applied).
Logs: The runtime log and error information of the debug task.
Query results: The output data of the debug task.
Step 4: Deploy and manage the Flink SQL batch node
After the node task is configured, you must deploy the node. For more information, see Deploy a node.
After the task is deployed, you can click Go to operation and maintenance below Deploy to Production to view the running status of scheduled tasks in Operation Center. For more information, see View scheduled tasks.