このトピックでは、MongoDB コネクタの使用方法について説明します。
背景情報
MongoDB は、非構造化データを格納するドキュメント指向型データベースであり、アプリケーション開発およびスケーリングを簡素化します。MongoDB コネクタは、以下の機能をサポートしています:
カテゴリ | 説明 |
サポートされているタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、およびデータインジェストテーブル |
実行モード | ストリーミングモードのみ |
監視メトリック | |
API 種別 | DataStream、SQL、およびデータインジェスト YAML |
結果テーブルデータの更新および削除のサポート | はい |
機能
MongoDB Change Data Capture (CDC) ソーステーブルは、Change Stream API を使用して完全および増分データをキャプチャします。まず初期スナップショットとして履歴データを読み取り、その後、oplog からの増分データ読み取りへシームレスに切り替えます。このプロセスにより1 回限りのセマンティクスが提供され、重複または欠落のないレコードを保証し、障害回復時のデータ整合性を担保します。
Change Stream API を基盤とする
コネクタは、MongoDB 3.6 で導入された Change Stream API を使用して、データベースおよびコレクションから挿入、更新、置き換え、削除イベントを効率的にキャプチャします。これらのイベントは、Flink が処理可能なチェンジログストリームに変換されます。
完全および増分データのシームレスなキャプチャ
コネクタは、初期スナップショットを自動的に読み取った後、手動介入なしで増分モードへと移行します。
並列スナップショット読み取り
コネクタは、パフォーマンス向上のため履歴データを並列で読み取ります。この機能には MongoDB 4.0 以降が必要です。
複数の起動モード
initial:ジョブ初回起動時に完全スナップショットを実行し、その後 oplog から増分変更を読み取ります。latest-offset:oplog の最新位置から読み取りを開始し、履歴データは読み取りません。timestamp:指定したタイムスタンプから oplog イベントを読み取り、スナップショットはスキップします。この機能には MongoDB 4.0 以降が必要です。
完全なチェンジログのサポート
データの変更前および変更後の状態を含む完全なチェンジログイベントを出力します。この機能には MongoDB 6.0 以降およびプレイメージおよびポストイメージ記録機能が必要です。
Flink 統合の強化
VVR 8.0.6 以降
CREATE TABLE AS (CTAS) 文または CREATE DATABASE AS (CDAS) 文を使用して、MongoDB データおよびスキーマ変更をダウンストリームテーブルへ同期できます。この機能にはプレイメージおよびポストイメージ記録機能が必要です。
VVR 8.0.9 以降
ディメンションテーブル結合機能が拡張され、ObjectId 型の組み込み
_idフィールドの読み取りが可能になりました。
前提条件
MongoDB インスタンスの要件
コネクタは、Alibaba Cloud MongoDB(レプリカセットまたはシャードクラスター)または自己管理型 MongoDB 3.6 以降のみをサポートします。
監視対象の MongoDB データベースに対して、レプリカセット機能を有効化する必要があります。詳細については、「レプリケーション」をご参照ください。
MongoDB 機能の依存関係
完全チェンジログイベントストリーム機能を使用するには、プレイメージおよびポストイメージ記録機能を有効化する必要があります。
MongoDB 認証が有効化されている場合、MongoDB ユーザーには以下のデータベース権限が必要です:
MongoDB のネットワークおよびその他の準備
Flink による MongoDB アクセスを許可する IP アドレスホワイトリストが設定済みである必要があります。
対象となる MongoDB データベースおよびテーブルが作成済みである必要があります。
制限事項
CDC ソーステーブル
初期スナップショット段階における並列読み取りは、MongoDB 4.0 以降でのみサポートされています。この機能を有効化するには、
scan.incremental.snapshot.enabled構成オプションをtrueに設定します。admin、local、config データベースおよびシステムコレクションからのデータ読み取りはできません。これは MongoDB Change Streams のサブスクリプション制限によるものです。詳細については、「MongoDB ドキュメント」をご参照ください。
シンクテーブル
VVR 8.0.5 より前のバージョンの Realtime Compute for Apache Flink では、結果テーブルへの挿入のみが可能です。
VVR 8.0.5 以降の Realtime Compute for Apache Flink では、DDL 文で結果テーブルを作成する際にプライマリキーを宣言した場合に限り、挿入、更新、および削除が可能です。プライマリキーが宣言されていない場合は、挿入のみが可能です。
ディメンションテーブル
VVR 8.0.5 以降の Realtime Compute for Apache Flink で MongoDB ディメンションテーブルがサポートされます。
SQL
構文
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)CDC ソーステーブルを作成する際には、_id STRING 列を宣言し、これをプライマリキーとして指定する必要があります。
WITH パラメーター
一般
パラメーター | 説明 | 型 | 必須? | デフォルト値 | 備考 |
connector | コネクタの名前です。 | String | はい | なし |
|
uri | MongoDB データベースへの接続に使用される一意識別子 (URI) です。 | String | いいえ | デフォルト値なし | 説明
|
hosts | MongoDB データベースサーバーのホスト名です。 | String | いいえ | なし | 複数のホスト名はカンマ ( |
scheme | MongoDB データベースへのアクセスに使用される接続プロトコルです。 | String | いいえ | mongodb | 有効な値:
|
username | MongoDB への接続に使用するユーザー名です。 | String | いいえ | デフォルト値なし | 本人確認機能が有効化されている場合、このパラメーターは必須です。 |
password | MongoDB への接続に使用するパスワードです。 | String | いいえ | デフォルト値なし | 本人確認機能が有効化されている場合は、このパラメーターを設定する必要があります。 重要 パスワード漏洩を防ぐため、資格情報をハードコーディングするのではなく、変数 を使用してください。 |
database | MongoDB データベースの名前です。 | String | いいえ | デフォルト値なし |
重要 admin、local、config データベースのデータは監視できません。 |
collection | MongoDB コレクションの名前です。 | String | いいえ | デフォルト値なし |
重要 システムコレクションのデータは監視できません。 |
connection.options | MongoDB 用の追加接続オプションです。 | String | いいえ | デフォルト値なし | 追加オプションをキーと値のペア( 重要 デフォルトでは、MongoDB CDC はソケット接続タイムアウトを設定しません。これにより、ネットワークジッター時に長時間の中断が発生する可能性があります。 この問題を回避するために、socketTimeoutMS を適切な値に設定することを推奨します。 |
ソース固有
パラメーター | 説明 | 型 | 必須? | デフォルト値 | 備考 |
scan.startup.mode | MongoDB CDC コネクタの起動モードです。 | String | いいえ | initial | 有効な値:
詳細については、「起動プロパティ」をご参照ください。 |
scan.startup.timestamp-millis | 指定されたオフセットで消費を開始するタイムスタンプです。 | Long | scan.startup.mode の値に応じて異なります:
| なし | 値は、UNIX エポック(1970 年 1 月 1 日 00:00:00 UTC)からのミリ秒数です。
|
initial.snapshotting.queue.size | 初期スナップショット段階における最大キューサイズです。 | Integer | いいえ | 10240 |
|
batch.size | カーソルのバッチ処理サイズです。 | Integer | いいえ | 1024 | 該当なし。 |
poll.max.batch.size | 単一バッチで処理される変更ドキュメントの最大数です。 | Integer | いいえ | 1024 | このオプションは、ストリーム処理中に一度にプルされる変更ドキュメントの最大数を制御します。値が大きいほど、コネクタ内の内部バッファーも大きくなります。 |
poll.await.time.ms | データプル間の間隔です。 | Integer | いいえ | 1000 | 単位:ミリ秒。 |
heartbeat.interval.ms | ハートビートパケットの送信間隔です。 | Integer | いいえ | 0 | 単位:ミリ秒。 MongoDB CDC コネクタは、データベースへハートビートパケットを送信して、最新のバックトラッキングステータスを保証します。この値を 0 に設定すると、ハートビートパケットの送信が無効化されます。 重要 更新頻度が低いコレクションに対しては、このオプションを設定することを強く推奨します。 |
scan.incremental.snapshot.enabled | 初期スナップショット段階における並列読み取りを有効化します。 | Boolean | いいえ | false | これは実験的な機能です。 |
scan.incremental.snapshot.chunk.size.mb | 並列スナップショット読み取りが有効化された場合のシャードサイズです。 | Integer | いいえ | 64 | これは実験的な機能です。 単位:MB。 並列スナップショット読み取りが有効化された場合にのみ有効です。 |
scan.full-changelog | 完全なチェンジログイベントストリームを生成します。 | Boolean | いいえ | false | これは実験的な機能です。 説明 MongoDB 6.0 以降およびプレイメージおよびポストイメージ機能が有効化されている必要があります。設定手順については、「ドキュメントプレイメージ」をご参照ください。 |
scan.flatten-nested-columns.enabled |
| Boolean | いいえ | false | 有効化した場合、以下の BSON ドキュメントの 説明 VVR 8.0.5 以降でのみサポートされます。 |
scan.primitive-as-string | BSON ドキュメント内のすべてのプリミティブ型を STRING として解析します。 | Boolean | いいえ | false | 説明 VVR 8.0.5 以降でのみサポートされます。 |
scan.ignore-delete.enabled | DELETE (-D) メッセージを無視するかどうかを指定します。 | Boolean | いいえ | false | MongoDB ソースデータのアーカイブ処理中には、OpLog に多数の DELETE イベントが出現することがあります。このオプションを有効化することで、それらのイベントを無視し、ダウンストリームへの同期を防止できます。 説明
|
scan.incremental.snapshot.backfill.skip | 増分スナップショット読み取り中のウォーターマークバックフィルをスキップします。 | Boolean | いいえ | false | このオプションを有効化すると、最低 1 回以上のセマンティクスのみが保証されます。 説明 VVR 11.1 以降でのみサポートされます。 |
initial.snapshotting.pipeline | スナップショット読み取り時に MongoDB へプッシュダウンされる MongoDB パイプライン操作です。必要最小限のデータのみをフィルターし、効率を向上させます。 | String | いいえ | なし。 |
|
initial.snapshotting.max.threads | データ複製に使用されるスレッド数です。 | Integer | いいえ | なし。 | scan.startup.mode が initial に設定されている場合にのみ有効です。 説明 VVR 11.1 以降でのみサポートされます。 |
initial.snapshotting.queue.size | 初期スナップショットのキューサイズです。 | Integer | いいえ | 16000 | scan.startup.mode が initial に設定されている場合にのみ有効です。 説明 VVR 11.1 以降でのみサポートされます。 |
scan.change-stream.reading.parallelism | Change Stream へのサブスクライブの並列度です。 | Integer | いいえ | 1 | scan.incremental.snapshot.enabled が有効化されている場合にのみ有効です。 重要 複数の同時リーダーによる Change Stream サブスクリプションを実現するには、heartbeat.interval.ms も設定してください。 説明 VVR 11.2 以降でのみサポートされます。 |
scan.change-stream.reading.queue-size | 同時 Change Stream サブスクリプションのメッセージキューのサイズです。 | Integer | いいえ | 16384 | scan.change-stream.reading.parallelism が有効化されている場合にのみ有効です。 説明 VVR 11.2 以降でのみサポートされます。 |
ディメンションテーブル固有
パラメーター | 説明 | データ型 | 必須? | デフォルト値 | 備考 |
lookup.cache | キャッシュポリシーです。 | String | いいえ | NONE | サポートされるポリシー:
|
lookup.max-retries | データベース照会に失敗した場合の最大リトライ回数です。 | Integer | いいえ | 3 | 該当なし。 |
lookup.retry.interval | データベース照会に失敗した場合のリトライ間隔です。 | Duration | いいえ | 1s | 該当なし。 |
lookup.partial-cache.expire-after-access | アクセス後にキャッシュ内に保持される最大期間です。 | Duration | いいえ | なし | サポートされる単位:ms、s、min、h、d。
|
lookup.partial-cache.expire-after-write | 書き込み後にキャッシュ内に保持される最大期間です。 | Duration | いいえ | なし |
|
lookup.partial-cache.max-rows | キャッシュされる最大行数です。上限を超えた場合、最も古い行が期限切れになります。 | Long | いいえ | なし |
|
lookup.partial-cache.cache-missing-key | 物理テーブルに関連付けられたデータがない場合に、空のレコードをキャッシュします。 | Boolean | いいえ | True |
|
シンク固有の
パラメーター | 説明 | 型 | 必須? | デフォルト値 | 備考 |
sink.buffer-flush.max-rows | 1 バッチあたりの最大書き込みレコード数です。 | Integer | いいえ | 1000 | 該当なし。 |
sink.buffer-flush.interval | データをフラッシュする間隔です。 | Duration | いいえ | 1s | 該当なし。 |
sink.delivery-guarantee | データ書き込みのセマンティクス保証です。 | String | いいえ | at-least-once | 有効な値:
説明 Exactly-once はサポートされていません。 |
sink.max-retries | データベースへの書き込みに失敗した場合の最大リトライ回数です。 | Integer | いいえ | 3 | 該当なし。 |
sink.retry.interval | データベースへの書き込みに失敗した場合のリトライ間隔です。 | Duration | いいえ | 1s | 該当なし。 |
sink.parallelism | 結果テーブルのカスタム並列処理の次数です。 | Integer | いいえ | empty | 該当なし。 |
sink.delete-strategy | -D および -U データイベントの処理方法を指定します。 | String | いいえ | CHANGELOG_STANDARD | 有効な値:
|
データ型マッピング
CDC ソーステーブル
BSON 型 | Flink SQL 型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
Date Timestamp | DATE |
Date Timestamp | TIME |
DateTime | TIMESTAMP(3) TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) |
String ObjectId UUID Symbol MD5 JavaScript Regex | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
GeoJSON | Point:ROW<type STRING, coordinates ARRAY<DOUBLE>> Line:ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>> |
ディメンションテーブルおよび結果テーブル
BSON 型 | Flink SQL 型 |
Int32 | INT |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Boolean | BOOLEAN |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
String ObjectId | STRING |
Binary | BYTES |
Object | ROW |
Array | ARRAY |
使用例
CDC ソーステーブル
CREATE TEMPORARY TABLE mongo_source (
`_id` STRING, --必須
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'scan.incremental.snapshot.enabled' = 'true',
'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE productssink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING,
db_name STRING,
collection_name STRING,
op_ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO productssink
SELECT
name,
weight,
tags,
price.amount,
suppliers[1].name,
db_name,
collection_name,
op_ts
FROM
mongo_source;ディメンションテーブル
CREATE TEMPORARY TABLE datagen_source (
id STRING,
a int,
b BIGINT,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.expire-after-access' = '10min',
'lookup.partial-cache.expire-after-write' = '10min',
'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price_amount DECIMAL,
suppliers_name STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
);
INSERT INTO print_sink
SELECT
T.id,
T.a,
T.b,
H.name
FROM
datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;シンクテーブル
CREATE TEMPORARY TABLE datagen_source (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
`_id` STRING,
name STRING,
weight DECIMAL,
tags ARRAY<STRING>,
price ROW<amount DECIMAL, currency STRING>,
suppliers ARRAY<ROW<name STRING, address STRING>>,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
'username' = 'root',
'password' = '${secret_values.password}',
'database' = 'flinktest',
'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;データインジェスト(パブリックプレビュー)
MongoDB コネクタをデータインジェストソースとして使用できます。
制限事項
この機能は、VVR 11.1 以降でのみサポートされます。
構文
source:
type: mongodb
name: MongoDB Source
hosts: localhost:33076
username: ${mongo.username}
password: ${mongo.password}
database: foo_db
collection: foo_col_.*
sink:
type: ...構成オプション
パラメーター | 説明 | 必須? | データ型 | デフォルト値 | 備考 |
type | データソースの種別です。 | はい | STRING | なし | このオプションを mongodb に設定します。 |
scheme | MongoDB サーバーへの接続に使用されるプロトコルです。 | いいえ | STRING | mongodb | 有効な値:
|
hosts | MongoDB サーバーのホスト名です。 | はい | STRING | デフォルト値なし | 複数のホスト名はカンマ (,) で区切ります。 |
username | MongoDB への接続に使用するユーザー名です。 | いいえ | STRING | デフォルト値なし | 該当なし。 |
password | MongoDB への接続に使用するパスワードです。 | いいえ | STRING | なし | 該当なし。 |
database | キャプチャ対象の MongoDB データベースの名前です。 | はい | STRING | デフォルト値なし | 正規表現がサポートされます。 |
collection | キャプチャ対象の MongoDB コレクションの名前です。 | はい | STRING | デフォルト値なし | 正規表現がサポートされます。完全な |
connection.options | MongoDB サーバーへの接続時に使用する追加接続オプションです。 | いいえ | STRING | なし | キーと値のペア( |
schema.inference.strategy | ドキュメント型推論の戦略です。 有効な値: | いいえ | STRING |
|
|
scan.max.pre.fetch.records | 初期スキーマ推論時に各キャプチャコレクションでサンプリングする最大レコード数です。 | いいえ | INT | 50 | 該当なし。 |
scan.startup.mode | MongoDB データソースの起動モードです。 有効な値: | いいえ | STRING | initial | 有効な値:
|
scan.startup.timestamp-millis | 起動モードが | いいえ | LONG | なし | 該当なし。 |
chunk-meta.group.size | 最大メタデータチャンクサイズです。 | いいえ | INT | 1000 | 該当なし。 |
scan.incremental.close-idle-reader.enabled | 増分読み取りへ切り替えた後に、アイドル状態のソースリーダーを閉じるかどうかを指定します。 | いいえ | BOOLEAN | false | 該当なし。 |
scan.incremental.snapshot.backfill.skip | 増分スナップショットアルゴリズムにおけるバックフィルウォーターマーク処理をスキップするかどうかを指定します。 | いいえ | BOOLEAN | false | シンクコネクタが自動プライマリキー重複排除をサポートしている場合、このスイッチを有効化することで、スナップショット読み取りから増分読み取りへの切り替えにかかる時間を短縮できます。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | 増分スナップショットフレームワーク下で、無制限のチャンクを最初に読み取るかどうかを指定します。 | いいえ | BOOLEAN | false | スナップショット対象のコレクションが頻繁に更新される場合、この機能を有効化することで、無制限のチャンク読み取り時にメモリ不足エラーが発生するリスクを低減できます。 |
batch.size | MongoDB からデータを読み取る際のカーソルのバッチサイズです。 | いいえ | INT | 1024 | 該当なし。 |
poll.max.batch.size | Change Stream 変更をプルする際に要求する最大エントリ数です。 | いいえ | INT | 1024 | 該当なし。 |
poll.await.time.ms | Change Stream 変更をプルする際の 2 回のリクエスト間の最小待機時間です。 | いいえ | INT | 1000 | 単位:ミリ秒。 |
heartbeat.interval.ms | ハートビートパケットの送信間隔です。 | いいえ | INT | 0 | 単位:ミリ秒。 MongoDB CDC コネクタは、MongoDB データベースへハートビートパケットを送信して、最新のバックトラッキングステータスを保証します。この値を 0 に設定すると、ハートビートパケットの送信が無効化されます。 説明 更新頻度が低いコレクションに対しては、このオプションを設定してください。 |
scan.incremental.snapshot.chunk.size.mb | スナップショット段階におけるシャードサイズです。 | いいえ | INT | 64 | 単位:MB。 |
scan.incremental.snapshot.chunk.samples | スナップショット段階でコレクションサイズを判定するためのサンプル数です。 | いいえ | INT | 20 | 該当なし。 |
scan.full-changelog | MongoDB のプレイメージおよびポストイメージ記録に基づく完全なチェンジログイベントストリームを生成するかどうかを指定します。 | いいえ | BOOLEAN | false | MongoDB 6.0 以降およびプレイメージおよびポストイメージ機能が有効化されている必要があります。設定手順については、「ドキュメントプレイメージ」をご参照ください。 |
scan.cursor.no-timeout | カーソルタイムアウトを無効化するかどうかを指定します。 | いいえ | BOOLEAN | false | MongoDB サーバーは、通常、メモリ使用量の問題を防ぐため、10 分間アイドル状態のカーソルを閉じます。このオプションを true に設定すると、この動作が防止されます。 |
scan.ignore-delete.enabled | MongoDB からの削除イベントを無視するかどうかを指定します。 | いいえ | BOOLEAN | false | 該当なし。 |
scan.flatten.nested-documents.enabled | BSON ドキュメント内のネスト構造をフラット化するかどうかを指定します。 | いいえ | BOOLEAN | false | 有効化した場合、 |
scan.all.primitives.as-string.enabled | すべてのプリミティブ型を STRING として推論します。 | いいえ | BOOLEAN | false | このオプションを有効化すると、上流のデータ型が不一致な場合に、頻繁なスキーマ変更イベントが発生するのを回避できます。 |
metadata.list | ダウンストリームへ渡すメタデータのリストです。 | いいえ | STRING | なし。 | 複数のメタデータ項目はカンマで区切ります。 サポートされるメタデータ:
|
データ型マッピング
BSON 型 | CDC 型 | 備考 |
STRING | VARCHAR | 該当なし。 |
INT32 | INT | |
INT64 | BIGINT | |
DECIMAL128 | DECIMAL | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
TIMESTAMP | TIMESTAMP | |
DATETIME | LOCALZONEDTIMESTAMP | |
BINARY | VARBINARY | |
DOCUMENT | MAP | キーと値の型は推論されます。 |
ARRAY | ARRAY | 要素の型は推論されます。 |
OBJECTID | VARCHAR | HexString として表現されます。 |
SYMBOL REGULAREXPRESSION JAVASCRIPT JAVASCRIPTWITHSCOPE | VARCHAR | 文字列として表現されます。 |
メタデータ
SQL コネクタ
MongoDB CDC SQL ソーステーブルは、メタデータ列構文をサポートしています。以下のメタデータ列にアクセスできます:
メタデータキー | メタデータ型 | 説明 |
database_name | STRING NOT NULL | ドキュメントを含むデータベースの名前です。 |
collection_name | STRING NOT NULL | ドキュメントを含むコレクションの名前です。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | データベース内でドキュメントが変更された時刻です。ドキュメントが ChangeStream ではなく履歴テーブルデータから取得された場合、この値は常に 0 になります。 |
row_kind | STRING NOT NULL | データ変更イベントの種別です。有効な値:
説明 VVR 11.1 以降でのみサポートされます。 |
データインジェスト YAML
MongoDB CDC データインジェスト YAML コネクタは、以下のメタデータ列をサポートしています:
メタデータ列 | メタデータ型 | 説明 |
ts_ms | BIGINT NOT NULL | データベース内でドキュメントが変更された時刻です。ドキュメントが ChangeStream ではなく履歴テーブルデータから取得された場合、この値は常に 0 になります。 |
また、Transform モジュールが提供する汎用メタデータ列を使用して、データベース名、コレクション名、および row_kind 情報にアクセスすることもできます。
プレイメージおよびポストイメージ機能
デフォルトでは、MongoDB 6.0 より前のバージョンでは、変更前または削除されたドキュメントは保持されません。プレイメージおよびポストイメージ機能が有効化されていない場合、MongoDB は UPSERT セマンティクスのみをサポートします。これは、UPDATE_BEFORE イベントが欠落することを意味します。しかし、多くの有用な Flink オペレーターは、INSERT、UPDATE_BEFORE、UPDATE_AFTER、および DELETE イベントを含む完全なチェンジログストリームを必要とします。
欠落している UPDATE_BEFORE イベントを補完するため、Flink SQL プランナーは UPSERT 型のデータソースに対して ChangelogNormalize オペレーターを自動的に生成します。このオペレーターは、すべてのドキュメントの現在バージョンのスナップショットをデプロイメント状態データにキャッシュします。ドキュメントが更新または削除された場合、ChangelogNormalize の状態データを照会して、更新前の状態を取得できます。ただし、これには大量の状態データを格納する必要があります。

MongoDB 6.0 は、プレイメージおよびポストイメージ機能をサポートしています。詳細については、「MongoDB Change Streams を使用してデータ変更をリアルタイムでキャプチャする」をご参照ください。この機能を有効化すると、MongoDB は特別なコレクションに各変更前後のドキュメントの完全な状態を記録します。ジョブで scan.full-changelog オプションを有効化すると、MongoDB CDC コネクタはこれらの変更ドキュメントレコードを使用して UPDATE_BEFORE レコードを生成します。これにより、コネクタは完全なイベントストリームを生成でき、ChangelogNormalize オペレーターへの依存がなくなります。
MongoDB CDC DataStream API
DataStream を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して Flink に接続します。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。
DataStream API プログラムを作成し、MongoDBSource を使用できます。以下のコードは一例です:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();XML
Maven Central Repository には、ジョブ開発で直接使用できる VVR MongoDB コネクタ がホストされています。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>DataStream API を使用する場合、MongoDBSource データソースを構築するために使用するパッケージは、増分スナップショット機能を有効にするかどうかによって異なります。増分スナップショット機能を有効にするには、com.ververica.cdc.connectors.mongodb.source パッケージの MongoDBSource#builder() を使用します。そうでない場合は、com.ververica.cdc.connectors.mongodb の MongoDBSource#builder() を使用します。
MongoDBSource を構築する際に、以下のパラメーターを設定できます:
パラメーター | 説明 |
hosts | 接続先の MongoDB データベースのホスト名です。 |
username | MongoDB データベースサービスのユーザー名です。 説明 MongoDB サーバーで認証が有効化されていない場合、このパラメーターを設定する必要はありません。 |
password | MongoDB データベースサービスのパスワードです。 説明 MongoDB サーバーで認証が有効化されていない場合、このパラメーターを設定する必要はありません。 |
databaseList | 監視対象の MongoDB データベースの名前です。 説明 データベース名には正規表現がサポートされており、複数のデータベースからデータを読み取ることができます。すべてのデータベースに一致させるには |
collectionList | 監視対象の MongoDB コレクションの名前です。 説明 コレクション名には正規表現がサポートされており、複数のコレクションからデータを読み取ることができます。すべてのコレクションに一致させるには |
startupOptions | MongoDB CDC の起動モードを選択します。 有効な値:
詳細については、「起動プロパティ」をご参照ください。 |
deserializer | SourceRecord 型のレコードを特定の型に逆シリアル化するデシリアライザーです。有効な値:
|