After you configure a MySQL catalog, you can access the tables of a MySQL instance in the console of fully managed Flink. This topic describes how to configure, view, and delete a MySQL catalog in the console of fully managed Flink.

Background information

When you use a MySQL catalog, take note of the following points:
  • You can directly access a table of a MySQL instance without the need to execute DDL statements to register the MySQL table. This improves the efficiency and accuracy of data development.
  • Tables of MySQL catalogs can be used as MySQL Change Data Capture (CDC) source tables, MySQL result tables, and MySQL dimension tables in Flink SQL jobs.
  • Catalogs of ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases are supported.
  • Sharding-based logical tables can be directly accessed.
  • You can use the CREATE DATABASE AS and CREATE TABLE AS statements to synchronize full data of a database, the merged data of sharded tables in a sharded database, and changes in table schemas based on MySQL data sources.
This topic describes the following operations that you can perform to manage MySQL catalogs:

Limits

  • Only the Flink compute engine of vvr-4.0.11-flink-1.13 or later supports MySQL catalogs.
  • You cannot modify the DDL statements that are related to catalogs.
  • You can only query data in databases and tables. You cannot create databases or tables.
  • If a table of a MySQL catalog is used as a MySQL CDC source table, you can read data from the source table only in streaming mode. You cannot read data from the source table in batch mode. If a table of a MySQL catalog is used as a dimension table or result table, both streaming processing and batch processing are supported.
  • Only MySQL 5.7 and MySQL 8.0.X are supported.
Note If a table of a MySQL catalog is used as a MySQL CDC source table, you must enable binary logging on the ApsaraDB RDS for MySQL, PolarDB for MySQL, or self-managed MySQL database. For more information, see Configure a MySQL database.

Configure a MySQL catalog

You can configure a MySQL catalog on the UI or by executing an SQL statement. We recommend that you configure a MySQL catalog on the UI.

Configure a MySQL catalog on the UI

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Draft Editor.
  4. On the left side of the Draft Editor page, click the Schemas tab.
  5. Click the Plus sign icon and select Create Catalog from the drop-down list.
  6. In the Create Catalog dialog box, click MySQL.
  7. Configure the parameters.
    Notice After you create a MySQL catalog, the parameter configuration cannot be modified. If you want to modify the parameter configuration, you must delete the MySQL catalog that you created and create a MySQL catalog again.
    Parameters
    Parameter Description Required
    catalogname The name of the MySQL catalog. Yes
    type The type of the database. Set the value to mysql. Yes
    hostname The IP address or hostname that is used to access the MySQL database. Yes
    port The port number of the MySQL database. Default value: 3306. No
    default-database The name of the default MySQL database. Yes
    username The username that is used to access the MySQL database. Yes
    password The password that is used to access the MySQL database. Yes
  8. Click Confirm.
  9. Click the Refresh icon to refresh the page and view the MySQL catalog that you created.
    Refresh icon

Configure a MySQL catalog by executing an SQL statement

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Draft Editor.
  4. In the upper-left corner of the Draft Editor page, click New. In the New Draft dialog box, select STREAM / SQL from the Type drop-down list.
  5. In the script editor, enter the following statement to create a MySQL catalog:
    CREATE CATALOG <catalogname> WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = '<dbname>'
    );
    Parameter Description Required
    catalogname The name of the MySQL catalog. Yes
    type The type of the database. Set the value to mysql. Yes
    hostname The IP address or hostname that is used to access the MySQL database. Yes
    port The port number of the MySQL database. Default value: 3306. No
    default-database The name of the default MySQL database. Yes
    username The username that is used to access the MySQL database. Yes
    password The password that is used to access the MySQL database. Yes
  6. Click Execute.
    After the statement is executed, the message "Query has been executed" appears.
  7. On the left side of the Draft Editor page, click the Schemas tab.
  8. Click the Refresh icon to refresh the page and view the MySQL catalog that you created.
    Refresh icon

View the metadata of a MySQL catalog

