このトピックでは、Hologres コネクタを用いて YAML 形式のデータインジェストジョブでデータを同期する方法について説明します。
背景情報
Hologres は、エンドツーエンドのリアルタイムデータウェアハウスエンジンです。大規模なリアルタイムデータインジェスト、更新、および分析をサポートします。標準 SQL を使用し、PostgreSQL プロトコルと互換があります。ペタバイト規模のデータに対する OLAP およびアドホック分析を実行可能です。高同時実行性・低レイテンシのオンラインデータサービスを提供します。MaxCompute、Flink、DataWorks と緊密に統合されており、オンラインおよびオフラインのフルスタックデータウェアハウスソリューションを提供します。以下の表に、Hologres YAML コネクタの機能を示します。
カテゴリ | 説明 |
対応タイプ | データインジェストシンク |
実行モード | ストリーミングおよびバッチモード |
データフォーマット | 非対応 |
監視メトリック |
説明 メトリックの詳細については、「監視メトリック」をご参照ください。 |
API タイプ | YAML |
結果テーブルへのデータの更新または削除の対応 | はい |
機能
機能 | 説明 |
データベース内のすべてのテーブル(または複数のテーブル)から、完全および増分データをそれぞれ対応する結果テーブルに同期します。 | |
データベース内のすべてのテーブルを同期する際に、各ソーステーブルのスキーマ変更(列の追加・削除・名前の変更など)をリアルタイムで対応する結果テーブルに同期します。 | |
正規表現を用いて、複数のシャード化されたデータベースにまたがるソーステーブルをマッチさせます。データをマージした後、対応する名前の下流の結果テーブルに同期します。 | |
上流のテーブルから Hologres のパーティションテーブルにデータを書き込みます。 | |
複数のマッピング戦略を用いて、上流のデータ型をより広範な Hologres のデータ型にマップします。 |
構文
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}パラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
type | シンクのタイプです。 | String | はい | なし |
|
name | シンクの名前です。 | String | いいえ | なし | 特になし。 |
dbname | データベース名です。 | String | はい | なし | 特になし。 |
username | ユーザー名です。Alibaba Cloud アカウントの AccessKey ID を入力します。 | String | はい | なし | 詳細については、「AccessKey ID および AccessKey Secret の確認方法」をご参照ください。 重要 AccessKey ペアの保護のため、AccessKey ID は変数を使用して設定してください。詳細については、「プロジェクト変数」をご参照ください。 |
password | パスワードです。Alibaba Cloud アカウントの AccessKey Secret を入力します。 | String | はい | なし | |
endpoint | Hologres のエンドポイントです。 | String | はい | なし | 詳細については、「エンドポイント」をご参照ください。 |
jdbcRetryCount | 接続失敗時に、書き込みおよびクエリ操作を再試行する最大回数です。 | Integer | いいえ | 10 | 特になし。 |
jdbcRetrySleepInitMs | 各再試行の前に待機する固定時間です。 | Long | いいえ | 1000 | 単位:ミリ秒。実際の待機時間は |
jdbcRetrySleepStepMs | 各再試行の前に追加される増分待機時間です。 | Long | いいえ | 5000 | 単位:ミリ秒。実際の待機時間は |
jdbcConnectionMaxIdleMs | JDBC 接続の最大アイドル時間です。 | Long | いいえ | 60000 | 単位:ミリ秒。接続がこの値を超えてアイドル状態になると、接続は閉じられ、解放されます。 |
jdbcMetaCacheTTL | キャッシュされた TableSchema 情報の生存時間 (TTL) です。 | Long | いいえ | 60000 | 単位:ミリ秒。 |
jdbcMetaAutoRefreshFactor | キャッシュの残り時間がトリガータイム未満の場合、システムが自動的にキャッシュをリフレッシュします。 | Integer | いいえ | 4 | キャッシュの残り時間は次の式で計算されます:残り時間 = 生存時間 (TTL) - 経過時間。キャッシュが自動的にリフレッシュされると、経過時間は 0 にリセットされます。 トリガータイム = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor。 |
mutatetype | データ書き込みモードです。 | String | いいえ | INSERT_OR_UPDATE | Hologres の物理テーブルにプライマリキーが設定されている場合、Hologres ストリーミングシンクは、プライマリキーに基づく 1 回限りのセマンティクスを使用します。同じプライマリキーを持つ複数のレコードが出現した場合、mutatetype パラメーターを指定して、結果テーブルの更新方法を決定する必要があります。mutatetype パラメーターの有効な値:
|
createparttable | パーティション値に基づいて、不足しているパーティションテーブルを自動的に作成するかどうかを指定します。 | Boolean | いいえ | false | 特になし。 |
sink.delete-strategy | 撤回メッセージの処理方法です。 | String | いいえ | なし | 有効な値:
|
jdbcWriteBatchSize | JDBC モードでの書き込み時に、1 バッチあたりの最大行数です。 | Integer | いいえ | 256 | 単位:行。 説明 jdbcWriteBatchSize、jdbcWriteBatchByteSize、jdbcWriteFlushInterval のいずれか 1 つが条件を満たすと、書き込みがトリガーされます。 |
jdbcWriteBatchByteSize | JDBC モードでの書き込み時に、1 バッチあたりの最大バイト数です。 | Long | いいえ | 2097152 (2 × 1024 × 1024 バイト)、または 2 MB | 説明 jdbcWriteBatchSize、jdbcWriteBatchByteSize、jdbcWriteFlushInterval のいずれか 1 つが条件を満たすと、書き込みがトリガーされます。 |
jdbcWriteFlushInterval | JDBC モードで、バッチを Hologres にフラッシュするまでの最大待機時間です。 | Long | いいえ | 10000 | 単位:ミリ秒。 説明 jdbcWriteBatchSize、jdbcWriteBatchByteSize、jdbcWriteFlushInterval のいずれか 1 つが条件を満たすと、書き込みがトリガーされます。 |
ignoreNullWhenUpdate | mutatetype='insertOrUpdate' の場合、更新書き込み時の null 値を無視するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
jdbcEnableDefaultForNotNullColumn | Hologres テーブルにおいて、NOT NULL 制約のあるカラムにデフォルト値が定義されていない場合、コネクタがデフォルト値を埋め込むかどうかを指定します。 | Boolean | いいえ | true | 有効な値:
|
remove-u0000-in-text.enabled | STRING データを書き込む前に、\u0000 の無効な文字を削除するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
deduplication.enabled | JDBC または jdbc_fixed モードでのバッチ書き込み時に重複排除を実行するかどうかを指定します。 | Boolean | いいえ | true | 有効な値:
|
sink.type-normalize-strategy | データマッピング戦略です。 | String | いいえ | STANDARD | Hologres シンクが上流のデータ型を Hologres の型に変換する際に使用する戦略です。
|
table_property.* | Hologres の物理テーブルプロパティです。 | String | いいえ | なし | Hologres テーブルを作成する際、WITH 句で物理テーブルプロパティを設定できます。適切な設定により、データの整理およびクエリの効率化が図れます。 警告 table_property.distribution_key のデフォルト値はプライマリキーです。書き込みの正確性に影響するため、必要でない限り変更しないでください。 |
connection.ssl.mode | 転送中暗号化を有効にするかどうか、および使用するモードを指定します。 | String | いいえ | disable |
説明
|
connection.ssl.root-cert.location | 転送中暗号化で証明書が必要な場合の証明書ファイルのパスです。 | String | いいえ | なし | connection.ssl.mode を verify-ca または verify-full に設定する場合に必須です。Realtime Compute コンソールの「ファイル管理」機能を用いて CA 証明書をアップロードします。アップロードされたファイルは /flink/usrlib に配置されます。たとえば、CA 証明書のファイル名が certificate.crt の場合、このパラメーターには 説明 CA 証明書の取得手順については、「転送中暗号化 — CA 証明書のダウンロード」をご参照ください。 |
データ型のマッピング
sink.type-normalize-strategy パラメーターを用いて、上流のデータ型を Hologres のデータ型にマップする方法を定義します。
YAML ジョブを初めて起動する際に sink.type-normalize-strategy を有効化してください。起動後に有効化する場合は、下流のテーブルを削除し、ジョブをステートレスで再起動することで設定を有効化できます。
サポートされる配列型には、INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR、VARCHAR があります。
Hologres は NUMERIC をプライマリキーとしてサポートしていません。プライマリキーが NUMERIC にマップされる場合、Hologres はこれを VARCHAR に変換します。
STANDARD
sink.type-normalize-strategy が STANDARD の場合、マッピングは以下のとおりです:
Flink CDC 型 | Hologres 型 |
CHAR | bpchar |
STRING | text |
VARCHAR | text(長さ > 10,485,760 バイトの場合) |
varchar(長さ ≤ 10,485,760 バイトの場合) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | さまざまな型の配列 |
MAP | 非対応 |
ROW | 非対応 |
BROADEN
sink.type-normalize-strategy が BROADEN の場合、Flink CDC の型はより広範な Hologres の型にマップされます。マッピングは以下のとおりです:
Flink CDC 型 | Hologres 型 |
CHAR | text |
STRING | |
VARCHAR | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | さまざまな型の配列 |
MAP | 非対応 |
ROW | 非対応 |
ONLY_BIGINT_OR_TEXT
sink.type-normalize-strategy が ONLY_BIGINT_OR_TEXT の場合、すべての Flink CDC 型が Hologres の BIGINT または STRING にマップされます。マッピングは以下のとおりです:
Flink CDC 型 | Hologres 型 |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
BOOLEAN | text |
BINARY | |
VARBINARY | |
DECIMAL | |
FLOAT | |
DOUBLE | |
DATE | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
ARRAY | さまざまな型の配列 |
MAP | 非対応 |
ROW | 非対応 |
パーティションテーブルへの書き込み
Hologres シンクは、パーティションテーブルへの書き込みをサポートしています。変換モジュールと組み合わせることで、上流のデータを Hologres のパーティションテーブルに書き込むことができます。以下の点にご注意ください:
パーティションキーは、プライマリキーの一部である必要があります。上流の非プライマリキーをパーティションキーとして使用すると、上流と下流の間でプライマリキーが不整合になる可能性があります。その結果、同期時にデータの不整合が発生します。
TEXT、VARCHAR、および INT データの型のカラムをパーティションキーとして使用できます。 Hologres V1.3.22 以降では、DATE データの型のカラムもパーティションキーとして使用できます。
子パーティションテーブルを自動的に作成するには、createparttable を true に設定します。それ以外の場合は、手動で作成してください。
例については、「パーティションテーブルへの書き込み」をご参照ください。
テーブルスキーマの変更を同期
CDC YAML パイプラインジョブでは、スキーマ進化を処理するための異なるポリシーが使用されます。これらのポリシーは、パイプラインレベルの設定項目 schema.change.behavior で指定されます。schema.change.behavior の有効な値は、IGNORE、LENIENT、TRY_EVOLVE、EVOLVE、EXCEPTION です。Hologres シンクは TRY_EVOLVE ポリシーをサポートしていません。LENIENT および EVOLVE ポリシーは、スキーマ進化を含むものです。以下では、さまざまなスキーマ変更イベントの処理方法について説明します。
LENIENT(デフォルト)
LENIENT モードでは、スキーマ変更は以下のとおりに処理されます:
NULL 許容カラムの追加:結果テーブルのスキーマ末尾にカラムを追加し、データを同期します。
NULL 許容カラムの削除:カラムを結果テーブルから削除せず、代わりに null で埋めます。
NOT NULL カラムの追加:結果テーブルのスキーマ末尾にカラムを追加し、データを同期します。新しいカラムはデフォルトで NULL 許容となります。カラムが追加される前のデータは、デフォルトで null になります。
カラム名の変更:追加+削除として扱われます。結果テーブルのスキーマ末尾に名前変更されたカラムを追加し、元のカラムを null で埋めます。たとえば、col_a が col_b に変更された場合、col_b が追加され、col_a は null で埋められます。
カラム型の変更:非対応です。Hologres ではカラム型の変更は許可されていません。代わりに sink.type-normalize-strategy を使用してください。
以下のスキーマ変更は非対応です:
プライマリキーまたはインデックスなどの制約の変更。
NOT NULL カラムの削除。
NOT NULL から NULL 許容への変更。
EVOLVE
EVOLVE モードでは、スキーマ変更は以下のとおりに処理されます:
NULL 許容カラムの追加:対応。
NULL 許容カラムの削除:非対応。
NOT NULL カラムは、結果テーブルに NULL 許容カラムとして追加されます。
カラム名の変更:対応。結果テーブルのカラム名を変更します。
カラム型の変更:非対応です。Hologres ではカラム型の変更は許可されていません。代わりに sink.type-normalize-strategy を使用してください。
以下のスキーマ変更は非対応です:
プライマリキーまたはインデックスなどの制約の変更。
NOT NULL カラムの削除。
NOT NULL から NULL 許容への変更。
EVOLVE モードでは、結果テーブルを削除せずにジョブをステートレスで再起動すると、上流と結果テーブルの間でスキーマが不整合になり、ジョブが失敗する可能性があります。結果テーブルのスキーマを手動で調整してください。
EVOLVE モードを有効化する例については、「EVOLVE モードの有効化」をご参照ください。
コード例
広範な型マッピング
sink.type-normalize-strategy パラメーターを用いて、データ型のマッピングを広範なものにします。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL から Hologres へのパイプラインパーティションテーブルへの書き込み
上流のタイムスタンプフィールド create_time を日付型に変換し、Hologres テーブルのパーティションキーとして使用します。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# 不足しているパーティションテーブルを自動的に作成します。
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: パーティションキーの追加
pipeline:
name: MySQL から Hologres へのパイプラインEVOLVE モードの有効化
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# 不足しているパーティションテーブルを自動的に作成します。
createparttable: true
pipeline:
name: MySQL から Hologres へのパイプライン
schema.change.behavior: evolve単一テーブルの同期
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL から Hologres へのパイプラインデータベース内のすべてのテーブルを同期
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL から Hologres へのパイプラインシャード化されたテーブルのマージ
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
route:
# MySQL test_db 内のすべてのシャード化されたテーブルを、1 つの Hologres テーブル test_db.user にマージします。
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL から Hologres へのパイプライン特定のスキーマへの同期
Hologres のスキーマは、MySQL のデータベースに対応します。結果テーブルのスキーマを指定できます。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
route:
# MySQL test_db のすべてのテーブルを、テーブル名を変更せずに Hologres test_db2 スキーマに同期します。
- source-table: test_db.\.*
sink-table: test_db2.<>
replace-symbol: <>
pipeline:
name: MySQL から Hologres へのパイプライン実行中のジョブで新規テーブルを同期
ジョブ実行中に新しく作成されたテーブルをリアルタイムで同期するには、scan.binlog.newly-added-table.enabled を true に設定します。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
# ジョブ実行中に作成された新規テーブルからのデータをキャプチャします。
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL から Hologres へのパイプライン新しく追加された既存テーブルの再起動
既存のテーブルの同期を追加する場合は、scan.newly-added-table.enabled = true を設定してジョブを再起動します。
すでに scan.binlog.newly-added-table.enabled = true を使用して新規テーブルのキャプチャを行っている場合、再起動後に scan.newly-added-table.enabled = true を再度使用して既存テーブルをキャプチャしないでください。これにより、重複データが発生する可能性があります。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.startup.mode: initial
# 再起動時に、tables パラメーターで新規テーブルをチェックし、スナップショットを実行します。
# scan.startup.mode: initial が必要です。
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL から Hologres へのパイプライン全データベース同期時のテーブル除外
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
# この正規表現に一致するテーブルは同期されません。
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# CDC データ型をより広範な Hologres の型にマップします。
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL から Hologres へのパイプライン参考情報
ソース、シンク、変換、ルートに関する開発リファレンスについては、「Flink CDC データインジェスト開発リファレンス」をご参照ください。
YAML データインジェストジョブの開発手順については、「YAML データインジェストジョブの開発(パブリックプレビュー)」をご参照ください。