MySQL カタログを作成すると、対応する MySQL インスタンス内のテーブルに Flink コンソールから直接アクセスし、これらのテーブルを Flink SQL ジョブで使用できます。このトピックでは、MySQL カタログの作成方法と使用方法について説明します。
背景情報
MySQL カタログには次の特徴があります:
データ定義言語 (DDL) 文を使用して手動で登録することなく、MySQL インスタンス内のテーブルにアクセスできます。この特徴により、開発効率と精度が向上します。
MySQL カタログ内のテーブルは、Flink SQL ジョブで MySQL Change Data Capture (CDC) ソーステーブル、MySQL 結果テーブル、および MySQL ディメンションテーブルとして使用できます。
ApsaraDB RDS for MySQL、PolarDB for MySQL、および自己管理 MySQL データベースをサポートします。
シャード化されたデータベースおよびテーブル内の論理テーブルへの直接アクセスをサポートします。
CREATE DATABASE AS (CDAS) および CREATE TABLE AS (CTAS) 構文を使用して、MySQL データソースに基づく完全なデータベース同期、シャード化されたデータベースとテーブルのマージ同期、およびスキーマ進化の同期を実行できます。
制限事項
MySQL インスタンスと Flink のデプロイメントは、同じ Virtual Private Cloud (VPC) 内にある必要があります。VPC 間またはインターネット経由でインスタンスにアクセスするには、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続性」をご参照ください。
カタログは作成後に構成を変更することはできません。変更するには、カタログを削除して新しいカタログを作成する必要があります。
既存のデータベースとテーブルのクエリのみが可能です。Flink を使用してデータベースやテーブルを作成することはできません。
ソーステーブルとして使用する場合、ストリーム読み取りのみがサポートされます。バッチ読み取りはサポートされていません。
説明MySQL カタログのテーブルを MySQL CDC ソーステーブルとして使用する場合、ApsaraDB RDS for MySQL、PolarDB for MySQL、または自己管理 MySQL データベースでバイナリログなどの機能を有効にする必要があります。詳細については、「MySQL データベースの設定」をご参照ください。
CREATE TABLE 文で PolarDB 固有の構文を使用して作成されたテーブルは識別できません。
例:
PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`).Ververica Runtime (VVR) 8.0.7 以降では、ビューを Flink テーブルとして使用することはできません。
MySQL バージョン 5.7 および 8.0.x のみがサポートされています。
MySQL カタログの作成
UI または SQL コマンドを使用して MySQL カタログを作成できます。
UI (推奨)
Data Management ページに移動します。
Realtime Compute for Apache Flink コンソールにログインします。対象のワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[Data Management] をクリックします。
[カタログの作成] をクリックします。[MySQL] を選択し、[次へ] をクリックします。
パラメーターを設定します。
重要カタログは作成後に以下の構成を変更することはできません。変更するには、カタログを削除して新しいカタログを作成する必要があります。

パラメータ
説明
必須
catalogname
MySQL カタログのカスタム名。
はい
hostname
MySQL データベースの IP アドレスまたはホスト名。
説明VPC 間またはインターネット経由でインスタンスにアクセスするには、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続性」をご参照ください。
はい
port
MySQL データベースサービスのポート番号。デフォルト値は 3306 です。
いいえ
default-database
デフォルトの MySQL データベースの名前。
はい
username
MySQL データベースサービスのユーザー名。
はい
password
MySQL データベースサービスのパスワード。
プレーンテキストの AccessKey などのリスクを回避するには、パスワードを変数として指定します。画像内の例では、mysqlpw という名前の変数を使用しています。詳細については、「変数を作成する」をご参照ください。
はい
[確認] をクリックします。
カタログリストページの左側にある [カタログ] ペインで、作成したカタログを表示できます。
SQL コマンド
[データクエリ] ページに移動します。
Realtime Compute for Apache Flink コンソールにログインします。目的のワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 をクリックします。
をクリックし、次に [クエリ スクリプトの作成] をクリックします。[ファイル名] と [ストレージの場所] を入力し、[保存] をクリックします。次のコードを入力します。
CREATE CATALOG YourCatalogName WITH( 'type' = 'mysql', 'hostname' = 'rm-bp1gcn0q0j0******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'usertest', 'password' = '${secret_values.mysqlpw}', 'default-database' = 'flinktest', 'catalog.table.metadata-columns'='table_name' );パラメーター
説明
必須
YourCatalogName
MySQL カタログのカスタム名。
はい
type
タイプ。値を mysql に設定します。
はい
hostname
MySQL データベースの IP アドレスまたはホスト名。
説明VPC 間またはインターネット経由でインスタンスにアクセスするには、ネットワーク接続を確立する必要があります。詳細については、「ネットワーク接続性」をご参照ください。
はい
port
MySQL データベースサービスのポート番号。デフォルト値は 3306 です。
いいえ
default-database
デフォルトの MySQL データベースの名前。
はい
username
MySQL データベースサービスのユーザー名。
はい
password
MySQL データベースサービスのパスワード。
プレーンテキストの AccessKey などのリスクを回避するには、パスワードを変数として指定します。この例では、mysqlpw という名前の変数を使用しています。詳細については、「変数を作成する」をご参照ください。
はい
property-version
カタログパラメーターのバージョン。有効な値は 0 (デフォルト) と 1 (推奨) です。
使用可能なパラメーターとそのデフォルト値は、バージョンによって異なる場合があります。違いについては、特定のパラメーターの説明で説明されています。
説明VVR 8.0.6 以降のみがこのパラメーターをサポートします。
VVR 11.1 以降では、デフォルト値は 1 です。
いいえ
catalog.table.metadata-columns
データテーブルを取得する際に、MySQL CDC ソーステーブルからテーブルスキーマに追加するメタデータ列を指定します。デフォルトでは、メタデータ列は追加されません。
複数のメタデータ列はセミコロン (;) で区切ります。例:
op_ts;table_name;database_name。説明Ververica Runtime (VVR) 6.0.5 以降のみがこのパラメーターをサポートします。
このパラメーターが構成されている場合、返されるテーブルスキーマには指定されたメタデータ列が含まれます。これらの列は MySQL CDC ソーステーブルにのみ適用されます。したがって、このカタログによって返されるテーブルは、ソーステーブルとしてのみ使用でき、結果テーブルやディメンションテーブルとしては使用できません。
いいえ
catalog.table.treat-tinyint1-as-boolean
テーブルスキーマを取得する際に、MySQL の TinyInt(1) 型と Boolean 型を Flink の Boolean 型にマッピングするかどうかを指定します。有効な値:
true: Boolean 型にマッピングします。
false: TINYINT 型にマッピングします。
デフォルト値:
property-version が 0 の場合、デフォルト値は true です。
property-version が 1 の場合、デフォルト値は false です。
説明Ververica Runtime (VVR) 8.0.4 以降のみがこのパラメーターをサポートします。
MySQL の TinyInt(1) フィールドに 0 と 1 以外の値を格納することはお勧めしません。マッピングには適切なデータの型を選択してください。詳細については、「タイプのマッピング」をご参照ください。
いいえ
カタログ作成のコードを選択し、左側の行番号の横にある [実行] をクリックします。
The following statement has been executed successfully!というメッセージは、カタログが正常に作成されたことを示します。
MySQL カタログの表示と削除
UI (推奨)
[Data Management] ページでは、[カタログリスト] に作成したカタログの [カタログ名] と [タイプ] が表示されます。
カタログの表示: 目的のカタログの [操作] 列にある [表示] をクリックして、そのデータベースとテーブルを表示します。
フィールドのコメント情報は、テーブルスキーマの詳細には表示されません。
対象カタログの [操作] 列で、[削除] をクリックします。
この操作はカタログのみを削除します。対応するサービス内のテーブルは削除されません。削除されたカタログのテーブルを使用している実行中のジョブは影響を受けません。ただし、テーブルが見つからないため、ジョブを再デプロイまたは再起動するとエラーが発生します。この操作は注意して実行してください。
SQL コマンド
[データクエリ] ページのテキストエディターに、次のコマンドを入力します。
-- Flink に対応するカタログ内のテーブルのスキーマ情報を表示します。フィールドのコメント情報は表示されません。 DESCRIBE `<catalogname>`.`<dbname>`.`<tablename>`; -- カタログを削除します。 DROP CATALOG `<catalogname>`;説明この操作はカタログのみを削除します。対応するサービス内のテーブルは削除されません。削除されたカタログのテーブルを使用している実行中のジョブは影響を受けません。ただし、テーブルが見つからないため、ジョブを再デプロイまたは再起動するとエラーが発生します。この操作は注意して実行してください。
コマンドを右クリックし、ショートカットメニューから [実行] を選択します。

MySQL カタログの使用
MySQL ソーステーブルからデータを読み取る
INSERT INTO `<othersinktable>`
SELECT ...
FROM `<mysqlcatalog>`.`<dbname>`.`<tablename>` /*+ OPTIONS('server-id' = '6000-6008') */;MySQL カタログが MySQL CDC ソーステーブルとして使用される場合、Table Hints を使用してジョブに異なる server-id を指定します。ソーステーブルが同時読み取りを必要とする場合は、server-id を範囲として構成する必要もあります。範囲内のサーバー ID の数は、並列処理の次数以上である必要があります。
シャード化されたデータベースの論理テーブルからデータを読み取る
MySQL カタログでは、正規表現を使用して、シャード化されたデータベースやテーブルからデータを読み取る論理テーブルを定義できます。
たとえば、db01 から db10 などのデータベースに分散された user01、user02、user99 などのテーブルを含む、シャード化された MySQL データベースを考えてみましょう。すべてのテーブルに互換性のあるスキーマがある場合、データベース名とテーブル名に正規表現を使用して、すべてのシャード化されたユーザーテーブルにアクセスできます。
SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;シャード化されたデータベースとテーブルの論理テーブルは、2 つの追加のシステムフィールド _db_name (STRING) と _table_name (STRING) を返します。これら 2 つのフィールドと元のシャード化されたテーブルのプライマリキーは、一意性を確保するために論理テーブルの新しい複合プライマリキーを形成します。user01 から user99 までのプライマリキーが id の場合、ユーザー論理テーブルの複合プライマリキーは (_db_name, _table_name, id) です。
MySQL カタログは、同期のために複数のテーブルを照合するための正規表現の使用をサポートしています。これにより、シャード化されたデータベースとテーブルをマージして同期できます。詳細な例については、「シャード化されたデータベースとテーブルのマージと同期」をご参照ください。
CTAS と CDAS を使用して MySQL データとスキーマの変更をリアルタイムで同期する
データを同期する際は、アップストリームとダウンストリームのストレージが CTAS と CDAS でサポートされているストレージのリスト に含まれていることを確認してください。たとえば、MongoDB は結果テーブルとしてサポートされていません。デプロイメント中にエラーが発生します: CREATE TABLE ... AS TABLE ... statement requires target catalog ... implements org.apache.flink.table.catalog.CatalogTableProvider interface.
CTAS は、単一テーブルの同期、スキーマ進化の同期、シャード化されたデータベースとテーブルのマージ同期、およびカスタム計算列の同期をサポートします。データ同期ジョブに CTAS 文を追加することもできます。詳細な例と情報については、「CREATE TABLE AS (CTAS) 文」をご参照ください。CDAS は、スキーマ進化の同期とともに、データベースレベルでのスキーマとデータのリアルタイム同期をサポートします。詳細については、「CREATE DATABASE AS (CDAS) 文」をご参照ください。
-- 単一テーブルの同期。テーブルレベルのスキーマとデータの変更をリアルタイムで同期します。
CREATE TABLE IF NOT EXISTS `<targetcatalog>`.`<targetdbname>`.`<targettablename>`
WITH (...)
AS TABLE `<mysqlcatalog>`.`<dbname>`.`<tablename>`
/*+ OPTIONS('server-id'='6000-6018') */;
-- 完全なデータベース同期。データベースレベルのスキーマとデータの変更をリアルタイムで同期します。
CREATE DATABASE `<targetcatalog>`.`<targetdbname>` WITH (...)
AS DATABASE `<mysqlcatalog>`.`<dbname>` INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='6000-6018') */; MySQL から Hologres へのデータ同期の例と詳細については、「Hologres カタログの使用」をご参照ください。
USE CATALOG holocatalog; -- 使用するカタログを指定します。
CREATE TABLE IF NOT EXISTS holotable -- 結果テーブルの名前を指定します。データベースレベルを指定しない場合、データはカタログのデフォルトデータベースに自動的に同期されます。
WITH ('jdbcWriteBatchSize' = '1024') -- オプション。結果テーブルのパラメーターを指定します。
AS TABLE mysqlcatalog.dbmysql.mysqltable
/*+ OPTIONS('server-id'='8001-8004') */; -- mysql-cdc ソーステーブルの追加パラメーターを指定します。MySQL ディメンションテーブルからデータを読み取る
INSERT INTO `<othersinktable>`
SELECT ...
FROM `<othersourcetable>` AS e
JOIN `<mysqlcatalog>`.`<dbname>`.`<tablename>` FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;MySQL テーブルにデータを書き込む
INSERT INTO `<mysqlcatalog>`.`<dbname>`.`<tablename>`
SELECT ...
FROM `<othersourcetable>`