すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Hologres コネクタによるデータインジェスト (YAML)

最終更新日:Feb 07, 2026

このトピックでは、Hologres コネクタを用いて YAML 形式のデータインジェストジョブでデータを同期する方法について説明します。

背景情報

Hologres は、エンドツーエンドのリアルタイムデータウェアハウスエンジンです。大規模なリアルタイムデータインジェスト、更新、および分析をサポートします。標準 SQL を使用し、PostgreSQL プロトコルと互換があります。ペタバイト規模のデータに対する OLAP およびアドホック分析を実行可能です。高同時実行性・低レイテンシのオンラインデータサービスを提供します。MaxCompute、Flink、DataWorks と緊密に統合されており、オンラインおよびオフラインのフルスタックデータウェアハウスソリューションを提供します。以下の表に、Hologres YAML コネクタの機能を示します。

カテゴリ

説明

対応タイプ

データインジェストシンク

実行モード

ストリーミングおよびバッチモード

データフォーマット

非対応

監視メトリック

  • numRecordsOut

  • numRecordsOutPerSecond

説明

メトリックの詳細については、「監視メトリック」をご参照ください。

API タイプ

YAML

結果テーブルへのデータの更新または削除の対応

はい

機能

機能

説明

データベース内のすべてのテーブルを同期

データベース内のすべてのテーブル(または複数のテーブル)から、完全および増分データをそれぞれ対応する結果テーブルに同期します。

テーブルスキーマの変更を同期

データベース内のすべてのテーブルを同期する際に、各ソーステーブルのスキーマ変更(列の追加・削除・名前の変更など)をリアルタイムで対応する結果テーブルに同期します。

シャード化されたテーブルのマージ

正規表現を用いて、複数のシャード化されたデータベースにまたがるソーステーブルをマッチさせます。データをマージした後、対応する名前の下流の結果テーブルに同期します。

パーティションテーブルへの書き込み

上流のテーブルから Hologres のパーティションテーブルにデータを書き込みます。

データ型のマッピング

複数のマッピング戦略を用いて、上流のデータ型をより広範な Hologres のデータ型にマップします。

構文

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

パラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

type

シンクのタイプです。

String

はい

なし

hologres を指定します。

name

シンクの名前です。

String

いいえ

なし

特になし。

dbname

データベース名です。

String

はい

なし

特になし。

username

ユーザー名です。Alibaba Cloud アカウントの AccessKey ID を入力します。

String

はい

なし

詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。

重要

AccessKey ペアの保護のため、AccessKey ID は変数を使用して設定してください。詳細については、「プロジェクト変数」をご参照ください。

password

パスワードです。Alibaba Cloud アカウントの AccessKey Secret を入力します。

String

はい

なし

endpoint

Hologres のエンドポイントです。

String

はい

なし

詳細については、「エンドポイント」をご参照ください。

jdbcRetryCount

接続失敗時に、書き込みおよびクエリ操作を再試行する最大回数です。

Integer

いいえ

10

特になし。

jdbcRetrySleepInitMs

各再試行の前に待機する固定時間です。

Long

いいえ

1000

単位:ミリ秒。実際の待機時間は jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs で計算されます。

jdbcRetrySleepStepMs

各再試行の前に追加される増分待機時間です。

Long

いいえ

5000

単位:ミリ秒。実際の待機時間は jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs で計算されます。

jdbcConnectionMaxIdleMs

JDBC 接続の最大アイドル時間です。

Long

いいえ

60000

単位:ミリ秒。接続がこの値を超えてアイドル状態になると、接続は閉じられ、解放されます。

jdbcMetaCacheTTL

キャッシュされた TableSchema 情報の生存時間 (TTL) です。

Long

いいえ

60000

単位:ミリ秒。

jdbcMetaAutoRefreshFactor

キャッシュの残り時間がトリガータイム未満の場合、システムが自動的にキャッシュをリフレッシュします。

Integer

いいえ

4

キャッシュの残り時間は次の式で計算されます:残り時間 = 生存時間 (TTL) - 経過時間。キャッシュが自動的にリフレッシュされると、経過時間は 0 にリセットされます。

トリガータイム = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor

mutatetype

データ書き込みモードです。

String

いいえ

INSERT_OR_UPDATE

Hologres の物理テーブルにプライマリキーが設定されている場合、Hologres ストリーミングシンクは、プライマリキーに基づく 1 回限りのセマンティクスを使用します。同じプライマリキーを持つ複数のレコードが出現した場合、mutatetype パラメーターを指定して、結果テーブルの更新方法を決定する必要があります。mutatetype パラメーターの有効な値:

  • INSERT_OR_IGNORE:最初のレコードを保持し、後の重複レコードを無視します。

  • INSERT_OR_REPLACE:既存の行全体を新しい行で置き換えます。

  • INSERT_OR_UPDATE:指定された列のみを更新します。たとえば、テーブルにフィールド a、b、c、d があり、フィールド a がプライマリキーである場合、Hologres に書き込まれるのは a と b のみです。プライマリキーが重複した場合、b のみが更新され、c および d は変更されません。

createparttable

パーティション値に基づいて、不足しているパーティションテーブルを自動的に作成するかどうかを指定します。

Boolean

いいえ

false

特になし。

sink.delete-strategy

撤回メッセージの処理方法です。

String

いいえ

なし

有効な値:

  • IGNORE_DELETE:UPDATE BEFORE および DELETE メッセージを無視します。データの挿入または更新のみを行うが、削除は行わないシナリオで使用します。

  • DELETE_ROW_ON_PK:プライマリキーに基づいて削除を適用します。更新は「削除→挿入」の順序で実行され、正確性が保証されます。

jdbcWriteBatchSize

JDBC モードでの書き込み時に、1 バッチあたりの最大行数です。

Integer

いいえ

256

単位:行。

説明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval のいずれか 1 つが条件を満たすと、書き込みがトリガーされます。

jdbcWriteBatchByteSize

JDBC モードでの書き込み時に、1 バッチあたりの最大バイト数です。

Long

いいえ

2097152 (2 × 1024 × 1024 バイト)、または 2 MB

説明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval のいずれか 1 つが条件を満たすと、書き込みがトリガーされます。

jdbcWriteFlushInterval

JDBC モードで、バッチを Hologres にフラッシュするまでの最大待機時間です。

Long

いいえ

10000

単位:ミリ秒。

説明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval のいずれか 1 つが条件を満たすと、書き込みがトリガーされます。

ignoreNullWhenUpdate

mutatetype='insertOrUpdate' の場合、更新書き込み時の null 値を無視するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false(デフォルト):null 値を Hologres シンクテーブルに書き込みます。

  • true:更新書き込み時の null 値を無視します。

jdbcEnableDefaultForNotNullColumn

Hologres テーブルにおいて、NOT NULL 制約のあるカラムにデフォルト値が定義されていない場合、コネクタがデフォルト値を埋め込むかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true(デフォルト):コネクタがデフォルト値を埋めて書き込みます。ルールは以下のとおりです:

    • カラムが STRING の場合、空文字列 ("") を書き込みます。

    • カラムが NUMBER の場合、0 を書き込みます。

    • カラムが DATE、TIMESTAMP、または TIMESTAMPTZ の場合、1970-01-01 00:00:00 を書き込みます。

  • false:デフォルト値を埋めません。NOT NULL カラムに null を書き込むとエラーが発生します。

remove-u0000-in-text.enabled

STRING データを書き込む前に、\u0000 の無効な文字を削除するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false(デフォルト):コネクタはデータに対して何も処理を行いません。ただし、不正なデータに遭遇した場合、書き込み操作で次の例外が発生する可能性があります:ERROR: invalid byte sequence for encoding "UTF8": 0x00

    この場合、事前にソーステーブルから不正なデータを削除するか、SQL 文で不正なデータの処理ロジックを定義する必要があります。

  • true:コネクタが STRING データから \u0000 を削除してエラーを防止します。

deduplication.enabled

JDBC または jdbc_fixed モードでのバッチ書き込み時に重複排除を実行するかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true(デフォルト):バッチ内で同一のプライマリキーを持つ行を重複排除します。最後に到着した行のみを保持します。例:2 つのフィールドがあり、最初のフィールドが PK です。

    • INSERT (1,'a')INSERT (1,'b') が順に到着した場合、重複排除後は (1,'b') のみが Hologres シンクテーブルに書き込まれます。

    • (1,'a') がシンクテーブルに存在しており、その後 DELETE (1,'a')INSERT (1,'b') が順に到着した場合、(1,'b') のみが書き込まれます。これは直接更新(削除→挿入ではなく)として動作します。

  • false:バッチ処理中に重複排除は行いません。新規データが現在のバッチデータと同じ PK を持つ場合、まずバッチを書き込み、その後新規データを書き込みます。

sink.type-normalize-strategy

データマッピング戦略です。

String

いいえ

STANDARD

Hologres シンクが上流のデータ型を Hologres の型に変換する際に使用する戦略です。

  • STANDARD:Flink CDC の型を標準に従って PostgreSQL の型にマップします。

  • BROADEN:Flink CDC の型をより広範な Hologres の型にマップします。

  • ONLY_BIGINT_OR_TEXT:Flink CDC のすべての型を Hologres の BIGINT または STRING にマップします。

table_property.*

Hologres の物理テーブルプロパティです。

String

いいえ

なし

Hologres テーブルを作成する際、WITH 句で物理テーブルプロパティを設定できます。適切な設定により、データの整理およびクエリの効率化が図れます。

警告

table_property.distribution_key のデフォルト値はプライマリキーです。書き込みの正確性に影響するため、必要でない限り変更しないでください。

connection.ssl.mode

転送中暗号化を有効にするかどうか、および使用するモードを指定します。

String

いいえ

disable

  • disable(デフォルト):転送中の暗号化を無効にします。

  • require:SSL を有効にし、データリンクのみを暗号化します。

  • verify-ca:SSL を有効にし、データリンクを暗号化するとともに、CA 証明書を用いて Hologres サーバーの ID を検証します。

  • verify-full:SSL を有効にし、データリンクを暗号化するとともに、CA 証明書を用いて Hologres サーバーの ID を検証し、証明書内の CN または DNS が設定された Hologres エンドポイントと一致することを確認します。

説明
  • Hologres バージョン 2.1 以降で verify-ca および verify-full がサポートされています。「転送中暗号化」をご参照ください。

  • verify-ca または verify-full を使用する場合、connection.ssl.root-cert.location も設定する必要があります。

connection.ssl.root-cert.location

転送中暗号化で証明書が必要な場合の証明書ファイルのパスです。

String

いいえ

なし

connection.ssl.mode を verify-ca または verify-full に設定する場合に必須です。Realtime Compute コンソールの「ファイル管理」機能を用いて CA 証明書をアップロードします。アップロードされたファイルは /flink/usrlib に配置されます。たとえば、CA 証明書のファイル名が certificate.crt の場合、このパラメーターには '/flink/usrlib/certificate.crt' を指定します。

説明

CA 証明書の取得手順については、「転送中暗号化 — CA 証明書のダウンロード」をご参照ください。

データ型のマッピング

sink.type-normalize-strategy パラメーターを用いて、上流のデータ型を Hologres のデータ型にマップする方法を定義します。

説明
  • YAML ジョブを初めて起動する際に sink.type-normalize-strategy を有効化してください。起動後に有効化する場合は、下流のテーブルを削除し、ジョブをステートレスで再起動することで設定を有効化できます。

  • サポートされる配列型には、INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR、VARCHAR があります。

  • Hologres は NUMERIC をプライマリキーとしてサポートしていません。プライマリキーが NUMERIC にマップされる場合、Hologres はこれを VARCHAR に変換します。

STANDARD

sink.type-normalize-strategy が STANDARD の場合、マッピングは以下のとおりです:

Flink CDC 型

Hologres 型

CHAR

bpchar

STRING

text

VARCHAR

text(長さ > 10,485,760 バイトの場合)

varchar(長さ ≤ 10,485,760 バイトの場合)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

さまざまな型の配列

MAP

非対応

ROW

非対応

BROADEN

sink.type-normalize-strategy が BROADEN の場合、Flink CDC の型はより広範な Hologres の型にマップされます。マッピングは以下のとおりです:

Flink CDC 型

Hologres 型

CHAR

text

STRING

VARCHAR

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int8

SMALLINT

INTEGER

BIGINT

FLOAT

float8

DOUBLE

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

さまざまな型の配列

MAP

非対応

ROW

非対応

ONLY_BIGINT_OR_TEXT

sink.type-normalize-strategy が ONLY_BIGINT_OR_TEXT の場合、すべての Flink CDC 型が Hologres の BIGINT または STRING にマップされます。マッピングは以下のとおりです:

Flink CDC 型

Hologres 型

TINYINT

int8

SMALLINT

INTEGER

BIGINT

BOOLEAN

text

BINARY

VARBINARY

DECIMAL

FLOAT

DOUBLE

DATE

TIME_WITHOUT_TIME_ZONE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_WITH_LOCAL_TIME_ZONE

ARRAY

さまざまな型の配列

MAP

非対応

ROW

非対応

パーティションテーブルへの書き込み

Hologres シンクは、パーティションテーブルへの書き込みをサポートしています。変換モジュールと組み合わせることで、上流のデータを Hologres のパーティションテーブルに書き込むことができます。以下の点にご注意ください:

  • パーティションキーは、プライマリキーの一部である必要があります。上流の非プライマリキーをパーティションキーとして使用すると、上流と下流の間でプライマリキーが不整合になる可能性があります。その結果、同期時にデータの不整合が発生します。

  • TEXTVARCHAR、および INT データの型のカラムをパーティションキーとして使用できます。 Hologres V1.3.22 以降では、DATE データの型のカラムもパーティションキーとして使用できます。

  • 子パーティションテーブルを自動的に作成するには、createparttable を true に設定します。それ以外の場合は、手動で作成してください。

例については、「パーティションテーブルへの書き込み」をご参照ください。

テーブルスキーマの変更を同期

CDC YAML パイプラインジョブでは、スキーマ進化を処理するための異なるポリシーが使用されます。これらのポリシーは、パイプラインレベルの設定項目 schema.change.behavior で指定されます。schema.change.behavior の有効な値は、IGNORE、LENIENT、TRY_EVOLVE、EVOLVE、EXCEPTION です。Hologres シンクは TRY_EVOLVE ポリシーをサポートしていません。LENIENT および EVOLVE ポリシーは、スキーマ進化を含むものです。以下では、さまざまなスキーマ変更イベントの処理方法について説明します。

LENIENT(デフォルト)

LENIENT モードでは、スキーマ変更は以下のとおりに処理されます:

  • NULL 許容カラムの追加:結果テーブルのスキーマ末尾にカラムを追加し、データを同期します。

  • NULL 許容カラムの削除:カラムを結果テーブルから削除せず、代わりに null で埋めます。

  • NOT NULL カラムの追加:結果テーブルのスキーマ末尾にカラムを追加し、データを同期します。新しいカラムはデフォルトで NULL 許容となります。カラムが追加される前のデータは、デフォルトで null になります。

  • カラム名の変更:追加+削除として扱われます。結果テーブルのスキーマ末尾に名前変更されたカラムを追加し、元のカラムを null で埋めます。たとえば、col_a が col_b に変更された場合、col_b が追加され、col_a は null で埋められます。

  • カラム型の変更:非対応です。Hologres ではカラム型の変更は許可されていません。代わりに sink.type-normalize-strategy を使用してください。

  • 以下のスキーマ変更は非対応です:

    • プライマリキーまたはインデックスなどの制約の変更。

    • NOT NULL カラムの削除。

    • NOT NULL から NULL 許容への変更。

