All Products
Search
Document Center

Realtime Compute for Apache Flink:CREATE TABLE AS (CTAS)

Last Updated:Mar 26, 2026

The CREATE TABLE AS (CTAS) statement synchronizes full data, incremental data, and schema changes from a source table to a sink table in real time. It handles sink table creation automatically — no manual DDL needed.

For new pipelines, consider using YAML-based data ingestion instead. YAML supports all key CTAS capabilities, including schema evolution, computed columns, column pruning, and raw binary log synchronization. See Data ingestion with Flink CDC for details. You can also convert an existing SQL draft to a YAML draft with one click.

How it works

When you run a CTAS statement, Realtime Compute for Apache Flink does the following:

  1. Checks whether the sink table already exists in the destination system.

    • If the sink table does not exist, Flink creates it by mirroring the source table's schema, using the destination catalog.

    • If the sink table already exists, Flink skips table creation and verifies that the sink table's schema matches the source table's. If the schemas differ, an error is reported.

  2. Starts the data synchronization job. Flink continuously replicates data and schema changes from the source to the sink.

The sink table inherits the source table's schema — primary key, physical column names, and data types — but excludes computed columns, metadata fields, and watermark configurations.

Prerequisites

Before you begin, ensure that you have:

  • A catalog for the destination store, created in your workspace. For more information, see Catalogs.

Syntax

CREATE TABLE IF NOT EXISTS <sink_table>
(
  [ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
  key1=val1,
  key2=val2,
  ...
)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ...) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]

The IF NOT EXISTS keyword is required. It tells Flink to check whether the sink table exists before attempting to create it.

Required arguments

Argument Description
sink_table The name of the sink table. Use the fully qualified name (catalog_name.db_name.table_name) to specify the catalog and database.
source_table The name of the source table. Use the fully qualified name to specify the catalog and database.

Optional arguments

Argument Description
COMMENT A description for the sink table. Defaults to the source table's description.
PARTITIONED BY The partition columns. Note: data cannot be synchronized to a StarRocks partitioned table.
table_constraint The primary key definition. Uniquely identifies each record in the table.
WITH Connector options for the sink table. Both key and value must be STRING type — for example, 'jdbcWriteBatchSize' = '1024'. For available options, see the connector documentation for Upsert Kafka, Hologres, StarRocks, or Paimon.
OPTIONS Connector options for the source table. Both key and value must be STRING type — for example, 'server-id' = '65500'. For available options, see the connector documentation for MySQL and Kafka.
ADD COLUMN Adds computed columns to the sink table. Also used to rename source columns — though pure column mappings (for example, col AS new_col) may be optimized away. To guarantee a mapping, use a zero-computation expression such as col AS new_col + INTERVAL '0' SECOND.
column_component The definition of each new column.
computed_column_expression The expression used to compute the column's value.
FIRST Places the new column as the first field in the sink table. By default, new columns are appended at the end.
AFTER column_name Places the new column immediately after the specified column.

Supported connectors

Connector Source table Sink table Notes
MySQL Supported Not supported Views cannot be synchronized. For sharded table consolidation, database and table names are written to the sink automatically. For single-table sync, to include database and table names, create a MySQL catalog via Flink SQL with the catalog.table.metadata-columns option. See Manage MySQL catalogs.
Kafka Supported Not supported
MongoDB Supported Not supported Requires VVR 8.0.6 or later and MongoDB 6.0 or later. Sharded table/database consolidation is not supported. New tables in the source database cannot be detected or synchronized. Metadata cannot be synchronized. Data and table schema changes can be synchronized from MongoDB to other systems. See Synchronize from MongoDB to Hologres.
Upsert Kafka Not supported Supported
StarRocks Not supported Supported Limited to StarRocks on Alibaba Cloud EMR.
Hologres Not supported Supported Flink creates a separate connection for each table based on the connectionSize option. To share a connection pool across tables, set connectionPoolName. If the source table uses data types unsupported by Hologres's fixed plan feature, use INSERT INTO instead — CTAS delivers lower write performance when fixed plans cannot be used.
Paimon Not supported Supported

Schema evolution

During synchronization, the CTAS statement replicates DDL changes from the source to the sink.

Important

The CTAS statement compares schema differences rather than tracking individual DDL operations. For example, if a column is dropped and then re-added with no data changes in between, no schema change is detected. If data changes occur between the drop and the re-add, the schema change is detected and synchronized.

Supported schema changes

Schema change Behavior
Add a nullable column The column is appended to the sink table's schema, and new data is written to it. Historical data in this column is set to NULL.
Add a non-null column The column is appended to the sink table's schema, and data is synchronized.
Delete a nullable column The column is retained in the sink table, but filled with NULL values going forward.
Rename a column Renaming is treated as a drop + add: the new column name is appended to the sink table, and the original column is filled with NULL values. For example, renaming col_a to col_b adds col_b at the end of the sink table and fills col_a with NULL.
Modify a column's data type For sinks that support type changes (currently Paimon only): standard type changes such as INT to BIGINT are supported, subject to connector-specific compatibility rules. For Hologres: use type normalization mode to handle type changes via type widening. At job startup, a Hologres table with wider data types is created, and changes to column types are supported based on the compatibility of the sink. See Widen data types during replication.

Unsupported schema changes

The following schema changes are not replicated automatically:

  • Modifying constraints (such as primary keys or indexes)

  • Deleting non-nullable columns

  • Changing a column from NOT NULL to NULLABLE

To apply these changes, manually drop the sink table and restart the job to re-synchronize all data.

Limitations

  • Debugging a SQL draft that contains a CTAS statement is not supported.

  • A CTAS statement and an INSERT INTO statement cannot coexist in the same SQL draft.

  • Data cannot be synchronized to a StarRocks partitioned table.

  • MiniBatch is not supported.

Important

Before creating a SQL draft with CTAS or CDAS statements, remove any MiniBatch configurations: 1. Go to O&M > Configurations. 2. Select the Deployment Defaults tab. 3. In the Other Configuration section, verify that MiniBatch settings are removed. If you encounter the error "Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax" when creating or starting a deployment, see How do I fix this error?

Examples

Synchronize a single table

Synchronize the web_sales table from MySQL to Hologres.

Prerequisites:

  • A Hologres catalog named holo

  • A MySQL catalog named mysql

Use source and destination catalogs so that Flink can parse the source table's schema and properties automatically — no explicit DDL needed.

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales          -- Sync to web_sales in the default database.
WITH ('jdbcWriteBatchSize' = '1024')          -- Optional: sink connector options.
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;      -- Optional: source connector options.

Merge and synchronize table shards

Consolidate sharded MySQL tables and databases, then synchronize to a single Hologres table.

Use regular expressions to match source databases and tables. The database name and table name are written to two additional columns in the sink table. The sink table's primary key includes these columns to ensure uniqueness across shards.

Carets (^) cannot be used to match the beginning of a table name.

Consolidate table shards:

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

After the job starts, schema changes to individual shard tables are replicated in real time. For example, adding an age column to user02 and inserting a record:

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

Both the schema change and the new record are synchronized to the sink table automatically.

Add computed columns

During sharded table consolidation, add custom computed columns to the sink table.

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3 AFTER `id`
);

The computed columns are treated as physical columns in the sink table and synchronized in real time. Specify FIRST or AFTER <column_name> to control column position.

Run multiple CTAS statements in a single job

Use BEGIN STATEMENT SET ... END to commit multiple CTAS statements as one job. This reuses the source operator across all statements, reducing the number of server IDs, database connections, and overall read load on the source.

Important

To reuse the source operator, all CTAS statements that read from the same source must have identical connector options. For guidance on configuring server IDs, see Set the server ID to avoid binlog consumption conflicts.

USE CATALOG holo;

BEGIN STATEMENT SET;

-- Sync the web_sales table.
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- Sync the user table shards.
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

Synchronize one source to multiple sinks

Without computed columns:

USE CATALOG `holo`;

BEGIN STATEMENT SET;

-- Sync to database1.
CREATE TABLE IF NOT EXISTS `database1`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

-- Sync to database2.
CREATE TABLE IF NOT EXISTS `database2`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

With computed columns: Create a temporary table for each sink to define the computed columns, then reference each in a CTAS statement.

-- Define a temporary table with a computed_id column.
CREATE TEMPORARY TABLE `user_with_changed_id` (
  `computed_id` AS `id` + 1000
) LIKE `mysql`.`tpcds`.`user`;

-- Define a temporary table with a computed_age column.
CREATE TEMPORARY TABLE `user_with_changed_age` (
  `computed_age` AS `age` + 1
) LIKE `mysql`.`tpcds`.`user`;

BEGIN STATEMENT SET;

-- Sync to holo.tpcds.user_with_changed_id.
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
AS TABLE `user_with_changed_id`
/*+ OPTIONS('server-id'='8001-8004') */;

-- Sync to holo.tpcds.user_with_changed_age.
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
AS TABLE `user_with_changed_age`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

Add CTAS statements to replicate new tables

After a multi-CTAS job starts, add a statement to replicate a newly added source table without restarting from scratch.

Limitations:

  • Requires VVR 8.0.1 or later.

  • Only jobs started in initial mode can detect new tables when synchronizing from a CDC source.

  • The new CTAS statement's source connector options must match those of the existing statements — this is required for source operator reuse.

  • Job configuration (such as startup mode) must stay the same before and after adding the statement.

Procedure:

  1. On the Deployments page, find the deployment and click Cancel in the Actions column.

  2. In the dialog, expand More Strategies, select Stop With Savepoint, and click OK.

  3. In the SQL draft, enable new table detection and add the new CTAS statement.

    1. Click Deploy.

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
    -- Enable new table detection.
    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
    
    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- Existing: sync web_sales.
    CREATE TABLE IF NOT EXISTS web_sales
    AS TABLE mysql.tpcds.web_sales
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Existing: sync user shards.
    CREATE TABLE IF NOT EXISTS user
    AS TABLE mysql.`wp.*`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- New: sync the product table.
    CREATE TABLE IF NOT EXISTS product
    AS TABLE mysql.tpcds.product
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  4. Restart the job from the savepoint.

    1. On the Deployments page, click the deployment name.

    2. On the deployment details page, click the State tab, then click History.

    3. In the Savepoints list, find the savepoint created when the job was stopped.

    4. Click More > Start job from this savepoint in the Actions column. For more information, see Start a job.

Synchronize to a Hologres partitioned table

Replicate data from MySQL to a Hologres partitioned table.

If a primary key is defined on the Hologres table, the partition columns must be included in that primary key.

Create the source MySQL table:

CREATE TABLE orders (
    order_id   INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city       VARCHAR(100) NOT NULL,
    order_date DATE,
    purchaser  INTEGER,
    PRIMARY KEY(order_id, product_id)
);

If the source primary key already includes the partition columns — use the CTAS statement directly. Hologres verifies that the partition column is part of the primary key automatically.

CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
PARTITIONED BY (product_id)
AS TABLE `mysql`.`tpcds`.`orders`;

If the source primary key excludes the partition columns — declare the sink table's primary key explicitly in the CTAS statement and include the partition column.

-- Declare order_id, product_id, and city as the composite primary key.
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
    CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`, `product_id`, `city`) NOT ENFORCED
)
PARTITIONED BY (city)
AS TABLE `mysql`.`tpcds`.`orders`;

Widen data types during replication

Change a column's precision or data type during synchronization — for example, from VARCHAR(10) to VARCHAR(20), or from SMALLINT to INT.

Enable type normalization mode when you first launch the job. For existing jobs, drop the Hologres sink table and restart without states to apply type normalization.

Type normalization rules:

If the original and new data types both normalize to the same target type, the change succeeds. Otherwise, an exception is raised.

Original types Normalized to
TINYINT, SMALLINT, INT, BIGINT BIGINT
CHAR, VARCHAR, STRING STRING
FLOAT, DOUBLE DOUBLE
Other types Per Hologres–Flink data type mappings
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
WITH (
  'connector' = 'hologres',
  'enableTypeNormalization' = 'true'    -- Enable type normalization mode.
)
AS TABLE `mysql`.`tpcds`.`orders`;

Synchronize from MongoDB to Hologres

Limitations:

  • Requires VVR 8.0.6 or later and MongoDB 6.0 or later.

  • scan.incremental.snapshot.enabled and scan.full-changelog must both be set to true in the source connector options.

  • The preimage and postimage features must be enabled on the MongoDB database. See Document preimages.

  • For multi-collection jobs, the following connector options must be identical across all source tables: hosts, scheme, username, password, connectionOptions, and scan.startup.mode.

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;

FAQ

What do I do if a job cannot be started? See Job startup failures.

What do I do if a job restarts unexpectedly? See Unexpected job restarts.

What do I do if the "akka.pattern.AskTimeoutException" error appears? See AskTimeoutException.

How do I troubleshoot backpressure issues? See Troubleshoot backpressure.

What do I do if data reading efficiency is low during full data reads? See Low read efficiency during full data sync.

How do I troubleshoot high latency? See Troubleshoot high latency.

What do I do if upstream data consumption is unstable? See Unstable upstream consumption.

Why does the MySQL primary key data type change from BIGINT UNSIGNED to DECIMAL (or TEXT after CTAS)? See Data type changes in MySQL-to-Hologres sync.

How do I fix the "Currently does not support merge StreamExecMiniBatchAssigner" error? See Fix the MiniBatch error.

Why is the sink table empty? See No output in the sink table.

How do I troubleshoot Flink source reading issues? See Troubleshoot source reading.

How do I troubleshoot write failures to the sink table? See Troubleshoot sink write failures.

How do I troubleshoot data inaccuracy issues? See Troubleshoot data inaccuracy.

What's next