すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:CREATE DATABASE AS (CDAS)

最終更新日:Jun 11, 2025

CREATE DATABASE AS (CDAS) 文は、スキーマ変更の同期と共に、データベースレベルでのテーブルスキーマとデータのリアルタイム同期をサポートします。このトピックでは、CDAS 文の使用方法と適用可能なシナリオについて説明します。

説明

YAML 経由のデータインジェスト:

  • はじめに: YAML を使用してソースから宛先にデータを同期することにより、ジョブを開発できます。

  • 利点: データベース、テーブル、テーブルスキーマ、およびカスタム計算列の同期など、CTAS 文と CDAS 文の主要な機能がサポートされています。さらに、リアルタイムのスキーマ進化、生のバイナリログデータの同期、WHERE 句、および列プルーニングもサポートされています。

データインジェストには、YAML を使用してジョブを作成することをお勧めします。詳細については、「YAML デプロイメントを使用してデータをインジェストする」をご参照ください。

背景情報

CREATE TABLE AS (CTAS) の糖衣構文である CDAS 文は、データベース内の複数のテーブルまたはすべてのテーブルからデータを同期するために使用され、自動データ統合シナリオに最適です。多くの場合、テーブルの永続的なメタデータ管理機能を提供するソースカタログおよび宛先カタログと共に使用されます。 CDAS 文は、完全データと増分データのレプリケーションの実装、およびデータとスキーマの変更の同期に役立ち、ターゲットテーブルを事前に作成する必要はありません。

CDAS 文には、次の利点があります。

  • 簡略化された構文

    Realtime Compute for Apache Flink は、CDAS 文を CTAS 文に自動的に変換します(テーブルごとに 1 つの CTAS 文)。 CDAS 文は、データ同期とスキーマ進化のための CTAS 文の機能を継承します。

  • 最適化されたリソース

    Realtime Compute for Apache Flink は、単一のソース頂点を使用して複数のビジネステーブルから読み取ることにより、ソーステーブルを最適化します。これは MySQL CDC ソースにとって特に有益であり、データベース接続を削減し、冗長なバイナリログのプルを防ぎ、MySQL データベースへの全体的な読み取り負荷を軽減します。

コア機能

データ同期

機能

説明

データベースの同期

データベース内の複数のテーブル(またはすべてのテーブル)から関連する各シンクテーブルへの完全データと増分データの同期を実行します。

データベースシャードの統合と同期

正規表現を使用してデータベースシャード間でソーステーブル名を照合し、これらのテーブルを統合して、対応するシンクに同期します。

新しいテーブルの同期

セーブポイントからジョブを再起動することにより、新しく追加されたテーブルを同期します。

複数の CDAS 文と CTAS 文の実行

STATEMENT SET 文を使用して、複数の CDAS 文と CTAS 文を 1 つのジョブとしてコミットできます。また、ソーステーブルオペレーターのデータをマージして再利用し、データソースへの読み取り負荷を軽減することもできます。

スキーマ進化

データベースデータの同期中に、スキーマの変更(列の追加など)をシンクに伝達することもできます。ポリシーは、CTAS 文のポリシーと一致しています。詳細については、「スキーマ進化」をご参照ください。

起動プロセス

次の表は、CDAS 文を使用して MySQL から Hologres にデータを同期するプロセスを示しています。

フローチャート

説明

CDAS 文を実行すると、Realtime Compute for Apache Flink は次の処理を行います。

  1. 宛先データベースとシンクテーブルの存在を確認します。

    • 宛先データベースが見つからない場合、Realtime Compute for Apache Flink は宛先カタログを介してデータベースを作成します。

    • 宛先データベースが見つかった場合、Realtime Compute for Apache Flink はデータベースの作成をスキップし、データベース内のシンクテーブルをチェックします。

      • シンクテーブルが見つからない場合、Realtime Compute for Apache Flink は、ソーステーブル名とスキーマを反映したシンクテーブルを作成します。

      • シンクテーブルが見つかった場合、Realtime Compute for Apache Flink はテーブルの作成をスキップします。

  2. データ同期ジョブをコミットして実行します。

    データとスキーマの変更は、ソースデータベースから宛先データベースのテーブルにレプリケートされます。

前提条件

宛先カタログがワークスペースに登録されています。詳細については、「カタログの管理」をご参照ください。

制限

構文の制限

  • CDAS 文を含む SQL ドラフトのデバッグはサポートされていません。

  • MiniBatch はサポートされていません。

    重要

サポートされているアップストリームシステムとダウンストリームシステム

次の表に、CDAS 文でサポートされているアップストリームシステムとダウンストリームシステムを示します。

コネクタ

ソーステーブル

シンクテーブル

注記

MySQL コネクタ

×

ビューは同期できません。

Kafka コネクタ

×

MongoDB コネクタ

×

  • シャーディングされたテーブルとデータベースの統合と同期はサポートされていません。

  • MongoDB メタデータは同期できません。

  • CDAS 文を介して MongoDB から宛先ストアにデータとスキーマの変更を同期するには、「MongoDB カタログの管理」をご参照ください。

Upsert Kafka コネクタ

×

Hologres コネクタ

×

Hologres がデータ同期の宛先システムとして機能する場合、システムは connectionSize オプションの値に基づいて、テーブルごとに自動的に接続を作成します。 connectionPoolName オプションを使用して、複数のテーブルに同じ接続プールを構成できます。

説明
  • ソーステーブルのデータ型が Hologres の 固定プラン機能でサポートされていない場合は、代わりに INSERT INTO 文を使用してデータ同期を行ってください。固定プランを使用できないため書き込みパフォーマンスが低下する CTAS 文は使用しないでください。

  • Realtime Compute for Apache Flink は、専用の Hologres インスタンスとの読み取りと書き込みができます。 Hologres 共有クラスタインスタンスはサポートされていません。

StarRocks コネクタ

×

Alibaba Cloud EMR 上の StarRocks に限定されたサポートです。

Paimon コネクタ

×

該当なし。

使用上の注意

  • 新しいテーブルの同期

    • VVR 8.0.6 以降: テーブルが追加された後、セーブポイントを作成し、セーブポイントからジョブを再起動して、新しいテーブルをキャプチャして同期します。詳細については、「新しいテーブルの同期」をご参照ください。

    • VVR 8.0.5 以前: 新しいテーブルは、ジョブの再起動によってキャプチャまたは同期されません。代わりに、次のいずれかの方法を使用してください。

      方法

      説明

      新しいテーブルを同期するための新しいジョブを作成する

      既存のジョブはそのままにします。新しいテーブルを同期するための新しいジョブを作成します。サンプルコード:

      -- new_table という名前の新しいテーブルからデータを同期するジョブを作成します
      CREATE TABLE IF NOT EXISTS new_table
      AS TABLE mysql.tpcds.new_table 
      /*+ OPTIONS('server-id'='8008-8010') */;

      同期されたデータをクリーンアップし、ジョブを再起動する

      次の手順を実行します。

      1. 既存のジョブをキャンセルします。

      2. シンク内の同期されたデータをクリーンアップします。

      3. 状態なしでジョブを再起動して、データを再度同期します。

  • 外部システムへの読み取り/書き込みアクセス

    操作が成功するように、次の場合にアカウントに必要な読み取り/書き込み権限を付与します。

    • アカウント間で外部リソースにアクセスする場合。

    • RAM ユーザーまたは RAM ロールとして外部リソースにアクセスする場合。

構文

CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]

<target_database>:
  [catalog_name.]db_name

<source_database>:
  [catalog_name.]db_name

CDAS 文は、CREATE DATABASE 文の基本構文を使用します。次の表に、パラメーターを示します。

パラメーター

説明

target_database

宛先データベース名。必要に応じて、宛先カタログ名を含めます。

COMMENT

宛先データベースの説明。 source_database の説明が自動的に使用されます。

WITH

宛先データベースのオプション。詳細については、「カタログの管理」の下のそれぞれのドキュメントをご参照ください。

説明

キーと値の両方が文字列型である必要があります(例: 'sink.parallelism' = '4')。

source_database

ソースデータベース名。必要に応じて、ソースカタログ名を含めます。

INCLUDING ALL TABLES

ソースデータベース内のすべてのテーブルを同期することを指定します。

INCLUDING TABLE

同期するテーブルを指定します。複数のテーブルは縦棒(|)で区切ります。正規表現を使用して、特定の命名パターンに基づいてすべてのテーブルを含めることができます。たとえば、INCLUDING TABLE 'web.*' は、ソースデータベース内で web で始まる名前のすべてのテーブルを同期します。

EXCLUDING TABLE

同期から除外するテーブルを指定します。複数のテーブルは縦棒(|)で区切ります。正規表現を使用して、特定の命名パターンに基づいてすべてのテーブルを含めることができます。たとえば、INCLUDING TABLE 'web.*' は、ソースデータベース内で web で始まる名前のすべてのテーブルを同期します。

OPTIONS

ソーステーブルのコネクタオプション。詳細については、「サポートされているコネクタ」の下のそれぞれのドキュメントをご参照ください。

説明

キーと値の両方が文字列型である必要があります(例: 'server-id' = '65500')。

説明
  • IF NOT EXISTS キーワードは必須ですIF NOT EXISTS注:。システムに宛先ストア内のシンクテーブルの存在を確認するように促します。存在しない場合は、システムがシンクテーブルを作成します。存在する場合は、テーブルの作成はスキップされます。

  • 作成されたシンクテーブルは、プライマリキーと物理フィールド名と型を含め、ソーステーブルのスキーマを共有しますが、計算列、メタデータフィールド、およびウォーターマーク構成は除外されます。

  • Realtime Compute for Apache Flink は、データ同期の際に、ソーステーブルからシンクテーブルへのデータ型マッピングを実行します。データ型マッピングの詳細については、特定の コネクタのドキュメントをご参照ください。

データベースの同期

説明: tpcds MySQL データベースのすべてのテーブルを Hologres に同期します。

前提条件: ワークスペースに次のカタログが作成されています。

  • holo という名前の Hologres カタログ。

  • mysql という名前の MySQL カタログ。

サンプルコード:

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_tpcds  -- Hologres に holo_tpcds という名前のデータベースを作成します。
WITH ('sink.parallelism' = '4') -- 必要に応じて、宛先データベースのオプションを構成します。デフォルトでは、Hologres のシンク並列度は 4 に設定されています。
AS DATABASE mysql.tpcds INCLUDING ALL TABLES  -- すべてのテーブルを同期します。
/*+ OPTIONS('server-id'='8001-8004') */ ; -- 必要に応じて、MySQL CDC ソーステーブルのオプションを構成します。
説明

WITH 句で宛先データベースに構成されたオプションは、書き込み動作を制御するために現在のジョブにのみ適用されます。 Hologres カタログには永続化されません。サポートされているコネクタオプションの詳細については、「Hologres コネクタ」をご参照ください。

データベースシャード間でのデータの同期

説明: MySQL インスタンスには、order_db01 から order_db99 という名前の複数のデータベースシャードがあります。各データベースシャードには、orderorder_detail などの複数のテーブルが含まれています。 CDAS 文を使用すると、これらのデータベースシャード内のすべてのテーブルを、データとスキーマの変更と共に Hologres に同期できます。

ソリューション:

データベース名(`order_db[0-9]+`)に正規表現を使用して、同期するすべてのデータベースシャード(order_db01 から order_db99)を照合します。データベース名とテーブル名は、2 つの追加フィールドとして各シンクテーブルに追加されます。

Hologres テーブルのプライマリキーには、一意性を確保するために、データベース名、テーブル名、およびソーステーブルのプライマリキー列が含まれています。

ターゲットテーブルを事前に作成する必要はありません。

サンプルコードと結果:

データベースシャード間で同じ名前のテーブルは、単一の Hologres テーブルに同期される前にマージされます。

サンプルコード

結果

USE CATALOG holo;

CREATE DATABASE IF NOT EXISTS holo_order  -- Hologres に holo_order という名前のデータベースを作成します。データベースには、MySQL インスタンスの order_db データベースシャード内のすべてのテーブルが含まれています。
WITH('sink.parallelism'='4')        -- 宛先データベースのパラメーターを指定します。デフォルトでは、各 Hologres シンクの並列度は 4 です。この設定はオプションです。
AS DATABASE mysql.`order_db[0-9]+` INCLUDING ALL TABLES -- MySQL インスタンスの order_db データベースシャード内のすべてのテーブルからデータを同期します。
/*+OPTIONS('server-id'='8001-8004')*/;  -- 必要に応じて、MySQL CDC ソーステーブルの追加パラメーターを指定します。この設定はオプションです。

order1

新しいテーブルの同期

説明: CDAS 文を介してデータを同期するジョブが開始された後、新しいテーブルが追加され、同期する必要があります。

ソリューション: ジョブの新しいテーブル検出を有効にし、セーブポイントからジョブを再起動して、新しく追加されたテーブルをキャプチャして同期します。

制限: 新しいテーブル検出は、VVR 8.0.6 以降でサポートされています。この機能を有効にするには、ソーステーブルの起動モードが initial に設定されていることを確認してください。

手順:

  1. [デプロイメント] ページで、ターゲットデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。

  2. ダイアログで、[その他の戦略] セクションを展開し、[セーブポイントで停止] を選択して、[OK] をクリックします。

  3. ジョブの SQL ドラフトで、次の文を追加して新しいテーブル検出を有効にします。

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  4. [デプロイ] をクリックします。

  5. セーブポイントからジョブを復旧します。

    1. [デプロイメント] ページで、デプロイメントの名前をクリックします。

    2. デプロイメントの詳細ページで、[状態] タブをクリックします。次に、[履歴] サブタブをクリックします。

    3. [セーブポイント] リストで、ジョブのキャンセル時に作成されたセーブポイントを見つけます。

    4. [アクション] 列の [詳細] > [このセーブポイントからジョブを開始] を選択します。詳細については、「デプロイメントの開始」をご参照ください。

複数の CDAS 文と CTAS 文の実行

説明: tpcds データベース、tpch データベース、および user_db01 から user_db99 までのデータベースシャードのデータを、単一のジョブで Hologres に同期します。

ソリューション: STATEMENT SET 文を使用して、複数の CDAS 文と CTAS 文をグループ化します。このソリューションでは、ソース頂点を再利用して、必要なテーブルからデータを読み取ります。これは MySQL CDC データソースで特に有益であり、サーバー ID、データベース接続の数、および MySQL データベースへの全体的な読み取り負荷を削減します。

重要
  • ソースを再利用してパフォーマンスを最適化するには、各ソーステーブルのコネクタオプションが同一であることを確認してください。

  • サーバー ID の構成の詳細については、「クライアントごとに異なるサーバー ID を設定する」をご参照ください。

サンプルコード:

USE CATALOG holo;

BEGIN STATEMENT SET;

-- データベースシャード全体のユーザテーブルからデータを同期します。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

-- tpcds データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- tpch データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

複数の CDAS 文を介してデータベース内のデータを Kafka に同期する

説明: 複数の MySQL データベース(tpcdstpch など)のテーブル内のデータを Kafka に同期します。

ソリューション: 複数の CDAS 文を使用してデータベース内のデータを Kafka に同期する場合、同じ名前のテーブルが異なるデータベースに存在する可能性があります。トピックの競合を防ぐために、cdas.topic.pattern オプションを構成して、トピック名のパターンを定義します。 {table-name} プレースホルダーを使用できます。たとえば、'cdas.topic.pattern'='dbname-{table-name}' を指定すると、db1 データベースの table1 テーブルからレプリケートされたデータが dbname-table1 Kafka トピックに送信されます。

サンプルコード:

USE CATALOG kafkaCatalog;

BEGIN STATEMENT SET;

-- tpcds データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

-- tpch データベースからデータを同期します。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;

END;

MySQL と Flink の間に Kafka を中間レイヤーとして導入することで、MySQL への負荷を軽減できます。詳細については、「MySQL データベースのすべてのテーブルから Kafka にデータを同期する」をご参照ください。

FAQ

ランタイムエラー

ジョブパフォーマンス

データ同期

参考資料