すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:CREATE TABLE AS (CTAS)

最終更新日:Jul 08, 2025

CREATE TABLE AS (CTAS) ステートメントを実行して、アップストリームからダウンストリームシステムへのデータとテーブルスキーマの変更をリアルタイムで同期できます。 これにより、デスティネーションシステムでテーブルを作成し、テーブルスキーマの変更を同期する効率が向上します。 このトピックでは、CTAS ステートメントの使用方法と、さまざまなシナリオでの例について説明します。

説明

データの取り込みには、YAML を使用したジョブの作成をお勧めします。 CTAS または CDAS ステートメントを含む既存の SQL ドラフトを YAML ドラフトに変換することができます。

  • はじめに: YAML を使用してジョブを開発し、ソースからデスティネーションにデータを同期できます。

  • 利点: データベース、テーブル、テーブルスキーマ、カスタム計算列の同期など、CTAS および CDAS ステートメントの主要な機能がサポートされています。 さらに、リアルタイム スキーマ進化、raw バイナリログデータの同期、WHERE 句、列プルーニングもサポートされています。

詳細については、「YAML デプロイメントを使用してデータを取り込む」をご参照ください。

特徴

データ同期

特徴

説明

テーブルの同期

ソーステーブルからシンクテーブルへの完全データと増分データをリアルタイムで同期します。 (例: テーブルの同期)

テーブルシャードの統合と同期

正規表現を使用して、データベースとテーブルのシャード名と一致します。 次に、これらのテーブルシャードを統合し、データをシンクテーブルに同期できます。 (例: テーブルシャードのマージと同期)

説明

キャレット (^) を使用して、テーブル名の先頭と一致させることはできません。

カスタム計算列の同期

特定の列を変換および処理するための計算列を追加します。 計算列にシステム関数またはユーザー定義関数 (UDF) を使用し、追加する計算列の位置を指定できます。 新しく追加された計算列は、シンクテーブルで物理列として使用され、結果はリアルタイムでシンクテーブルに同期されます。 (例: カスタム計算列の同期)

複数の CTAS ステートメントの実行

スキーマ進化

データ同期の際に、CTAS ステートメントは、テーブルの作成やスキーマの変更など、ソーステーブルからシンクテーブルへのスキーマ変更の複製をサポートします。

  • サポートされているスキーマ変更

    スキーマ変更

    説明

    NULL 許容列の追加

    関連する列をシンクテーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。 新しい列はデフォルトで NULL 許容列として設定され、変更前のこの列のデータは自動的に NULL に設定されます。

    NULL 非許容列の追加

    対応する列をシンクテーブルのスキーマの末尾に自動的に追加し、データを同期します。

    NULL 許容列の削除

    テーブルから列を削除する代わりに、シンクテーブルの NULL 許容列に NULL 値を自動的に設定します。

    列名の変更

    列名の変更操作には、列の追加と列の削除が含まれます。 ソーステーブルで列の名前が変更されると、新しい名前を使用する列がシンクテーブルの末尾に追加され、元の名前を使用する列には NULL 値が設定されます。

    説明

    たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列がシンクテーブルの末尾に追加され、col_a 列には自動的に NULL 値が設定されます。

    列のデータ型の変更

    • 列型の変更をサポートするダウンストリームシステムの場合: 現在、Paimon は型変更をサポートする唯一のダウンストリームシステムです。 CTAS ステートメントは、INT から BIGINT など、通常の列の型の変更をサポートしています。

      互換性は、システム固有のルールによって異なります (コネクタ ドキュメントを参照)。

    • 列型の変更をサポートしていないダウンストリームシステムの場合: 現在、Hologres のみが型の拡大を使用して列型の変更を処理することをサポートしています。 メカニズムは次のとおりです。ジョブの起動時に、より広いデータ型を持つ Hologres テーブルが作成され、シンクの互換性に基づいて列型の変更がサポートされます。 詳細については、例: 型正規化モードでのデータの同期 を参照してください。

      重要

      Hologres で型の拡大をサポートするには、最初のジョブ起動時に型正規化モードを有効にします。 既存のジョブの場合は、Hologres テーブルを削除し、状態なしで再起動して、型正規化設定を適用します。

    重要

    CTAS ステートメントを同期に使用する場合、システムはスキーマの違いのみを比較し、特定の DDL タイプは識別しません。 たとえば、

    • 列が削除され、その後再び追加された場合、この期間中にデータが変更されていないと、システムはスキーマの変更を検出しません。

    • 列が削除され、その後再び追加された場合、この期間中にデータが変更されると、システムはスキーマの変更を検出して同期します。

  • サポートされていないスキーマ変更

    • プライマリキーやインデックスなどの制約を変更します。

    • NULL 非許容列を削除します。

    • NOT NULL から NULLABLE に変更します。

    重要

    サポートされていないスキーマ変更を同期するには、シンクテーブルを手動で削除し、ジョブを再起動して履歴データを再同期します。

同期プロセス

次のフローチャートは、CTAS ステートメントを使用して MySQL から Hologres にデータを同期するプロセスを示しています。

フローチャート

説明

image

CTAS ステートメントを実行すると、Realtime Compute for Apache Flink は次の処理を行います。

  1. デスティネーションシステムにシンクテーブルが存在するかどうかを確認します。

    • 存在しない場合、Realtime Compute for Apache Flink は、デスティネーションストアのカタログを使用して、ソーステーブルのスキーマをミラーリングするシンクテーブルを作成します。

    • 存在する場合、Realtime Compute for Apache Flink はテーブルの作成をスキップし、シンクテーブルのスキーマとソーステーブルのスキーマの一貫性を検証します。 スキーマが異なる場合は、エラーが報告されます。

  2. データ同期ジョブをコミットして実行します。

    Realtime Compute for Apache Flink は、ソースからシンクにデータとスキーマの変更を同期します。

前提条件

ワークスペースにデスティネーションストアのカタログが作成されています。 詳細については、「カタログの管理」をご参照ください。

制限事項

構文の制限

  • CTAS ステートメントを含む SQL ドラフトのデバッグはサポートされていません。

  • CTAS ステートメントは、同じ SQL ドラフトで INSERT INTO ステートメントと一緒に使用することはできません。

  • StarRocks パーティションテーブルにデータを同期することはできません。

  • MiniBatch はサポートされていません。

    重要

サポートされているアップストリームシステムとダウンストリームシステム

次の表に、CTAS ステートメントを使用できるアップストリーム データストアとダウンストリーム データストアを示します。

コネクタ

ソーステーブル

シンクテーブル

注記

MySQL

サポートされています

サポートされていません

  • データベース名とテーブル名は、シャードテーブルとデータベースの統合と同期の際に自動的に同期されます。

  • 単一テーブルの同期の際に、データベース名とテーブル名を同期するには、Flink SQL を介して MySQL カタログを作成し、catalog.table.metadata-columns オプションを含めます。 詳細については、「MySQL カタログの管理」をご参照ください。

  • ビューは同期できません。

Kafka コネクタ

サポートされています

サポートされていません

該当なし

MongoDB

サポートされています

サポートされていません

  • シャードテーブルとデータベースの統合と同期はサポートされていません。

  • MongoDB データベースのメタデータは同期できません。

  • ソースデータベースの新しいテーブルはキャプチャまたは同期できません。

  • データとテーブルスキーマの変更は、MongoDB から他のシステムに同期できます。 詳細については、MongoDB ソーステーブルから Hologres テーブルへのデータの同期 を参照してください。

Upsert Kafka

サポートされていません

サポートされています

該当なし

StarRocks コネクタ

サポートされていません

サポートされています

Alibaba Cloud EMR 上の StarRocks に限定されたサポートです。

Hologres コネクタ

サポートされていません

サポートされています

Hologres がデータ同期のデスティネーションシステムとして機能する場合、システムは connectionSize オプションの値に基づいて、テーブルごとに接続を自動的に作成します。 connectionPoolName オプションを使用して、複数のテーブルに同じ接続プールを構成できます。

説明

ソーステーブルのデータ型が Hologres の 固定プラン機能でサポートされていない場合は、データ同期に INSERT INTO ステートメントを使用します。 固定プランを使用できないため書き込みパフォーマンスが低下する CTAS ステートメントは使用しないでください。

Paimon コネクタ

サポートされていません

サポートされています

構文

CREATE TABLE IF NOT EXISTS <sink_table>
(
  [ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
  key1=val1,
  key2=val2, 
  ...
 )
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]

CTAS ステートメントは、CREATE TABLE ステートメントの基本構文を使用します。 次の表に、いくつかの引数を示します。

引数

説明

sink_table

データ同期のターゲットテーブル名。 オプションで、カタログとデータベースを含めて、テーブルの完全修飾名を使用します。

COMMENT

シンクテーブルの説明。 デフォルトでは、source_table の説明が使用されます。

PARTITIONED BY

パーティション列を指定します。

重要

StarRocks パーティションテーブルにデータを同期することはできません。

table_constraint

プライマリキー。テーブル内の各レコードの一意の識別子です。

WITH

シンクテーブルのコネクタオプション。 詳細については、Upsert Kafka コネクタHologres コネクタStarRocks コネクタ、または Paimon コネクタの「WITH 句のコネクタオプション」セクションをご参照ください。

説明

キーと値の両方が STRING 型である必要があります (例: 'jdbcWriteBatchSize' = '1024')。

source_table

ソーステーブル名。 オプションで、テーブルのカタログとデータベースを含む完全修飾名を使用します。

OPTIONS

ソーステーブルのコネクタオプション。 詳細については、MySQL コネクタ および Kafka コネクタの「WITH 句のコネクタオプション」をご参照ください。

説明

キーと値の両方が STRING 型である必要があります (例: 'server-id' = '65500')。

ADD COLUMN

データ同期の際にシンクテーブルに列を追加します。 計算列のみがサポートされています。

column_component

新しい列の説明。

computed_column_expression

計算列の式の説明。

FIRST

新しい列をシンクテーブルの最初のフィールドとして使用することを指定します。 デフォルトでは、新しい列はシンクテーブルの末尾に追加されます。

AFTER

新しい列を特定のフィールドの後に追加することを指定します。

説明
  • IF NOT EXISTS キーワードは必須ですIF NOT EXISTS注意:。 システムに、デスティネーションストアにシンクテーブルが存在するかどうかを確認するように促します。 存在しない場合、システムはシンクテーブルを作成します。 存在する場合は、テーブルの作成はスキップされます。

  • 作成されたシンクテーブルは、プライマリキーと物理フィールド名と型を含め、ソーステーブルのスキーマを共有しますが、計算列、メタデータフィールド、およびウォーターマーク構成は除外されます。

  • Realtime Compute for Apache Flink は、データ同期の際に、ソーステーブルからシンクテーブルへのデータ型のマッピングを実行します。 データ型のマッピングの詳細については、特定のコネクタドキュメントをご参照ください。

テーブルの同期

説明: MySQL から Hologres に web_sales テーブルを同期します。

前提条件:

  • holo という名前の Hologres カタログが作成されています。

  • mysql という名前の MySQL カタログが作成されています。

サンプルコード:

CTAS ステートメントは、完全データ同期と増分データ同期をサポートするために、多くの場合、ソースカタログとデスティネーションカタログと一緒に使用されます。 ソースカタログは、明示的な DDL なしで、ソーステーブルのスキーマとプロパティを自動的に解析します。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales   -- デフォルトデータベースの web_sales テーブルにデータを同期します。
WITH ('jdbcWriteBatchSize' = '1024')   -- オプションで、シンクテーブルのコネクタオプションを構成します。
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */; -- オプションで、MySQL CDC ソーステーブルに追加オプションを構成します。

テーブルとデータベースのシャードの統合と同期

説明: データを Hologres テーブルに同期する前に、シャード化された MySQL テーブルとデータベースを統合します。

方法: MySQL カタログと正規表現を使用して、同期するデータベースとテーブルと一致します。

データベース名とテーブル名は、2 つの追加フィールドの値としてシンクテーブルに書き込まれます。 シンクテーブルのプライマリキーは、データベース名、テーブル名、および元のプライマリキーで構成され、プライマリキーの一意性が保証されます。

コードと結果:

サンプルコード

結果

テーブルシャードの統合と同期:

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

效果

ソーステーブルのスキーマの変更: user02 テーブルに age という名前の新しい列と新しいレコードを追加します。 ソーステーブルのスキーマが異なる場合でも、user02 テーブルへのデータとスキーマの変更は、リアルタイムでシンクテーブルに同期されます。

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

カスタム計算列の同期

説明: 統合されたテーブルとデータベースのシャードを MySQL から Hologres に同期する際に、カスタム計算列を追加します。

コードと結果:

サンプルコード

結果

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

image

単一ジョブでの複数の CTAS ステートメントの実行

説明: web_sales テーブルと user テーブルのシャードを MySQL から Hologres に単一ジョブで同期します。

方法: STATEMENT SET を使用して、複数の CTAS ステートメントをグループとして実行します。 このアプローチでは、ソース頂点を再利用して複数のテーブルからデータを読み取るため、サーバー ID、データベース接続の数、および全体的な読み取り負荷が軽減されます。

重要

サンプルコード:

USE CATALOG holo;

BEGIN STATEMENT SET;

-- web_sales テーブルからデータを同期します。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- user テーブルのシャードからデータを同期します。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

複数の CTAS ステートメントを使用して、ソースから複数のシンクにデータを同期する

  • シンクテーブルに計算列は追加されません

    USE CATALOG `holo`;
    
    BEGIN STATEMENT SET;
    
    -- MySQL テーブルの user テーブルから Hologres の database1 の user テーブルにデータを同期します。
    CREATE TABLE IF NOT EXISTS `database1`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- MySQL データベースの user テーブルから Hologres の database2 の user テーブルにデータを同期します。
    CREATE TABLE IF NOT EXISTS `database2`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  • 計算列がシンクテーブルに追加されます

    -- ソーステーブル user に基づいて user_with_changed_id という名前の一時テーブルを作成します。 ソーステーブルの id 列に基づいて computed_id 列を定義します。
    CREATE TEMPORARY TABLE `user_with_changed_id` (
      `computed_id` AS `id` + 1000
    ) LIKE `mysql`.`tpcds`.`user`;
    
    -- ソーステーブル user に基づいて user_with_changed_age という名前の一時テーブルを作成します。 ソーステーブルの age 列に基づいて computed_age 列を定義します。
    CREATE TEMPORARY TABLE `user_with_changed_age` (
      `computed_age` AS `age` + 1
    ) LIKE `mysql`.`tpcds`.`user`;
    
    BEGIN STATEMENT SET;
    
    -- MySQL データベースの user テーブルから Hologres の user_with_changed_id テーブルにデータを同期します。 user_with_changed_id テーブルには、ソーステーブルの id 列に基づく計算から取得された ID が含まれています。 取得された ID は computed_id 列にあります。
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
    AS TABLE `user_with_changed_id`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- MySQL データベースの user テーブルから Hologres の user_with_changed_age テーブルにデータを同期します。 user_with_changed_age テーブルには、ソーステーブルの age 列に基づく計算から取得された年齢値が含まれています。 取得された年齢値は computed_age 列にあります。
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
    AS TABLE `user_with_changed_age`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

複数の CTAS ステートメントを使用して新しいテーブルを同期する

シナリオの説明: 同期に複数の CTAS ステートメントを使用するジョブが開始された後、新しいテーブルを複製するために CTAS ステートメントを追加します。

方法: ジョブの新しいテーブル検出を有効にし、ジョブの SQL コードに CTAS ステートメントを追加し、セーブポイントから再起動します。 新しいテーブルがキャプチャされると、データが複製されます。

制限事項:

  • 新しいテーブル検出は、VVR 8.0.1 以降でサポートされています。

  • CDC ソーステーブルからデータを同期する場合、初期モードで開始されたジョブのみが新しいテーブルを検出できます。

  • 新しい CTAS ステートメントを使用して追加されたソーステーブルの構成は、元のソーステーブルの構成と同じである必要があります。 これにより、ソース頂点を再利用できます。

  • CTAS ステートメントを追加する前後のジョブ構成パラメーターは同じである必要があります。 たとえば、起動モードは同じである必要があります。

手順:

  1. [デプロイメント] ページで、ターゲットデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。

  2. ダイアログで、[その他の戦略] セクションを展開し、[セーブポイントで停止] を選択して、[OK] をクリックします。

  3. ジョブの SQL ドラフトで、新しいテーブル検出を有効にし、CTAS ステートメントを追加します。

    1. 次のステートメントを追加して、新しいテーブル検出を有効にします。

      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
    2. CTAS ステートメントを追加します。 ジョブの完全なコードは次のようになります。

      -- 新しいテーブル検出を有効にする
      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
      
      USE CATALOG holo;
      
      BEGIN STATEMENT SET;
      
      -- web_sales テーブルからデータを同期します。
      CREATE TABLE IF NOT EXISTS web_sales
      AS TABLE mysql.tpcds.web_sales
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- user テーブルのシャードからデータを同期します。
      CREATE TABLE IF NOT EXISTS user
      AS TABLE mysql.`wp.*`.`user[0-9]+`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- product テーブルからデータを同期します。
      CREATE TABLE IF NOT EXISTS product
      AS TABLE mysql.tpcds.product
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      END;
    3. [デプロイ] をクリックします。

  4. セーブポイントからジョブをリカバリします。

    1. [デプロイメント] ページで、デプロイメントの名前をクリックします。

    2. デプロイメントの詳細ページで、[状態] タブをクリックします。 次に、[履歴] サブタブをクリックします。

    3. [セーブポイント] リストで、ジョブのキャンセル時に作成されたセーブポイントを見つけます。

    4. [アクション] 列で [詳細] > [このセーブポイントからジョブを開始] を選択します。 詳細については、「デプロイメントの開始」をご参照ください。

Hologres のパーティションテーブルへの同期

シナリオの説明: MySQL から Hologres パーティションテーブルにデータを複製します。

使用上の注意: Hologres テーブルにプライマリキーが定義されている場合、パーティション列はプライマリキーに含まれている必要があります。

サンプルコード:

MySQL テーブルを作成します。

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

パーティション列がプライマリキーの一部であるかどうかによって、適切な方法を選択します。

  • ソースプライマリキーにパーティション列が含まれている場合:

    • CTAS ステートメントを直接使用します。

      Hologres は、パーティション列がプライマリキーに含まれているかどうかを自動的に検証します。

      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
      PARTITIONED BY (product_id)
      AS TABLE `mysql`.`tpcds`.`orders`;
  • ソースプライマリキーにパーティション列が含まれていない場合:

    • CTAS ステートメントでシンクテーブルのプライマリキーを宣言し、プライマリキー定義にパーティション列を含めます。

      この場合、プライマリキーを再定義しないか、パーティション列を含めないと、ジョブは失敗します。

      -- order_id、product_id、および city フィールドを Hologres パーティションテーブルのプライマリキーとして宣言します。
      CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
          CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
      )
      PARTITIONED BY (city)
      AS TABLE `mysql`.`tpcds`.`orders`;

レプリケーション中のデータ型の拡大

シナリオの説明: データ同期の際に、列の精度を VARCHAR(10) から VARCHAR(20) に変更したり、列のデータ型を SMALLINT から INT に変更したりします。

方法:

  • 新しいジョブ: 初回起動時に型正規化モードを有効にします。

  • 既存のジョブ: Hologres シンクテーブルを削除し、状態なしで再起動して型正規化を適用します。

型正規化ルール:

新しいデータ型と元のデータ型が同じデータ型に正規化されている場合、データ型は正常に変更され、ジョブは正常に実行されます。 それ以外の場合、例外が報告されます。 詳細は次のとおりです。

  • TINYINT、SMALLINT、INT、および BIGINT は BIGINT に変換されます。

  • CHAR、VARCHAR、および STRING は STRING に変換されます。

  • FLOAT および DOUBLE は DOUBLE に変換されます。

  • その他のデータ型は、Hologres フィールドと Flink フィールド間のデータ型のマッピングに基づいて変換されます。 詳細については、「データ型マッピング」をご参照ください。

サンプルコード:

CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
WITH (
'connector' = 'hologres', 
'enableTypeNormalization' = 'true' -- 型正規化モードを有効にします。
) AS TABLE `mysql`.`tpcds`.`orders`;

MongoDB から Hologres へのデータの同期

制限事項:

  • サポートは、VVR 8.0.6 以降および MongoDB バージョン 6.0 以降に限定されています。

  • ソーステーブルのコネクタオプションでは、scan.incremental.snapshot.enabledscan.full-changelogtrue に設定する必要があります。

  • MongoDB データベースで preimage 機能と postimage 機能を有効にする必要があります。 詳細については、「ドキュメントのプレイメージ」をご参照ください。

  • 単一ジョブで複数の MongoDB コレクションからデータを同期するには、すべてのテーブルで次のコネクタオプションの構成が同一であることを確認してください。

    • hostsschemeusernamepasswordconnectionOptions などの MongoDB データベース関連のオプション

    • scan.startup.mode

サンプルコード:

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;

FAQ

ランタイムエラー

ジョブパフォーマンス

データ同期

参考資料