After you configure a MySQL catalog, you can perform the following steps to view the metadata of the MySQL catalog.

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Draft Editor.
  4. On the left side of the Draft Editor page, click the Schemas tab.
  5. Select the MySQL catalog whose metadata you want to view from the drop-down list in the menu bar. In this example, the MySQL catalog named mysql is used.
    mysql catalog
  6. View information about databases, tables, and functions in the MySQL catalog.
    Tables

Use a MySQL catalog

  • Read data from a MySQL CDC source table.
    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */;
    Note If you want to use a table of a MySQL catalog as the MySQL CDC source table, we recommend that you use SQL hints to set the server-id parameter to a unique value for each job. If you want to run multiple jobs to read data from the source table at the same time, you must set the server-id parameter to a value range. The number of server-id values in the value range must be greater than or equal to the parallelism of jobs.
  • Read data from the sharding-based MySQL logic tables.
    MySQL catalogs allow you to configure the databases and tables in a sharded database as a logical table in a query statement by using regular expressions and execute the query statement to read data from the logical table. For example, if a sharded MySQL database has multiple tables, such as user01, user02, and user99, in database shards from db01 to db10 and the schemas of all the tables are compatible with each other, you can access all the tables in the database shards by using the following regular expression:
    SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

    The query result contains two additional system fields _db_name (STRING) and _table_name (STRING). The two fields and the primary key of the original tables are used as the new joint primary key of the logical table to ensure that the joint primary key is unique. If the primary keys of the tables from user01 to user99 are id, the joint primary key of the logical table named user is (_db_name, _table_name, id). For more information about how to read data from sharded tables in a sharded database by using regular expressions, see CREATE TABLE AS statement.

  • Execute the CREATE TABLE AS and CREATE DATABASE AS statements to synchronize MySQL data changes and schema changes in real time.
    USE CATALOG `${target_catalog}`;
    
    -- Single-table synchronization: Synchronize schema changes and data changes of tables in real time. 
    CREATE TABLE IF NOT EXISTS `${target_table_name}`
    WITH (...)
    AS TABLE `${mysql_catalog}`.`${db_name}`.`${table_name}`
    /*+ OPTIONS('server-id'='6000-6018') */;
    
    -- Database synchronization: Synchronize schema changes and data changes of the database in real time. 
    CREATE DATABASE `${target_db_name}` WITH (...)
    AS DATABASE `${mysql_catalog}`.`${db_name}` INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='6000-6018') */;
                        

    For more examples, see CREATE TABLE AS statement or CREATE DATABASE AS statement.

  • Read data from a MySQL dimension table.
    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM ${other_source_table} AS e
    JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • Write result data to a MySQL table.
    INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
    SELECT ...
    FROM ${other_source_table}

Delete a MySQL catalog

You can delete a MySQL catalog on the UI or by using an SQL statement. We recommend that you delete a MySQL catalog on the UI.

Delete a MySQL catalog on the UI

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Draft Editor.
  4. On the left side of the Draft Editor page, click the Schemas tab.
  5. Select the MySQL catalog that you want to delete from the drop-down list in the menu bar, and click the Delete icon.
    Delete button
  6. In the dialog box that appears, click Delete.
  7. Click the Refresh icon to refresh the page and check whether the MySQL catalog is deleted.

Delete a MySQL catalog by using an SQL statement

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Draft Editor.
  4. In the upper-left corner of the Draft Editor page, click New. In the New Draft dialog box, select STREAM / SQL from the Type drop-down list.
  5. In the script editor, enter the following statement:
    DROP CATALOG ${catalog_name}
    catalog_name is the name of the MySQL catalog that you want to delete in the console of fully managed Flink.
    Note After you delete a MySQL catalog, the running jobs are not affected. However, the jobs that use a table of the catalog can no longer find the table if the jobs are published or restarted. Proceed with caution when you delete a MySQL catalog.
  6. Click Execute.
  7. On the left side of the Draft Editor page, click the Schemas tab.
  8. Click the Refresh icon to refresh the page and check whether the MySQL catalog is deleted.