すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:YAML デプロイメントでの Hologres コネクタを使用したデータインジェスト (パブリックプレビュー)

最終更新日:Nov 09, 2025

このトピックでは、Hologres コネクタを使用して、データインジェスト用の YAML デプロイメントでデータを同期する方法について説明します。

背景情報

Hologres は、大量のデータをリアルタイムで書き込み、更新、分析できるエンドツーエンドのリアルタイムデータウェアハウスサービスです。Hologres は PostgreSQL プロトコルと互換性があり、標準の SQL 構文をサポートしています。Hologres は、ペタバイト規模のデータに対するオンライン分析処理 (OLAP) とアドホッククエリをサポートし、高同時実行性、低レイテンシーのオンラインデータサービスを提供します。Hologres は、MaxCompute、Realtime Compute for Apache Flink、DataWorks とシームレスに統合され、オンラインとオフラインのデータウェアハウスソリューションをフルスタックで提供します。次の表に、YAML デプロイメントで使用される Hologres コネクタの機能を示します。

項目

説明

サポートされるタイプ

データインジェストシンク

実行モード

ストリーミングモードとバッチモード

データ形式

該当なし

メトリック

  • numRecordsOut

  • numRecordsOutPerSecond

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

YAML

シンクテーブルでのデータの更新または削除

サポートされています

機能

機能

説明

データベース内のすべてのテーブルのデータの同期

データベース内のすべてのテーブルまたはデータベース内の複数のテーブルから、関連する各シンクテーブルに、フルデータと増分データを同期します。

テーブルスキーマ変更の同期

データベースの同期中に、各ソーステーブルのスキーマ変更 (列の追加、削除、名前変更など) を関連するシンクテーブルにリアルタイムで同期します。

シャーディングされたデータベース内の複数のテーブルのマージと同期

正規表現を使用してデータベース名を指定し、データソースの複数のデータベースシャード内のソーステーブルと照合できます。 ソーステーブルのデータがマージされた後、データは各ソーステーブルに対応する名前を持つダウンストリームシンクテーブルに同期されます。

パーティションテーブルへのデータの書き込み

アップストリームテーブルから Hologres パーティションテーブルにデータを書き込むことができます。

データ型のマッピング

複数のマッピングポリシーを使用して、アップストリームデータ型を Hologres データ型にマッピングします。 Hologres データ型は、Flink Change Data Capture (CDC) データ型よりも広範です。

構文

sink:
  type: hologres
  name: Hologres Sink  // Hologres シンク
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

パラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

type

シンクのタイプ。

String

はい

デフォルト値なし

値を hologres に設定します。

name

シンク名。

String

いいえ

デフォルト値なし

該当なし。

dbname

データベース名。

String

はい

デフォルト値なし

該当なし。

username

データベースへのアクセスに使用するユーザー名。Alibaba Cloud アカウントの AccessKey ID を入力します。

String

はい

デフォルト値なし

Alibaba Cloud アカウントの AccessKey シークレットを取得する方法については、「コンソール操作」トピックの「アカウントの AccessKey ID と AccessKey シークレットに関する情報を表示するにはどうすればよいですか?」セクションをご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。

password

データベースへのアクセスに使用するパスワード。Alibaba Cloud アカウントの AccessKey シークレットを入力します。

String

はい

デフォルト値なし

endpoint

Hologres のエンドポイント。

String

はい

デフォルト値なし

詳細については、「Hologres に接続するためのエンドポイント」をご参照ください。

jdbcRetryCount

接続障害が発生した場合に、データの読み取りと書き込みに許可される最大再試行回数。

Integer

いいえ

10

該当なし。

jdbcRetrySleepInitMs

再試行ごとの固定待機時間。

Long

いいえ

1000

単位はミリ秒です。リトライの実際の待機時間は、次の数式を使用して計算されます: jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs

jdbcRetrySleepStepMs

再試行ごとの累積待機時間。

Long

いいえ

5000

単位はミリ秒です。リトライの実際の待機時間は、次の数式を使用して計算されます: jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs

jdbcConnectionMaxIdleMs

Java Database Connectivity (JDBC) 接続がアイドル状態になっている最大時間。

Long

いいえ

60000

単位:ミリ秒。 JDBC 接続がこのパラメーターの値を超える期間アイドル状態のままである場合、接続は閉じられて解放されます。

jdbcMetaCacheTTL

キャッシュに TableSchema 情報を保存する最大時間。

Long

いいえ

60000

単位:ミリ秒。

jdbcMetaAutoRefreshFactor

キャッシュの自動更新をトリガーするための係数。 キャッシュにデータを保存するための残り時間が、キャッシュの自動更新をトリガーするための時間よりも短い場合、Realtime Compute for Apache Flink はキャッシュを自動的に更新します。

Integer

いいえ

4

キャッシュにデータを格納する残り時間は、次の数式を使用して計算されます: キャッシュにデータを格納する残り時間 = キャッシュにデータを格納できる最大時間 - データがキャッシュされた期間。キャッシュが自動的に更新された後、データがキャッシュされた期間は 0 から再計算されます。

キャッシュの自動更新をトリガーする時間は、次の数式を使用して計算されます: キャッシュの自動更新をトリガーする時間 = jdbcMetaCacheTTL パラメーターの値/jdbcMetaAutoRefreshFactor パラメーターの値

mutatetype

データ書き込みモード。

String

いいえ

INSERT_OR_UPDATE

Hologres 物理テーブルでプライマリキーが設定されている場合、Hologres ストリーミングシンクはプライマリキーに基づいて厳密に 1 回のセマンティクスを使用します。 同じプライマリキーを持つ複数のレコードがテーブルに書き込まれる場合、mutatetype パラメーターを指定して、シンクテーブルの更新方法を決定する必要があります。 mutatetype パラメーターの有効な値:

  • INSERT_OR_IGNORE: 個別のデータレコードを保持し、重複するデータレコードを無視します。

  • INSERT_OR_REPLACE: 行全体の既存のレコードを、後で到着したレコードに置き換えます。

  • INSERT_OR_UPDATE: 既存のデータの特定の列を更新します。 たとえば、テーブルにフィールド a、b、c、d が含まれているとします。 a フィールドはプライマリキーであり、a フィールドと b フィールドのデータのみが Hologres に書き込まれます。 重複するプライマリキーが存在する場合、システムは b フィールドのデータのみを更新します。 c フィールドと d フィールドのデータは変更されません。

createparttable

パーティション値に基づいてデータを書き込むパーティションテーブルを自動的に作成するかどうかを指定します。

Boolean

いいえ

false

該当なし。

sink.delete-strategy

後戻りメッセージを受信したときに実行される操作。

String

いいえ

デフォルト値なし

パラメーターは次のとおりです:

  • IGNORE_DELETE: UPDATE BEFORE メッセージと DELETE メッセージを無視します。 この操作は、データが挿入または更新され、データの削除が不要なシナリオに適用されます。

  • DELETE_ROW_ON_PK: Flink フレームワークは、プライマリキーに基づいて削除操作を適用します。更新操作は、最初にデータを削除してから新しいデータを挿入することで実行され、データの正確性を確保します。

jdbcWriteBatchSize

JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが同時に処理できるデータ行の最大数。

Integer

いいえ

256

単位:行。

説明

次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。

jdbcWriteBatchByteSize

JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが同時に処理できるデータの最大バイト数。

Long

いいえ

2097152 (2 × 1024 × 1024 バイト = 2 MB)

説明

次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。

jdbcWriteFlushInterval

JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが複数の行のデータを Hologres に同時に書き込むのを待機するために必要な最大時間。

Long

いいえ

10000

単位:ミリ秒。

説明

次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。

ignoreNullWhenUpdate

mutatetype='insertOrUpdate' が指定されている場合、データに書き込まれた null 値を無視するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false: null 値は Hologres シンクテーブルに書き込まれます。 これはデフォルト値です。

  • true: データに書き込まれた null 値は無視されます。

jdbcEnableDefaultForNotNullColumn

Hologres テーブルでデフォルト値が設定されていない null 非許容列に null 値が書き込まれた場合に、Hologres コネクタがデフォルト値を入力することを許可するかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true: Hologres コネクタはデフォルト値を入力します。 これはデフォルト値です。 このパラメーターを true に設定すると、システムは次のルールに基づいて null 値をデフォルト値に変換します。

    • 列の型が STRING の場合、列は空のままになります。

    • 列のデータ型が NUMBER の場合、null 値は 0 に変換されます。

    • 列の型が DATE、TIMESTAMP、または TIMESTAMPTZ の場合、null 値は 1970-01-01 00:00:00 に変換されます。

  • false: Hologres コネクタはデフォルト値を入力しません。 Hologres テーブルでデフォルト値が設定されていない null 非許容列に null 値が書き込まれた場合、エラーが返されます。

remove-u0000-in-text.enabled

シンクテーブルに書き込まれる STRING 型のデータに無効な文字 \u0000 が含まれている場合、Hologres コネクタが無効な文字 \u0000 をデータから削除することを許可するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false (デフォルト): コネクタはデータに対して操作を行いません。ただし、ダーティデータを書き込むと、次の例外がスローされる可能性があります: ERROR: invalid byte sequence for encoding "UTF8": 0x00

    この場合、事前にソーステーブルからダーティデータを削除するか、SQL ステートメントでダーティデータ処理ロジックを定義する必要があります。

  • true: Hologres コネクタは、STRING 型のデータから無効な文字 \u0000 を削除して、エラーメッセージが報告されないようにします。

deduplication.enabled

jdbc モードまたは jdbc_fixed モードでデータをバッチ書き込みする場合に重複除外を実行するかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true: バッチ書き込みされるデータに同じプライマリキーを持つデータが含まれている場合、重複除外が実行されます。 最後のデータレコードのみが保持されます。 これはデフォルト値です。 この例では、2 つのフィールドが使用され、最初のフィールドはプライマリキーです。

    • レコード INSERT (1,'a')INSERT (1,'b') が順番に到着した場合、重複排除後に後のレコード (1,'b') のみが Hologres 結果テーブルに書き込まれます。

    • Hologres 結果テーブルにはすでにレコード (1,'a') が含まれています。レコード DELETE (1,'a')INSERT (1,'b') が順番に到着した場合、後のレコード (1,'b') のみが Hologres に書き込まれます。これは、削除してから挿入するのではなく、直接更新として表示されます。

  • false: データをバッチ書き込みする場合、重複除外は実行されません。 挿入されたデータのプライマリキーの値が、バッチ書き込みされるデータのプライマリキーの値と同じである場合、バッチデータが書き込まれ、次に挿入する必要があるデータが書き込まれます。

sink.type-normalize-strategy

データマッピングポリシー。

String

いいえ

STANDARD

Hologres シンクテーブルでアップストリームデータ型を Hologres データ型に変換するために使用されるマッピングポリシー。 有効な値:

  • STANDARD: 変換標準に基づいて、Flink CDC 型のデータを AnalyticDB for PostgreSQL 型のデータに変換します。

  • BROADEN: Flink CDC 型のデータを Hologres 型のデータに変換します。 Hologres データ型は、Flink CDC データ型よりも広範です。

  • ONLY_BIGINT_OR_TEXT: Flink CDC データ型のすべてのデータを、Hologres の BIGINT 型または STRING 型のデータに変換します。

table_property.*

Hologres 物理テーブルのプロパティ。

String

いいえ

デフォルト値なし

Hologres テーブルを作成するときに、WITH 句で物理テーブルのプロパティを設定できます。効率的な方法でデータをソートおよびクエリするために、必要に応じてテーブルプロパティ設定を構成できます。

警告

デフォルトでは、table_property.distribution_key の値はプライマリキー値です。 このパラメーターを別の値に設定すると、書き込まれたデータが無効になる可能性があります。

データ型のマッピング

sink.type-normalize-strategy 設定項目を使用して、アップストリームのデータ型を Hologres のデータ型に変換するポリシーを設定できます。

説明
  • YAML デプロイメントを初めて開始するときは、sink.type-normalize-strategy を指定することをお勧めします。 デプロイメントの開始後に sink.type-normalize-strategy を指定する場合は、ダウンストリームテーブルを削除し、状態なしでデプロイメントを再起動して、ポリシーを有効にする必要があります。

  • サポートされている配列データ型は、INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR、VARCHAR のみです。

  • NUMERIC データ型は、Hologres でプライマリキーのデータ型として使用できません。 プライマリキーのデータ型が NUMERIC データ型にマッピングされている場合、プライマリキーのデータ型は VARCHAR データ型に変換されます。

STANDARD

次の表は、sink.type-normalize-strategy が STANDARD に設定されている場合のデータ型のマッピングについて説明しています。

Flink CDC データの型

Hologres データの型

CHAR

bpchar

STRING

text

VARCHAR

text (データ長が 10,485,760 バイトを超える場合)

varchar (データ長が 10,485,760 バイト以下の場合)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

サポートされている配列データの型

MAP

サポートされていません

ROW

サポートされていません

BROADEN

sink.type-normalize-strategy を BROADEN に設定すると、Flink CDC 型のデータは Hologres データ型に変換されます。 Hologres データ型は、Flink CDC データ型よりも広範です。 次の表は、データ型のマッピングについて説明しています。

Flink CDC データ型

Hologres データ型

CHAR

text

STRING

VARCHAR

ブーリアン

bool

バイナリ

bytea

VARBINARY

デシマル

bytea

TINYINT

int8

SMALLINT

整数

BIGINT

FLOAT

float8

DOUBLE

日付

日付

TIME_WITHOUT_TIME_ZONE

時間

TIMESTAMP_WITHOUT_TIME_ZONE

タイムスタンプ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

配列

サポートされている配列データ型

MAP

サポートされていません

サポートされていません

ONLY_BIGINT_OR_TEXT

sink.type-normalize-strategy を ONLY_BIGINT_OR_TEXT に設定すると、すべての Flink CDC データ型が Hologres の BIGINT または TEXT データ型のいずれかに変換されます。次の表に、型のマッピングを示します。

Flink CDC データ型

Hologres データ型

TINYINT

int8

SMALLINT

INTEGER(整数)

BIGINT

ブール値

テキスト

バイナリ

VARBINARY

DECIMAL

FLOAT(浮動小数点数)

DOUBLE

日付

TIME_WITHOUT_TIME_ZONE

TIMESTAMP WITHOUT TIME ZONE

TIMESTAMP WITH LOCAL TIME ZONE

配列

サポートされている配列データ型

MAP

サポートされていません

サポートされていません

パーティションテーブルへのデータの書き込み

Hologres シンクを使用すると、Hologres パーティションテーブルにデータを書き込むことができます。変換モジュールでシンクを使用して、アップストリームデータを Hologres パーティションテーブルに書き込むことができます。次の点に注意してください。

  • パーティションキーはプライマリキーに含まれている必要があります。アップストリームの非プライマリキーの1つがパーティションキーとして使用されている場合、アップストリームとダウンストリームのプライマリキーが一致しない可能性があります。これにより、データ同期中にアップストリームテーブルとダウンストリームテーブル間でデータの不整合が発生する可能性があります。

  • TEXTVARCHAR、および INT データ型の列をパーティションキーとして使用できます。Hologres V1.3.22 以降では、DATE 型の列もパーティションキーとして使用できます。

  • システムが自動的に子パーティションテーブルを作成できるようにするには、createparttable を true に設定する必要があります。そうでない場合は、子パーティションテーブルを手動で作成する必要があります。

パーティションテーブルにデータを書き込む方法の例については、パーティションテーブルへのデータの書き込み をご参照ください。

テーブルスキーマ変更の同期

CDC YAML パイプラインジョブは、スキーマ進化を処理するためにさまざまなポリシーを使用します。これらのポリシーは、パイプラインレベルの設定項目 schema.change.behavior を使用して設定できます。schema.change.behavior の有効な値は、IGNORE、LENIENT、TRY_EVOLVE、EVOLVE、および EXCEPTION です。Hologres シンクは TRY_EVOLVE ポリシーをサポートしていません。LENIENT および EVOLVE ポリシーにはスキーマ進化が含まれます。次のセクションでは、さまざまなスキーマ変更イベントの処理方法について説明します。

LENIENT(デフォルト値)

schema.change.behavior を LENIENT に設定すると、テーブルスキーマの変更は、次の操作の説明に基づいて同期されます。

  • NULL 値が許可された列を追加する:ステートメントは、関連する列をシンクテーブルのスキーマの最後に自動的に追加し、追加された列にデータを同期します。

  • NULL 値が許可された列を削除する:ステートメントは、テーブルから列を削除する代わりに、シンクテーブルの NULL 値が許可された列に NULL 値を自動的に設定します。

  • NULL 値が許可されていない列を追加する:ステートメントは、関連する列をシンクテーブルのスキーマの最後に自動的に追加し、新しい列のデータを同期します。新しい列は自動的に NULL 値が許可された列に設定され、列が追加される前のデータは自動的に NULL 値に設定されます。

  • 列の名前を変更する:列の名前を変更する操作には、列の追加と列の削除が含まれます。ソーステーブルで列の名前が変更されると、新しい名前を使用する列がシンクテーブルの最後に追加され、元の名前を使用する列には NULL 値が設定されます。たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列がシンクテーブルの最後に追加され、col_a 列には自動的に NULL 値が設定されます。

  • 列のデータ型を変更する:サポートされていません。Hologres は列のデータ型の変更をサポートしていません。sink.type-normalize-strategy パラメーターを指定する必要があります。

  • 以下のスキーマ変更はサポートされていません。

    • 主キーやインデックスなどの制約の変更

    • NULL 値が許可されていない列の削除

    • NULL 値が許可されていない列から NULL 値が許可された列への変更

EVOLVE

schema.change.behavior を EVOLVE に設定すると、テーブルスキーマの変更は、次の操作の説明に基づいて同期されます。

  • NULL 値が許可された列を追加する:サポートされています。

  • NULL 値が許可された列を削除する:サポートされていません。

  • NULL 値が許可されていない列を追加する:NULL 値が許可された列がシンクテーブルに追加されます。

  • 列の名前を変更する:サポートされています。シンクテーブルで列の名前が変更されます。

  • 列のデータ型を変更する:サポートされていません。Hologres は列のデータ型の変更をサポートしていません。sink.type-normalize-strategy を指定して Hologres を使用する必要があります。

  • 以下のスキーマ変更はサポートされていません。

    • 主キーやインデックスなどの制約の変更

    • NULL 値が許可されていない列の削除

    • NULL 値が許可されていない列から NULL 値が許可された列への変更

警告

EVOLVE 値が使用されているデプロイのシンクテーブルが削除されず、状態なしでデプロイが再起動された場合、アップストリームテーブルとシンクテーブル間のスキーマの不整合により、デプロイが失敗する可能性があります。この場合、シンクテーブルのスキーマを手動で変更する必要があります。

EVOLVE モードを有効にする方法の詳細については、「EVOLVE モードの有効化」をご参照ください。

サンプルコード

ワイド型マッピング

sink.type-normalize-strategy 設定項目を使用して、広範な型マッピングを設定できます。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

パーティションテーブルへのデータ書き込み

timestampデータ型のアップストリームフィールド create_time を dateデータ型のフィールドに変換し、新しいフィールドをHologresテーブルのパーティションキーとして使用できます。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

有効化モード

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

データベース内の単一テーブルからのデータの同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

データベース内のすべてのテーブルからのデータの同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

シャーディングデータベース内の複数のテーブルのマージと同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

特定のテーブルのスキーマの同期

Hologresテーブルのスキーマは、MySQLテーブルのデータベースに対応しています。 特定のHologresシンクテーブルのスキーマをMySQLテーブルに同期できます。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db2.user\.*r

pipeline:
  name: MySQL to Hologres Pipeline

デプロイメントを再起動せずに新しいテーブルを同期

デプロイメントを再起動せずにリアルタイムで新しいテーブルを同期する場合は、scan.binlog.newly-added-table.enable を true に設定します。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

新規および既存のテーブルを再起動する

デプロイメント内の既存のテーブルを同期する場合は、scan.newly-added-table.enabled を true に設定し、デプロイメントを再起動します。

警告

デプロイメントで scan.binlog.newly-added-table.enabled = true 設定を使用して新しいテーブルを同期する場合、scan.newly-added-table.enabled = true 設定を使用して既存のテーブルを同期してからデプロイメントを再起動することはできません。上記の方法を試みると、データが繰り返し送信される可能性があります。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

データベース内のすべてのテーブルからのデータの同期中に特定のテーブルを除外

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

参考資料