すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:YAML デプロイメントでの Hologres コネクタを使用したデータインジェスト (パブリックプレビュー)

最終更新日:Jan 08, 2025

このトピックでは、Hologres コネクタを使用して、データインジェスト用の YAML デプロイメントでデータを同期する方法について説明します。

背景情報

Hologres は、大量のデータのリアルタイムでの書き込み、更新、分析を可能にするエンドツーエンドのリアルタイムデータウェアハウスサービスです。Hologres は PostgreSQL プロトコルと互換性があり、標準 SQL 構文をサポートしています。 Hologres は、ペタバイト規模のデータに対するオンライン分析処理 (OLAP) およびアドホッククエリをサポートし、高並列性、低レイテンシのオンラインデータサービスを提供します。 Hologres は、MaxCompute、Realtime Compute for Apache Flink、および DataWorks とシームレスに統合されており、フルスタックのオンラインおよびオフラインデータウェアハウスソリューションを提供します。次の表は、YAML デプロイメントで使用される Hologres コネクタの機能について説明しています。

項目

説明

テーブルタイプ

データインジェストシンク

実行モード

ストリーミングモードとバッチモード

データ形式

該当なし

メトリック

  • numRecordsOut

  • numRecordsOutPerSecond

説明

メトリックの詳細については、「メトリック」をご参照ください。

API タイプ

YAML API

シンクテーブルでのデータの更新または削除

サポートされています

機能

機能

説明

データベース内のすべてのテーブルのデータの同期

データベース内のすべてのテーブルまたはデータベース内の複数のテーブルから、関連する各シンクテーブルに、フルデータと増分データを同期します。

テーブルスキーマ変更の同期

データベースの同期中に、各ソーステーブルのスキーマ変更 (列の追加、削除、名前変更など) を関連するシンクテーブルにリアルタイムで同期します。

シャーディングされたデータベース内の複数のテーブルのマージと同期

正規表現を使用してデータベース名を指定し、データソースの複数のデータベースシャード内のソーステーブルと照合できます。 ソーステーブルのデータがマージされた後、データは各ソーステーブルに対応する名前を持つダウンストリームシンクテーブルに同期されます。

パーティションテーブルへのデータの書き込み

アップストリームテーブルから Hologres パーティションテーブルにデータを書き込むことができます。

データ型のマッピング

複数のマッピングポリシーを使用して、アップストリームデータ型を Hologres データ型にマッピングします。 Hologres データ型は、Flink Change Data Capture (CDC) データ型よりも広範です。

構文

sink:
  type: hologres
  name: Hologres Sink  // Hologres シンク
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

パラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

type

シンクのタイプ。

String

はい

デフォルト値なし

パラメーターの値は hologres です。

name

シンク名。

String

いいえ

デフォルト値なし

該当なし。

dbname

データベース名。

String

はい

デフォルト値なし

該当なし。

username

データベースへのアクセスに使用するユーザー名。Alibaba Cloud アカウントの AccessKey ID を入力します。

String

はい

デフォルト値なし

Alibaba Cloud アカウントの AccessKey シークレットを取得する方法については、「コンソール操作」トピックの「アカウントの AccessKey ID と AccessKey シークレットに関する情報を表示するにはどうすればよいですか?」セクションをご参照ください。

重要

AccessKey ペアを保護するために、キー管理方式を使用して AccessKey ID を設定することをお勧めします。 詳細については、「変数とキーの管理」をご参照ください。

password

データベースへのアクセスに使用するパスワード。Alibaba Cloud アカウントの AccessKey シークレットを入力します。

String

はい

デフォルト値なし

詳細については、「コンソール操作」をご参照ください。

重要

AccessKey ペアを保護するために、キー管理方式を使用して AccessKey シークレットを設定することをお勧めします。 詳細については、「変数とキーの管理」をご参照ください。

endpoint

Hologres のエンドポイント。

String

はい

デフォルト値なし

詳細については、「Hologres に接続するためのエンドポイント」をご参照ください。

jdbcRetryCount

接続障害が発生した場合に、データの読み取りと書き込みに許可される最大再試行回数。

Integer

いいえ

10

該当なし。

jdbcRetrySleepInitMs

再試行ごとの固定待機時間。

Long

いいえ

1000

単位:ミリ秒。 再試行の実際の待機時間は、次の式を使用して計算されます。jdbcRetrySleepInitMs パラメーターの値 + 現在の再試行までの再試行回数 × jdbcRetrySleepInitMs パラメーターの値

jdbcRetrySleepStepMs

再試行ごとの累積待機時間。

Long

