You can execute the CREATE TABLE AS statement to synchronize data and the changes in the table schema from one table to another table in real time. This helps improve the efficiency of creating a table in a destination store and synchronizing changes in the schema of a source table to a destination table. This topic describes the background information, prerequisites, limits, and syntax of the CREATE TABLE AS statement. This topic also provides examples of the CREATE TABLE AS statement.

Background information

  • Features
    Feature Description
    Single-table synchronization Synchronizes full data and incremental data from a source table to a destination table in real time.
    Synchronization of table schema changes Synchronizes schema changes of a source table, such as an added column, to a destination table in real time.
    Merging and synchronization of multiple tables in a sharded database Allows you to use regular expressions to define database shard names and table names that match multiple database shards and tables in a sharded database. Then, you can merge the data in the tables and synchronize the data to a destination table.
    Addition of custom computed columns Allows you to add computed columns to the source table. This way, data in specific columns of the source table can be converted and computed. You can specify the position of the computed column that you want to add and use the column as a physical column of the destination table. This way, data in the computed column can be synchronized to the destination table in real time.
    Execution of multiple CREATE TABLE AS statements Allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS statements as one job. You can also merge and reuse the data of source nodes to reduce the reading load on the data source.
  • Startup process
    When you execute the CREATE TABLE AS statement, fully managed Flink performs the following operations:
    1. Checks whether the destination table exists in the destination store.

      If the destination table does not exist, fully managed Flink uses the catalog of the destination store to create the destination table in the destination store. The destination table has the same schema as the data source. If the destination table exists, fully managed Flink does not create a table. If the schema of the destination table is different from the schema of the source table, an error is returned.

    2. Commits and runs the data synchronization job.

      Fully managed Flink synchronizes data and changes in the schema from the data source to the destination table.

    The following figure shows the process of using the CREATE TABLE AS statement to synchronize data from MySQL to Hologres. Synchronization process
  • Synchronization policies of table schema changes
    You can use the CREATE TABLE AS statement to synchronize data in real time and also synchronize schema changes from the source table to the destination table. Schema changes include table creation and schema changes after a table is created. The following schema change policies are supported:
    • Add a nullable column: The statement automatically adds the related column to the end of the destination table schema and synchronizes data to the added column.
    • Delete a nullable column: The statement automatically fills null values in the nullable column of the destination table instead of deleting the column from the table.
    • Rename a column: The operation of renaming a column involves adding a column and deleting a column. After a column is renamed in the source table, the column that uses the new name is added to the end of the destination table and the column that uses the original name is filled with null values. For example, if the name of the col_a column in the source table is changed to col_b, column col_b is added to the end of the destination table and column col_a is automatically filled with null values.
    The following schema changes are not supported:
    • Change of data types

      For example, the data in a column is changed from the VARCHAR type to the BIGINT type, or the column property is changed from NOT NULL to NULLABLE.

    • Change of constraints, such as the primary key or index
    • Addition or deletion of a non-nullable column
    Note
    • If the schema of the source table has one of the preceding changes, you must delete the destination table and restart the job that executes the CREATE TABLE AS statement. This way, the destination table is created again and the historical data is resynchronized to the destination table.
    • The CREATE TABLE AS statement does not identify the types of DDL statements, but compares the schema differences between the two data records before and after the schema is changed. Therefore, if you delete a column and then add the column again, and no data changes between the two DDL statements that are used to delete and add the column, the CREATE TABLE AS statement considers that no schema change occurs. Similarly, the CREATE TABLE AS statement does not trigger schema change synchronization even if you add a column to the source table. The statement identifies the schema change only when the data changes in the source table. In this case, the statement synchronizes the schema change to the destination table.

Prerequisites

A catalog of the destination store is created in your workspace. For more information, see Manage a Hive metastore, Manage Hologres catalogs or Manage MySQL catalogs.

Limits

  • Only the Flink compute engine of vvr-4.0.11-flink-1.13 or later supports the CREATE TABLE AS statement.
  • Only the Flink compute engine of vvr-4.0.12-flink-1.13 or later supports the addition of custom computed columns.
  • The destination store supports only Hologres catalogs.
  • The CREATE TABLE AS statement does not support job debugging. For more information, see Debug a job.
  • Only the schema changes of a MySQL Change Data Capture (CDC) source table and a Message Queue for Apache Kafka source table can be synchronized.

Syntax

CREATE TABLE IF NOT EXISTS <sink_table>
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component>  (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  computed_column_definition [FIRST  AFTER column_name]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]
The CREATE TABLE AS statement uses the basic syntax of the CREATE TABLE statement. The following table describes the parameters.
Parameter Description
sink_table The name of the table to which data is synchronized. You can use a catalog name and a database name to specify the name of the destination table.
COMMENT The description of the destination table. By default, the description of source_table is used.
PARTITIONED BY The partition key of the destination table.
WITH The parameters of the destination table. For more information, see the parameters in the WITH clause in the documentation of the related result table connector. For more information about the connector documentation, see Create a result table.
Note Both the key and value must be of the STRING type, such as 'jdbcWriteBatchSize' = '1024'.
source_table The name of the table from which data is synchronized. You can use a catalog name and database name to specify the name of the source table.
OPTIONS The parameters of the source tables. For more information, see the parameters in the WITH clause in the documentation of the related source table connector. For more information about the connector documentation, see Create a source table.
Note Both the key and value must be of the STRING type, such as 'server-id' = '65500'.
ADD COLUMN Adds columns to the source table. Only computed columns can be added.
column_component The description of the new column.
computed_column_definition The description of the computed column expression.
FIRST Specifies that the new column is used as the first field in the source table. If you do not use this parameter, the new column is used as the last field in the source table by default.
AFTER Specifies that the new column is added after the specified field in the source table.
Note The IF NOT EXISTS keyword is required. If the destination table does not exist in the destination store, the destination table is created first. If the destination table exists, the table creation step is skipped. The destination tables that are created use the schemas of the source tables, including the primary key and the names and types of the physical fields. The computed columns, meta field, and watermark are not included. The field types of the source tables are mapped to the field types of the destination tables. For more information, see the data type mapping table of each connector.

Examples

  • Example 1: single-table synchronization

    In most cases, the CREATE TABLE AS statement is used with the catalog of the data source and the catalog of the destination store. For example, you can execute the CREATE TABLE AS statement and use a MySQL catalog and a Hologres catalog to synchronize full data and incremental data from the MySQL database to Hologres. You can use a MySQL catalog to parse the schema and related parameters of the source table without the need to manually write DDL statements.

    For example, a Hologres catalog named holo and a MySQL catalog named mysql are created in your workspace. You can use the following sample code to synchronize data from the web_sales table of the MySQL database to Hologres:
    USE CATALOG holo;
    
    CREATE TABLE IF NOT EXISTS web_sales  
    WITH ('jdbcWriteBatchSize' = '1024')   -- Configure the parameters of the destination table. This setting is optional. 
    AS TABLE mysql.tpcds.web_sales   
    /*+ OPTIONS('server-id'='8001-8004') */; -- Configure additional parameters for the MySQL CDC source table. 
  • Example 2: merging and synchronization of multiple tables in a sharded database
    If you want to merge and synchronize data of multiple tables in a sharded database, you can use a MySQL catalog and a regular expression that defines database shard names and table names to match the tables whose data you want to synchronize. You can use the CREATE TABLE AS statement to merge multiple tables in a sharded database into a Hologres table. The database shard names and table names are written as the values of two additional fields to the table. To ensure that the primary key is unique, the database shard name field, table name field, and original primary key are used as the new joint primary key of the Hologres table.
    USE CATALOG holo;
    
    CREATE TABLE IF NOT EXISTS user
    WITH ('jdbcWriteBatchSize' = '1024')
    AS TABLE mysql.`wp.*`.`user[0-9]+`  
    /*+ OPTIONS('server-id'='8001-8004') */;
    The following figure shows the effect of the merging operation. Effect 1If you add a column named age to table user02 and insert a data record into the table, the data and schema changes on table user02 can be automatically synchronized to the destination table in real time even if the schemas of the source tables are different.
    ALTER TABLE `user02` ADD COLUMN `age` INT;
    INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);
    Effect 2
  • Example 3: addition of custom computed columns
    This example describes how to add computed columns to the source tables for data conversion and computation during the merging and synchronization of multiple tables in a sharded database. In this example, the user table is used.
    USE CATALOG holo;
    
    CREATE TABLE IF NOT EXISTS user
    WITH ('jdbcWriteBatchSize' = '1024')
    AS TABLE mysql.`wp.*`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */
    ADD COLUMN (
      `t_idx` AS COALESCE(SPLIT_INDEX(`tbl`, 'r', 1), 'default') FIRST,
      `c_id` AS `id` + 10 AFTER `id`
    );
    The following figure shows the synchronization effect. Add computed columns
  • Example 4: execution of multiple CREATE TABLE AS statements

    Fully managed Flink allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS statements as one job. Fully managed Flink can also optimize the data of source nodes and use a single source node to read data from multiple business tables. This operation can help reduce the use of server-id, the number of database connections, and the database reading load. Therefore, this operation is suitable for MySQL CDC data sources.

    For example, data is synchronized from the web_sales table in the first code segment, and data is synchronized from multiple tables whose names start with user in a sharded database in the second code segment. In this case, you can use the STATEMENT SET statement to commit these code segments as one job. This way, you can synchronize data from multiple business tables in one job, and one source node can read data from multiple business tables. The following code shows an example.
    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- Synchronize data from the web_sales table. 
    CREATE TABLE IF NOT EXISTS web_sales
    AS TABLE mysql.tpcds.web_sales
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Synchronize data from multiple tables in the specified database shards whose names start with user. 
    CREATE TABLE IF NOT EXISTS user
    AS TABLE mysql.`wp.*`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;