All Products
Search
Document Center

Data Transmission Service:Configure an ETL task in Flink SQL mode

Last Updated:Mar 30, 2026
Important

This feature will be unavailable soon and is available free only for specific users that have activated this feature. To configure ETL going forward, use a data synchronization or migration instance instead. See Configure ETL in a data migration or synchronization task.

When you need to transform data in real time as it flows from a source database to a destination—such as joining two tables, enriching records with lookup data, or filtering rows—DTS streaming ETL lets you define that logic in SQL. Flink SQL mode gives you full SQL expressiveness, including statements not available in DAG mode, without requiring knowledge of stream processing internals.

This topic describes how to configure a streaming ETL task in Flink SQL mode.

Before you begin

Before configuring the task, make sure that:

  • The ETL task is in one of the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Zhangjiakou), China (Shenzhen), China (Guangzhou), or China (Hong Kong).

  • The source database is one of the following types: MySQL, PolarDB for MySQL, Oracle, PostgreSQL, iSeries DB2 (AS/400), Db2 for LUW, PolarDB-X 1.0, PolarDB for PostgreSQL, MariaDB, PolarDB for Oracle, SQL Server, or PolarDB-X 2.0.

  • The destination database is one of the following types: MySQL, PolarDB for MySQL, Oracle, AnalyticDB for MySQL V3.0, PolarDB for PostgreSQL, PostgreSQL, Db2 for LUW, iSeries DB2 (AS/400), AnalyticDB for PostgreSQL, SQL Server, MariaDB, PolarDB-X 1.0, PolarDB for Oracle, or Tablestore.

  • The source and destination databases are in the same region and under the same Alibaba Cloud account.

  • All stream tables belong to the same instance.

  • All database names and table names are unique.

  • The target schemas are created in the destination database. The ETL feature does not support schema migration, so you must create the destination tables manually before starting the task. For example, to join Table A (Field 1, Field 2, Field 3) and Table B (Field 2, Field 3, Field 4) into a result containing Field 2 and Field 3, create Table C with those fields in the destination database first.

  • The source and destination instances are registered with Data Management (DMS). See Instance Management.

  • The ETL task supports only incremental data. Full data synchronization is not supported.

Key concepts

Table types

TypeRoleDescription
Stream tableSourceUpdates in real time. Can be joined with dimension tables for data enrichment.
Dimension tableSource (lookup)Static or slowly changing. Used to enrich streaming data for analysis.
Output (sink)DestinationThe table where transformed data is written.

Stream types

When you define a stream table with CREATE TABLE, set streamType to control how the dynamic table's changes are encoded when written to the destination:

ValueSupported operationsWhen to useNotes
appendINSERT onlyData is only inserted, never updated or deletedDestination receives only new rows.
upsertINSERT, UPDATE, DELETEData can be inserted, updated, or deletedRequires a unique key (can be composite). INSERT and UPDATE are encoded as upsert messages; DELETE as delete messages.

Configure a streaming ETL task in Flink SQL mode

The configuration has five steps:

  1. Create a data flow and select Flink SQL mode

  2. Add source and destination databases

  3. Write Flink SQL statements

  4. Validate and publish

  5. Purchase an instance and start the task

Step 1: Create a data flow

  1. Log on to the DTS console.DTS console

  2. In the left-side navigation pane, click ETL.

  3. Click 新增数据流. In the Create Data Flow dialog box, enter a name in the Data Flow Name field and select FlinkSQL as Development Method.

  4. Click OK.

Step 2: Add source and destination databases

On the Streaming ETL page, configure the source and destination databases in the Data Flow Information section.

ParameterDescription
RegionThe region where the database resides.
TypeThe role of this database entry. Select Stream Table for a real-time source, Dimension Table for a static lookup table, or Output for the destination.
Database typeThe type of the source or destination database.
InstanceThe name or ID of the instance. The instance must be registered in Data Management (DMS).
DatabaseThe database containing the tables you want to transform.
Physical tableThe source or destination table.
Alias of physical tableA readable name for the table. This alias is referenced in your Flink SQL statements to link each CREATE TABLE declaration to the physical table selected here.

Step 3: Write Flink SQL statements

In the script editor on the Streaming ETL page, write SQL statements to define your ETL logic.

Important

Each SQL statement must end with a semicolon (;).

A complete Flink SQL script requires three types of statements:

StatementPurpose
CREATE TABLEDefines source and destination tables, including their ETL parameters in the WITH clause.
CREATE VIEWDescribes the data transformation logic, such as a JOIN between a stream table and a dimension table.
INSERT INTOWrites transformed data from the view to the destination table.

Parameters in the WITH clause

Each CREATE TABLE statement uses a WITH clause to configure the table's ETL behavior:

ParameterApplies toDescription
streamTypeStream tables onlyHow changes are encoded when writing to the destination. Valid values: append, upsert. See Stream types.
aliasAll table typesMust exactly match the Alias of physical table value set in Step 2. This value links the CREATE TABLE declaration to the physical table you selected.
vertexTypeAll table typesThe role of the table. Valid values: stream (stream table), lookup (dimension table), sink (destination table).

Example: join a stream table with a dimension table

The following script joins stream table test_orders with dimension table product and inserts the results into destination table test_orders_new.

CREATE TABLE `etltest_test_orders` (
  `order_id` BIGINT,
  `user_id` BIGINT,
  `product_id` BIGINT,
  `total_price` DECIMAL(15,2),
  `order_date` TIMESTAMP(6),
  `dts_etl_schema_db_table` STRING,
  `dts_etl_db_log_time` BIGINT,
  `pt` AS PROCTIME(),
  WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
) WITH (
  'streamType'= 'append',
  'alias'= 'test_orders',
  'vertexType'= 'stream'
);
CREATE TABLE `etltest_product` (
  `product_id` BIGINT,
  `product_name` STRING,
  `product_price` DECIMAL(15,2)
) WITH (
  'alias'= 'product',
  'vertexType'= 'lookup'
);
CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
SELECT
  `etltest_test_orders`.`order_id` AS `order_id`,
  `etltest_test_orders`.`user_id` AS `user_id`,
  `etltest_test_orders`.`product_id` AS `product_id`,
  `etltest_test_orders`.`total_price` AS `total_price`,
  `etltest_test_orders`.`order_date` AS `order_date`,
  `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
  `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
  `etltest_product`.`product_id` AS `product_id_0001011101`,
  `etltest_product`.`product_name` AS `product_name`,
  `etltest_product`.`product_price` AS `product_price`
FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
;
CREATE TABLE `test_orders_new` (
  `order_id` BIGINT,
  `user_id` BIGINT,
  `product_id` BIGINT,
  `total_price` DECIMAL(15,2),
  `order_date` TIMESTAMP(6),
  `product_name` STRING,
  `product_price` DECIMAL(15,2)
) WITH (
  'alias'= 'test_orders_new',
  'vertexType'= 'sink'
);
INSERT INTO `test_orders_new` (
  `order_id`,
  `user_id`,
  `product_id`,
  `total_price`,
  `order_date`,
  `product_name`,
  `product_price`
)
SELECT
  `etltest_test_orders_JOIN_etltest_product`.`order_id`,
  `etltest_test_orders_JOIN_etltest_product`.`user_id`,
  `etltest_test_orders_JOIN_etltest_product`.`product_id`,
  `etltest_test_orders_JOIN_etltest_product`.`total_price`,
  `etltest_test_orders_JOIN_etltest_product`.`order_date`,
  `etltest_test_orders_JOIN_etltest_product`.`product_name`,
  `etltest_test_orders_JOIN_etltest_product`.`product_price`
FROM `etltest_test_orders_JOIN_etltest_product`;

Step 4: Validate and publish

  1. Click Generate Flink SQL Validation to validate your SQL statements.

    • If validation succeeds, click ETL校验成功 to review the details.

    • If validation fails, click ETL校验成功 to see error details, correct the SQL statements, and run validation again.

    Note

    Clicking Publish also triggers SQL validation and runs a precheck in one step.

  2. After validation succeeds, click Publish to run a precheck.

  3. Wait until the precheck success rate reaches 100%.

    Note

    If the precheck fails, click View Details next to each failed item, resolve the issues, and run the precheck again.

Step 5: Purchase an instance and start the task

  1. On the Purchase Instance page, select an Instance Class and confirm the Compute Units (CUs). The CU value is fixed at 2 during the public preview period.

  2. Read and select the checkboxes to agree to the Data Transmission Service (Pay-as-you-go) Service Terms and Service Terms for Public Preview.

  3. Click Buy and Start to start the ETL task.

Note

During the public preview, each user can create two ETL instances for free.

Related topics