いいえ

5000

単位:ミリ秒。 再試行の実際の待機時間は、次の式を使用して計算されます。jdbcRetrySleepInitMs パラメーターの値 + 現在の再試行までの再試行回数 × jdbcRetrySleepInitMs パラメーターの値

jdbcConnectionMaxIdleMs

Java Database Connectivity (JDBC) 接続がアイドル状態になっている最大時間。

Long

いいえ

60000

単位:ミリ秒。 JDBC 接続がこのパラメーターの値を超える期間アイドル状態のままである場合、接続は閉じられて解放されます。

jdbcMetaCacheTTL

キャッシュに TableSchema 情報を保存する最大時間。

Long

いいえ

60000

単位:ミリ秒。

jdbcMetaAutoRefreshFactor

キャッシュの自動更新をトリガーするための係数。 キャッシュにデータを保存するための残り時間が、キャッシュの自動更新をトリガーするための時間よりも短い場合、Realtime Compute for Apache Flink はキャッシュを自動的に更新します。

Integer

いいえ

4

キャッシュにデータを保存するための残り時間は、次の式を使用して計算されます。キャッシュにデータを保存するための残り時間 = キャッシュにデータを保存できる最大時間 - データがキャッシュされている期間。 キャッシュが自動的に更新された後、データがキャッシュされている期間は 0 から再計算されます。

キャッシュの自動更新をトリガーする時間は、次の式を使用して計算されます。キャッシュの自動更新をトリガーする時間 = jdbcMetaCacheTTL パラメーターの値 / jdbcMetaAutoRefreshFactor パラメーターの値

mutatetype

データ書き込みモード。

String

いいえ

INSERT_OR_UPDATE

Hologres 物理テーブルでプライマリキーが設定されている場合、Hologres ストリーミングシンクはプライマリキーに基づいて厳密に 1 回のセマンティクスを使用します。 同じプライマリキーを持つ複数のレコードがテーブルに書き込まれる場合、mutatetype パラメーターを指定して、シンクテーブルの更新方法を決定する必要があります。 mutatetype パラメーターの有効な値:

  • INSERT_OR_IGNORE: 個別のデータレコードを保持し、重複するデータレコードを無視します。

  • INSERT_OR_REPLACE: 行全体の既存のレコードを、後で到着したレコードに置き換えます。

  • INSERT_OR_UPDATE: 既存のデータの特定の列を更新します。 たとえば、テーブルにフィールド a、b、c、d が含まれているとします。 a フィールドはプライマリキーであり、a フィールドと b フィールドのデータのみが Hologres に書き込まれます。 重複するプライマリキーが存在する場合、システムは b フィールドのデータのみを更新します。 c フィールドと d フィールドのデータは変更されません。

createparttable

パーティション値に基づいてデータを書き込むパーティションテーブルを自動的に作成するかどうかを指定します。

Boolean

いいえ

false

該当なし。

sink.delete-strategy

後戻りメッセージを受信したときに実行される操作。

String

いいえ

デフォルト値なし

incrValue パラメーターの値は、incrMode パラメーターの値によって異なります。

  • IGNORE_DELETE: UPDATE BEFORE メッセージと DELETE メッセージを無視します。 この操作は、データが挿入または更新され、データの削除が不要なシナリオに適用されます。

  • CHANGELOG_STANDARD: Realtime Compute for Apache Flink のフレームワークは、Flink SQL 変更ログの原則に基づいて実行されます。 削除操作は無視されません。 更新操作が実行されると、新しいデータが挿入される前に元のデータが削除されます。 これにより、データの精度が保証されます。 この操作は、部分更新が関係しないシナリオに適用されます

jdbcWriteBatchSize

JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが同時に処理できるデータ行の最大数。

Integer

いいえ

256

単位:行。

説明

次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。

jdbcWriteBatchByteSize

JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが同時に処理できるデータの最大バイト数。

Long

いいえ

2097152 (2 × 1024 × 1024 バイト = 2 MB)

説明

次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。

jdbcWriteFlushInterval

JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが複数の行のデータを Hologres に同時に書き込むのを待機するために必要な最大時間。

Long

いいえ

10000

単位:ミリ秒。

説明

次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。

ignoreNullWhenUpdate

mutatetype='insertOrUpdate' が指定されている場合、データに書き込まれた null 値を無視するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false: null 値は Hologres シンクテーブルに書き込まれます。 これはデフォルト値です。

  • true: データに書き込まれた null 値は無視されます。

jdbcEnableDefaultForNotNullColumn

Hologres テーブルでデフォルト値が設定されていない null 非許容列に null 値が書き込まれた場合に、Hologres コネクタがデフォルト値を入力することを許可するかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true: Hologres コネクタはデフォルト値を入力します。 これはデフォルト値です。 このパラメーターを true に設定すると、システムは次のルールに基づいて null 値をデフォルト値に変換します。

    • 列の型が STRING の場合、列は空のままになります。

    • 列のデータ型が NUMBER の場合、null 値は 0 に変換されます。

    • 列の型が DATE、TIMESTAMP、または TIMESTAMPTZ の場合、null 値は 1970-01-01 00:00:00 に変換されます。

  • false: Hologres コネクタはデフォルト値を入力しません。 Hologres テーブルでデフォルト値が設定されていない null 非許容列に null 値が書き込まれた場合、エラーが返されます。

remove-u0000-in-text.enabled

シンクテーブルに書き込まれる STRING 型のデータに無効な文字 \u0000 が含まれている場合、Hologres コネクタが無効な文字 \u0000 をデータから削除することを許可するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false: Hologres コネクタはデータを処理しません。 シンクテーブルに書き込まれるデータにダーティデータが含まれている場合、"ERROR: invalid byte sequence for encoding "UTF8": 0x00" というエラーメッセージが報告される場合があります。 これはデフォルト値です。

    この場合、事前にソーステーブルからダーティデータを削除するか、SQL ステートメントでダーティデータ処理ロジックを定義する必要があります。

  • true: Hologres コネクタは、STRING 型のデータから無効な文字 \u0000 を削除して、エラーメッセージが報告されないようにします。

deduplication.enabled

jdbc モードまたは jdbc_fixed モードでデータをバッチ書き込みする場合に重複除外を実行するかどうかを指定します。

Boolean

いいえ

true

有効な値:

  • true: バッチ書き込みされるデータに同じプライマリキーを持つデータが含まれている場合、重複除外が実行されます。 最後のデータレコードのみが保持されます。 これはデフォルト値です。 この例では、2 つのフィールドが使用され、最初のフィールドはプライマリキーです。

    • INSERT (1,'a') レコードと INSERT (1,'b') レコードが時系列でシンクテーブルに書き込まれる場合、重複除外が実行された後、後で到着した (1,'b') レコードのみが保持され、Hologres シンクテーブルに書き込まれます。

    • (1,'a') レコードは既に Hologres シンクテーブルに存在します。 この場合、DELETE (1,'a') レコードと INSERT (1,'b') レコードが時系列でシンクテーブルに書き込まれます。 後で到着した (1,'b') レコードのみが保持され、Hologres シンクテーブルに書き込まれます。 この場合、データはテーブルから削除されてから挿入されるのではなく、直接更新されます。

  • false: データをバッチ書き込みする場合、重複除外は実行されません。 挿入されたデータのプライマリキーの値が、バッチ書き込みされるデータのプライマリキーの値と同じである場合、バッチデータが書き込まれ、次に挿入する必要があるデータが書き込まれます。

sink.type-normalize-strategy

データマッピングポリシー。

String

いいえ

STANDARD

Hologres シンクテーブルでアップストリームデータ型を Hologres データ型に変換するために使用されるマッピングポリシー。 有効な値:

  • STANDARD: 変換標準に基づいて、Flink CDC 型のデータを AnalyticDB for PostgreSQL 型のデータに変換します。

  • BROADEN: Flink CDC 型のデータを Hologres 型のデータに変換します。 Hologres データ型は、Flink CDC データ型よりも広範です。

  • ONLY_BIGINT_OR_TEXT: Flink CDC データ型のすべてのデータを、Hologres の BIGINT 型または STRING 型のデータに変換します。

table_property.*

Hologres 物理テーブルのプロパティ。

String

いいえ

デフォルト値なし

Hologres テーブルを作成するときに、WITH 句で物理テーブルプロパティを設定できます。 ビジネス要件に基づいてテーブルプロパティ設定を設定して、データを効率的にソートおよびクエリできます。

警告

デフォルトでは、table_property.distribution_key の値はプライマリキー値です。 このパラメーターを別の値に設定すると、書き込まれたデータが無効になる可能性があります。

データ型のマッピング

sink.type-normalize-strategy パラメーターを使用して、アップストリームデータ型を Hologres データ型に変換するためのポリシーを指定できます。

