In Data Studio of DataWorks, Flink SQL Streaming nodes allow you to use standard SQL statements to define the processing logic of real-time tasks. Flink SQL Streaming nodes are easy to use, support a variety of SQL syntax, and provide powerful state management and fault tolerance capabilities. In addition, Flink SQL Streaming nodes are compatible with event time and processing time and can be flexibly expanded. Flink SQL Streaming nodes are easy to integrate with services, such as Kafka and Hadoop Distributed File System (HDFS), and provide detailed logs and performance monitoring tools. You need to only create Flink SQL Streaming tasks and write SQL statements for these tasks in your DataWorks workspace to process data in real time. This topic describes how to develop Flink SQL Streaming tasks in the DataWorks console and process real-time data of Realtime Compute for Apache Flink by using DataWorks.
Prerequisites
A DataWorks workspace is created and Realtime Compute for Apache Flink computing resources are associated with the DataWorks workspace in Management Center of the DataWorks console.
A Flink SQL Streaming node is created.
Step 1: Develop a task based on the Flink SQL Streaming node
On the configuration tab of the Flink SQL Streaming node, you can perform the following operations to develop a task based on the node:
Develop SQL code
In the SQL editor, develop task code. You can define variables in the ${Variable name} format in the task code, and configure scheduling parameters in the Script Parameters section of the Real-time Configurations tab to assign the scheduling parameters to the variables as values. When the Flink SQL Streaming task is scheduled to run, the values of the scheduling parameters are dynamically replaced in the task code. Sample code:
-- Create a source table named datagen_source.
CREATE TEMPORARY TABLE datagen_source(
name VARCHAR
) WITH (
'connector' = 'datagen'
);
-- Create a result table named blackhole_sink.
CREATE TEMPORARY TABLE blackhole_sink(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
-- Insert data from the source table datagen_source into the result table blackhole_sink.
INSERT INTO blackhole_sink
SELECT
name
FROM datagen_source WHERE LENGTH(name) > ${name_length};In this example, the value of the parameter name_length is 5. You can configure this parameter to filter the data whose names are less than 5 in length.
Step 2: Configure the Flink SQL Streaming task
You can refer to the parameter descriptions in the following tables to configure the Flink SQL Streaming task based on your business requirements.
Configure parameters in the Flink Resource Information section
In the Flink Resource Information section of the Real-time Configurations tab, configure the parameters based on the resource configuration mode. The following table describes the parameters. For more information, see Configure resources for a deployment.
Parameter | Description |
Flink Cluster | The name of the Realtime Compute for Apache Flink workspace that is associated with the DataWorks workspace in Management Center. |
Flink Engine Version | Select an engine version based on your business requirements. |
Resource Group | Select the serverless resource group that is connected to the Realtime Compute for Apache Flink workspace. |
Two resource configuration modes are supported: basic mode and expert mode. For more information, see Configure resources for a deployment.
Configure the parameters based on the resource configuration mode that you select. You can configure parameters more efficiently after you understand the architecture of Apache Flink. For more information, see Flink Architecture. | |
Basic Mode | |
Job Manager CPU | The best practices of Realtime Compute for Apache Flink show that the JobManager requires at least 0.5 CPU cores and 2 GiB of memory to ensure the stable running of the deployment. We recommend that you configure 1 CPU core and 4 GiB of memory for each JobManager. You can configure a maximum of 16 CPU cores. You must configure this parameter based on the size of the Realtime Compute for Apache Flink workspace and deployment complexity. |
Job Manager Memory | The memory configuration of the JobManager affects the task scheduling and management capabilities of the JobManager. We recommend that you specify a value from 2 to 64 for this parameter to ensure the stable and efficient running of the system. Unit: GiB. You must configure this parameter based on the size of the Realtime Compute for Apache Flink workspace and deployment requirements. |
Task Manager CPU | The CPU resource configuration of a TaskManager affects the ability of the TaskManager to process data in tasks. The best practices of Realtime Compute for Apache Flink show that a TaskManager requires at least 0.5 CPU cores and 2 GiB of memory to ensure the stable running of the deployment. We recommend that you configure 1 CPU core and 4 GiB of memory for each TaskManager. You can configure a maximum of 16 CPU cores. You must configure this parameter based on your business requirements. |
Task Manager Memory | The memory configuration of a TaskManager determines the data volume and performance of the TaskManager to process data in tasks. To ensure task stability and efficiency, we recommend that you specify a value from 2 to 64 for this parameter. Unit: GiB. |
Parallelism | The number of tasks that can be run in parallel in a deployment. Higher concurrency can improve the processing speed and resource utilization. You must configure this parameter based on workspace resources and deployment characteristics. |
Slots For Each TaskManager | The number of slots in each TaskManager. This parameter specifies the number of tasks that can be run in parallel. You can adjust the slot configuration to optimize resource utilization and deployment parallel processing. |
Expert Mode | |
Job Manager CPU | The best practices of Realtime Compute for Apache Flink show that the JobManager requires at least 0.25 CPU cores and 1 GiB of memory to ensure the stable running of the deployment. You can configure a maximum of 16 CPU cores. You must configure this parameter based on the size of the Realtime Compute for Apache Flink workspace and deployment complexity. |
Job Manager Memory | The memory configuration of the JobManager affects the task scheduling and management capabilities of the JobManager. We recommend that you specify a value from 1 to 64 for this parameter to ensure the stable and efficient running of the system. Unit: GiB. You must configure this parameter based on the size of the Realtime Compute for Apache Flink workspace and deployment requirements. |
Slots For Each TaskManager | The number of slots in each TaskManager. This parameter specifies the number of tasks that can be run in parallel. You can adjust the slot configuration to optimize resource utilization and deployment parallel processing. |
Multi-SSG Mode | By default, all operators are placed in one SSG. You cannot separately modify the resource configuration of each operator. If you want to configure resources for individual operators, you must enable the Multiple SSG mode to ensure that each operator has an independent slot. This way, you can configure resources for each operator in a slot. |
(Optional) Configure parameters in the Script Parameters section
In the right-side navigation pane of the node configuration page, click the Real-time Configurations tab. In the Script Parameters section of the Real-time Configurations tab, click Add Parameter and configure Parameter Name and Parameter Value to dynamically use the parameter in the code.
(Optional) Configure parameters in the Flink Runtime Parameters section
In the right-side navigation pane of the node configuration page, click the Real-time Configurations tab. In the Flink Runtime Parameters section of the Real-time Configurations tab, configure the parameters described in the following table. For more information, see Configure a deployment.
Parameter | Description |
Checkpointing Interval | The interval at which a checkpoint is generated. A shorter interval can reduce the fault recovery time but increase the system overhead. If you do not configure this parameter, the checkpointing feature is disabled. |
The Minimum Interval Between Two Checkpoints | The minimum interval between two checkpoints. This can prevent excessively frequent checkpoints from affecting system performance. If the maximum parallelism of checkpoints is 1, this parameter specifies the minimum interval between two checkpoints. |
State Data Expiration Time | The maximum time that the state data in a deployment can be retained without being accessed or updated. Default value: 36. Unit: hours. The default value indicates that the state data of a deployment expires after 36 hours. The system automatically removes the expired data to optimize state storage and resource usage. Important The default value is determined based on the best practices of Alibaba Cloud. This default value is different from the default value of the TTL provided by Apache Flink. The default value of the TTL provided by Apache Flink is 0, which indicates that the state data does not expire. |
Other Configurations | Other Realtime Compute for Apache Flink settings. Example: Note For more information about other deployment parameters, see Create a deployment. |
After the configuration is complete, click Save to save the task.
Step 3: Start the Flink SQL Streaming task
Deploy the Flink SQL Streaming task.
The task can be run only after the task is deployed to Operation Center. You can deploy the Flink SQL Streaming task as prompted.
NoteWhen you deploy the Flink SQL Streaming task in the DataWorks console, the task is also deployed to the Ververica Platform (VVP) platform of Realtime Compute for Apache Flink. You can view the tasks deployed from DataWorks on the Deployments page of the development console of Realtime Compute for Apache Flink.
Start the Flink SQL Streaming task.
After you deploy the task, you can click Perform O&M below Prod Online in the DataWorks console. In the left-side navigation pane of Operation Center, choose . On the page that appears, find the task and click Start in the Actions column to start and view the running status of the task.