すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MySQL カタログの管理

最終更新日:Nov 09, 2025

MySQL カタログを作成すると、対応する MySQL インスタンス内のテーブルに Flink コンソールから直接アクセスし、これらのテーブルを Flink SQL ジョブで使用できます。このトピックでは、MySQL カタログの作成方法と使用方法について説明します。

背景情報

MySQL カタログには次の特徴があります:

  • データ定義言語 (DDL) 文を使用して手動で登録することなく、MySQL インスタンス内のテーブルにアクセスできます。この特徴により、開発効率と精度が向上します。

  • MySQL カタログ内のテーブルは、Flink SQL ジョブで MySQL Change Data Capture (CDC) ソーステーブル、MySQL 結果テーブル、および MySQL ディメンションテーブルとして使用できます。

  • ApsaraDB RDS for MySQL、PolarDB for MySQL、および自己管理 MySQL データベースをサポートします。

  • シャード化されたデータベースおよびテーブル内の論理テーブルへの直接アクセスをサポートします。

  • CREATE DATABASE AS (CDAS) および CREATE TABLE AS (CTAS) 構文を使用して、MySQL データソースに基づく完全なデータベース同期、シャード化されたデータベースとテーブルのマージ同期、およびスキーマ進化の同期を実行できます。

制限事項

  • MySQL インスタンスと Flink のデプロイメントは、同じ Virtual Private Cloud (VPC) 内にある必要があります。VPC 間またはインターネット経由でインスタンスにアクセスするには、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続性」をご参照ください。

  • カタログは作成後に構成を変更することはできません。変更するには、カタログを削除して新しいカタログを作成する必要があります。

  • 既存のデータベースとテーブルのクエリのみが可能です。Flink を使用してデータベースやテーブルを作成することはできません。

  • ソーステーブルとして使用する場合、ストリーム読み取りのみがサポートされます。バッチ読み取りはサポートされていません。

    説明

    MySQL カタログのテーブルを MySQL CDC ソーステーブルとして使用する場合、ApsaraDB RDS for MySQL、PolarDB for MySQL、または自己管理 MySQL データベースでバイナリログなどの機能を有効にする必要があります。詳細については、「MySQL データベースの設定」をご参照ください。

  • CREATE TABLE 文で PolarDB 固有の構文を使用して作成されたテーブルは識別できません。

    例: PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`).

  • Ververica Runtime (VVR) 8.0.7 以降では、ビューを Flink テーブルとして使用することはできません。

  • MySQL バージョン 5.7 および 8.0.x のみがサポートされています。

MySQL カタログの作成

UI または SQL コマンドを使用して MySQL カタログを作成できます。

UI (推奨)

  1. Data Management ページに移動します。

    1. Realtime Compute for Apache Flink コンソールにログインします。対象のワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

    2. 左側のナビゲーションウィンドウで、[Data Management] をクリックします。

  2. [カタログの作成] をクリックします。[MySQL] を選択し、[次へ] をクリックします。

  3. パラメーターを設定します。

    重要

    カタログは作成後に以下の構成を変更することはできません。変更するには、カタログを削除して新しいカタログを作成する必要があります。

    配置信息

    パラメータ

    説明

    必須

    catalogname

    MySQL カタログのカスタム名。

    はい

    hostname

    MySQL データベースの IP アドレスまたはホスト名。

    説明

    VPC 間またはインターネット経由でインスタンスにアクセスするには、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続性」をご参照ください。

    はい

    port

    MySQL データベースサービスのポート番号。デフォルト値は 3306 です。

    いいえ

    default-database

    デフォルトの MySQL データベースの名前。

    はい

    username

    MySQL データベースサービスのユーザー名。

    はい

    password

    MySQL データベースサービスのパスワード。

    プレーンテキストの AccessKey などのリスクを回避するには、パスワードを変数として指定します。画像内の例では、mysqlpw という名前の変数を使用しています。詳細については、「変数を作成する」をご参照ください。

    はい

  4. [確認] をクリックします。

    カタログリストページの左側にある [カタログ] ペインで、作成したカタログを表示できます。

SQL コマンド

  1. [データクエリ] ページに移動します。

    1. Realtime Compute for Apache Flink コンソールにログインします。目的のワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。

    2. 左側のナビゲーションウィンドウで、[開発] > [スクリプト] をクリックします。

  2. image をクリックし、次に [クエリ スクリプトの作成] をクリックします。[ファイル名][ストレージの場所] を入力し、[保存] をクリックします。

  3. 次のコードを入力します。

    CREATE CATALOG YourCatalogName WITH(
      'type' = 'mysql',
      'hostname' = 'rm-bp1gcn0q0j0******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'usertest',
      'password' = '${secret_values.mysqlpw}',
      'default-database' = 'flinktest',
      'catalog.table.metadata-columns'='table_name'
    );

    パラメーター

    説明

    必須

    YourCatalogName

    MySQL カタログのカスタム名。

    はい

    type

    タイプ。値を mysql に設定します。

    はい

    hostname

    MySQL データベースの IP アドレスまたはホスト名。

    説明

    VPC 間またはインターネット経由でインスタンスにアクセスするには、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続性」をご参照ください。

    はい

    port

    MySQL データベースサービスのポート番号。デフォルト値は 3306 です。

    いいえ

    default-database

    デフォルトの MySQL データベースの名前。

    はい

    username

    MySQL データベースサービスのユーザー名。

    はい

    password

    MySQL データベースサービスのパスワード。

    プレーンテキストの AccessKey などのリスクを回避するには、パスワードを変数として指定します。この例では、mysqlpw という名前の変数を使用しています。詳細については、「変数を作成する」をご参照ください。

    はい

    property-version

    カタログパラメーターのバージョン。有効な値は 0 (デフォルト) と 1 (推奨) です。

    使用可能なパラメーターとそのデフォルト値は、バージョンによって異なる場合があります。違いについては、特定のパラメーターの説明で説明されています。

    説明
    • VVR 8.0.6 以降のみがこのパラメーターをサポートします。

    • VVR 11.1 以降では、デフォルト値は 1 です。

    いいえ

    catalog.table.metadata-columns

    データテーブルを取得する際に、MySQL CDC ソーステーブルからテーブルスキーマに追加するメタデータ列を指定します。デフォルトでは、メタデータ列は追加されません。

    複数のメタデータ列はセミコロン (;) で区切ります。例: op_ts;table_name;database_name

    説明
    • Ververica Runtime (VVR) 6.0.5 以降のみがこのパラメーターをサポートします。

    • このパラメーターが構成されている場合、返されるテーブルスキーマには指定されたメタデータ列が含まれます。これらの列は MySQL CDC ソーステーブルにのみ適用されます。したがって、このカタログによって返されるテーブルは、ソーステーブルとしてのみ使用でき、結果テーブルやディメンションテーブルとしては使用できません。

    いいえ

    catalog.table.treat-tinyint1-as-boolean

    テーブルスキーマを取得する際に、MySQL の TinyInt(1) 型と Boolean 型を Flink の Boolean 型にマッピングするかどうかを指定します。有効な値:

    • true: Boolean 型にマッピングします。

    • false: TINYINT 型にマッピングします。

    デフォルト値:

    • property-version が 0 の場合、デフォルト値は true です。

    • property-version が 1 の場合、デフォルト値は false です。

    説明
    • Ververica Runtime (VVR) 8.0.4 以降のみがこのパラメーターをサポートします。

    • MySQL の TinyInt(1) フィールドに 0 と 1 以外の値を格納することはお勧めしません。マッピングには適切なデータの型を選択してください。詳細については、「タイプのマッピング」をご参照ください。

    いいえ

  4. カタログ作成のコードを選択し、左側の行番号の横にある [実行] をクリックします。

    The following statement has been executed successfully! というメッセージは、カタログが正常に作成されたことを示します。

    image

MySQL カタログの表示と削除

UI (推奨)

[Data Management] ページでは、[カタログリスト] に作成したカタログの [カタログ名][タイプ] が表示されます。

  • カタログの表示: 目的のカタログの [操作] 列にある [表示] をクリックして、そのデータベースとテーブルを表示します。

    フィールドのコメント情報は、テーブルスキーマの詳細には表示されません。

  • 対象カタログの [操作] 列で、[削除] をクリックします。

    この操作はカタログのみを削除します。対応するサービス内のテーブルは削除されません。削除されたカタログのテーブルを使用している実行中のジョブは影響を受けません。ただし、テーブルが見つからないため、ジョブを再デプロイまたは再起動するとエラーが発生します。この操作は注意して実行してください。

SQL コマンド

  1. [データクエリ] ページのテキストエディターに、次のコマンドを入力します。

    -- Flink に対応するカタログ内のテーブルのスキーマ情報を表示します。フィールドのコメント情報は表示されません。
    DESCRIBE `<catalogname>`.`<dbname>`.`<tablename>`;
    
    -- カタログを削除します。
    DROP CATALOG `<catalogname>`;
    説明

    この操作はカタログのみを削除します。対応するサービス内のテーブルは削除されません。削除されたカタログのテーブルを使用している実行中のジョブは影響を受けません。ただし、テーブルが見つからないため、ジョブを再デプロイまたは再起動するとエラーが発生します。この操作は注意して実行してください。

  2. コマンドを右クリックし、ショートカットメニューから [実行] を選択します。

    image

MySQL カタログの使用

MySQL ソーステーブルからデータを読み取る

INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;

    MySQL カタログが MySQL CDC ソーステーブルとして使用される場合、Table Hints を使用してジョブに異なる server-id を指定します。ソーステーブルが同時読み取りを必要とする場合は、server-id を範囲として構成する必要もあります。範囲内のサーバー ID の数は、並列処理の次数以上である必要があります。

    シャード化されたデータベースの論理テーブルからデータを読み取る

    MySQL カタログでは、正規表現を使用して、シャード化されたデータベースやテーブルからデータを読み取る論理テーブルを定義できます。

    たとえば、db01 から db10 などのデータベースに分散された user01、user02、user99 などのテーブルを含む、シャード化された MySQL データベースを考えてみましょう。すべてのテーブルに互換性のあるスキーマがある場合、データベース名とテーブル名に正規表現を使用して、すべてのシャード化されたユーザーテーブルにアクセスできます。

    SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

    シャード化されたデータベースとテーブルの論理テーブルは、2 つの追加のシステムフィールド _db_name (STRING) と _table_name (STRING) を返します。これら 2 つのフィールドと元のシャード化されたテーブルのプライマリキーは、一意性を確保するために論理テーブルの新しい複合プライマリキーを形成します。user01 から user99 までのプライマリキーが id の場合、ユーザー論理テーブルの複合プライマリキーは (_db_name, _table_name, id) です。

    MySQL カタログは、同期のために複数のテーブルを照合するための正規表現の使用をサポートしています。これにより、シャード化されたデータベースとテーブルをマージして同期できます。詳細な例については、「シャード化されたデータベースとテーブルのマージと同期」をご参照ください。

    CTAS と CDAS を使用して MySQL データとスキーマの変更をリアルタイムで同期する

    説明

    データを同期する際は、アップストリームとダウンストリームのストレージが CTAS と CDAS でサポートされているストレージのリスト に含まれていることを確認してください。たとえば、MongoDB は結果テーブルとしてサポートされていません。デプロイメント中にエラーが発生します: CREATE TABLE ... AS TABLE ... statement requires target catalog ... implements org.apache.flink.table.catalog.CatalogTableProvider interface.

    CTAS は、単一テーブルの同期、スキーマ進化の同期、シャード化されたデータベースとテーブルのマージ同期、およびカスタム計算列の同期をサポートします。データ同期ジョブに CTAS 文を追加することもできます。詳細な例と情報については、「CREATE TABLE AS (CTAS) 文」をご参照ください。CDAS は、スキーマ進化の同期とともに、データベースレベルでのスキーマとデータのリアルタイム同期をサポートします。詳細については、「CREATE DATABASE AS (CDAS) 文」をご参照ください。

    -- 単一テーブルの同期。テーブルレベルのスキーマとデータの変更をリアルタイムで同期します。
    CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
    WITH (...)
    AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
    /*+ OPTIONS('server-id'='6000-6018') */;
    
    -- 完全なデータベース同期。データベースレベルのスキーマとデータの変更をリアルタイムで同期します。
    CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
    AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='6000-6018') */;   

    MySQL から Hologres へのデータ同期の例と詳細については、「Hologres カタログの使用」をご参照ください。

    USE CATALOG holocatalog; -- 使用するカタログを指定します。
    
    CREATE TABLE IF NOT EXISTS holotable  -- 結果テーブルの名前を指定します。データベースレベルを指定しない場合、データはカタログのデフォルトデータベースに自動的に同期されます。  
    WITH ('jdbcWriteBatchSize' = '1024')   -- オプション。結果テーブルのパラメーターを指定します。
    AS TABLE mysqlcatalog.dbmysql.mysqltable   
    /*+ OPTIONS('server-id'='8001-8004') */;  -- mysql-cdc ソーステーブルの追加パラメーターを指定します。

    MySQL ディメンションテーブルからデータを読み取る

    INSERT INTO `<othersinktable>`
    SELECT ...
    FROM `<othersourcetable>` AS e
    JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;

    MySQL テーブルにデータを書き込む

    INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
    SELECT ...
    FROM `<othersourcetable>`