All Products
Search
Document Center

Realtime Compute for Apache Flink:CREATE TABLE AS (CTAS)

Last Updated:Jun 19, 2025

You can execute the CREATE TABLE AS (CTAS) statement to synchronize data and table schema changes from upstream to downstream systems in real time. This enhances the efficiency of creating a table in the destination system and synchronizing table schema changes. This topic describes how to use the CTAS statement, and examples across various scenarios.

Note

We recommend that you create a job using YAML for data ingestion. You can convert existing SQL drafts containing CTAS or CDAS statements to YAML drafts:

  • Introduction: You can develop a job by using YAML to synchronize data from the source to the destination.

  • Advantages: Key capabilities of CTAS and CDAS statements are supported, including synchronization of databases, tables, table schemas, and custom computed columns. Additionally, real-time schema evolution, synchronization of raw binary log data, the WHERE clause, and column pruning are also supported.

For more information, see Use YAML deployments to ingest data.

Features

Data synchronization

Feature

Description

Synchronize a table

Synchronizes full data and incremental data from a source table to a sink table in real time. (Example: Synchronize a table)

Consolidate and synchronize table shards

Uses regular expressions to match database and table shards names. Then, you can consolidate these table shards and synchronize the data to a sink table. (Example: Merge and synchronize table shards)

Note

Carets (^) cannot be used to match the beginning of the name of a table.

Synchronize custom computed columns

Adds computed columns for converting and processing specific columns. You can use system functions or user-defined functions (UDFs) for computed columns and specify the position of the computed column that you want to add. Newly added computed column will be used as physical columns in the sink table, and the results are synchronized to the sink table in real time. (Example: Synchronize custom computed columns)

Execute multiple CTAS statements

Schema evolution

During data synchronization, the CTAS statement supports replicating schema changes, including table creation and schema modification, from the source table to the sink table.

  • Supported schema changes

    Schema change

    Description

    Add a nullable column

    Automatically adds the related column to the end of the sink table's schema, with data synchronized data to the added column. The new column is set as a nullable column by default, and the data in this column before the change is automatically set to NULL.

    Add a non-null column

    Automatically adds the corresponding column to the end of the sink table's schema and synchronizes data.

    Delete a nullable column

    Automatically fills null values in the nullable column of the sink table instead of deleting the column from the table.

    Rename a column

    The operation of renaming a column involves adding a column and deleting a column. After a column is renamed in the source table, the column that uses the new name is added to the end of the sink table and the column that uses the original name is filled with null values.

    Note

    For example, if the name of the col_a column in the source table is changed to col_b, the col_b column is added to the end of the sink table and the col_a column is automatically filled with null values.

    Modify a column's data type

    • For downstream systems supporting column type modifications: Currently, Paimon is the only downstream system that supports type changes. The CTAS statement supports modifying a regular column's type, such as from INT to BIGINT.

      Compatibility depends on system-specific rules (refer to connector documentation).

    • For downstream systems that do not support column type modifications: Currently, only Hologres supports using type widening to handle column type changes. Here's the mechanism: At job startup, a Hologres table with wider data types is created, and changes to column types are supported based on the compatibility of the sink. For more information, see Example: Synchronize data in the type normalization mode.

      Important

      To enable Hologres to support type widening, activate type normalization mode during the initial job launch. For existing jobs, delete the Hologres table and restart without states to apply type normalization settings.

    Important

    When the CTAS statement is used for synchronization, the system only compares the schema differences and does not identify the specific DDL type. For example,

    • If a column is dropped and subsquently added again, without any data changes during this period, the system detects no schema changes.

    • If a column is dropped and subsequently added again, with data changes during this period, the system detects and synchronizes the schema changes.

  • Unsupported schema changes

    • Modify constraints, such as primary keys or indexes.

    • Delete non-nullable columns.

    • Change from NOT NULL to NULLABLE.

    Important

    To synchronize unsupported schema changes, manually drop the sink table and restart your job to re-synchronize historical data.

Synchronization process

The following flowchart shows the process of synchronizing data from MySQL to Hologres with the CTAS statement.

Flowchart

Description

image