説明
  • YAML デプロイメントを初めて開始するときは、sink.type-normalize-strategy を指定することをお勧めします。 デプロイメントの開始後に sink.type-normalize-strategy を指定する場合は、ダウンストリームテーブルを削除し、状態なしでデプロイメントを再起動して、ポリシーを有効にする必要があります。

  • サポートされている配列データ型は、INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR、VARCHAR のみです。

  • NUMERIC データ型は、Hologres でプライマリキーのデータ型として使用できません。 プライマリキーのデータ型が NUMERIC データ型にマッピングされている場合、プライマリキーのデータ型は VARCHAR データ型に変換されます。

STANDARD

次の表は、sink.type-normalize-strategy が STANDARD に設定されている場合のデータ型のマッピングについて説明しています。

VARBINARY

Flink CDC データ型

Hologres データ型

CHAR

bpchar

STRING

text

VARCHAR

VARCHAR

varchar(データ長が 10,485,760 バイト以下の場合)

BOOLEAN

bool

BINARY

bytea

bytea

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

日付

日付

TIME_WITHOUT_TIME_ZONE

時間

TIMESTAMP_WITHOUT_TIME_ZONE

タイムスタンプ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

配列

サポートされている配列データ型

MAP

サポートされていません

サポートされていません

BROADEN

sink.type-normalize-strategy を BROADEN に設定すると、Flink CDC 型のデータは Hologres データ型に変換されます。 Hologres データ型は、Flink CDC データ型よりも広範です。 次の表は、データ型のマッピングについて説明しています。

BINARY

Flink CDC データ型

Hologres データ型

CHAR

text

STRING

VARCHAR

VARCHAR

text

BOOLEAN

bool

bytea

VARBINARY

bytea

DECIMAL

int8

SMALLINT

整数

BIGINT

FLOAT

float8

DOUBLE

日付

日付

TIME_WITHOUT_TIME_ZONE

時間

TIMESTAMP_WITHOUT_TIME_ZONE

タイムスタンプ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

配列

サポートされている配列データ型

MAP

サポートされていません

サポートされていません

ONLY_BIGINT_OR_TEXT

sink.type-normalize-strategy を ONLY_BIGINT_OR_TEXT に設定すると、Flink CDC データ型のすべてのデータは、Hologres の BIGINT 型または STRING 型のデータに変換されます。 次の表は、データ型のマッピングについて説明しています。

Flink CDC データ型

Hologres データ型

TINYINT

int8

SMALLINT

INTEGER(整数)

BIGINT

ブール値

テキスト

バイナリ

VARBINARY

DECIMAL

FLOAT(浮動小数点数)

DOUBLE

日付

TIME_WITHOUT_TIME_ZONE

TIMESTAMP WITHOUT TIME ZONE

TIMESTAMP WITH LOCAL TIME ZONE

配列

サポートされている配列データ型

MAP

サポートされていません

サポートされていません

パーティションテーブルへのデータの書き込み

Hologres シンクを使用すると、Hologres パーティションテーブルにデータを書き込むことができます。変換モジュールでシンクを使用して、アップストリームデータを Hologres パーティションテーブルに書き込むことができます。次の点に注意してください。

  • パーティションキーはプライマリキーに含まれている必要があります。アップストリームの非プライマリキーの1つがパーティションキーとして使用されている場合、アップストリームとダウンストリームのプライマリキーが一致しない可能性があります。これにより、データ同期中にアップストリームテーブルとダウンストリームテーブル間でデータの不整合が発生する可能性があります。

  • TEXTVARCHAR、および INT データ型の列をパーティションキーとして使用できます。Hologres V1.3.22 以降では、DATE 型の列もパーティションキーとして使用できます。

  • システムが自動的に子パーティションテーブルを作成できるようにするには、createparttable を true に設定する必要があります。そうでない場合は、子パーティションテーブルを手動で作成する必要があります。

パーティションテーブルにデータを書き込む方法の例については、パーティションテーブルへのデータの書き込み をご参照ください。

テーブルスキーマ変更の同期

YAML 形式の CDC パイプラインデプロイで schema.change.behavior パラメーターを使用して、テーブルスキーマ変更を同期するためのポリシーを指定できます。schema.change.behavior の有効な値:IGNORE、LENIENT、TRY_EVOLVE、EVOLVE、および EXCEPTION。Hologres シンクは TRY_EVOLVE ポリシーをサポートしていません。LENIENT または EVOLVE ポリシーを使用してテーブルスキーマの変更を同期する場合、シンクテーブルのスキーマが変更される可能性があります。次のセクションでは、LENIENT または EVOLVE ポリシーを使用してさまざまなスキーマ変更イベントを処理する方法について説明します。

LENIENT(デフォルト値)

schema.change.behavior を LENIENT に設定すると、テーブルスキーマの変更は、次の操作の説明に基づいて同期されます。

  • NULL 値が許可された列を追加する:ステートメントは、関連する列をシンクテーブルのスキーマの最後に自動的に追加し、追加された列にデータを同期します。

  • NULL 値が許可された列を削除する:ステートメントは、テーブルから列を削除する代わりに、シンクテーブルの NULL 値が許可された列に NULL 値を自動的に設定します。

  • NULL 値が許可されていない列を追加する:ステートメントは、関連する列をシンクテーブルのスキーマの最後に自動的に追加し、新しい列のデータを同期します。新しい列は自動的に NULL 値が許可された列に設定され、列が追加される前のデータは自動的に NULL 値に設定されます。

  • 列の名前を変更する:列の名前を変更する操作には、列の追加と列の削除が含まれます。ソーステーブルで列の名前が変更されると、新しい名前を使用する列がシンクテーブルの最後に追加され、元の名前を使用する列には NULL 値が設定されます。たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列がシンクテーブルの最後に追加され、col_a 列には自動的に NULL 値が設定されます。

  • 列のデータ型を変更する:サポートされていません。Hologres は列のデータ型の変更をサポートしていません。sink.type-normalize-strategy パラメーターを指定する必要があります。

  • 以下のスキーマ変更はサポートされていません。

    • 主キーやインデックスなどの制約の変更

    • NULL 値が許可されていない列の削除

    • NULL 値が許可されていない列から NULL 値が許可された列への変更

EVOLVE

schema.change.behavior を EVOLVE に設定すると、テーブルスキーマの変更は、次の操作の説明に基づいて同期されます。

  • NULL 値が許可された列を追加する:サポートされています。

  • NULL 値が許可された列を削除する:サポートされていません。

  • NULL 値が許可されていない列を追加する:NULL 値が許可された列がシンクテーブルに追加されます。

  • 列の名前を変更する:サポートされています。シンクテーブルで列の名前が変更されます。

  • 列のデータ型を変更する:サポートされていません。Hologres は列のデータ型の変更をサポートしていません。sink.type-normalize-strategy を指定して Hologres を使用する必要があります。

  • 以下のスキーマ変更はサポートされていません。

    • 主キーやインデックスなどの制約の変更

    • NULL 値が許可されていない列の削除

    • NULL 値が許可されていない列から NULL 値が許可された列への変更

警告

EVOLVE 値が使用されているデプロイのシンクテーブルが削除されず、状態なしでデプロイが再起動された場合、アップストリームテーブルとシンクテーブル間のスキーマの不整合により、デプロイが失敗する可能性があります。この場合、シンクテーブルのスキーマを手動で変更する必要があります。

EVOLVE モードを有効にする方法の詳細については、「EVOLVE モードの有効化」をご参照ください。

サンプルコード

Flink CDCデータ型のHologresデータ型への変換

sink.type-normalize-strategy パラメーターを使用して、Flink CDCデータ型をHologresデータ型に変換できます。 Hologresデータ型は、Flink CDCデータ型よりも広範です。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

パーティションテーブルへのデータ書き込み

timestampデータ型のアップストリームフィールド create_time を dateデータ型のフィールドに変換し、新しいフィールドをHologresテーブルのパーティションキーとして使用できます。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

有効化モード

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

データベース内の単一テーブルからのデータの同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

データベース内のすべてのテーブルからのデータの同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

シャーディングデータベース内の複数のテーブルのマージと同期

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

特定のテーブルのスキーマの同期

Hologresテーブルのスキーマは、MySQLテーブルのデータベースに対応しています。 特定のHologresシンクテーブルのスキーマをMySQLテーブルに同期できます。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db2.user\.*r

pipeline:
  name: MySQL to Hologres Pipeline

デプロイメントを再起動せずに新しいテーブルを同期

デプロイメントを再起動せずにリアルタイムで新しいテーブルを同期する場合は、scan.binlog.newly-added-table.enable を true に設定します。サンプルコード:

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

デプロイメントの再起動後に既存のテーブルを同期

デプロイメント内の既存のテーブルを同期する場合は、scan.newly-added-table.enabled を true に設定し、デプロイメントを再起動します。

警告

デプロイメントで scan.binlog.newly-added-table.enabled = true 設定を使用して新しいテーブルを同期する場合、scan.newly-added-table.enabled = true 設定を使用して既存のテーブルを同期してからデプロイメントを再起動することはできません。 前述の方法を使用しようとすると、データが繰り返し送信される可能性があります。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

データベース内のすべてのテーブルからのデータの同期中に特定のテーブルを除外

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

参考資料