CREATE TABLE AS (CTAS) ステートメントを実行して、アップストリームからダウンストリームシステムへのデータとテーブルスキーマの変更をリアルタイムで同期できます。 これにより、デスティネーションシステムでテーブルを作成し、テーブルスキーマの変更を同期する効率が向上します。 このトピックでは、CTAS ステートメントの使用方法と、さまざまなシナリオでの例について説明します。
データの取り込みには、YAML を使用したジョブの作成をお勧めします。 CTAS または CDAS ステートメントを含む既存の SQL ドラフトを YAML ドラフトに変換することができます。
はじめに: YAML を使用してジョブを開発し、ソースからデスティネーションにデータを同期できます。
利点: データベース、テーブル、テーブルスキーマ、カスタム計算列の同期など、CTAS および CDAS ステートメントの主要な機能がサポートされています。 さらに、リアルタイム スキーマ進化、raw バイナリログデータの同期、WHERE 句、列プルーニングもサポートされています。
詳細については、「YAML デプロイメントを使用してデータを取り込む」をご参照ください。
特徴
データ同期
特徴 | 説明 |
テーブルの同期 | ソーステーブルからシンクテーブルへの完全データと増分データをリアルタイムで同期します。 (例: テーブルの同期) |
テーブルシャードの統合と同期 | 正規表現を使用して、データベースとテーブルのシャード名と一致します。 次に、これらのテーブルシャードを統合し、データをシンクテーブルに同期できます。 (例: テーブルシャードのマージと同期) 説明 キャレット (^) を使用して、テーブル名の先頭と一致させることはできません。 |
カスタム計算列の同期 | 特定の列を変換および処理するための計算列を追加します。 計算列にシステム関数またはユーザー定義関数 (UDF) を使用し、追加する計算列の位置を指定できます。 新しく追加された計算列は、シンクテーブルで物理列として使用され、結果はリアルタイムでシンクテーブルに同期されます。 (例: カスタム計算列の同期) |
複数の CTAS ステートメントの実行 |
|
スキーマ進化
データ同期の際に、CTAS ステートメントは、テーブルの作成やスキーマの変更など、ソーステーブルからシンクテーブルへのスキーマ変更の複製をサポートします。
サポートされているスキーマ変更
スキーマ変更
説明
NULL 許容列の追加
関連する列をシンクテーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。 新しい列はデフォルトで NULL 許容列として設定され、変更前のこの列のデータは自動的に NULL に設定されます。
NULL 非許容列の追加
対応する列をシンクテーブルのスキーマの末尾に自動的に追加し、データを同期します。
NULL 許容列の削除
テーブルから列を削除する代わりに、シンクテーブルの NULL 許容列に NULL 値を自動的に設定します。
列名の変更
列名の変更操作には、列の追加と列の削除が含まれます。 ソーステーブルで列の名前が変更されると、新しい名前を使用する列がシンクテーブルの末尾に追加され、元の名前を使用する列には NULL 値が設定されます。
説明たとえば、ソーステーブルの
col_a列の名前がcol_bに変更された場合、col_b列がシンクテーブルの末尾に追加され、col_a列には自動的に NULL 値が設定されます。列のデータ型の変更
列型の変更をサポートするダウンストリームシステムの場合: 現在、Paimon は型変更をサポートする唯一のダウンストリームシステムです。 CTAS ステートメントは、INT から BIGINT など、通常の列の型の変更をサポートしています。
互換性は、システム固有のルールによって異なります (コネクタ ドキュメントを参照)。
列型の変更をサポートしていないダウンストリームシステムの場合: 現在、Hologres のみが型の拡大を使用して列型の変更を処理することをサポートしています。 メカニズムは次のとおりです。ジョブの起動時に、より広いデータ型を持つ Hologres テーブルが作成され、シンクの互換性に基づいて列型の変更がサポートされます。 詳細については、例: 型正規化モードでのデータの同期 を参照してください。
重要Hologres で型の拡大をサポートするには、最初のジョブ起動時に型正規化モードを有効にします。 既存のジョブの場合は、Hologres テーブルを削除し、状態なしで再起動して、型正規化設定を適用します。
重要CTAS ステートメントを同期に使用する場合、システムはスキーマの違いのみを比較し、特定の DDL タイプは識別しません。 たとえば、
列が削除され、その後再び追加された場合、この期間中にデータが変更されていないと、システムはスキーマの変更を検出しません。
列が削除され、その後再び追加された場合、この期間中にデータが変更されると、システムはスキーマの変更を検出して同期します。
サポートされていないスキーマ変更
プライマリキーやインデックスなどの制約を変更します。
NULL 非許容列を削除します。
NOT NULL から NULLABLE に変更します。
重要サポートされていないスキーマ変更を同期するには、シンクテーブルを手動で削除し、ジョブを再起動して履歴データを再同期します。
同期プロセス
次のフローチャートは、CTAS ステートメントを使用して MySQL から Hologres にデータを同期するプロセスを示しています。
フローチャート | 説明 |
CTAS ステートメントを実行すると、Realtime Compute for Apache Flink は次の処理を行います。
|
前提条件
ワークスペースにデスティネーションストアのカタログが作成されています。 詳細については、「カタログの管理」をご参照ください。
制限事項
構文の制限
CTAS ステートメントを含む SQL ドラフトのデバッグはサポートされていません。
CTAS ステートメントは、同じ SQL ドラフトで INSERT INTO ステートメントと一緒に使用することはできません。
StarRocks パーティションテーブルにデータを同期することはできません。
MiniBatch はサポートされていません。
重要CTAS または CDAS ステートメントを含む SQL ドラフトを作成する前に、MiniBatch 構成が削除されていることを確認してください。 次の手順を実行します。
に移動します。
[デプロイメントのデフォルト] タブを選択します。
[その他の構成] セクションで、MiniBatch 構成が削除されていることを確認します。
SQL ドラフトからデプロイメントを作成したり、デプロイメントを開始したりするときにエラーが報告された場合は、「「現在、CTAS/CDAS 構文で StreamExecMiniBatchAssigner タイプの ExecNode のマージはサポートされていません」エラーを修正する方法」をご参照ください。
サポートされているアップストリームシステムとダウンストリームシステム
次の表に、CTAS ステートメントを使用できるアップストリーム データストアとダウンストリーム データストアを示します。
コネクタ | ソーステーブル | シンクテーブル | 注記 |
サポートされています | サポートされていません |
| |
サポートされています | サポートされていません | 該当なし | |
サポートされています | サポートされていません |
| |
サポートされていません | サポートされています | 該当なし | |
サポートされていません | サポートされています | Alibaba Cloud EMR 上の StarRocks に限定されたサポートです。 | |
サポートされていません | サポートされています | Hologres がデータ同期のデスティネーションシステムとして機能する場合、システムは 説明 ソーステーブルのデータ型が Hologres の 固定プラン機能でサポートされていない場合は、データ同期に INSERT INTO ステートメントを使用します。 固定プランを使用できないため書き込みパフォーマンスが低下する CTAS ステートメントは使用しないでください。 | |
サポートされていません | サポートされています |
構文
CREATE TABLE IF NOT EXISTS <sink_table>
(
[ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
key1=val1,
key2=val2,
...
)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];
<sink_table>:
[catalog_name.][db_name.]table_name
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<source_table>:
[catalog_name.][db_name.]table_name
<column_component>:
column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]CTAS ステートメントは、CREATE TABLE ステートメントの基本構文を使用します。 次の表に、いくつかの引数を示します。
引数 | 説明 |
| データ同期のターゲットテーブル名。 オプションで、カタログとデータベースを含めて、テーブルの完全修飾名を使用します。 |
| シンクテーブルの説明。 デフォルトでは、source_table の説明が使用されます。 |
| パーティション列を指定します。 重要 StarRocks パーティションテーブルにデータを同期することはできません。 |
| プライマリキー。テーブル内の各レコードの一意の識別子です。 |
| シンクテーブルのコネクタオプション。 詳細については、Upsert Kafka コネクタ、Hologres コネクタ、StarRocks コネクタ、または Paimon コネクタの「WITH 句のコネクタオプション」セクションをご参照ください。 説明 キーと値の両方が STRING 型である必要があります (例: |
| ソーステーブル名。 オプションで、テーブルのカタログとデータベースを含む完全修飾名を使用します。 |
| ソーステーブルのコネクタオプション。 詳細については、MySQL コネクタ および Kafka コネクタの「WITH 句のコネクタオプション」をご参照ください。 説明 キーと値の両方が STRING 型である必要があります (例: 'server-id' = '65500')。 |
| データ同期の際にシンクテーブルに列を追加します。 計算列のみがサポートされています。 |
| 新しい列の説明。 |
| 計算列の式の説明。 |
| 新しい列をシンクテーブルの最初のフィールドとして使用することを指定します。 デフォルトでは、新しい列はシンクテーブルの末尾に追加されます。 |
| 新しい列を特定のフィールドの後に追加することを指定します。 |
IF NOT EXISTSキーワードは必須ですIF NOT EXISTS注意:。 システムに、デスティネーションストアにシンクテーブルが存在するかどうかを確認するように促します。 存在しない場合、システムはシンクテーブルを作成します。 存在する場合は、テーブルの作成はスキップされます。作成されたシンクテーブルは、プライマリキーと物理フィールド名と型を含め、ソーステーブルのスキーマを共有しますが、計算列、メタデータフィールド、およびウォーターマーク構成は除外されます。
Realtime Compute for Apache Flink は、データ同期の際に、ソーステーブルからシンクテーブルへのデータ型のマッピングを実行します。 データ型のマッピングの詳細については、特定のコネクタドキュメントをご参照ください。
例
テーブルの同期
説明: MySQL から Hologres に web_sales テーブルを同期します。
前提条件:
holoという名前の Hologres カタログが作成されています。mysqlという名前の MySQL カタログが作成されています。
サンプルコード:
CTAS ステートメントは、完全データ同期と増分データ同期をサポートするために、多くの場合、ソースカタログとデスティネーションカタログと一緒に使用されます。 ソースカタログは、明示的な DDL なしで、ソーステーブルのスキーマとプロパティを自動的に解析します。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS web_sales -- デフォルトデータベースの web_sales テーブルにデータを同期します。
WITH ('jdbcWriteBatchSize' = '1024') -- オプションで、シンクテーブルのコネクタオプションを構成します。
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */; -- オプションで、MySQL CDC ソーステーブルに追加オプションを構成します。テーブルとデータベースのシャードの統合と同期
説明: データを Hologres テーブルに同期する前に、シャード化された MySQL テーブルとデータベースを統合します。
方法: MySQL カタログと正規表現を使用して、同期するデータベースとテーブルと一致します。
データベース名とテーブル名は、2 つの追加フィールドの値としてシンクテーブルに書き込まれます。 シンクテーブルのプライマリキーは、データベース名、テーブル名、および元のプライマリキーで構成され、プライマリキーの一意性が保証されます。
コードと結果:
サンプルコード | 結果 |
テーブルシャードの統合と同期: |
|
ソーステーブルのスキーマの変更: |
|
カスタム計算列の同期
説明: 統合されたテーブルとデータベースのシャードを MySQL から Hologres に同期する際に、カスタム計算列を追加します。
コードと結果:
サンプルコード | 結果 |
|
|
単一ジョブでの複数の CTAS ステートメントの実行
説明: web_sales テーブルと user テーブルのシャードを MySQL から Hologres に単一ジョブで同期します。
方法: STATEMENT SET を使用して、複数の CTAS ステートメントをグループとして実行します。 このアプローチでは、ソース頂点を再利用して複数のテーブルからデータを読み取るため、サーバー ID、データベース接続の数、および全体的な読み取り負荷が軽減されます。
ソースを再利用してパフォーマンスを最適化するには、各ソーステーブルのコネクタオプションが同一であることを確認してください。
サーバー ID の構成については、「クライアントごとに異なるサーバー ID を設定する」をご参照ください。
サンプルコード:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- web_sales テーブルからデータを同期します。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;
-- user テーブルのシャードからデータを同期します。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
END;複数の CTAS ステートメントを使用して、ソースから複数のシンクにデータを同期する
シンクテーブルに計算列は追加されません
USE CATALOG `holo`; BEGIN STATEMENT SET; -- MySQL テーブルの user テーブルから Hologres の database1 の user テーブルにデータを同期します。 CREATE TABLE IF NOT EXISTS `database1`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; -- MySQL データベースの user テーブルから Hologres の database2 の user テーブルにデータを同期します。 CREATE TABLE IF NOT EXISTS `database2`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; END;計算列がシンクテーブルに追加されます
-- ソーステーブル user に基づいて user_with_changed_id という名前の一時テーブルを作成します。 ソーステーブルの id 列に基づいて computed_id 列を定義します。 CREATE TEMPORARY TABLE `user_with_changed_id` ( `computed_id` AS `id` + 1000 ) LIKE `mysql`.`tpcds`.`user`; -- ソーステーブル user に基づいて user_with_changed_age という名前の一時テーブルを作成します。 ソーステーブルの age 列に基づいて computed_age 列を定義します。 CREATE TEMPORARY TABLE `user_with_changed_age` ( `computed_age` AS `age` + 1 ) LIKE `mysql`.`tpcds`.`user`; BEGIN STATEMENT SET; -- MySQL データベースの user テーブルから Hologres の user_with_changed_id テーブルにデータを同期します。 user_with_changed_id テーブルには、ソーステーブルの id 列に基づく計算から取得された ID が含まれています。 取得された ID は computed_id 列にあります。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id` AS TABLE `user_with_changed_id` /*+ OPTIONS('server-id'='8001-8004') */; -- MySQL データベースの user テーブルから Hologres の user_with_changed_age テーブルにデータを同期します。 user_with_changed_age テーブルには、ソーステーブルの age 列に基づく計算から取得された年齢値が含まれています。 取得された年齢値は computed_age 列にあります。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age` AS TABLE `user_with_changed_age` /*+ OPTIONS('server-id'='8001-8004') */; END;
複数の CTAS ステートメントを使用して新しいテーブルを同期する
シナリオの説明: 同期に複数の CTAS ステートメントを使用するジョブが開始された後、新しいテーブルを複製するために CTAS ステートメントを追加します。
方法: ジョブの新しいテーブル検出を有効にし、ジョブの SQL コードに CTAS ステートメントを追加し、セーブポイントから再起動します。 新しいテーブルがキャプチャされると、データが複製されます。
制限事項:
新しいテーブル検出は、VVR 8.0.1 以降でサポートされています。
CDC ソーステーブルからデータを同期する場合、初期モードで開始されたジョブのみが新しいテーブルを検出できます。
新しい CTAS ステートメントを使用して追加されたソーステーブルの構成は、元のソーステーブルの構成と同じである必要があります。 これにより、ソース頂点を再利用できます。
CTAS ステートメントを追加する前後のジョブ構成パラメーターは同じである必要があります。 たとえば、起動モードは同じである必要があります。
手順:
[デプロイメント] ページで、ターゲットデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。
ダイアログで、[その他の戦略] セクションを展開し、[セーブポイントで停止] を選択して、[OK] をクリックします。
ジョブの SQL ドラフトで、新しいテーブル検出を有効にし、CTAS ステートメントを追加します。
次のステートメントを追加して、新しいテーブル検出を有効にします。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';CTAS ステートメントを追加します。 ジョブの完全なコードは次のようになります。
-- 新しいテーブル検出を有効にする SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; USE CATALOG holo; BEGIN STATEMENT SET; -- web_sales テーブルからデータを同期します。 CREATE TABLE IF NOT EXISTS web_sales AS TABLE mysql.tpcds.web_sales /*+ OPTIONS('server-id'='8001-8004') */; -- user テーブルのシャードからデータを同期します。 CREATE TABLE IF NOT EXISTS user AS TABLE mysql.`wp.*`.`user[0-9]+` /*+ OPTIONS('server-id'='8001-8004') */; -- product テーブルからデータを同期します。 CREATE TABLE IF NOT EXISTS product AS TABLE mysql.tpcds.product /*+ OPTIONS('server-id'='8001-8004') */; END;[デプロイ] をクリックします。
セーブポイントからジョブをリカバリします。
[デプロイメント] ページで、デプロイメントの名前をクリックします。
デプロイメントの詳細ページで、[状態] タブをクリックします。 次に、[履歴] サブタブをクリックします。
[セーブポイント] リストで、ジョブのキャンセル時に作成されたセーブポイントを見つけます。
[アクション] 列で を選択します。 詳細については、「デプロイメントの開始」をご参照ください。
Hologres のパーティションテーブルへの同期
シナリオの説明: MySQL から Hologres パーティションテーブルにデータを複製します。
使用上の注意: Hologres テーブルにプライマリキーが定義されている場合、パーティション列はプライマリキーに含まれている必要があります。
サンプルコード:
MySQL テーブルを作成します。
CREATE TABLE orders (
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
city VARCHAR(100) NOT NULL
order_date DATE,
purchaser INTEGER,
PRIMARY KEY(order_id, product_id)
);パーティション列がプライマリキーの一部であるかどうかによって、適切な方法を選択します。
ソースプライマリキーにパーティション列が含まれている場合:
CTAS ステートメントを直接使用します。
Hologres は、パーティション列がプライマリキーに含まれているかどうかを自動的に検証します。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` PARTITIONED BY (product_id) AS TABLE `mysql`.`tpcds`.`orders`;
ソースプライマリキーにパーティション列が含まれていない場合:
CTAS ステートメントでシンクテーブルのプライマリキーを宣言し、プライマリキー定義にパーティション列を含めます。
この場合、プライマリキーを再定義しないか、パーティション列を含めないと、ジョブは失敗します。
-- order_id、product_id、および city フィールドを Hologres パーティションテーブルのプライマリキーとして宣言します。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`( CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED ) PARTITIONED BY (city) AS TABLE `mysql`.`tpcds`.`orders`;
レプリケーション中のデータ型の拡大
シナリオの説明: データ同期の際に、列の精度を VARCHAR(10) から VARCHAR(20) に変更したり、列のデータ型を SMALLINT から INT に変更したりします。
方法:
新しいジョブ: 初回起動時に型正規化モードを有効にします。
既存のジョブ: Hologres シンクテーブルを削除し、状態なしで再起動して型正規化を適用します。
型正規化ルール:
新しいデータ型と元のデータ型が同じデータ型に正規化されている場合、データ型は正常に変更され、ジョブは正常に実行されます。 それ以外の場合、例外が報告されます。 詳細は次のとおりです。
TINYINT、SMALLINT、INT、および BIGINT は BIGINT に変換されます。
CHAR、VARCHAR、および STRING は STRING に変換されます。
FLOAT および DOUBLE は DOUBLE に変換されます。
その他のデータ型は、Hologres フィールドと Flink フィールド間のデータ型のマッピングに基づいて変換されます。 詳細については、「データ型マッピング」をご参照ください。
サンプルコード:
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
WITH (
'connector' = 'hologres',
'enableTypeNormalization' = 'true' -- 型正規化モードを有効にします。
) AS TABLE `mysql`.`tpcds`.`orders`;MongoDB から Hologres へのデータの同期
制限事項:
サポートは、VVR 8.0.6 以降および MongoDB バージョン 6.0 以降に限定されています。
ソーステーブルのコネクタオプションでは、scan.incremental.snapshot.enabled と scan.full-changelog を
trueに設定する必要があります。MongoDB データベースで preimage 機能と postimage 機能を有効にする必要があります。 詳細については、「ドキュメントのプレイメージ」をご参照ください。
単一ジョブで複数の MongoDB コレクションからデータを同期するには、すべてのテーブルで次のコネクタオプションの構成が同一であることを確認してください。
hosts、scheme、username、password、connectionOptionsなどの MongoDB データベース関連のオプションscan.startup.mode
サンプルコード:
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
END;FAQ
ランタイムエラー
ジョブパフォーマンス
データ同期
参考資料
CTAS ステートメントは、多くの場合、カタログと一緒に使用されます。カタログは、テーブルの永続的なメタデータ管理を提供し、ジョブ間のデータアクセスを可能にします。一般的なカタログ:
CTAS および CDAS ステートメントを使用するためのベストプラクティス:
データベース内のすべてのテーブルからのデータの同期、シャードデータベース内のテーブルのデータのマージと同期、またはソースデータベース内の新しいテーブルからのデータの同期の詳細については、「CREATE DATABASE AS (CDAS)」をご参照ください。
データベースの同期中に MySQL データベースのデータ読み取り負荷を軽減するには、データベース内のデータを Kafka に同期できます。 MySQL データベース内のすべてのテーブルから Kafka にデータを同期する を参照してください。
CTAS および CDAS ステートメントを使用してデータ同期を実行する方法については、「データウェアハウスにデータをリアルタイムで取り込む」、「Realtime Compute for Apache Flink と Hologres を使用してリアルタイム データウェアハウスを構築する」、または「Realtime Compute for Apache Flink、Apache Paimon、および StarRocks を使用してストリーミングデータ レイクハウスを構築する」をご参照ください。
YAML によるデータの取り込み:


