CREATE TABLE AS (CTAS) 文を実行すると、上流システムから下流システムへデータとテーブルスキーマの変更をリアルタイムで同期できます。これにより、宛先システムでのテーブル作成とテーブルスキーマ変更の同期の効率が向上します。このトピックでは、CTAS 文の使用方法と、さまざまなシナリオでの例について説明します。
データインジェストには、YAML を使用してジョブを作成することを推奨します。CTAS 文または CDAS 文を含む既存の SQL ドラフトを YAML ドラフトに変換できます。
概要:YAML を使用してジョブを開発し、ソースから宛先へデータを同期できます。
利点:データベース、テーブル、テーブルスキーマ、カスタム計算列の同期など、CTAS 文と CDAS 文の主要な機能がサポートされています。さらに、リアルタイムのスキーマ進化、生のバイナリログデータの同期、WHERE 句、列のプルーニングもサポートされています。
詳細については、「Flink CDC を使用したデータインジェスト」をご参照ください。
特徴
データ同期
特徴 | 説明 |
テーブルの同期 | ソーステーブルから結果テーブルへ、完全データと増分データをリアルタイムで同期します。(例:テーブルの同期) |
テーブルシャードの統合と同期 | 正規表現を使用してデータベースとテーブルシャードの名前を照合します。その後、これらのテーブルシャードを統合し、データを結果テーブルに同期できます。(例:テーブルシャードのマージと同期) 説明 キャレット (^) を使用してテーブル名の先頭を照合することはできません。 |
カスタム計算列の同期 | 特定の列を変換および処理するための計算列を追加します。計算列にはシステム関数またはユーザー定義関数 (UDF) を使用でき、追加する計算列の位置を指定できます。新しく追加された計算列は、結果テーブルで物理列として使用され、その結果はリアルタイムで結果テーブルに同期されます。(例:カスタム計算列の同期) |
複数の CTAS 文の実行 |
|
スキーマ進化
データ同期中、CTAS 文は、テーブルの作成やスキーマの変更など、ソーステーブルから結果テーブルへのスキーマ変更のレプリケーションをサポートします。
サポートされているスキーマ変更
スキーマ変更
説明
NULL 値を許容する列の追加
関連する列を結果テーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。新しい列はデフォルトで NULL 値を許容する列として設定され、変更前のこの列のデータは自動的に NULL に設定されます。
NULL 値を許容しない列の追加
対応する列を結果テーブルのスキーマの末尾に自動的に追加し、データを同期します。
NULL 値を許容する列の削除
テーブルから列を削除する代わりに、結果テーブルの NULL 値を許容する列に NULL 値を自動的に入力します。
列名の変更
列名の変更操作には、列の追加と列の削除が含まれます。ソーステーブルで列の名前が変更されると、新しい名前を使用する列が結果テーブルの末尾に追加され、元の名前を使用する列には NULL 値が入力されます。
説明たとえば、ソーステーブルの
col_a列の名前がcol_bに変更された場合、col_b列が結果テーブルの末尾に追加され、col_a列には自動的に NULL 値が入力されます。列のデータ型の変更
列の型変更をサポートする下流システムの場合:現在、型の変更をサポートしている下流システムは Paimon のみです。CTAS 文は、INT から BIGINT への変更など、通常の列の型変更をサポートしています。
互換性はシステム固有のルールに依存します (コネクタのドキュメントをご参照ください)。
列の型変更をサポートしない下流システムの場合:現在、Hologres のみが型拡張を使用して列の型変更を処理できます。その仕組みは次のとおりです。ジョブの起動時に、より広いデータ型を持つ Hologres テーブルが作成され、結果テーブルの互換性に基づいて列の型の変更がサポートされます。詳細については、「例:型正規化モードでのデータ同期」をご参照ください。
重要Hologres で型拡張を有効にするには、最初のジョブ起動時に型正規化を有効にする必要があります。既存のジョブの場合は、Hologres テーブルを削除し、ステートなしで再起動して型正規化設定を適用してください。
重要CTAS 文を同期に使用する場合、システムはスキーマの差分のみを比較し、特定の DDL タイプを識別しません。例:
ある列が削除され、その後すぐにデータ変更なしで再度追加された場合、システムはスキーマの変更を検出しません。
ある列が削除され、その間にデータ変更があった後に再度追加された場合、システムはスキーマの変更を検出して同期します。
サポートされていないスキーマ変更
プライマリキーやインデックスなどの制約の変更。
NULL 値を許容しない列の削除。
NOT NULL から NULLABLE への変更。
重要サポートされていないスキーマ変更を同期するには、手動で結果テーブルを削除し、ジョブを再起動して既存データを再同期してください。
同期プロセス
次のフローチャートは、CTAS 文を使用して MySQL から Hologres にデータを同期するプロセスを示しています。
フローチャート | 説明 |
CTAS 文を実行すると、Realtime Compute for Apache Flink は次の処理を行います。
|
前提条件
ご利用のワークスペースに宛先ストアのカタログが作成されていること。詳細については、「カタログ」をご参照ください。
制限事項
構文の制限事項
CTAS 文を含む SQL ドラフトのデバッグはサポートされていません。
CTAS 文は、同じ SQL ドラフト内で INSERT INTO 文と併用できません。
StarRocks のパーティションテーブルにデータを同期することはできません。
MiniBatch はサポートされていません。
重要CTAS 文または CDAS 文を含む SQL ドラフトを作成する前に、MiniBatch の構成が削除されていることを確認してください。次の手順を実行します。
に移動します。
[Deployment Defaults] タブを選択します。
[Other Configuration] セクションで、MiniBatch の構成が削除されていることを確認します。
SQL ドラフトからデプロイメントを作成するとき、またはデプロイメントを開始するときにエラーが報告された場合は、「「Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax」というエラーを修正するにはどうすればよいですか?」をご参照ください。
サポートされている上流および下流システム
次の表に、CTAS 文を使用できる上流および下流のデータストアを示します。
コネクタ | ソーステーブル | シンクテーブル | 注記 |
サポートされています | サポートされていません |
| |
サポートされています | サポートされていません | N/A | |
サポートされています | サポートされていません |
| |
サポートされていません | サポートされています | N/A | |
サポートされていません | サポートされています | Alibaba Cloud EMR 上の StarRocks のみがサポートされています。 | |
サポートされていません | サポートされています | Hologres がデータ同期の宛先システムとして機能する場合、システムは 説明 ソーステーブルのデータ型が Hologres の固定プラン機能でサポートされていない場合は、データ同期に INSERT INTO 文を使用してください。固定プランが使用できないため、CTAS 文を使用すると書き込みパフォーマンスが低下します。 | |
サポートされていません | サポートされています |
構文
CREATE TABLE IF NOT EXISTS <sink_table>
(
[ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
key1=val1,
key2=val2,
...
)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];
<sink_table>:
[catalog_name.][db_name.]table_name
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<source_table>:
[catalog_name.][db_name.]table_name
<column_component>:
column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]CTAS 文は CREATE TABLE 文の基本構文を使用します。次の表に、一部の引数について説明します。
引数 | 説明 |
| データ同期のターゲットテーブル名。オプションで、カタログとデータベースを含めてテーブルの完全修飾名を使用します。 |
| 結果テーブルの説明。デフォルトでは、source_table の説明が使用されます。 |
| パーティション列を指定します。 重要 StarRocks のパーティションテーブルにデータを同期することはできません。 |
| プライマリキー。テーブル内の各レコードの一意の識別子です。 |
| 結果テーブルのコネクタオプション。詳細については、「Upsert Kafka コネクタ」、「Hologres コネクタ」、「StarRocks」、または「Paimon コネクタ」の「WITH 句のコネクタオプション」セクションをご参照ください。 説明 キーと値の両方が STRING 型である必要があります (例: |
| ソーステーブル名。オプションで、テーブルのカタログとデータベースを含む完全修飾名を使用します。 |
| ソーステーブルのコネクタオプション。詳細については、「MySQL コネクタ」および「Kafka コネクタ」の「WITH 句のコネクタオプション」をご参照ください。 説明 キーと値の両方が STRING 型である必要があります (例:'server-id' = '65500')。 |
| 結果テーブルに計算列を追加するか、ソース列の名前を変更します。 重要 純粋な列マッピング (例: |
| 新しい列の説明。 |
| 計算列の式の説明。 |
| 新しい列を結果テーブルの最初のフィールドとして使用することを指定します。デフォルトでは、新しい列は結果テーブルの末尾に追加されます。 |
| 新しい列を特定のフィールドの後に追加することを指定します。 |
IF NOT EXISTSキーワードは必須です。これにより、システムは宛先ストアに結果テーブルが存在するかどうかを確認します。存在しない場合、システムは結果テーブルを作成します。存在する場合、テーブルの作成はスキップされます。作成された結果テーブルは、プライマリキー、物理フィールド名、型を含むソーステーブルのスキーマを共有しますが、計算列、メタデータフィールド、ウォーターマーク構成は除外されます。
Realtime Compute for Apache Flink は、データ同期中にソーステーブルから結果テーブルへのデータ型のマッピングを実行します。データ型のマッピングの詳細については、特定のコネクタのドキュメントをご参照ください。
例
テーブルの同期
説明:MySQL から Hologres へ web_sales テーブルを同期します。
前提条件:
holoという名前の Hologres カタログが作成されていること。mysqlという名前の MySQL カタログが作成されていること。
サンプルコード:
CTAS 文は、多くの場合、ソースカタログと宛先カタログと共に使用され、完全データ同期と増分データ同期をサポートします。ソースカタログは、明示的な DDL なしでソーステーブルのスキーマとプロパティを自動的に解析します。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS web_sales -- データをデフォルトデータベースの web_sales テーブルに同期します。
WITH ('jdbcWriteBatchSize' = '1024') -- オプションで、結果テーブルのコネクタオプションを構成します。
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */; -- オプションで、MySQL CDC ソーステーブルの追加オプションを構成します。テーブルおよびデータベースシャードの統合と同期
説明:シャード化された MySQL のテーブルとデータベースを統合してから、データを Hologres テーブルに同期します。
方法:MySQL カタログと正規表現を使用して、同期したいデータベースとテーブルを照合します。
データベース名とテーブル名は、2 つの追加フィールドの値として結果テーブルに書き込まれます。結果テーブルのプライマリキーは、データベース名、テーブル名、および元のプライマリキーで構成され、プライマリキーが一意であることを保証します。
コードと結果:
サンプルコード | 結果 |
テーブルシャードの統合と同期: |
|
ソーステーブルのスキーマ変更: |
|
カスタム計算列の同期
説明:MySQL から Hologres への統合されたテーブルおよびデータベースシャードの同期中に、カスタム計算列を追加します。
コードと結果:
サンプルコード | 結果 |
|
|
単一ジョブでの複数の CTAS 文の実行
説明:単一のジョブで、MySQL から Hologres へ web_sales テーブルと user テーブルシャードを同期します。
方法:STATEMENT SET を使用して、複数の CTAS 文をグループとして実行します。このアプローチでは、ソース頂点を再利用して複数のテーブルからデータを読み取るため、サーバー ID の数、データベース接続、および全体的な読み取り負荷が削減されます。
ソースを再利用してパフォーマンスを最適化するには、各ソーステーブルのコネクタオプションが同一であることを確認してください。
サーバー ID の構成については、「各クライアントに異なるサーバー ID を設定する」をご参照ください。
サンプルコード:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- web_sales テーブルからデータを同期します。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;
-- user テーブルシャードからデータを同期します。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
END;複数の CTAS 文を使用したソースから複数のシンクへのデータ同期
計算列は結果テーブルに追加されません。
USE CATALOG `holo`; BEGIN STATEMENT SET; -- user MySQL テーブルから Hologres の database1 の user テーブルにデータを同期します。 CREATE TABLE IF NOT EXISTS `database1`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; -- MySQL データベースの user テーブルから Hologres の database2 の user テーブルにデータを同期します。 CREATE TABLE IF NOT EXISTS `database2`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; END;計算列は結果テーブルに追加されます
-- ソーステーブル user に基づいて user_with_changed_id という名前の一時テーブルを作成します。ソーステーブルの id 列に基づいて computed_id 列を定義します。 CREATE TEMPORARY TABLE `user_with_changed_id` ( `computed_id` AS `id` + 1000 ) LIKE `mysql`.`tpcds`.`user`; -- ソーステーブル user に基づいて user_with_changed_age という名前の一時テーブルを作成します。ソーステーブルの age 列に基づいて computed_age 列を定義します。 CREATE TEMPORARY TABLE `user_with_changed_age` ( `computed_age` AS `age` + 1 ) LIKE `mysql`.`tpcds`.`user`; BEGIN STATEMENT SET; -- MySQL データベースの user テーブルから Hologres の user_with_changed_id テーブルにデータを同期します。user_with_changed_id テーブルには、ソーステーブルの id 列に基づく計算から得られた ID が含まれます。得られた ID は computed_id 列にあります。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id` AS TABLE `user_with_changed_id` /*+ OPTIONS('server-id'='8001-8004') */; -- MySQL データベースの user テーブルから Hologres の user_with_changed_age テーブルにデータを同期します。user_with_changed_age テーブルには、ソーステーブルの age 列に基づく計算から得られた年齢値が含まれます。得られた年齢値は computed_age 列にあります。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age` AS TABLE `user_with_changed_age` /*+ OPTIONS('server-id'='8001-8004') */; END;
複数の CTAS 文を使用した新しいテーブルの同期
シナリオの説明:同期に複数の CTAS 文を使用するジョブが開始された後、新しいテーブルをレプリケーションするために CTAS 文を追加します。
方法:ジョブの新規テーブル検出を有効にし、ジョブの SQL コードに CTAS 文を追加し、セーブポイントから再起動します。新しいテーブルがキャプチャされると、データがレプリケーションされます。
制限事項:
新規テーブル検出は VVR 8.0.1 以降でサポートされています。
CDC ソーステーブルからデータを同期する場合、初期モードで開始されたジョブのみが新しいテーブルを検出できます。
新しい CTAS 文を使用して追加されるソーステーブルの構成は、元のソーステーブルの構成と同じでなければなりません。これにより、ソース頂点を再利用できます。
CTAS 文を追加する前後のジョブ構成パラメーターは同じでなければなりません。たとえば、起動モードは同じである必要があります。
手順:
[Deployments] ページで、ターゲットのデプロイメントを見つけ、[Actions] 列の [Cancel] をクリックします。
ダイアログボックスで、[More Strategies] セクションを展開し、[Stop With Savepoint] を選択して、[OK] をクリックします。
ジョブの SQL ドラフトで、新規テーブル検出を有効にし、CTAS 文を追加します。
次の文を追加して、新規テーブル検出を有効にします。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';CTAS 文を追加します。ジョブの完全なコードは次のようになります。
-- 新規テーブル検出を有効にします SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; USE CATALOG holo; BEGIN STATEMENT SET; -- web_sales テーブルからデータを同期します。 CREATE TABLE IF NOT EXISTS web_sales AS TABLE mysql.tpcds.web_sales /*+ OPTIONS('server-id'='8001-8004') */; -- user テーブルシャードからデータを同期します。 CREATE TABLE IF NOT EXISTS user AS TABLE mysql.`wp.*`.`user[0-9]+` /*+ OPTIONS('server-id'='8001-8004') */; -- product テーブルからデータを同期します。 CREATE TABLE IF NOT EXISTS product AS TABLE mysql.tpcds.product /*+ OPTIONS('server-id'='8001-8004') */; END;[Deploy] をクリックします。
セーブポイントからジョブを復旧します。
[Deployments] ページで、デプロイメントの名前をクリックします。
デプロイメントの詳細ページで、[State] タブをクリックします。次に、[History] サブタブをクリックします。
[Savepoints] リストで、ジョブがキャンセルされたときに作成されたセーブポイントを見つけます。
[Actions] 列で を選択します。詳細については、「ジョブデプロイメントの開始」をご参照ください。
Hologres のパーティションテーブルへの同期
シナリオの説明:MySQL から Hologres のパーティションテーブルにデータをレプリケーションします。
使用上の注意:Hologres テーブルにプライマリキーが定義されている場合、パーティション列をプライマリキーに含める必要があります。
サンプルコード:
MySQL テーブルの作成:
CREATE TABLE orders (
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
city VARCHAR(100) NOT NULL
order_date DATE,
purchaser INTEGER,
PRIMARY KEY(order_id, product_id)
);パーティション列がプライマリキーの一部であるかどうかに応じて、適切な方法を選択します。
ソースのプライマリキーにパーティション列が含まれている場合:
CTAS 文を直接使用します。
Hologres は、パーティション列がプライマリキーに含まれているかどうかを自動的に検証します。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` PARTITIONED BY (product_id) AS TABLE `mysql`.`tpcds`.`orders`;
ソースのプライマリキーにパーティション列が含まれていない場合:
CTAS 文で結果テーブルのプライマリキーを宣言し、プライマリキーの定義にパーティション列を含めます。
この場合、プライマリキーを再定義しないか、パーティション列を含めないと、ジョブは失敗します。
-- Hologres パーティションテーブルのプライマリキーとして order_id、product_id、city フィールドを宣言します。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`( CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED ) PARTITIONED BY (city) AS TABLE `mysql`.`tpcds`.`orders`;
レプリケーション中のデータ型の拡張
シナリオの説明:データ同期中に、列の精度を変更する (例:VARCHAR(10) から VARCHAR(20))、または列のデータ型を変更する (例:SMALLINT から INT)。
方法:
新規ジョブ:最初の起動時に型正規化モードを有効にします。
既存のジョブ:Hologres の結果テーブルを削除し、ステートなしで再起動して型正規化を適用します。
型正規化ルール:
新しいデータ型と元のデータ型が同じデータ型に正規化される場合、データ型は正常に変更され、ジョブは正常に実行されます。それ以外の場合は、例外が報告されます。詳細は次のとおりです。
TINYINT、SMALLINT、INT、および BIGINT は BIGINT に変換されます。
CHAR、VARCHAR、および STRING は STRING に変換されます。
FLOAT と DOUBLE は DOUBLE に変換されます。
その他のデータ型は、Hologres と Flink フィールド間のデータ型のマッピングに基づいて変換されます。詳細については、「データ型のマッピング」をご参照ください。
サンプルコード:
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
WITH (
'connector' = 'hologres',
'enableTypeNormalization' = 'true' -- 型正規化モードを有効にします。
) AS TABLE `mysql`.`tpcds`.`orders`;MongoDB から Hologres へのデータ同期
制限事項:
VVR 8.0.6 以降および MongoDB バージョン 6.0 以降でのみサポートされています。
ソーステーブルのコネクタオプションでは、scan.incremental.snapshot.enabled と scan.full-changelog を
trueに設定する必要があります。MongoDB データベースでプリイメージとポストイメージ機能を有効にする必要があります。詳細については、「Document Preimages」をご参照ください。
単一のジョブで複数の MongoDB コレクションからデータを同期するには、すべてのテーブルで次のコネクタオプションの構成が同一であることを確認してください。
MongoDB データベース関連のオプション (
hosts、scheme、username、password、connectionOptionsなど)scan.startup.mode
サンプルコード:
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
END;よくある質問
ランタイムエラー
ジョブのパフォーマンス
データ同期
関連ドキュメント
CTAS 文は、テーブルの永続的なメタデータ管理を提供し、ジョブ間のデータアクセスを可能にするカタログと共によく使用されます。一般的なカタログ:
CTAS 文と CDAS 文を使用するためのベストプラクティス:
データベース内のすべてのテーブルからのデータ同期、シャード化されたデータベース内のテーブルデータのマージと同期、またはソースデータベース内の新しいテーブルからのデータ同期については、「CREATE DATABASE AS (CDAS)」をご参照ください。
データベース同期中の MySQL データベースへのデータ読み取り負荷を軽減するために、データベース内のデータを Kafka に同期できます。「Flink CDC を使用した MySQL から Kafka へのデータベース同期」をご参照ください。
CTAS 文と CDAS 文を使用してデータ同期を実行する方法については、「データウェアハウスへのリアルタイムデータインジェスト」、「Flink と Hologres を使用したリアルタイムデータウェアハウスの構築」、または「Flink、Paimon、StarRocks を使用したストリーミングデータレイクハウスの構築」をご参照ください。
YAML によるデータインジェスト:


