The CREATE DATABASE AS statement can synchronize table schemas and data of an entire database in real time. This statement can also synchronize changes of table schemas. This topic describes the background information, prerequisites, limits, and syntax of the CREATE DATABASE AS statement. This topic also provides the sample code of the CREATE DATABASE AS statement.

Background information

The CREATE DATABASE AS statement is a syntactic sugar of the CREATE TABLE AS statement to synchronize data of an entire database or multiple tables. Fully managed Flink translates the CREATE DATABASE AS statement into a CREATE TABLE AS statement for each table from which data needs to be synchronized. Therefore, the CREATE DATABASE AS statement also provides the data synchronization and schema change synchronization capabilities of the CREATE TABLE AS statement. In most cases, the CREATE DATABASE AS statement is suitable for fully automated data integration scenarios. Fully managed Flink can also optimize data of source nodes and use one source node to read data from multiple business tables. This is suitable for MySQL Change Data Capture (CDC) data sources. The CREATE DATABASE AS statement can help reduce the number of database connections and prevent the system from repeatedly pulling binary log data to reduce the database reading load. The following section shows information about the CREATE DATABASE AS statement.
  • Features
    Feature Description
    Synchronization of an entire database Synchronizes full and incremental data from a database or multiple tables to each related destination table.
    Synchronization of table schema changes Synchronizes schema changes of each source table, such as an added column, to the related destination table in real time during database synchronization.
    Execution of multiple CREATE TABLE AS and CREATE DATABASE AS statements Allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS and CREATE DATABASE AS statements as one job. You can also merge and reuse the data of source nodes to reduce the reading load on the data source.
  • Startup process
    When you execute the CREATE DATABASE AS statement, fully managed Flink performs the following operations:
    1. Checks whether the destination database and tables exist in the destination store.

      If the destination database does not exist, fully managed Flink uses the catalog of the destination store to create a destination database in the destination store. If the destination database exists, fully managed Flink skips the database creation step and checks whether the destination tables exist in the destination database. If the destination tables do not exist, fully managed Flink creates the destination tables in the destination database. The destination tables have the same names and schemas as the tables in the source database. If the destination tables exist, fully managed Flink skips the table creation step.

    2. Commits and runs the data synchronization job.

      Synchronizes data and schema changes from the source database to the tables in the destination database.

    The following figure shows the process of using the CREATE DATABASE AS statement to synchronize data from MySQL to Hologres. Process of data synchronization by using the CREATE DATABASE AS statement
  • Synchronization policies of table schema changes

    The CREATE DATABASE AS statement is a syntactic sugar of the CREATE TABLE AS statement. Therefore, the CREATE DATABASE AS statement provides the same synchronization capability of schema changes as the CREATE TABLE AS statement. For more information about the synchronization policies of table schema changes, see CREATE TABLE AS statement.

Prerequisites

A catalog of the destination store is created in your workspace. For more information, see Manage a Hive metastore, Manage Hologres catalogs or Manage MySQL catalogs.

Limits

  • Only the Flink compute engine of vvr-4.0.11-flink-1.13 or later supports the CREATE DATABASE AS statement.
  • The destination store supports only Hologres catalogs.
  • The CREATE DATABASE AS statement does not support job debugging. For more information, see Debug a job.

Syntax

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES  TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]

<target_database>:
  [catalog_name.]db_name

<source_database>:
  [catalog_name.]db_name
