このトピックでは、Hologres コネクタを使用して、データインジェスト用の YAML デプロイメントでデータを同期する方法について説明します。
背景情報
Hologres は、大量のデータをリアルタイムで書き込み、更新、分析できるエンドツーエンドのリアルタイムデータウェアハウスサービスです。Hologres は PostgreSQL プロトコルと互換性があり、標準の SQL 構文をサポートしています。Hologres は、ペタバイト規模のデータに対するオンライン分析処理 (OLAP) とアドホッククエリをサポートし、高同時実行性、低レイテンシーのオンラインデータサービスを提供します。Hologres は、MaxCompute、Realtime Compute for Apache Flink、DataWorks とシームレスに統合され、オンラインとオフラインのデータウェアハウスソリューションをフルスタックで提供します。次の表に、YAML デプロイメントで使用される Hologres コネクタの機能を示します。
項目 | 説明 |
サポートされるタイプ | データインジェストシンク |
実行モード | ストリーミングモードとバッチモード |
データ形式 | 該当なし |
メトリック |
説明 メトリックの詳細については、「メトリック」をご参照ください。 |
API タイプ | YAML |
シンクテーブルでのデータの更新または削除 | サポートされています |
機能
機能 | 説明 |
データベース内のすべてのテーブルまたはデータベース内の複数のテーブルから、関連する各シンクテーブルに、フルデータと増分データを同期します。 | |
データベースの同期中に、各ソーステーブルのスキーマ変更 (列の追加、削除、名前変更など) を関連するシンクテーブルにリアルタイムで同期します。 | |
正規表現を使用してデータベース名を指定し、データソースの複数のデータベースシャード内のソーステーブルと照合できます。 ソーステーブルのデータがマージされた後、データは各ソーステーブルに対応する名前を持つダウンストリームシンクテーブルに同期されます。 | |
アップストリームテーブルから Hologres パーティションテーブルにデータを書き込むことができます。 | |
複数のマッピングポリシーを使用して、アップストリームデータ型を Hologres データ型にマッピングします。 Hologres データ型は、Flink Change Data Capture (CDC) データ型よりも広範です。 |
構文
sink:
type: hologres
name: Hologres Sink // Hologres シンク
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}パラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
type | シンクのタイプ。 | String | はい | デフォルト値なし | 値を |
name | シンク名。 | String | いいえ | デフォルト値なし | 該当なし。 |
dbname | データベース名。 | String | はい | デフォルト値なし | 該当なし。 |
username | データベースへのアクセスに使用するユーザー名。Alibaba Cloud アカウントの AccessKey ID を入力します。 | String | はい | デフォルト値なし | Alibaba Cloud アカウントの AccessKey シークレットを取得する方法については、「コンソール操作」トピックの「アカウントの AccessKey ID と AccessKey シークレットに関する情報を表示するにはどうすればよいですか?」セクションをご参照ください。 重要 AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定してください。詳細については、「プロジェクト変数」をご参照ください。 |
password | データベースへのアクセスに使用するパスワード。Alibaba Cloud アカウントの AccessKey シークレットを入力します。 | String | はい | デフォルト値なし | |
endpoint | Hologres のエンドポイント。 | String | はい | デフォルト値なし | 詳細については、「Hologres に接続するためのエンドポイント」をご参照ください。 |
jdbcRetryCount | 接続障害が発生した場合に、データの読み取りと書き込みに許可される最大再試行回数。 | Integer | いいえ | 10 | 該当なし。 |
jdbcRetrySleepInitMs | 再試行ごとの固定待機時間。 | Long | いいえ | 1000 | 単位はミリ秒です。リトライの実際の待機時間は、次の数式を使用して計算されます: |
jdbcRetrySleepStepMs | 再試行ごとの累積待機時間。 | Long | いいえ | 5000 | 単位はミリ秒です。リトライの実際の待機時間は、次の数式を使用して計算されます: |
jdbcConnectionMaxIdleMs | Java Database Connectivity (JDBC) 接続がアイドル状態になっている最大時間。 | Long | いいえ | 60000 | 単位:ミリ秒。 JDBC 接続がこのパラメーターの値を超える期間アイドル状態のままである場合、接続は閉じられて解放されます。 |
jdbcMetaCacheTTL | キャッシュに TableSchema 情報を保存する最大時間。 | Long | いいえ | 60000 | 単位:ミリ秒。 |
jdbcMetaAutoRefreshFactor | キャッシュの自動更新をトリガーするための係数。 キャッシュにデータを保存するための残り時間が、キャッシュの自動更新をトリガーするための時間よりも短い場合、Realtime Compute for Apache Flink はキャッシュを自動的に更新します。 | Integer | いいえ | 4 | キャッシュにデータを格納する残り時間は、次の数式を使用して計算されます: キャッシュにデータを格納する残り時間 = キャッシュにデータを格納できる最大時間 - データがキャッシュされた期間。キャッシュが自動的に更新された後、データがキャッシュされた期間は 0 から再計算されます。 キャッシュの自動更新をトリガーする時間は、次の数式を使用して計算されます: キャッシュの自動更新をトリガーする時間 = jdbcMetaCacheTTL パラメーターの値/jdbcMetaAutoRefreshFactor パラメーターの値。 |
mutatetype | データ書き込みモード。 | String | いいえ | INSERT_OR_UPDATE | Hologres 物理テーブルでプライマリキーが設定されている場合、Hologres ストリーミングシンクはプライマリキーに基づいて厳密に 1 回のセマンティクスを使用します。 同じプライマリキーを持つ複数のレコードがテーブルに書き込まれる場合、mutatetype パラメーターを指定して、シンクテーブルの更新方法を決定する必要があります。 mutatetype パラメーターの有効な値:
|
createparttable | パーティション値に基づいてデータを書き込むパーティションテーブルを自動的に作成するかどうかを指定します。 | Boolean | いいえ | false | 該当なし。 |
sink.delete-strategy | 後戻りメッセージを受信したときに実行される操作。 | String | いいえ | デフォルト値なし | パラメーターは次のとおりです:
|
jdbcWriteBatchSize | JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが同時に処理できるデータ行の最大数。 | Integer | いいえ | 256 | 単位:行。 説明 次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSize、jdbcWriteBatchByteSize、jdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。 |
jdbcWriteBatchByteSize | JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが同時に処理できるデータの最大バイト数。 | Long | いいえ | 2097152 (2 × 1024 × 1024 バイト = 2 MB) | 説明 次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSize、jdbcWriteBatchByteSize、jdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。 |
jdbcWriteFlushInterval | JDBC ドライバーを使用する場合に、Hologres ストリーミングシンクノードが複数の行のデータを Hologres に同時に書き込むのを待機するために必要な最大時間。 | Long | いいえ | 10000 | 単位:ミリ秒。 説明 次のパラメーターのうち 1 つだけを指定できます。jdbcWriteBatchSize、jdbcWriteBatchByteSize、jdbcWriteFlushInterval。 上記のパラメーターをすべて指定した場合、関連する条件のいずれかが満たされると、システムはデータを Hologres シンクテーブルに書き込みます。 |
ignoreNullWhenUpdate | mutatetype='insertOrUpdate' が指定されている場合、データに書き込まれた null 値を無視するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
jdbcEnableDefaultForNotNullColumn | Hologres テーブルでデフォルト値が設定されていない null 非許容列に null 値が書き込まれた場合に、Hologres コネクタがデフォルト値を入力することを許可するかどうかを指定します。 | Boolean | いいえ | true | 有効な値:
|
remove-u0000-in-text.enabled | シンクテーブルに書き込まれる STRING 型のデータに無効な文字 \u0000 が含まれている場合、Hologres コネクタが無効な文字 \u0000 をデータから削除することを許可するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
deduplication.enabled | jdbc モードまたは jdbc_fixed モードでデータをバッチ書き込みする場合に重複除外を実行するかどうかを指定します。 | Boolean | いいえ | true | 有効な値:
|
sink.type-normalize-strategy | データマッピングポリシー。 | String | いいえ | STANDARD | Hologres シンクテーブルでアップストリームデータ型を Hologres データ型に変換するために使用されるマッピングポリシー。 有効な値:
|
table_property.* | Hologres 物理テーブルのプロパティ。 | String | いいえ | デフォルト値なし | Hologres テーブルを作成するときに、WITH 句で物理テーブルのプロパティを設定できます。効率的な方法でデータをソートおよびクエリするために、必要に応じてテーブルプロパティ設定を構成できます。 警告 デフォルトでは、table_property.distribution_key の値はプライマリキー値です。 このパラメーターを別の値に設定すると、書き込まれたデータが無効になる可能性があります。 |
データ型のマッピング
sink.type-normalize-strategy 設定項目を使用して、アップストリームのデータ型を Hologres のデータ型に変換するポリシーを設定できます。
YAML デプロイメントを初めて開始するときは、sink.type-normalize-strategy を指定することをお勧めします。 デプロイメントの開始後に sink.type-normalize-strategy を指定する場合は、ダウンストリームテーブルを削除し、状態なしでデプロイメントを再起動して、ポリシーを有効にする必要があります。
サポートされている配列データ型は、INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR、VARCHAR のみです。
NUMERIC データ型は、Hologres でプライマリキーのデータ型として使用できません。 プライマリキーのデータ型が NUMERIC データ型にマッピングされている場合、プライマリキーのデータ型は VARCHAR データ型に変換されます。
STANDARD
次の表は、sink.type-normalize-strategy が STANDARD に設定されている場合のデータ型のマッピングについて説明しています。
Flink CDC データの型 | Hologres データの型 |
CHAR | bpchar |
STRING | text |
VARCHAR | text (データ長が 10,485,760 バイトを超える場合) |
varchar (データ長が 10,485,760 バイト以下の場合) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | サポートされている配列データの型 |
MAP | サポートされていません |
ROW | サポートされていません |
BROADEN
sink.type-normalize-strategy を BROADEN に設定すると、Flink CDC 型のデータは Hologres データ型に変換されます。 Hologres データ型は、Flink CDC データ型よりも広範です。 次の表は、データ型のマッピングについて説明しています。
Flink CDC データ型 | Hologres データ型 |
CHAR | text |
STRING | |
VARCHAR | |
ブーリアン | bool |
バイナリ | bytea |
VARBINARY | |
デシマル | bytea |
TINYINT | int8 |
SMALLINT | |
整数 | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
日付 | 日付 |
TIME_WITHOUT_TIME_ZONE | 時間 |
TIMESTAMP_WITHOUT_TIME_ZONE | タイムスタンプ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
配列 | サポートされている配列データ型 |
MAP | サポートされていません |
行 | サポートされていません |
ONLY_BIGINT_OR_TEXT
sink.type-normalize-strategy を ONLY_BIGINT_OR_TEXT に設定すると、すべての Flink CDC データ型が Hologres の BIGINT または TEXT データ型のいずれかに変換されます。次の表に、型のマッピングを示します。
Flink CDC データ型 | Hologres データ型 |
TINYINT | int8 |
SMALLINT | |
INTEGER(整数) | |
BIGINT | |
ブール値 | テキスト |
バイナリ | |
VARBINARY | |
DECIMAL | |
FLOAT(浮動小数点数) | |
DOUBLE | |
日付 | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP WITHOUT TIME ZONE | |
TIMESTAMP WITH LOCAL TIME ZONE | |
配列 | サポートされている配列データ型 |
MAP | サポートされていません |
行 | サポートされていません |
パーティションテーブルへのデータの書き込み
Hologres シンクを使用すると、Hologres パーティションテーブルにデータを書き込むことができます。変換モジュールでシンクを使用して、アップストリームデータを Hologres パーティションテーブルに書き込むことができます。次の点に注意してください。
パーティションキーはプライマリキーに含まれている必要があります。アップストリームの非プライマリキーの1つがパーティションキーとして使用されている場合、アップストリームとダウンストリームのプライマリキーが一致しない可能性があります。これにより、データ同期中にアップストリームテーブルとダウンストリームテーブル間でデータの不整合が発生する可能性があります。
TEXT、VARCHAR、および INT データ型の列をパーティションキーとして使用できます。Hologres V1.3.22 以降では、DATE 型の列もパーティションキーとして使用できます。
システムが自動的に子パーティションテーブルを作成できるようにするには、createparttable を true に設定する必要があります。そうでない場合は、子パーティションテーブルを手動で作成する必要があります。
パーティションテーブルにデータを書き込む方法の例については、パーティションテーブルへのデータの書き込み をご参照ください。
テーブルスキーマ変更の同期
CDC YAML パイプラインジョブは、スキーマ進化を処理するためにさまざまなポリシーを使用します。これらのポリシーは、パイプラインレベルの設定項目 schema.change.behavior を使用して設定できます。schema.change.behavior の有効な値は、IGNORE、LENIENT、TRY_EVOLVE、EVOLVE、および EXCEPTION です。Hologres シンクは TRY_EVOLVE ポリシーをサポートしていません。LENIENT および EVOLVE ポリシーにはスキーマ進化が含まれます。次のセクションでは、さまざまなスキーマ変更イベントの処理方法について説明します。
LENIENT(デフォルト値)
schema.change.behavior を LENIENT に設定すると、テーブルスキーマの変更は、次の操作の説明に基づいて同期されます。
NULL 値が許可された列を追加する:ステートメントは、関連する列をシンクテーブルのスキーマの最後に自動的に追加し、追加された列にデータを同期します。
NULL 値が許可された列を削除する:ステートメントは、テーブルから列を削除する代わりに、シンクテーブルの NULL 値が許可された列に NULL 値を自動的に設定します。
NULL 値が許可されていない列を追加する:ステートメントは、関連する列をシンクテーブルのスキーマの最後に自動的に追加し、新しい列のデータを同期します。新しい列は自動的に NULL 値が許可された列に設定され、列が追加される前のデータは自動的に NULL 値に設定されます。
列の名前を変更する:列の名前を変更する操作には、列の追加と列の削除が含まれます。ソーステーブルで列の名前が変更されると、新しい名前を使用する列がシンクテーブルの最後に追加され、元の名前を使用する列には NULL 値が設定されます。たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列がシンクテーブルの最後に追加され、col_a 列には自動的に NULL 値が設定されます。
列のデータ型を変更する:サポートされていません。Hologres は列のデータ型の変更をサポートしていません。sink.type-normalize-strategy パラメーターを指定する必要があります。
以下のスキーマ変更はサポートされていません。
主キーやインデックスなどの制約の変更
NULL 値が許可されていない列の削除
NULL 値が許可されていない列から NULL 値が許可された列への変更
EVOLVE
schema.change.behavior を EVOLVE に設定すると、テーブルスキーマの変更は、次の操作の説明に基づいて同期されます。
NULL 値が許可された列を追加する:サポートされています。
NULL 値が許可された列を削除する:サポートされていません。
NULL 値が許可されていない列を追加する:NULL 値が許可された列がシンクテーブルに追加されます。
列の名前を変更する:サポートされています。シンクテーブルで列の名前が変更されます。
列のデータ型を変更する:サポートされていません。Hologres は列のデータ型の変更をサポートしていません。sink.type-normalize-strategy を指定して Hologres を使用する必要があります。
以下のスキーマ変更はサポートされていません。
主キーやインデックスなどの制約の変更
NULL 値が許可されていない列の削除
NULL 値が許可されていない列から NULL 値が許可された列への変更
EVOLVE 値が使用されているデプロイのシンクテーブルが削除されず、状態なしでデプロイが再起動された場合、アップストリームテーブルとシンクテーブル間のスキーマの不整合により、デプロイが失敗する可能性があります。この場合、シンクテーブルのスキーマを手動で変更する必要があります。
EVOLVE モードを有効にする方法の詳細については、「EVOLVE モードの有効化」をご参照ください。
サンプルコード
ワイド型マッピング
sink.type-normalize-strategy 設定項目を使用して、広範な型マッピングを設定できます。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipelineパーティションテーブルへのデータ書き込み
timestampデータ型のアップストリームフィールド create_time を dateデータ型のフィールドに変換し、新しいフィールドをHologresテーブルのパーティションキーとして使用できます。サンプルコード:
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres Pipeline有効化モード
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolveデータベース内の単一テーブルからのデータの同期
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipelineデータベース内のすべてのテーブルからのデータの同期
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipelineシャーディングデータベース内の複数のテーブルのマージと同期
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres Pipeline特定のテーブルのスキーマの同期
Hologresテーブルのスキーマは、MySQLテーブルのデータベースに対応しています。 特定のHologresシンクテーブルのスキーマをMySQLテーブルに同期できます。サンプルコード:
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db2.user\.*r
pipeline:
name: MySQL to Hologres Pipelineデプロイメントを再起動せずに新しいテーブルを同期
デプロイメントを再起動せずにリアルタイムで新しいテーブルを同期する場合は、scan.binlog.newly-added-table.enable を true に設定します。サンプルコード:
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline新規および既存のテーブルを再起動する
デプロイメント内の既存のテーブルを同期する場合は、scan.newly-added-table.enabled を true に設定し、デプロイメントを再起動します。
デプロイメントで scan.binlog.newly-added-table.enabled = true 設定を使用して新しいテーブルを同期する場合、scan.newly-added-table.enabled = true 設定を使用して既存のテーブルを同期してからデプロイメントを再起動することはできません。上記の方法を試みると、データが繰り返し送信される可能性があります。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipelineデータベース内のすべてのテーブルからのデータの同期中に特定のテーブルを除外
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline参考資料
ソース、シンク、変換、およびデータ取り込みのルートの詳細については、「データ取り込み開発リファレンス」をご参照ください。
データ取り込み用の YAML ドラフトを開発する方法の詳細については、「データ取り込み用の YAML ドラフトを開発する (パブリックプレビュー)」をご参照ください。