このトピックでは、MongoDBコネクタの使用方法について説明します。
背景情報
MongoDB は、非構造化データ向けのドキュメント指向データベースです。アプリケーション開発とスケーリングを簡素化します。MongoDB コネクタは、次の機能をサポートしています:
カテゴリ | 詳細 |
サポートされるタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト |
ランタイムモード | ストリームモードのみがサポートされています。 |
独自の監視メトリック | |
API タイプ | DataStream、SQL、およびデータインジェスト YAML |
結果テーブルでの更新または削除のサポート | はい |
特徴
MongoDB 変更データキャプチャ (CDC) ソーステーブルは、Change Stream API を使用して、完全データと増分データの両方をキャプチャします。最初に既存データの完全なスナップショットを読み取り、その後シームレスに増分 oplog の読み取りに切り替わります。このプロセスにより、データが重複も欠落もないことが保証されます。また、このコネクタは Exactly-Once セマンティクスもサポートしており、障害回復時のデータ整合性を保証します。
Change Stream API に基づく
MongoDB 3.6 の Change Stream API を使用して、データベースまたはコレクションからの挿入、更新、置換、削除などの変更イベントを効率的にキャプチャします。これらのイベントを Flink が処理できるチェンジログストリームに変換します。
完全な統合と増分的な統合
手動での介入なしに、初期スナップショットを自動的に読み取り、スムーズに増分モードに移行します。
並列スナップショット読み取り
パフォーマンスを向上させるために、既存データの並列読み取りをサポートします。これには MongoDB 4.0 以降が必要です。
複数の起動モード
initial:最初の起動時に完全なスナップショットを実行し、その後継続的に oplog を読み取ります。latest-offset:既存データを読み取らずに、現在の oplog の末尾から開始します。timestamp:指定されたタイムスタンプから oplog の読み取りを開始し、スナップショットをスキップします。これには MongoDB 4.0 以降が必要です。
完全な changelog のサポート
変更前イメージと変更後イメージの両方を含む完全なチェンジログの出力をサポートします (MongoDB 6.0 以降で変更前/変更後イメージの記録機能が有効になっている場合)。
Flink 統合の強化
VVR 8.0.6 以降
CREATE TABLE AS (CTAS) 文または CREATE DATABASE AS (CDAS) 文を使用して、MongoDB からダウンストリームシステムへのデータとスキーマの変更を同期し、変更前/変更後イメージの記録機能を有効にすることをサポートします。
VVR 8.0.9 以降
ディメンションテーブルの結合機能を拡張し、ObjectId 型の組み込み
_idフィールドの読み取りをサポートします。
前提条件
MongoDB インスタンスの要件
Alibaba Cloud MongoDB 3.6 以降 (ReplicaSet またはシャードクラスター) および自己管理型 MongoDB 3.6 以降のみがサポートされています。
監視対象の MongoDB データベースで ReplicaSet 機能を有効にする必要があります。詳細については、「レプリケーション」をご参照ください。
MongoDB 機能の依存関係
完全なチェンジログイベントストリーム機能を使用するには、変更前/変更後イメージの記録機能を有効にする必要があります。
MongoDB で認証が有効になっている場合は、次のデータベース権限が必要です。
MongoDB ネットワークおよびその他の準備
Flink が MongoDB にアクセスできるように、IP アドレスホワイトリストが設定されていること。
ターゲットの MongoDB コレクションとデータが作成されていること。
制限事項
CDC ソーステーブル
MongoDB 4.0 以降では、初期スナップショットフェーズでの並列読み取りがサポートされています。初期スナップショットの並列モードを有効にするには、
scan.incremental.snapshot.enabledパラメーターを true に設定します。MongoDB Change Stream のサブスクリプション制限により、admin、local、config データベース、またはシステムコレクションからデータを読み取ることはできません。詳細については、MongoDB ドキュメントをご参照ください。
シンクテーブル
Ververica Runtime (VVR) 8.0.5 より前のバージョンでは、データの挿入のみがサポートされています。
VVR 8.0.5 以降では、結果テーブルでプライマリキーが宣言されている場合、データの挿入、更新、削除が可能です。プライマリキーが宣言されていない場合は、データの挿入のみが可能です。
ディメンションテーブル
VVR 8.0.5 以降では、MongoDB ディメンションテーブルの使用がサポートされています。
SQL
構文
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)CDC ソーステーブルを作成する場合、_id STRING 列を宣言し、それを一意のプライマリキーとして設定する必要があります。
WITH パラメーター
一般的なパラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
connector | コネクタの名前。 | String | はい | なし |
|
uri | MongoDB 接続 URI。 | String | いいえ | なし | 説明
|
hosts | MongoDB インスタンスのホスト名。 | String | いいえ | なし | カンマ ( |
scheme | MongoDB で使用される接続プロトコル。 | String | いいえ | mongodb | 有効な値:
|
username | MongoDB に接続するためのユーザー名。 | String | いいえ | なし | ID 検証が有効な場合は、このパラメーターは必須です。 |
password | MongoDB に接続するためのパスワード。 | String | いいえ | なし | ID 検証が有効な場合は、このパラメーターは必須です。 重要 パスワードの漏洩を防ぐため、変数を使用してパスワード値を設定することを推奨します。詳細については、「プロジェクト変数」をご参照ください。 |
database | MongoDBデータベースの名前。 | String | いいえ | なし |
重要 admin、local、または config データベースのデータの監視はサポートされていません。 |
collection | MongoDBコレクションの名前。 | String | いいえ | なし |
重要 システムコレクションのデータの監視はサポートされていません。 |
connection.options | MongoDB 側の接続パラメーター。 | String | いいえ | なし | アンパサンド ( 重要 デフォルトでは、MongoDB CDC はソケット接続タイムアウトを自動的に設定しません。これにより、ネットワークジッター中に長時間の中断が発生する可能性があります。 この問題を回避するには、socketTimeoutMS を適切な値に設定してください。 |
ソーステーブルのみ
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
scan.startup.mode | MongoDB CDC の起動モード。 | String | いいえ | initial | 有効な値:
詳細については、「起動プロパティ」をご参照ください。 |
scan.startup.timestamp-millis | 特定のオフセットで消費を開始するタイムスタンプ。 | Long | scan.startup.mode の値によって異なります:
| なし | 形式は、Linux エポックからのミリ秒数です。 このパラメーターは、 |
initial.snapshotting.queue.size | 初期スナップショットのキューサイズ制限。 | Integer | いいえ | 10240 | このパラメーターは、 |
batch.size | カーソルのバッチサイズ。 | Integer | いいえ | 1024 | なし。 |
poll.max.batch.size | 1 つのバッチに含まれる変更ドキュメントの最大数。 | Integer | いいえ | 1024 | このパラメーターは、ストリーム処理中に一度に取得される変更ドキュメントの最大数を制御します。値が大きいほど、コネクタ内で割り当てられるバッファーが大きくなります。 |
poll.await.time.ms | 2 つのデータプル間の時間間隔。 | Integer | いいえ | 1000 | 単位はミリ秒です。 |
heartbeat.interval.ms | ハートビートを送信する時間間隔。 | Integer | いいえ | 0 | 単位はミリ秒です。 MongoDB CDC コネクタは、再開トークンが最新であることを保証するために、データベースに積極的にハートビートを送信します。値が 0 の場合、ハートビートは送信されません。 重要 頻繁に更新されないコレクションについては、このオプションを設定することを強く推奨します。 |
scan.incremental.snapshot.enabled | 初期スナップショットの並列モードを有効にするかどうかを指定します。 | Boolean | いいえ | false | これは実験的な機能です。 |
scan.incremental.snapshot.chunk.size.mb | 並列モードでスナップショットを読み取るためのチャンクサイズ。 | Integer | いいえ | 64 | これは実験的な機能です。 単位は MB です。 このパラメーターは、並列スナップショットが有効な場合にのみ有効です。 |
scan.full-changelog | 完全なチェンジログイベントストリームを生成します。 | Boolean | いいえ | false | これは実験的な機能です。 説明 pre-image および post-image 機能が有効になっている MongoDB 6.0 以降が必要です。この機能を有効にする方法の詳細については、「Document Preimages」をご参照ください。 |
scan.flatten-nested-columns.enabled | ドット ( | Boolean | いいえ | false | 有効にすると、次の BSON ドキュメントの例では、 説明 このパラメーターは VVR 8.0.5 以降でのみサポートされています。 |
scan.primitive-as-string | BSON ドキュメント内のすべてのプリミティブ型を文字列として解析するかどうかを指定します。 | Boolean | いいえ | false | 説明 このパラメーターは VVR 8.0.5 以降でのみサポートされています。 |
scan.ignore-delete.enabled | 削除 (-D) メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | MongoDB ソースからデータをアーカイブする際、OpLog 内で多くの DELETE イベントが生成されることがあります。これらのイベントをダウンストリームに同期したくない場合は、このパラメーターを有効にして削除イベントを無視できます。 説明
|
scan.incremental.snapshot.backfill.skip | 増分スナップショットアルゴリズムのウォーターマークバックフィルプロセスをスキップするかどうかを指定します。 | Boolean | いいえ | false | このスイッチを有効にすると、at-least-once セマンティクスのみが提供されます。 説明 このパラメーターは VVR 11.1 以降でのみサポートされています。 |
initial.snapshotting.pipeline | MongoDB のパイプライン操作。スナップショット読み取りフェーズ中に、この操作は MongoDB にプッシュダウンされ、必要なデータのみをフィルター処理して読み取り効率を向上させます。 | String | いいえ | なし。 |
|
initial.snapshotting.max.threads | データレプリケーションに使用されるスレッド数。 | Integer | いいえ | なし。 | このパラメーターは、scan.startup.mode オプションが initial に設定されている場合にのみ有効です。 説明 このパラメーターは VVR 11.1 以降でのみサポートされています。 |
initial.snapshotting.queue.size | 初期スナップショットのキューサイズ。 | Integer | いいえ | 16000 | このパラメーターは、scan.startup.mode オプションが initial に設定されている場合にのみ有効です。 説明 このパラメーターは VVR 11.1 以降でのみサポートされています。 |
scan.change-stream.reading.parallelism | Change Stream をサブスクライブするための並列度。 | Integer | いいえ | 1 | このパラメーターは、scan.incremental.snapshot.enabled パラメーターが有効な場合にのみ有効です。 重要 複数の同時スレッドで Change Stream をサブスクライブするには、heartbeat.interval.ms パラメーターも設定する必要があります。 説明 このパラメーターは VVR 11.2 以降でのみサポートされています。 |
scan.change-stream.reading.queue-size | 同時 Change Stream サブスクリプションのメッセージキューサイズ。 | Integer | いいえ | 16384 | このパラメーターは、scan.change-stream.reading.parallelism パラメーターが有効な場合にのみ有効です。 説明 このパラメーターは VVR 11.2 以降でのみサポートされています。 |
ディメンションテーブルのみ
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
lookup.cache | キャッシュポリシー。 | String | いいえ | NONE | 以下のキャッシュポリシーがサポートされています:
|
lookup.max-retries | データベースクエリが失敗した場合の最大再試行回数。 | Integer | いいえ | 3 | なし。 |
lookup.retry.interval | データベースクエリが失敗した場合のリトライ間隔。 | Duration | いいえ | 1s | なし。 |
lookup.partial-cache.expire-after-access | キャッシュ内のレコードの最大保持期間。 | Duration | いいえ | なし | サポートされている時間単位は ms、s、min、h、d です。 この設定を使用する場合、 |
lookup.partial-cache.expire-after-write | キャッシュに書き込まれた後のレコードの最大保持期間。 | Duration | いいえ | なし | この設定を使用する場合、 |
lookup.partial-cache.max-rows | キャッシュ内の最大行数。この値を超えると、最も古い行が期限切れになります。 | Long | いいえ | なし | この設定を使用する場合、 |
lookup.partial-cache.cache-missing-key | 物理テーブルにデータが見つからない場合に空のレコードをキャッシュするかどうかを指定します。 | Boolean | いいえ | True | この設定を使用する場合、 |
結果テーブルのみ
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
sink.buffer-flush.max-rows | 各バッチ書き込みの最大レコード数。 | Integer | いいえ | 1000 | なし。 |
sink.buffer-flush.interval | データ書き込みの更新間隔。 | Duration | いいえ | 1s | なし。 |
sink.delivery-guarantee | データ書き込みのセマンティクス保証。 | String | いいえ | at-least-once | 有効な値:
説明 Exactly-once は現在サポートされていません。 |
sink.max-retries | データベースへの書き込みが失敗した場合の最大リトライ回数。 | Integer | いいえ | 3 | なし。 |
sink.retry.interval | データベースへの書き込みが失敗した場合のリトライ間隔。 | Duration | いいえ | 1s | なし。 |
sink.parallelism | カスタムのシンク並列度。 | Integer | いいえ | Empty | なし。 |
sink.delete-strategy | -D または -U データ型の処理方法を設定します。 | String | いいえ | CHANGELOG_STANDARD | 有効な値:
|
型マッピング
CDC ソーステーブル
BSON 型 | Flink SQL 型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point: ROW<type STRING, coordinates ARRAY<DOUBLE>> Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
ディメンションテーブルと結果テーブル
BSON 型 | Flink SQL 型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
例
CDC ソーステーブル
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --必ず宣言する必要があります
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;シンクテーブル
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;データインジェスト
MongoDB コネクタは、データインジェスト YAML タスクのデータソースとして使用できます。
制限事項
この機能は VVR 11.1 以降でのみサポートされています。
構文
source:
type: mongodb
name: MongoDB Source
hosts: localhost:33076
username: ${mongo.username}
password: ${mongo.password}
database: foo_db
collection: foo_col_.*
sink:
type: ...設定項目
パラメーター | 説明 | 必須 | データの型 | デフォルト値 | 注意 |
type | データソースのタイプ。 | はい | STRING | なし | 値は mongodb である必要があります。 |
scheme | MongoDB サーバーへの接続プロトコル。 | いいえ | STRING | mongodb | 有効な値:
|
hosts | MongoDB に接続するためのサーバーアドレス。 | はい | STRING | なし | 複数のアドレスをカンマ (,) で区切って指定できます。 |
username | MongoDB への接続に使用するユーザー名。 | いいえ | STRING | なし | なし。 |
password | MongoDB への接続に使用するパスワード。 | いいえ | STRING | なし | なし。 |
database | キャプチャする MongoDB データベースの名前。 | はい | STRING | なし | 正規表現をサポートします。 |
collection | キャプチャする MongoDB コレクションの名前。 | はい | STRING | なし | 正規表現をサポートしています。完全な |
connection.options | MongoDB サーバーに接続する際に追加する接続オプション。 | いいえ | STRING | なし | アンパサンド ( |
schema.inference.strategy | ドキュメントの型推論の戦略。 有効な値は | いいえ | STRING |
|
|
scan.max.pre.fetch.records | 初期推論中にキャプチャされた各コレクションからサンプリングするレコードの最大数。 | いいえ | INT | 50 | なし。 |
scan.startup.mode | MongoDB データソースの起動モードを指定します。 有効な値は | いいえ | STRING | initial | 有効な値:
|
scan.startup.timestamp-millis | 起動モードが | いいえ | LONG | なし | なし。 |
chunk-meta.group.size | メタデータチャンクサイズの制限を設定します。 | いいえ | INT | 1000 | なし。 |
scan.incremental.close-idle-reader.enabled | 増分モードに切り替えた後、アイドル状態の Source Reader を閉じるかどうかを指定します。 | いいえ | BOOLEAN | false | なし。 |
scan.incremental.snapshot.backfill.skip | 増分スナップショットアルゴリズムのウォーターマークバックフィルプロセスをスキップするかどうかを指定します。 | いいえ | BOOLEAN | false | 使用しているシンクコネクタがプライマリキーに基づいて自動的に重複を削除できる場合、このスイッチを有効にすると、完全から増分への移行にかかる時間を短縮できます。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 増分スナップショットアルゴリズムを実行する際に、無制限のチャンクを最初に読み取るかどうかを指定します。 | いいえ | BOOLEAN | false | スナップショットを作成しているコレクションが頻繁に更新される場合、この機能を有効にすると、無制限チャンクの読み取り時にメモリ不足エラーが発生する可能性を減らすことができます。 |
batch.size | MongoDB データを読み取るためのカーソルバッチサイズ。 | いいえ | INT | 1024 | なし。 |
poll.max.batch.size | Change Stream をプルする際のリクエストあたりの最大エントリ数。 | いいえ | INT | 1024 | なし。 |
poll.await.time.ms | Change Stream を取得する際のリクエスト間の最小待機時間。 | いいえ | INT | 1000 | 単位はミリ秒です。 |
heartbeat.interval.ms | ハートビートを送信する時間間隔。 | いいえ | INT | 0 | 単位はミリ秒です。 MongoDB CDC コネクタは、再開トークンが最新であることを保証するために、データベースに積極的にハートビートを送信します。値が 0 の場合、ハートビートは送信されません。 説明 頻繁に更新されないコレクションについては、このオプションを設定することを強く推奨します。 |
scan.incremental.snapshot.chunk.size.mb | スナップショットフェーズ中のチャンクサイズ。 | いいえ | INT | 64 | 単位は MB です。 |
scan.incremental.snapshot.chunk.samples | スナップショットフェーズ中にコレクションサイズを決定する際に使用するサンプル数。 | いいえ | INT | 20 | なし。 |
scan.full-changelog | Mongo の Pre- and Post-Image レコードに基づいて、完全なチェンジログイベントストリームを生成するかどうかを指定します。 | いいえ | BOOLEAN | false | pre-image および post-image 機能が有効になっている MongoDB 6.0 以降が必要です。この機能を有効にする方法の詳細については、「Document Preimages」をご参照ください。 |
scan.cursor.no-timeout | データ読み取り用のカーソルが期限切れにならないように設定するかどうかを指定します。 | いいえ | BOOLEAN | false | MongoDB サーバーは通常、高いメモリ使用量を防ぐために、一定期間 (10 分) アイドル状態になったカーソルを閉じます。このオプションを true に設定すると、これを防ぐことができます。 |
scan.ignore-delete.enabled | MongoDB ソースからの削除イベントレコードを無視するかどうかを指定します。 | いいえ | BOOLEAN | false | なし。 |
scan.flatten.nested-documents.enabled | BSON ドキュメント内のネストされた構造をフラット化するかどうかを指定します。 | いいえ | BOOLEAN | false | このオプションを有効にすると、 |
scan.all.primitives.as-string.enabled | すべてのプリミティブ型を STRING として推論するかどうかを指定します。 | いいえ | BOOLEAN | false | このオプションを有効にすると、混合された入力データ型を扱う際に多くのスキーマ進化イベントが生成されるのを避けることができます。 |
型マッピング
BSON 型 | CDC 型 | 注 |
STRING | VARCHAR | なし。 |
INT32 | INT | |
INT64 | BIGINT | |
DECIMAL128 | DECIMAL | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
TIMESTAMP | TIMESTAMP | |
DATETIME | LOCALZONEDTIMESTAMP | |
BINARY | VARBINARY | |
DOCUMENT | MAP | キー/値の型パラメーターを推論する必要があります。 |
ARRAY | ARRAY | 要素型のパラメーターを推論する必要があります。 |
OBJECTID | VARCHAR | HexString として表現されます。 |
SYMBOL REGULAREXPRESSION JAVASCRIPT JAVASCRIPTWITHSCOPE | VARCHAR | 文字列として表現されます。 |
メタデータ
SQL コネクタ
MongoDB CDC SQL ソーステーブルは、メタデータ列の構文をサポートしています。メタデータ列を介して、次のメタデータにアクセスできます。
メタデータキー | メタデータタイプ | 説明 |
database_name | STRING NOT NULL | ドキュメントを含むデータベースの名前。 |
collection_name | STRING NOT NULL | ドキュメントを含むコレクションの名前。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | データベース内でドキュメントが変更された時刻。ドキュメントが ChangeStream からではなく、テーブルの既存のデータからのものである場合、この値は常に 0 です。 |
row_kind | STRING NOT NULL | データ変更のタイプを示します。有効な値:
説明 この機能は VVR 11.1 以降でのみサポートされています。 |
データインジェスト YAML
MongoDB CDC データインジェスト YAML コネクタは、次のメタデータ列の読み取りをサポートしています:
メタデータキー | メタデータ型 | 説明 |
ts_ms | BIGINT NOT NULL | データベース内でドキュメントが変更された時刻。ドキュメントが ChangeStream からではなく、テーブルの既存のデータからのものである場合、この値は常に 0 です。 |
また、Transform モジュールが提供する共通のメタデータ列を使用して、データベース名、コレクション名、および row_kind の情報にアクセスすることもできます。
MongoDB の変更前/変更後イメージの記録機能について
デフォルトでは、6.0 より前のバージョンの MongoDB は、変更前のドキュメントや削除されたドキュメントのデータを提供しません。この機能が有効になっていない場合、利用可能な情報は Upsert セマンティクスしか実現できず、これは Update Before データエントリが欠落していることを意味します。しかし、Flink の多くの便利なオペレーターは、Insert、Update Before、Update After、Delete イベントの完全な変更ストリームに依存しています。
欠落している変更前イベントを補うために、Flink SQL Planner は Upsert タイプのデータソースに対して自動的に ChangelogNormalize ノードを生成します。このノードは、Flink ステート内のすべてのドキュメントの現在のバージョンのスナップショットをキャッシュします。更新または削除されたドキュメントに遭遇すると、キャッシュされたスナップショットから変更前の状態を検索できます。ただし、このオペレーターノードは大量のステートデータを必要とします。

MongoDB 6.0 は、データベースの変更前および変更後イメージ機能をサポートしています。詳細については、「MongoDB Change Streams を使用してデータ変更をリアルタイムでキャプチャする」をご参照ください。この機能を有効にすると、MongoDB は各変更の前後のドキュメントの完全な状態を特別なコレクションに記録します。その後、タスクで scan.full-changelog 設定項目を有効にすると、MongoDB CDC は変更ドキュメントレコードから Update Before レコードを生成します。このプロセスにより、完全なイベントストリームが作成され、ChangelogNormalize ノードへの依存がなくなります。
Mongo CDC DataStream API
DataStream API を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して Flink に接続する必要があります。設定手順については、「DataStream コネクタの使用」をご参照ください。
DataStream API プログラムを作成し、MongoDBSource を使用できます。次のコードは例を示しています:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
VVR MongoDB コネクタは Maven Central Repository で入手できます。タスク開発で直接使用できます。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>DataStream API を使用する場合、増分スナップショット機能を有効にするには、MongoDBSource を構築する際に com.ververica.cdc.connectors.mongodb.source パッケージの MongoDBSource#builder() を使用します。それ以外の場合は、com.ververica.cdc.connectors.mongodb パッケージの MongoDBSource#builder() を使用します。
MongoDBSource を構築する際に、次のパラメーターを設定できます:
パラメータ | 説明 |
hosts | 接続する MongoDB データベースのホスト名。 |
username | MongoDB データベースサービスのユーザー名。 説明 MongoDB サーバーで認証が有効になっていない場合、このパラメーターを設定する必要はありません。 |
password | MongoDB データベースサービスのパスワード。 説明 MongoDB サーバーで認証が有効になっていない場合、このパラメーターを設定する必要はありません。 |
databaseList | 監視する MongoDB データベースの名前。 説明 データベース名は、複数のデータベースからデータを読み取るための正規表現をサポートしています。 |
collectionList | 監視する MongoDB コレクションの名前。 説明 コレクション名は、複数のコレクションからデータを読み取るための正規表現をサポートしています。 |
startupOptions | MongoDB CDC の起動モードを選択します。 有効な値:
詳細については、「起動プロパティ」をご参照ください。 |
deserializer | SourceRecord 型のレコードを指定された型に逆シリアル化するデシリアライザ。有効な値:
|