Flink SQL is a programming language developed by Alibaba Cloud to simplify the computing model of extract, transform, load (ETL) and to decrease the requirements for user skills. Flink SQL is compatible with standard SQL syntax. Compared with the directed acyclic graph (DAG) mode, Flink SQL has more advanced capabilities. In the script editor of Flink SQL, you can enter statements that are not supported in the DAG mode. This topic describes how to configure an ETL task in Flink SQL mode.

Background information

Note The ETL feature is in public preview. You can apply for a free trial of this feature. If you have questions during the free trial, join the DingTalk group 32326646 for technical support.
  • Before you configure an ETL task, take note of the following information:
    • Input/Dimension Table indicates the source database of the ETL task.
    • Output indicates the destination database of the ETL task.
  • DTS provides the streaming ETL feature for the data synchronization process. You can add transformation components between the source and destination databases. You can transform data and write the processed data to the destination database in real time. For example, you can join two stream tables into a large table and write the data of the large table to the destination database. You can also add a field to the source table and configure a function to assign values to the field. Then, you can write the field to the destination database.

Prerequisites

  • An ETL task is created in the China (Hangzhou), China (Beijing), or China (Zhangjiakou) region.
  • The source database belongs to one of the following types: self-managed MySQL databases, ApsaraDB RDS for MySQL, PolarDB for MySQL, PolarDB-X V1.0 (formerly DRDS), self-managed Oracle databases, self-managed PostgreSQL databases, ApsaraDB RDS for PostgreSQL, Db2 for LUW, Db2 for i, and .
  • The destination database belongs to one of the following types: self-managed MySQL databases, ApsaraDB RDS for MySQL, PolarDB for MySQL, AnalyticDB for MySQL V3.0, self-managed Oracle databases, self-managed PostgreSQL databases, ApsaraDB RDS for PostgreSQL, Db2 for LUW, Db2 for i, and .
  • The schemas of tables in the destination database are created. This is because the ETL feature does not support schema migration. For example, Table A contains Field 1, Field 2, and Field 3, and Table B contains Field 2, Field 3, and Field 4. If you need to join Table A and Table B into a table that contains Field 2 and Field 3, you must create Table C in the destination database. Table C contains Field 2 and Field 3.
  • The ETL feature does not support full data synchronization. Therefore, you can transform only incremental data in real time.

Precautions

  • You cannot configure the source and destination databases by using the same connection template. You must create different connection templates for the source and destination databases. For more information, see Create a connection template. You can also register the source and destination instances in Data Management (DMS). For more information, see Instance management.
  • The source and destination databases must reside in the same region.
  • All stream tables must belong to the same instance.
  • All database names and table names must be unique.

Procedure

  1. Go to the ETL page.
    Note

    You can also perform the following steps to configure an ETL task in the DMS console:

    • Log on to the DMS console.
    • In the top navigation bar, click DTS.
    • In the left-side navigation pane, click Streaming ETL.
    • Click Create Data Flow in the upper-left corner. In the Create Data Flow dialog box, specify an ETL task name in the Data Flow Name field, set Processing Method to Stream Processing, and set Development Method to FlinkSQL.
    • Click OK.
  2. In the Data Flow Information section of the Streaming ETL page, add the source and destination databases.
    Parameter Description
    Region Select the region where the source database resides.
    Note You can create an ETL task only in the China (Hangzhou), China (Beijing), or China (Zhangjiakou) region. Select a region based on the actual scenario.
    Type Select a table type.
    • If the source table is a stream table, select Stream Table. If the source table is a dimension table, select Dimension Table.
    • If you configure the destination table, select Output.

    Database Type Select the type of the source or destination database.
    Instances Enter the name or ID of the source or destination instance.
    Notice Before you set this parameter, you must register the source and destination instances in DMS. For more information, see Instance management.
    Database Select the source or destination database to which the data transformation object belongs.
    Physical Table Select the source or destination table to which the data transformation object belongs.
    Alias of Physical Table Set a readable alias for the source or destination table. The alias helps you identify the table when you run SQL statements in ETL.
  3. On the Streaming ETL page, enter SQL statements in the script editor to configure an ETL task.
    The following SQL statements show how to configure an ETL task to combine a stream table named test_orders and a dimension table named product into the destination table test_orders_new.
    Notice You must separate multiple SQL statements with semicolons (;).
    CREATE TABLE `etltest_test_orders` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `dts_etl_schema_db_table` STRING,
      `dts_etl_db_log_time` BIGINT,
      `pt` AS PROCTIME(),
      WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
    ) WITH (
      'streamType'= 'append',
      'alias'= 'test_orders',
      'vertexType'= 'stream'
    );
    CREATE TABLE `etltest_product` (
      `product_id` BIGINT,
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'product',
      'vertexType'= 'lookup'
    );
    CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
    SELECT
      `etltest_test_orders`.`order_id` AS `order_id`,
      `etltest_test_orders`.`user_id` AS `user_id`,
      `etltest_test_orders`.`product_id` AS `product_id`,
      `etltest_test_orders`.`total_price` AS `total_price`,
      `etltest_test_orders`.`order_date` AS `order_date`,
      `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
      `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
      `etltest_product`.`product_id` AS `product_id_0001011101`,
      `etltest_product`.`product_name` AS `product_name`,
      `etltest_product`.`product_price` AS `product_price`
    FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
    ;
    CREATE TABLE `test_orders_new` (
      `order_id` BIGINT,
      `user_id` BIGINT,
      `product_id` BIGINT,
      `total_price` DECIMAL(15,2),
      `order_date` TIMESTAMP(6),
      `product_name` STRING,
      `product_price` DECIMAL(15,2)
    ) WITH (
      'alias'= 'test_orders_new',
      'vertexType'= 'sink'
    );
    INSERT INTO `test_orders_new` (
      `order_id`,
      `user_id`,
      `product_id`,
      `total_price`,
      `order_date`,
      `product_name`,
      `product_price`
    )
    SELECT
      `etltest_test_orders_JOIN_etltest_product`.`order_id`,
      `etltest_test_orders_JOIN_etltest_product`.`user_id`,
      `etltest_test_orders_JOIN_etltest_product`.`product_id`,
      `etltest_test_orders_JOIN_etltest_product`.`total_price`,
      `etltest_test_orders_JOIN_etltest_product`.`order_date`,
      `etltest_test_orders_JOIN_etltest_product`.`product_name`,
      `etltest_test_orders_JOIN_etltest_product`.`product_price`
    FROM `etltest_test_orders_JOIN_etltest_product`;
    Type Description
    Source and destination tables
    • You must use the CREATE TABLE statement to define the source and destination tables.
    • You can set three parameters in the WITH clause of an SQL statement: streamType, alias, and vertexType. You must set the preceding three parameters for a stream table. You only need to set the alias and vertexType parameters for a dimension table and the output.
      • streamType: The type of the stream.
        • Upsert: upsert stream. The data in a dynamic table can be modified by using the INSERT, UPDATE, and DELETE operations. When the dynamic table is converted into a stream, the INSERT and UPDATE operations are encoded as upsert message and the DELETE operations are encoded as delete messages.
          Note A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key.
        • append: append-only stream. The data in a dynamic table can be modified only by the INSERT operation. When the dynamic table is converted into a stream, only the inserted rows are emitted.
      • alias: The physical table alias that is set when you configure the source and destination databases in Step 3.
    • vertexType: The type of the table.
      • stream: stream table.
      • lookup: dimension table.
      • sink: destination table.
    Computing logic of data transformation You must use the CREATE VIEW statement to describe the computing logic of data transformation.
    Transformed destination table You must use the INSERT INTO statement to define the transformed destination table.
  4. After you configure the source database, destination database, and SQL statements, click Generate Flink SQL Validation.
    Note
    • You can also click Publish to validate the SQL statements and run a precheck.
    • If the Flink SQL validation succeeds, you can click ETL validation succeeded to view the details.
    • If the Flink SQL validation fails, you can click ETL validation failed to view the details. You can edit the SQL statements based on the error message and then perform Flink SQL validation again.
  5. After Flink SQL validation is completed, click Publish to run a precheck.
  6. Wait until the success rate becomes 100%. Then, click Next: Purchase Instance (Free).
    Note If the task fails to pass the precheck, click View Details next to each failed item. Troubleshoot the issues based on the causes and run a precheck again.
  7. On the Purchase Instance page, select the Instance Class and confirm the Compute Units (CUs) (fixed to 2 during public preview). Then, read and select Data Transmission Service (Pay-as-you-go) Service Terms and Service Terms for Public Preview.
    Note During public preview, each user can create two ETL instances for free.
  8. Click Buy and Start to start the ETL task.