CREATE DATABASE AS (CDAS) 文は、スキーマ変更の同期と共に、データベースレベルでのテーブルスキーマとデータのリアルタイム同期をサポートします。このトピックでは、CDAS 文の使用方法と適用可能なシナリオについて説明します。
はじめに: YAML を使用してソースから宛先にデータを同期することにより、ジョブを開発できます。
利点: データベース、テーブル、テーブルスキーマ、およびカスタム計算列の同期など、CTAS 文と CDAS 文の主要な機能がサポートされています。さらに、リアルタイムのスキーマ進化、生のバイナリログデータの同期、WHERE 句、および列プルーニングもサポートされています。
データインジェストには、YAML を使用してジョブを作成することをお勧めします。詳細については、「YAML デプロイメントを使用してデータをインジェストする」をご参照ください。
背景情報
CREATE TABLE AS (CTAS) の糖衣構文である CDAS 文は、データベース内の複数のテーブルまたはすべてのテーブルからデータを同期するために使用され、自動データ統合シナリオに最適です。多くの場合、テーブルの永続的なメタデータ管理機能を提供するソースカタログおよび宛先カタログと共に使用されます。 CDAS 文は、完全データと増分データのレプリケーションの実装、およびデータとスキーマの変更の同期に役立ち、ターゲットテーブルを事前に作成する必要はありません。
CDAS 文には、次の利点があります。
簡略化された構文
Realtime Compute for Apache Flink は、CDAS 文を CTAS 文に自動的に変換します(テーブルごとに 1 つの CTAS 文)。 CDAS 文は、データ同期とスキーマ進化のための CTAS 文の機能を継承します。
最適化されたリソース
Realtime Compute for Apache Flink は、単一のソース頂点を使用して複数のビジネステーブルから読み取ることにより、ソーステーブルを最適化します。これは MySQL CDC ソースにとって特に有益であり、データベース接続を削減し、冗長なバイナリログのプルを防ぎ、MySQL データベースへの全体的な読み取り負荷を軽減します。
コア機能
データ同期
機能 | 説明 |
データベース内の複数のテーブル(またはすべてのテーブル)から関連する各シンクテーブルへの完全データと増分データの同期を実行します。 | |
正規表現を使用してデータベースシャード間でソーステーブル名を照合し、これらのテーブルを統合して、対応するシンクに同期します。 | |
セーブポイントからジョブを再起動することにより、新しく追加されたテーブルを同期します。 | |
STATEMENT SET 文を使用して、複数の CDAS 文と CTAS 文を 1 つのジョブとしてコミットできます。また、ソーステーブルオペレーターのデータをマージして再利用し、データソースへの読み取り負荷を軽減することもできます。 |
スキーマ進化
データベースデータの同期中に、スキーマの変更(列の追加など)をシンクに伝達することもできます。ポリシーは、CTAS 文のポリシーと一致しています。詳細については、「スキーマ進化」をご参照ください。
起動プロセス
次の表は、CDAS 文を使用して MySQL から Hologres にデータを同期するプロセスを示しています。
フローチャート | 説明 |
CDAS 文を実行すると、Realtime Compute for Apache Flink は次の処理を行います。
|
前提条件
宛先カタログがワークスペースに登録されています。詳細については、「カタログの管理」をご参照ください。
制限
構文の制限
CDAS 文を含む SQL ドラフトのデバッグはサポートされていません。
MiniBatch はサポートされていません。
重要CTAS 文または CDAS 文を含む SQL ドラフトを作成する前に、MiniBatch 構成が削除されていることを確認してください。次の手順を実行します。
に移動します。
[デプロイメントのデフォルト] タブを選択します。
[その他の構成] セクションで、MiniBatch 構成が削除されていることを確認します。
SQL ドラフトからデプロイメントを作成したり、デプロイメントを開始したりするときにエラーが報告された場合は、「「現在、CTAS/CDAS 構文で StreamExecMiniBatchAssigner タイプの ExecNode のマージはサポートされていません」エラーを修正する方法」をご参照ください。
サポートされているアップストリームシステムとダウンストリームシステム
次の表に、CDAS 文でサポートされているアップストリームシステムとダウンストリームシステムを示します。
コネクタ | ソーステーブル | シンクテーブル | 注記 |
√ | × | ビューは同期できません。 | |
√ | × | ||
√ | × |
| |
× | √ | ||
× | √ | Hologres がデータ同期の宛先システムとして機能する場合、システムは 説明
| |
× | √ | Alibaba Cloud EMR 上の StarRocks に限定されたサポートです。 | |
× | √ | 該当なし。 |
使用上の注意
新しいテーブルの同期
VVR 8.0.6 以降: テーブルが追加された後、セーブポイントを作成し、セーブポイントからジョブを再起動して、新しいテーブルをキャプチャして同期します。詳細については、「新しいテーブルの同期」をご参照ください。
VVR 8.0.5 以前: 新しいテーブルは、ジョブの再起動によってキャプチャまたは同期されません。代わりに、次のいずれかの方法を使用してください。
方法
説明
新しいテーブルを同期するための新しいジョブを作成する
既存のジョブはそのままにします。新しいテーブルを同期するための新しいジョブを作成します。サンプルコード:
-- new_table という名前の新しいテーブルからデータを同期するジョブを作成します CREATE TABLE IF NOT EXISTS new_table AS TABLE mysql.tpcds.new_table /*+ OPTIONS('server-id'='8008-8010') */;同期されたデータをクリーンアップし、ジョブを再起動する
次の手順を実行します。
既存のジョブをキャンセルします。
シンク内の同期されたデータをクリーンアップします。
状態なしでジョブを再起動して、データを再度同期します。
外部システムへの読み取り/書き込みアクセス
操作が成功するように、次の場合にアカウントに必要な読み取り/書き込み権限を付与します。
アカウント間で外部リソースにアクセスする場合。
RAM ユーザーまたは RAM ロールとして外部リソースにアクセスする場合。
構文
CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
<target_database>:
[catalog_name.]db_name
<source_database>:
[catalog_name.]db_nameCDAS 文は、CREATE DATABASE 文の基本構文を使用します。次の表に、パラメーターを示します。
パラメーター | 説明 |
target_database | 宛先データベース名。必要に応じて、宛先カタログ名を含めます。 |
COMMENT | 宛先データベースの説明。 source_database の説明が自動的に使用されます。 |
WITH | 宛先データベースのオプション。詳細については、「カタログの管理」の下のそれぞれのドキュメントをご参照ください。 説明 キーと値の両方が文字列型である必要があります(例: 'sink.parallelism' = '4')。 |
source_database | ソースデータベース名。必要に応じて、ソースカタログ名を含めます。 |
INCLUDING ALL TABLES | ソースデータベース内のすべてのテーブルを同期することを指定します。 |
INCLUDING TABLE | 同期するテーブルを指定します。複数のテーブルは縦棒(|)で区切ります。正規表現を使用して、特定の命名パターンに基づいてすべてのテーブルを含めることができます。たとえば、INCLUDING TABLE 'web.*' は、ソースデータベース内で |
EXCLUDING TABLE | 同期から除外するテーブルを指定します。複数のテーブルは縦棒(|)で区切ります。正規表現を使用して、特定の命名パターンに基づいてすべてのテーブルを含めることができます。たとえば、INCLUDING TABLE 'web.*' は、ソースデータベース内で |
OPTIONS | ソーステーブルのコネクタオプション。詳細については、「サポートされているコネクタ」の下のそれぞれのドキュメントをご参照ください。 説明 キーと値の両方が文字列型である必要があります(例: 'server-id' = '65500')。 |
IF NOT EXISTSキーワードは必須ですIF NOT EXISTS注:。システムに宛先ストア内のシンクテーブルの存在を確認するように促します。存在しない場合は、システムがシンクテーブルを作成します。存在する場合は、テーブルの作成はスキップされます。作成されたシンクテーブルは、プライマリキーと物理フィールド名と型を含め、ソーステーブルのスキーマを共有しますが、計算列、メタデータフィールド、およびウォーターマーク構成は除外されます。
Realtime Compute for Apache Flink は、データ同期の際に、ソーステーブルからシンクテーブルへのデータ型マッピングを実行します。データ型マッピングの詳細については、特定の コネクタのドキュメントをご参照ください。
例
データベースの同期
説明: tpcds MySQL データベースのすべてのテーブルを Hologres に同期します。
前提条件: ワークスペースに次のカタログが作成されています。
holoという名前の Hologres カタログ。mysqlという名前の MySQL カタログ。
サンプルコード:
USE CATALOG holo;
CREATE DATABASE IF NOT EXISTS holo_tpcds -- Hologres に holo_tpcds という名前のデータベースを作成します。
WITH ('sink.parallelism' = '4') -- 必要に応じて、宛先データベースのオプションを構成します。デフォルトでは、Hologres のシンク並列度は 4 に設定されています。
AS DATABASE mysql.tpcds INCLUDING ALL TABLES -- すべてのテーブルを同期します。
/*+ OPTIONS('server-id'='8001-8004') */ ; -- 必要に応じて、MySQL CDC ソーステーブルのオプションを構成します。WITH 句で宛先データベースに構成されたオプションは、書き込み動作を制御するために現在のジョブにのみ適用されます。 Hologres カタログには永続化されません。サポートされているコネクタオプションの詳細については、「Hologres コネクタ」をご参照ください。
データベースシャード間でのデータの同期
説明: MySQL インスタンスには、order_db01 から order_db99 という名前の複数のデータベースシャードがあります。各データベースシャードには、order や order_detail などの複数のテーブルが含まれています。 CDAS 文を使用すると、これらのデータベースシャード内のすべてのテーブルを、データとスキーマの変更と共に Hologres に同期できます。
ソリューション:
データベース名(`order_db[0-9]+`)に正規表現を使用して、同期するすべてのデータベースシャード(order_db01 から order_db99)を照合します。データベース名とテーブル名は、2 つの追加フィールドとして各シンクテーブルに追加されます。
Hologres テーブルのプライマリキーには、一意性を確保するために、データベース名、テーブル名、およびソーステーブルのプライマリキー列が含まれています。
ターゲットテーブルを事前に作成する必要はありません。
サンプルコードと結果:
データベースシャード間で同じ名前のテーブルは、単一の Hologres テーブルに同期される前にマージされます。
サンプルコード | 結果 |
|
|
新しいテーブルの同期
説明: CDAS 文を介してデータを同期するジョブが開始された後、新しいテーブルが追加され、同期する必要があります。
ソリューション: ジョブの新しいテーブル検出を有効にし、セーブポイントからジョブを再起動して、新しく追加されたテーブルをキャプチャして同期します。
制限: 新しいテーブル検出は、VVR 8.0.6 以降でサポートされています。この機能を有効にするには、ソーステーブルの起動モードが initial に設定されていることを確認してください。
手順:
[デプロイメント] ページで、ターゲットデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。
ダイアログで、[その他の戦略] セクションを展開し、[セーブポイントで停止] を選択して、[OK] をクリックします。
ジョブの SQL ドラフトで、次の文を追加して新しいテーブル検出を有効にします。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';[デプロイ] をクリックします。
セーブポイントからジョブを復旧します。
[デプロイメント] ページで、デプロイメントの名前をクリックします。
デプロイメントの詳細ページで、[状態] タブをクリックします。次に、[履歴] サブタブをクリックします。
[セーブポイント] リストで、ジョブのキャンセル時に作成されたセーブポイントを見つけます。
[アクション] 列の を選択します。詳細については、「デプロイメントの開始」をご参照ください。
複数の CDAS 文と CTAS 文の実行
説明: tpcds データベース、tpch データベース、および user_db01 から user_db99 までのデータベースシャードのデータを、単一のジョブで Hologres に同期します。
ソリューション: STATEMENT SET 文を使用して、複数の CDAS 文と CTAS 文をグループ化します。このソリューションでは、ソース頂点を再利用して、必要なテーブルからデータを読み取ります。これは MySQL CDC データソースで特に有益であり、サーバー ID、データベース接続の数、および MySQL データベースへの全体的な読み取り負荷を削減します。
ソースを再利用してパフォーマンスを最適化するには、各ソーステーブルのコネクタオプションが同一であることを確認してください。
サーバー ID の構成の詳細については、「クライアントごとに異なるサーバー ID を設定する」をご参照ください。
サンプルコード:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- データベースシャード全体のユーザテーブルからデータを同期します。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
-- tpcds データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- tpch データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;複数の CDAS 文を介してデータベース内のデータを Kafka に同期する
説明: 複数の MySQL データベース(tpcds や tpch など)のテーブル内のデータを Kafka に同期します。
ソリューション: 複数の CDAS 文を使用してデータベース内のデータを Kafka に同期する場合、同じ名前のテーブルが異なるデータベースに存在する可能性があります。トピックの競合を防ぐために、cdas.topic.pattern オプションを構成して、トピック名のパターンを定義します。 {table-name} プレースホルダーを使用できます。たとえば、'cdas.topic.pattern'='dbname-{table-name}' を指定すると、db1 データベースの table1 テーブルからレプリケートされたデータが dbname-table1 Kafka トピックに送信されます。
サンプルコード:
USE CATALOG kafkaCatalog;
BEGIN STATEMENT SET;
-- tpcds データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- tpch データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;MySQL と Flink の間に Kafka を中間レイヤーとして導入することで、MySQL への負荷を軽減できます。詳細については、「MySQL データベースのすべてのテーブルから Kafka にデータを同期する」をご参照ください。
FAQ
ランタイムエラー
ジョブパフォーマンス
データ同期
参考資料
CDAS 文および CTAS 文で使用される一般的なカタログ:
ベストプラクティス:
YAML 経由のデータインジェスト:
