This feature will be unavailable soon and is available free only for specific users that have activated this feature. To configure ETL going forward, use a data synchronization or migration instance instead. See Configure ETL in a data migration or synchronization task.
When you need to transform data in real time as it flows from a source database to a destination—such as joining two tables, enriching records with lookup data, or filtering rows—DTS streaming ETL lets you define that logic in SQL. Flink SQL mode gives you full SQL expressiveness, including statements not available in DAG mode, without requiring knowledge of stream processing internals.
This topic describes how to configure a streaming ETL task in Flink SQL mode.
Before you begin
Before configuring the task, make sure that:
The ETL task is in one of the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Zhangjiakou), China (Shenzhen), China (Guangzhou), or China (Hong Kong).
The source database is one of the following types: MySQL, PolarDB for MySQL, Oracle, PostgreSQL, iSeries DB2 (AS/400), Db2 for LUW, PolarDB-X 1.0, PolarDB for PostgreSQL, MariaDB, PolarDB for Oracle, SQL Server, or PolarDB-X 2.0.
The destination database is one of the following types: MySQL, PolarDB for MySQL, Oracle, AnalyticDB for MySQL V3.0, PolarDB for PostgreSQL, PostgreSQL, Db2 for LUW, iSeries DB2 (AS/400), AnalyticDB for PostgreSQL, SQL Server, MariaDB, PolarDB-X 1.0, PolarDB for Oracle, or Tablestore.
The source and destination databases are in the same region and under the same Alibaba Cloud account.
All stream tables belong to the same instance.
All database names and table names are unique.
The target schemas are created in the destination database. The ETL feature does not support schema migration, so you must create the destination tables manually before starting the task. For example, to join Table A (Field 1, Field 2, Field 3) and Table B (Field 2, Field 3, Field 4) into a result containing Field 2 and Field 3, create Table C with those fields in the destination database first.
The source and destination instances are registered with Data Management (DMS). See Instance Management.
The ETL task supports only incremental data. Full data synchronization is not supported.
Key concepts
Table types
| Type | Role | Description |
|---|---|---|
| Stream table | Source | Updates in real time. Can be joined with dimension tables for data enrichment. |
| Dimension table | Source (lookup) | Static or slowly changing. Used to enrich streaming data for analysis. |
| Output (sink) | Destination | The table where transformed data is written. |
Stream types
When you define a stream table with CREATE TABLE, set streamType to control how the dynamic table's changes are encoded when written to the destination:
| Value | Supported operations | When to use | Notes |
|---|---|---|---|
append | INSERT only | Data is only inserted, never updated or deleted | Destination receives only new rows. |
upsert | INSERT, UPDATE, DELETE | Data can be inserted, updated, or deleted | Requires a unique key (can be composite). INSERT and UPDATE are encoded as upsert messages; DELETE as delete messages. |
Configure a streaming ETL task in Flink SQL mode
The configuration has five steps:
Create a data flow and select Flink SQL mode
Add source and destination databases
Write Flink SQL statements
Validate and publish
Purchase an instance and start the task
Step 1: Create a data flow
Log on to the DTS console.DTS console
In the left-side navigation pane, click ETL.
Click
. In the Create Data Flow dialog box, enter a name in the Data Flow Name field and select FlinkSQL as Development Method.Click OK.
Step 2: Add source and destination databases
On the Streaming ETL page, configure the source and destination databases in the Data Flow Information section.
| Parameter | Description |
|---|---|
| Region | The region where the database resides. |
| Type | The role of this database entry. Select Stream Table for a real-time source, Dimension Table for a static lookup table, or Output for the destination. |
| Database type | The type of the source or destination database. |
| Instance | The name or ID of the instance. The instance must be registered in Data Management (DMS). |
| Database | The database containing the tables you want to transform. |
| Physical table | The source or destination table. |
| Alias of physical table | A readable name for the table. This alias is referenced in your Flink SQL statements to link each CREATE TABLE declaration to the physical table selected here. |
Step 3: Write Flink SQL statements
In the script editor on the Streaming ETL page, write SQL statements to define your ETL logic.
Each SQL statement must end with a semicolon (;).
A complete Flink SQL script requires three types of statements:
| Statement | Purpose |
|---|---|
CREATE TABLE | Defines source and destination tables, including their ETL parameters in the WITH clause. |
CREATE VIEW | Describes the data transformation logic, such as a JOIN between a stream table and a dimension table. |
INSERT INTO | Writes transformed data from the view to the destination table. |
Parameters in the WITH clause
Each CREATE TABLE statement uses a WITH clause to configure the table's ETL behavior:
| Parameter | Applies to | Description |
|---|---|---|
streamType | Stream tables only | How changes are encoded when writing to the destination. Valid values: append, upsert. See Stream types. |
alias | All table types | Must exactly match the Alias of physical table value set in Step 2. This value links the CREATE TABLE declaration to the physical table you selected. |
vertexType | All table types | The role of the table. Valid values: stream (stream table), lookup (dimension table), sink (destination table). |
Example: join a stream table with a dimension table
The following script joins stream table test_orders with dimension table product and inserts the results into destination table test_orders_new.
CREATE TABLE `etltest_test_orders` (
`order_id` BIGINT,
`user_id` BIGINT,
`product_id` BIGINT,
`total_price` DECIMAL(15,2),
`order_date` TIMESTAMP(6),
`dts_etl_schema_db_table` STRING,
`dts_etl_db_log_time` BIGINT,
`pt` AS PROCTIME(),
WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
) WITH (
'streamType'= 'append',
'alias'= 'test_orders',
'vertexType'= 'stream'
);
CREATE TABLE `etltest_product` (
`product_id` BIGINT,
`product_name` STRING,
`product_price` DECIMAL(15,2)
) WITH (
'alias'= 'product',
'vertexType'= 'lookup'
);
CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
SELECT
`etltest_test_orders`.`order_id` AS `order_id`,
`etltest_test_orders`.`user_id` AS `user_id`,
`etltest_test_orders`.`product_id` AS `product_id`,
`etltest_test_orders`.`total_price` AS `total_price`,
`etltest_test_orders`.`order_date` AS `order_date`,
`etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
`etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
`etltest_product`.`product_id` AS `product_id_0001011101`,
`etltest_product`.`product_name` AS `product_name`,
`etltest_product`.`product_price` AS `product_price`
FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
;
CREATE TABLE `test_orders_new` (
`order_id` BIGINT,
`user_id` BIGINT,
`product_id` BIGINT,
`total_price` DECIMAL(15,2),
`order_date` TIMESTAMP(6),
`product_name` STRING,
`product_price` DECIMAL(15,2)
) WITH (
'alias'= 'test_orders_new',
'vertexType'= 'sink'
);
INSERT INTO `test_orders_new` (
`order_id`,
`user_id`,
`product_id`,
`total_price`,
`order_date`,
`product_name`,
`product_price`
)
SELECT
`etltest_test_orders_JOIN_etltest_product`.`order_id`,
`etltest_test_orders_JOIN_etltest_product`.`user_id`,
`etltest_test_orders_JOIN_etltest_product`.`product_id`,
`etltest_test_orders_JOIN_etltest_product`.`total_price`,
`etltest_test_orders_JOIN_etltest_product`.`order_date`,
`etltest_test_orders_JOIN_etltest_product`.`product_name`,
`etltest_test_orders_JOIN_etltest_product`.`product_price`
FROM `etltest_test_orders_JOIN_etltest_product`;Step 4: Validate and publish
Click Generate Flink SQL Validation to validate your SQL statements.
If validation succeeds, click
to review the details.If validation fails, click
to see error details, correct the SQL statements, and run validation again.
NoteClicking Publish also triggers SQL validation and runs a precheck in one step.
After validation succeeds, click Publish to run a precheck.
Wait until the precheck success rate reaches 100%.
NoteIf the precheck fails, click View Details next to each failed item, resolve the issues, and run the precheck again.
Step 5: Purchase an instance and start the task
On the Purchase Instance page, select an Instance Class and confirm the Compute Units (CUs). The CU value is fixed at 2 during the public preview period.
Read and select the checkboxes to agree to the Data Transmission Service (Pay-as-you-go) Service Terms and Service Terms for Public Preview.
Click Buy and Start to start the ETL task.
During the public preview, each user can create two ETL instances for free.