EVOLVE

EVOLVE モードでは、スキーマ変更は以下のとおりに処理されます:

  • NULL 許容カラムの追加:対応。

  • NULL 許容カラムの削除:非対応。

  • NOT NULL カラムは、結果テーブルに NULL 許容カラムとして追加されます。

  • カラム名の変更:対応。結果テーブルのカラム名を変更します。

  • カラム型の変更:非対応です。Hologres ではカラム型の変更は許可されていません。代わりに sink.type-normalize-strategy を使用してください。

  • 以下のスキーマ変更は非対応です:

    • プライマリキーまたはインデックスなどの制約の変更。

    • NOT NULL カラムの削除。

    • NOT NULL から NULL 許容への変更。

警告

EVOLVE モードでは、結果テーブルを削除せずにジョブをステートレスで再起動すると、上流と結果テーブルの間でスキーマが不整合になり、ジョブが失敗する可能性があります。結果テーブルのスキーマを手動で調整してください。

EVOLVE モードを有効化する例については、「EVOLVE モードの有効化」をご参照ください。

コード例

広範な型マッピング

sink.type-normalize-strategy パラメーターを用いて、データ型のマッピングを広範なものにします。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL から Hologres へのパイプライン

パーティションテーブルへの書き込み

上流のタイムスタンプフィールド create_time を日付型に変換し、Hologres テーブルのパーティションキーとして使用します。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # 不足しているパーティションテーブルを自動的に作成します。
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: パーティションキーの追加 

pipeline:
  name: MySQL から Hologres へのパイプライン

EVOLVE モードの有効化

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # 不足しているパーティションテーブルを自動的に作成します。
  createparttable: true

pipeline:
  name: MySQL から Hologres へのパイプライン
  schema.change.behavior: evolve

単一テーブルの同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL から Hologres へのパイプライン

データベース内のすべてのテーブルを同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL から Hologres へのパイプライン

シャード化されたテーブルのマージ

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。  
  sink.type-normalize-strategy: BROADEN
  
route:
  # MySQL test_db 内のすべてのシャード化されたテーブルを、1 つの Hologres テーブル test_db.user にマージします。
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL から Hologres へのパイプライン

特定のスキーマへの同期

Hologres のスキーマは、MySQL のデータベースに対応します。結果テーブルのスキーマを指定できます。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。
  sink.type-normalize-strategy: BROADEN
  
route:
  # MySQL test_db のすべてのテーブルを、テーブル名を変更せずに Hologres test_db2 スキーマに同期します。
  - source-table: test_db.\.*
    sink-table: test_db2.<>
    replace-symbol: <>

pipeline:
  name: MySQL から Hologres へのパイプライン

実行中のジョブで新規テーブルを同期

ジョブ実行中に新しく作成されたテーブルをリアルタイムで同期するには、scan.binlog.newly-added-table.enabled を true に設定します。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  # ジョブ実行中に作成された新規テーブルからのデータをキャプチャします。
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL から Hologres へのパイプライン

新しく追加された既存テーブルの再起動

既存のテーブルの同期を追加する場合は、scan.newly-added-table.enabled = true を設定してジョブを再起動します。

警告

すでに scan.binlog.newly-added-table.enabled = true を使用して新規テーブルのキャプチャを行っている場合、再起動後に scan.newly-added-table.enabled = true を再度使用して既存テーブルをキャプチャしないでください。これにより、重複データが発生する可能性があります。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.startup.mode: initial
  # 再起動時に、tables パラメーターで新規テーブルをチェックし、スナップショットを実行します。
  # scan.startup.mode: initial が必要です。
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL から Hologres へのパイプライン

全データベース同期時のテーブル除外

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  # この正規表現に一致するテーブルは同期されません。
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # CDC データ型をより広範な Hologres の型にマップします。
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL から Hologres へのパイプライン

参考情報