The CREATE DATABASE AS (CDAS) statement synchronizes table schemas and data across an entire database in real time, including schema changes. Use it when you need to replicate multiple—or all—tables from a source database to a destination without manually creating sink tables in advance.
For new jobs, use YAML for data ingestion instead. YAML supports all core CDAS capabilities—database and table synchronization, schema evolution, custom computed columns, raw binary log sync, WHERE clause filtering, and column pruning—and lets you convert existing SQL drafts containing CTAS or CDAS statements. For details, see Data ingestion with Flink CDC.
How it works
CDAS is a syntactic sugar of CREATE TABLE AS (CTAS). When you run a CDAS statement, Realtime Compute for Apache Flink:
-
Checks whether the destination database and sink tables exist.
-
If the destination database does not exist, Flink creates it via the destination catalog.
-
If it exists, Flink checks for sink tables:
-
If sink tables are absent, Flink creates them with names and schemas mirroring the source tables.
-
If sink tables are already present, Flink skips table creation.
-
-
-
Starts the data synchronization job. Data and schema changes are replicated continuously from the source database to the destination.
Why CDAS instead of multiple CTAS statements:
-
Simplified syntax: Flink expands one CDAS statement into one CTAS statement per table automatically.
-
Optimized resource usage: Flink uses a single source vertex to read from all matched tables. For MySQL Change Data Capture (CDC) sources, this reduces database connections, prevents redundant binary log reads, and lowers the overall read load on the MySQL database.
Core capabilities
Data synchronization
| Feature | Description |
|---|---|
| Synchronize a database | Performs full and incremental data synchronization from multiple tables (or all tables) in a database to each related sink table. |
| Consolidate and synchronize database shards | Matches source table names across database shards by using regular expressions, consolidates these tables, and synchronizes them to corresponding sinks. |
| Synchronize new tables | Synchronizes newly added tables by restarting your job from a savepoint. |
| Run multiple CDAS and CTAS statements as one job | Allows you to use the STATEMENT SET statement to commit multiple CDAS and CTAS statements as one job. You can also merge and reuse the data of source table operators to reduce the reading load on the data source. |
Schema evolution
When CDAS synchronizes a database, schema changes—such as adding columns—are propagated to the sink automatically. The behavior and policy are the same as for CTAS. See Schema evolution.
Prerequisites
Before you begin, ensure that you have:
-
A destination catalog registered in the workspace. See Catalogs.
Limitations
Syntax limitations
-
Debugging an SQL draft that contains a CDAS statement is not supported.
-
MiniBatch is not supported.
ImportantRemove all MiniBatch configurations before creating an SQL draft with a CTAS or CDAS statement: 1. Go to O&M > Configurations. 2. Select the Deployment Defaults tab. 3. In the Other Configuration section, verify that MiniBatch configurations are removed. If you encounter a "Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax" error when creating or starting a deployment, see How do I fix this error?
Supported connectors
| Connector | Source | Sink | Notes |
|---|---|---|---|
| MySQL | Yes | No | Views cannot be synchronized. |
| Kafka | Yes | No | — |
| MongoDB | Yes | No | Sharded table and database consolidation are not supported. MongoDB metadata cannot be synchronized. For setup, see Manage MongoDB catalogs. |
| Upsert Kafka | No | Yes | — |
| Hologres | No | Yes | Flink creates a connection for each table based on the connectionSize option. Configure a shared pool with connectionPoolName. If source data types are unsupported by Hologres fixed plans, use INSERT INTO instead—do not use the CTAS statement, which delivers lower writing performance because fixed plans cannot be used. Exclusive Hologres instances only—shared cluster instances are not supported. |
| StarRocks | No | Yes | Limited to StarRocks on Alibaba Cloud EMR. |
| Paimon | No | Yes | — |
Usage notes
New table synchronization
-
VVR 8.0.6 or later: Restart the job from a savepoint after a new table is added. See Synchronize new tables.
-
VVR 8.0.5 or earlier: Job restarts do not capture new tables. Use one of these approaches instead:
Approach Steps Create a new job for the new table Leave the existing job running. Create a separate job targeting only the new table. Example: CREATE TABLE IF NOT EXISTS new_table AS TABLE mysql.tpcds.new_table /*+ OPTIONS('server-id'='8008-8010') */;Clean up and restart from scratch 1. Cancel the existing job. 2. Clean up synchronized data in the sink. 3. Restart the job without states.
Cross-account and RAM access
Grant the necessary read/write permissions to your account when accessing external resources across accounts or as a RAM user or RAM role.
Syntax
CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
<target_database>:
[catalog_name.]db_name
<source_database>:
[catalog_name.]db_name
`IF NOT EXISTS` is required. It tells Flink to check whether the sink table exists in the destination. If absent, Flink creates it; if present, Flink skips creation.
The sink table shares the source table's schema—primary key and physical field names and types—but excludes computed columns, metadata fields, and watermark configurations.
Flink performs data type mappings from source to sink. For details, see the relevant connector documentation.
Parameters
| Parameter | Description |
|---|---|
target_database |
The destination database name, optionally prefixed with the catalog name: [catalog_name.]db_name. |
COMMENT |
A description for the destination database. Defaults to the source database's description. |
WITH |
Options for the destination database. Keys and values must be strings, for example, 'sink.parallelism' = '4'. See Catalogs for connector-specific options. |
source_database |
The source database name, optionally prefixed with the catalog name: [catalog_name.]db_name. |
INCLUDING ALL TABLES |
Synchronizes all tables in the source database. |
INCLUDING TABLE |
Specifies tables to synchronize. Separate multiple tables with |. Supports regular expressions—for example, INCLUDING TABLE 'web.*' matches all tables with names starting with web. |
EXCLUDING TABLE |
Specifies tables to exclude from synchronization. Separate multiple tables with |. Supports regular expressions—for example, EXCLUDING TABLE 'tmp.*' excludes all tables with names starting with tmp. |
OPTIONS |
Connector options for the source table. Keys and values must be strings, for example, 'server-id' = '65500'. See Supported connectors. |
Examples
Synchronize a database
Synchronize all tables from the tpcds MySQL database to Hologres.
Prerequisites:
-
A Hologres catalog named
holois created in the workspace. -
A MySQL catalog named
mysqlis created in the workspace.
USE CATALOG holo;
CREATE DATABASE IF NOT EXISTS holo_tpcds -- Create holo_tpcds in Hologres.
WITH ('sink.parallelism' = '4') -- Set sink parallelism (default: 4 for Hologres).
AS DATABASE mysql.tpcds INCLUDING ALL TABLES -- Sync all tables from mysql.tpcds.
/*+ OPTIONS('server-id'='8001-8004') */; -- Configure the MySQL CDC source options.
Options set in the WITH clause apply only to the current job and control write behavior. They are not persisted in the Hologres catalog. For supported options, see Hologres.
Consolidate and synchronize database shards
Merge all tables with identical names across multiple MySQL database shards into single Hologres tables.
Scenario: A MySQL instance has shards named order_db01 through order_db99. Each shard contains tables such as order and order_detail. All shard data—including schema changes—needs to land in Hologres.
Solution: Use a regular expression on the database name to match all shards. The database and table names are added to each sink table as two additional fields. The Hologres primary key includes the database name, table name, and source table's primary key columns to ensure uniqueness. There is no need to create target tables in advance.
USE CATALOG holo;
CREATE DATABASE IF NOT EXISTS holo_order -- Create holo_order in Hologres.
WITH('sink.parallelism'='4') -- Set sink parallelism (optional).
AS DATABASE mysql.`order_db[0-9]+` INCLUDING ALL TABLES -- Match all order_db shards.
/*+ OPTIONS('server-id'='8001-8004') */; -- Configure MySQL CDC source options (optional).
Tables with identical names across shards are merged into a single Hologres table.
Synchronize new tables
After a running CDAS job has started, enable new table detection and restart from a savepoint to capture newly added tables.
New table detection requires VVR 8.0.6 or later. The source table's startup mode must be initial.
-
On the Deployments page, find the target deployment and click Cancel in the Actions column.
-
In the dialog, expand More Strategies, select Stop With Savepoint, and click OK.
-
In the SQL draft, add the following statement:
SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; -
Click Deploy.
-
Recover the job from the savepoint:
-
On the Deployments page, click the deployment name.
-
On the deployment details page, click the State tab, then the History subtab.
-
In the Savepoints list, find the savepoint created when you stopped the job.
-
Choose More > Start job from this savepoint in the Actions column. See Start a job.
-
Run multiple CDAS and CTAS statements as one job
Synchronize multiple databases in a single job to maximize source reuse.
Scenario: Sync tpcds, tpch, and user_db01–user_db99 shards to Hologres in one job.
Solution: Wrap all statements in a STATEMENT SET block. Flink reuses a single source vertex across all statements, reducing the number of server IDs, database connections, and overall read load—especially for MySQL CDC sources.
-
Connector options must be identical across all source tables to enable source reuse.
-
For server ID configuration, see Set the server ID to avoid binlog consumption conflicts.
USE CATALOG holo;
BEGIN STATEMENT SET;
-- Sync user tables from all user_db shards.
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
-- Sync tpcds database.
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */;
-- Sync tpch database.
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */;
END;
Synchronize multiple MySQL databases to Kafka
Sync tables from multiple MySQL databases to Kafka while avoiding topic name conflicts.
Scenario: Tables with the same name exist in both tpcds and tpch. Sending both to the same Kafka topic would overwrite data.
Solution: Use the cdas.topic.pattern option to generate unique topic names per database. The {table-name} placeholder is replaced at runtime by the actual table name. For example, 'cdas.topic.pattern'='tpcds-{table-name}' routes table1 from tpcds to the tpcds-table1 topic.
USE CATALOG kafkaCatalog;
BEGIN STATEMENT SET;
-- Sync tpcds database. Topic names follow the pattern "tpcds-<table_name>".
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */;
-- Sync tpch database. Topic names follow the pattern "tpch-<table_name>".
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */;
END;
Introducing Kafka as an intermediate layer between MySQL and Flink also reduces the load on MySQL. See Synchronize MySQL database to Kafka with Flink CDC.
FAQ
Runtime errors
Job performance
-
What do I do if the data reading efficiency is low and backpressure exists when full data is read?
-
What do I do if the consumption of upstream data is unstable?
Data synchronization
What's next
-
Catalogs commonly used with CDAS:
-
Related capabilities:
-
Data ingestion via YAML: