All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage MySQL catalogs

Last Updated:Feb 05, 2024

After you create a MySQL catalog, you can access the tables of a MySQL instance in the console of fully managed Flink. This topic describes how to create, view, use, and drop a MySQL catalog in the console of fully managed Flink.

Background information

When you use a MySQL catalog, take note of the following points:

  • You can directly access a table of a MySQL instance without the need to execute DDL statements to register the MySQL table. This improves the efficiency and accuracy of data development.

  • Tables of MySQL catalogs can be used as MySQL Change Data Capture (CDC) source tables, MySQL result tables, and MySQL dimension tables in Flink SQL deployments.

  • Catalogs of ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases are supported.

  • Sharding-based logical tables can be directly accessed.

  • You can use the CREATE DATABASE AS and CREATE TABLE AS statements to synchronize full data of a database, the merged data of sharded tables in a sharded database, and changes in table schemas based on MySQL data sources.

This topic describes the following operations that you can perform to manage MySQL catalogs:

Limits

  • Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.11-flink-1.13 or later supports MySQL catalogs.

  • You cannot modify the DDL statements that are related to catalogs.

  • You can use MySQL catalogs to only query data in databases and tables. You cannot use MySQL catalogs to create databases or tables.

  • If a table of a MySQL catalog is used as a MySQL CDC source table, you can read data from the source table only in streaming mode. You cannot read data from the source table in batch mode. If a table of a MySQL catalog is used as a dimension table or result table, both streaming processing and batch processing are supported.

  • MySQL catalogs cannot identify tables that are created by using the syntax specific to PolarDB in the CREATE TABLE statement. For example, if a table is created by using the PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`) syntax in the CREATE TABLE statement, MySQL catalogs cannot identify the table.

  • Only MySQL 5.7 and MySQL 8.0.X are supported.

Note

If a table of a MySQL catalog is used as a MySQL CDC source table, you must enable binary logging on the ApsaraDB RDS for MySQL, PolarDB for MySQL, or self-managed MySQL database. For more information, see Configure a MySQL database.

Create a MySQL catalog

You can create a MySQL catalog on the UI or by executing an SQL statement. We recommend that you configure a MySQL catalog on the UI.

Create a MySQL catalog on the UI

  1. Go to the Catalogs page.

    1. Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    2. In the left-side navigation pane, click Catalogs.

  2. On the Catalog List page, click Create Catalog. On the Built-in Catalog tab of the Create Catalog dialog box, click MySQL and click Next.

  3. Configure the parameters in the Configure Catalog step.

    Important

    After you create a MySQL catalog, the parameter configuration cannot be modified. If you want to modify the parameter configuration, you must drop the MySQL catalog that you created and create a MySQL catalog again.

    配置信息

    Parameter

    Description

    Required

    catalogname

    The name of the MySQL catalog.

    Yes

    hostname

    The IP address or hostname that is used to access the MySQL database.

    Yes

    port

    The port number of the MySQL database. Default value: 3306.

    No

    default-database

    The name of the default MySQL database.

    Yes

    username

    The username that is used to access the MySQL database.

    Yes

    password

    The password that is used to access the MySQL database.

    Yes

  4. Click Confirm.

  5. In the Catalogs pane on the left side of the Catalog List page, view the catalog that you create.

Create a MySQL catalog by executing an SQL statement

  1. In the code editor of the Scripts tab on the SQL Editor page, enter the following statement:

    CREATE CATALOG <yourcatalogname> WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = '<dbname>',
      'catalog.table.metadata-columns' = '<metadata>'
    );

    Parameter

    Description

    Required

    yourcatalogname

    The name of the MySQL catalog.

    Important

    You must remove the angle brackets (<>) when you replace the value of the parameter with the name of your catalog. Otherwise, an error is returned during the syntax check.

    Yes

    type

    The type of the catalog. Set the value to mysql.

    Yes

    hostname

    The IP address or hostname that is used to access the MySQL database.

    Yes

    port

    The port number of the MySQL database. Default value: 3306.

    No

    default-database

    The name of the default MySQL database.

    Yes

    username

    The username that is used to access the ApsaraDB RDS for MySQL database.

    Yes

    password

    The password that is used to access the MySQL database.

    Yes

    catalog.table.metadata-columns

    Specifies the metadata columns in a MySQL CDC source table that you want to add to the schema of a table when you query the table. Separate multiple metadata columns with semicolons (;). For example, you can set this parameter to op_ts;table_name;database_name. By default, no metadata column is added.

    Note
    • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.5 or later supports this parameter.

    • If you specify this parameter, the specified metadata columns are added to the schema of the returned table. The metadata columns are available only in MySQL CDC source tables. Therefore, the tables that are created in the MySQL catalog can be used as only source tables and cannot be used as result tables or dimension tables.

    No

    catalog.table.treat-tinyint1-as-boolean

    Specifies whether the TINYINT(1) and BOOLEAN data types of MySQL correspond to the BOOLEAN data type of Flink when Realtime Compute for Apache Flink obtains the schema of the table. Valid values:

    • true: The TINYINT(1) and BOOLEAN data types of MySQL correspond to the BOOLEAN data type of Flink. This is the default value.

    • false: The TINYINT(1) and BOOLEAN data types of MySQL correspond to the TINYINT data type of Flink.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.4 or later supports this parameter.

    No

  2. Select the code that is used to create a catalog and click Run that appears on the left side of the code.

    image..png

View a MySQL catalog

  1. Go to the Catalogs page.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Catalogs.

  2. On the Catalog List page, find the desired catalog and view the Name and Type columns of the catalog.

    Note

    If you want to view the databases and tables in the catalog, click View in the Actions column.

Use a MySQL catalog

  • Read data from a MySQL CDC source table.

    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */;
    Note
    • When you use a table in a MySQL catalog, you can use SQL hints to specify the time zone of the MySQL database server for the table. For example, you can use mycatalog.mytable /*+ OPTIONS('server-time-zone'='Asia/Shanghai') */.

    • If you want to use a table of a MySQL catalog as the MySQL CDC source table, we recommend that you use SQL hints to set the server-id parameter to a unique value for each deployment. If you want to run multiple deployments to read data from the source table at the same time, you must set the server-id parameter to a value range. The number of server-id values in the value range must be greater than or equal to the parallelism of deployments. For example, you can use mycatalog.mytable /*+ OPTIONS('server-time-zone'='Asia/Shanghai', 'server-id' = '6000-6008') */.

  • Read data from the sharding-based MySQL logic tables.

    MySQL catalogs allow you to configure the databases and tables in a sharded database as a logical table in a query statement by using regular expressions and execute the query statement to read data from the logical table. For example, if a sharded MySQL database has tables, such as user01, user02, and user99, in database shards from db01 to db10 and the schemas of all the tables are compatible with each other, you can access all the tables in the database shards by using the following regular expression:

    SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

    The query result contains two additional system fields _db_name (STRING) and _table_name (STRING). The two fields and the primary key of the original tables are used as the new joint primary key of the logical table to ensure that the joint primary key is unique. If the primary keys of the tables from user01 to user99 are id, the joint primary key of the logical table named user is (_db_name, _table_name, id). MySQL catalogs support data reading from sharded tables in a sharded database by using regular expressions. For more information about the related examples and limits, see CREATE TABLE AS statement.

  • Execute the CREATE TABLE AS and CREATE DATABASE AS statements to synchronize MySQL data changes and schema changes in real time.

    USE CATALOG `${target_catalog}`;
    
    -- Single-table synchronization: Synchronize schema changes and data changes of tables in real time. 
    CREATE TABLE IF NOT EXISTS `${target_table_name}`
    WITH (...)
    AS TABLE `${mysql_catalog}`.`${db_name}`.`${table_name}`
    /*+ OPTIONS('server-id'='6000-6018') */;
    
    -- Database synchronization: Synchronize schema changes and data changes of the database in real time. 
    CREATE DATABASE `${target_db_name}` WITH (...)
    AS DATABASE `${mysql_catalog}`.`${db_name}` INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='6000-6018') */;
                        

    For more information about related examples and limits, see CREATE TABLE AS statement or CREATE DATABASE AS statement.

  • Read data from a MySQL dimension table.

    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM ${other_source_table} AS e
    JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • Write result data to a MySQL table.

    INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
    SELECT ...
    FROM ${other_source_table}

Drop a MySQL catalog

You can drop a MySQL catalog on the UI or by using an SQL statement. We recommend that you drop a MySQL catalog on the UI.

Drop a MySQL catalog on the UI

  1. Go to the Catalogs page.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Catalogs.

  2. On the Catalog List page, find the desired catalog and click Delete in the Actions column.

  3. In the message that appears, click Delete.

  4. View the Catalogs pane on the left side of the Catalog List page to check whether the catalog is dropped.

Drop a MySQL catalog by using an SQL statement

  1. In the code editor of the Scripts tab on the SQL Editor page, enter the following statement:

    DROP CATALOG ${catalog_name}

    catalog_name is the name of the MySQL catalog that you want to drop in the console of fully managed Flink.

    Note

    After you drop a MySQL catalog, the running deployments are not affected. However, the deployments that use a table of the catalog can no longer find the table if the deployments are published or restarted. Proceed with caution when you drop a MySQL catalog.

  2. Right-click the statement that is used to drop the catalog and select Run from the shortcut menu.

  3. View the Catalogs pane on the left side of the Catalog List page to check whether the catalog is dropped.