When executing the CTAS statement, Realtime Compute for Apache Flink does the following:

  1. Verifies the existence of a sink table in the destination system.

    • If absent, Realtime Compute for Apache Flink creates a sink table mirroring the source table's schema using the destination store's catalog.

    • If present, Realtime Compute for Apache Flink skips table creation and verifies the consistency of the sink table's schema with the source table's schema. If the schemas are different, an error is reported.

  2. Commits and runs the data synchronization job.

    Realtime Compute for Apache Flink synchronizes data and schema changes from the source to the sink.

Prerequisites

A catalog of the destination store is created in your workspace. For more information, see Manage catalogs.

Limits

Limits on syntax

  • Debugging an SQL draft containing the CTAS statement is not supported.

  • The CTAS statement cannot be used with the INSERT INTO statement in the same SQL draft.

  • Data cannot be synchronized to a StarRocks partitioned table.

  • MiniBatch is not supported.

    Important

Supported upstream and downstream systems

The following table describes the upstream and downstream data stores for which you can use the CTAS statement.

Connector

Source table

Sink table

Notes

MySQL

Supported

Not supported

  • Database and table names are automatically synchronized during the consolidation and synchronization of sharded tables and databases.

  • During single-table synchronization, to synchronize your database and table names, create a MySQL catalog via Flink SQL and include the catalog.table.metadata-columns option. For more information, see Manage MySQL catalogs.

  • Views cannot be synchronized.

Kafka connector

Supported

Not supported

N/A

MongoDB

Supported

Not supported

  • Consolidation and synchronization sharded tables and databases are not supported.

  • A MongoDB database's metadata cannot be synchronized.

  • New tables in the source database cannot be captured or synchronized.

  • Data and table schema changes can be synchronized from MongoDB to other systems. For more information, see Synchronize data from a MongoDB source table to a Hologres table.

Upsert Kafka

Not supported

Supported

N/A

StarRocks connector

Not supported

Supported

Support is limited to StarRocks on Alibaba Cloud EMR.

Hologres connector

Not supported

Supported

When Hologres serves as the destination system of data synchronization, the system automatically creates connections for each table based on the value of the connectionSize option. You can configure the same connection pool for multiple tables by using the connectionPoolName option.

Note

If data types in the source table are not supported by Hologres' fixed plan feature, use the INSERT INTO statement for data synchronization. Do not use the CTAS statement, which delivers lower writing performance because fixed plans cannot be used.

Paimon connector

Not supported

Supported

Syntax

CREATE TABLE IF NOT EXISTS <sink_table>
(
  [ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
  key1=val1,
  key2=val2, 
  ...
 )
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]

The CTAS statement uses the basic syntax of the CREATE TABLE statement. The following table describes some of the arguments:

Argument

Description

sink_table

The target table name for data synchronization. Optionally, use the table's fully qualified name by including its catalog and database.

COMMENT

The description of the sink table. By default, the description of source_table is used.

PARTITIONED BY

Specifies the partition columns.

Important

Data cannot be synchronized to a StarRocks partitioned table.

table_constraint

The primary key, which is a unique identifier for each record in the table.

WITH

The connector options for the sink table. For more information, see the "Connector options in the WITH clause" section in Upsert Kafka connector, Hologres connector, StarRocks connector, or Paimon connector.

Note

Both the key and value must be of the STRING type, such as 'jdbcWriteBatchSize' = '1024'.

source_table

The source table name. Optionally, use a fully qualified name including the table's catalog and database.

OPTIONS

The connector options for source table. For more information, see "Connector options in the WITH clause" in MySQL connector and Kafka connector.

Note

Both the key and value must be of the STRING type, such as 'server-id' = '65500'.

ADD COLUMN

Adds columns to the sink table during data synchronization. Only computed columns are supported.

column_component

The description of the new columns.

computed_column_expression

The description of the computed column expression.

FIRST

Specifies that the new column is used as the first field in the sink table. By default, the new column is added at the end of the sink table.

AFTER

Specifies that the new column is added after a specific field.

Note
  • The IF NOT EXISTS keyword is required. It prompts the system to check the sink table's existence in the destination store. If it is absent, the system will create a sink table. If it is present, table creation is skipped.

  • The created sink table shares the source table's schema, including the primary key and physical field names and types, but excludes computed columns, metadata fields, and watermark configurations.

  • Realtime Compute for Apache Flink performs data type mappings from the source table to the sink table during data synchronization. For more information about data type mappings, see the specific connector document.

Examples

Synchronize a table

Description: Synchronize the web_sales table from MySQL to Hologres.

Prerequisites:

  • A Hologres catalog named holo is created.

  • A MySQL catalog named mysql is created.

Sample code:

The CTAS statement is often used together with source and destination catalogs to support full and incremental data synchronization. The source catalog automatically parses source table schema and properties without explicit DDL.

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales   -- Synchronize data to the web_sales table in the default database. 
WITH ('jdbcWriteBatchSize' = '1024')   -- Optionally configure the connector options for the sink table. 
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */; -- Optionally configure additional options for the MySQL CDC source table.

Consolidate and synchronize table and database shards

Description: Consolidate sharded MySQL tables and databases before synchronizing data to a Hologres table.

Method: Use the MySQL catalog and regular expressions to match the database and tables that you want to synchronize.

The database and table names are written to the sink table as the values of two additional fields. The sink table's primary key consists of the database name, table name, and original primary key to ensure that the primary key is unique, .

Code and results:

Sample code

Results

Consolidate and synchronize table shards:

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

效果

Change source table's schema: Add a new column named age to the user02 table and a new record. Data and schema changes to the user02 table are synchronized to the sink table in real time even if the schemas of the source tables are different.

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

Synchronize custom computed columns

Description: During the synchronization of consolidated table and database shards from MySQL to Hologres, add custom computed columns.

Code and results:

Sample code

Results

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

image

Execute multiple CTAS statements in a single job

Description: Synchronize the web_sales and user table shards from MySQL to Hologres in a single job.

Method: Use STATEMENT SET to execute multiple CTAS statements as a group. This approach reuses the source vertex to read data from multiple tables, reducing the number of server IDs, database connections, and overall read load.

Important
  • To reuse the source and optimize performance, ensure the connector options for each source table are identical.

  • For information about the configuration of server IDs, see Set a different server ID for each client.

Sample code:

USE CATALOG holo;

BEGIN STATEMENT SET;

-- Synchronize data from the web_sales table. 
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- Synchronize data from the user table shards. 
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

Synchronize data from a source to multiple sinks by using multiple CTAS statements

  • No computed columns are added to the sink table

    USE CATALOG `holo`;
    
    BEGIN STATEMENT SET;
    
    -- Synchronize data from the user MySQL table to the user table in database1 of Hologres.
    CREATE TABLE IF NOT EXISTS `database1`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Synchronize data from the user table of the MySQL database to the user table in database2 of Hologres.
    CREATE TABLE IF NOT EXISTS `database2`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  • Computed columns are added to the sink table

    -- Create a temporary table named user_with_changed_id based on the source table user. Define the computed_id column based on the source table's id column. 
    CREATE TEMPORARY TABLE `user_with_changed_id` (
      `computed_id` AS `id` + 1000
    ) LIKE `mysql`.`tpcds`.`user`;
    
    -- Create a temporary table named user_with_changed_age based on the source table user. Define the computed_age column based on the source table's age column. 
    CREATE TEMPORARY TABLE `user_with_changed_age` (
      `computed_age` AS `age` + 1
    ) LIKE `mysql`.`tpcds`.`user`;
    
    BEGIN STATEMENT SET;
    
    -- Synchronize data from the user table of the MySQL database to the user_with_changed_id table of Hologres. The user_with_changed_id table contains the IDs that are obtained from the calculation based on the id column of the source table. The obtained IDs are in the computed_id column.  
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
    AS TABLE `user_with_changed_id`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Synchronize data from the user table of the MySQL database to the user_with_changed_age table of Hologres. The user_with_changed_age table contains the age values that are obtained from the calculation based on the age column of the source table. The obtained age values are in the computed_age column.  
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
    AS TABLE `user_with_changed_age`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

Synchronize new tables by using multiple CTAS statements

Scenario description: After a job that uses multiple CTAS statements for synchronization is started, add a CTAS statement to replicate new tables.

Method: Enable new table detection for the job, add a CTAS statement to the job's SQL code, and restart from a savepoint. After the new table is captured, data will be replicated.

Limits:

  • New table detection is supported for VVR 8.0.1 or later.

  • When data is synchronized from a CDC source table, only jobs started in the initial mode can detect new tables.

  • The configuration of the source table that is added by using the new CTAS statement must be the same as the configuration of the original source tables. This way, the source vertex can be reused.

  • Job configuration parameters before and after adding a CTAS statement must be the same. For example, the startup mode should be the same.

Procedure:

  1. On the Deployments page, find the target deployment and click Cancel in the Actions column.

  2. In the dialog, expand the More Strategies section, select Stop With Savepoint, and click OK.

  3. In the job's SQL draft, enable new table detection and add a CTAS statement.

    1. Add the following statement to enable new table detection.

      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
    2. Add a CTAS statement. The complete code of the job looks like this:

      -- Enable new table detection
      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
      
      USE CATALOG holo;
      
      BEGIN STATEMENT SET;
      
      -- Synchronize data from the web_sales table. 
      CREATE TABLE IF NOT EXISTS web_sales
      AS TABLE mysql.tpcds.web_sales
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Synchronize data from the user table shards. 
      CREATE TABLE IF NOT EXISTS user
      AS TABLE mysql.`wp.*`.`user[0-9]+`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Synchronize data from the product table.  
      CREATE TABLE IF NOT EXISTS product
      AS TABLE mysql.tpcds.product
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      END;
    3. Click Deploy.

  4. Recover the job from the savepoint.

    1. On the Deployments page, click the name of your deployment.

    2. On the deployment details page, click the State tab. Then, click the History subtab.

    3. In the Savepoints list, find the savepoint created when the job was canceled.

    4. Choose More > Start job from this savepoint in the Actions column. For more information, see Start a deployment.

Synchronize to a partitioned table in Hologres

Scenario description: Replicate data from MySQL to a Hologres partitioned table.

Usage notes: If a primary key is defined for the Hologres table, partition columns must be included in the primary key.

Sample code:

Create a MySQL table:

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

Depending on whether the partition columns are part of the primary key, choose a proper method:

  • If the source primary key contains partition columns:

    • Use the CTAS statement directly.

      Hologres will automatically verify whether partition columns are included in the primary key.

      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
      PARTITIONED BY (product_id)
      AS TABLE `mysql`.`tpcds`.`orders`;
  • If the source primary key excludes partition columns:

    • Declare the sink table's primary key in the CTAS statement and include the partition columns in primary key definition.

      In this case, not re-defining the primary key or including the partition column in it will cause the job to fail.

      -- Declare the order_id, product_id, and city fields as the primary key of the Hologres partitioned table. 
      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
          CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
      )
      PARTITIONED BY (city)
      AS TABLE `mysql`.`tpcds`.`orders`;

Widen data types during replication

Scenario description: During data synchronization, change the precision of a column, such as from VARCHAR(10) to VARCHAR(20), or change a column's data type, like from SMALLINT to INT.

Method:

  • New jobs: Enable type normalization mode at first launch.

  • Existing jobs: Drop the Hologres sink table, and restart without states to apply type normalization.

Type normalization rules:

If the new and original data types are normalized into the same data type, the data type can be successfully changed and the job will run normally. Otherwise, an exception will be reported. Details are as follows:

  • TINYINT, SMALLINT, INT, and BIGINT are converted into BIGINT.

  • CHAR, VARCHAR, and STRING are converted into STRING.

  • FLOAT and DOUBLE are converted into DOUBLE.

  • Other data types are converted based on the data type mappings between Hologres and Flink fields. For more information, see Data type mappings.

Sample code:

CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
WITH (
'connector' = 'hologres', 
'enableTypeNormalization' = 'true' -- Enable the type normalization mode. 
) AS TABLE `mysql`.`tpcds`.`orders`;

Synchronize data from MongoDB to Hologres

Limits:

  • Support is limited to VVR 8.0.6 or later and MongoDB version 6.0 or later.

  • In connector options for the source table, scan.incremental.snapshot.enabled and scan.full-changelog must be set to true.

  • The preimage and postimage features must be enabled for the MongoDB database. For more information, see Document Preimages.

  • To synchronize data from multiple MongoDB collections in a single job, ensure the configurations of the following connector options are identical for all tables:

    • MongoDB database-related options, including hosts, scheme, username, password, and connectionOptions

    • scan.startup.mode

Sample code:

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;

FAQ

Runtime errors

Job performance

Data synchronization

References