The CREATE DATABASE AS statement uses the basic syntax of the CREATE DATABASE statement. The following table describes the parameters.
Parameter Description
target_database The name of the database to which data is synchronized. You can specify a catalog name.
COMMENT The description of the destination database. By default, the description of source_database is used.
WITH The parameters of the destination database. For more information, see the related catalog documentation. For example, if you use a Hologres catalog, see Manage Hologres catalogs.
Note Both the key and value must be of the STRING type, such as 'sink.parallelism' = '4'.
source_database The name of the database from which data is synchronized. You can specify a catalog name.
INCLUDING ALL TABLES Specifies that all tables in the source database are synchronized.
INCLUDING TABLE Specifies that the specified tables in the source database are synchronized. You can use vertical lines (|) to separate the names of multiple tables. You can also use a regular expression to specify tables that comply with a rule. For example, INCLUDING TABLE 'web.*' indicates that all tables whose names start with web in the source database are synchronized.
EXCLUDING TABLE Specifies the tables that do not need to be synchronized. You can use vertical lines (|) to separate the names of multiple tables. You can also use a regular expression to specify tables that comply with a rule. For example, INCLUDING ALL TABLES EXCLUDING TABLE 'web.*' indicates that all tables whose names do not start with web in the source database are synchronized.
OPTIONS The parameters of the source tables. For more information, see the parameters in the WITH clause in the documentation of the related source table connector. For more information about the connector documentation, see Create a source table.
Note Both the key and value must be of the STRING type, such as 'server-id' = '65500'.
Note The IF NOT EXISTS keyword is required. If the destination database or destination tables do not exist in the destination store, the destination database or destination tables are created. If the destination database or destination tables exist, the related creation step is skipped. The destination tables that are created use the schemas of the source tables, including the primary key and the names and types of the physical fields. The computed columns, meta field, and watermark are not included. The field types of the source tables are mapped to the field types of the destination tables. For more information, see the data type mapping table of each connector.

Examples

  • Example 1: synchronization of the entire database

    In most cases, the CREATE DATABASE AS statement is used with the catalog of the data source and the catalog of the destination store. For example, you can execute the CREATE DATABASE AS statement and use a MySQL catalog and a Hologres catalog to synchronize full and incremental data from the MySQL database to Hologres. You can use a MySQL catalog to parse the schema and related parameters of the source tables without the need to manually write DDL statements.

    For example, a Hologres catalog named holo and a MySQL catalog named mysql are created in your workspace. A MySQL database named tpcds exists, and the tpcds database has 24 tables. You can execute the following statement to synchronize all tables from the tpcds database to Hologres. The synchronized data includes future data changes and table schema changes. You do not need to create tables in Hologres in advance.
    USE CATALOG holo;
    
    CREATE IF NOT EXISTS DATABASE holo_tpcds -- Create the holo_tpcds database in Hologres. 
    WITH ('sink.parallelism' = '4') -- Configure the parameters of the destination database. By default, the parallelism for the Hologres sink is set to 4. This setting is optional. 
    AS DATABASE mysql.tpcds INCLUDING ALL TABLES -- Synchronize all tables from the tpcds database in the MySQL database. 
    /*+ OPTIONS('server-id'='8001-8004') */ ; -- Configure additional parameters for MySQL CDC source tables. This setting is optional. 
    Note When you create a destination database, Hologres allows you to configure parameters in the WITH clause. These parameters take effect only for the current job. The parameters are used to control the behavior when data is written to the destination table and data is not persistently stored in Hologres. For more information about the supported parameter types and their features, see Create a Hologres result table.
  • Example 2: execution of multiple CREATE TABLE AS and CREATE DATABASE AS statements

    Fully managed Flink allows you to use the STATEMENT SET statement to commit multiple CREATE TABLE AS and CREATE DATABASE AS statements as one job. Fully managed Flink can also optimize data of source nodes and use one source node to read data from multiple business tables. This operation reduces the use of server-id, the number of database connections, and the database reading load. Therefore, this operation is suitable for MySQL CDC data sources.

    For example, if the MySQL instance has multiple databases named tpcds, tpch, and user_db01 to user_db99 (database shards), you can combine multiple CREATE TABLE AS and CREATE DATABASE AS statements to synchronize data from all databases and tables of the MySQL instance to Hologres. This way, you can run only one fully managed Flink job to synchronize data from all tables in the databases and use only one source node to read data from all tables. The following code shows an example.
    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- Synchronize data from multiple tables in the specified database shards whose names start with user. 
    CREATE TABLE IF NOT EXISTS user
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- Synchronize data from the tpcds database. 
    CREATE IF NOT EXISTS DATABASE holo_tpcds
    AS DATABASE mysql.tpcds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    -- Synchronize data from the tpch database. 
    CREATE IF NOT EXISTS DATABASE holo_tpch
    AS DATABASE mysql.tpch INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    END;