すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MongoDB

最終更新日:Feb 10, 2026

このトピックでは、MongoDB コネクタの使用方法について説明します。

背景情報

MongoDB は、非構造化データを格納するドキュメント指向型データベースであり、アプリケーション開発およびスケーリングを簡素化します。MongoDB コネクタは、以下の機能をサポートしています:

カテゴリ

説明

サポートされているタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、およびデータインジェストテーブル

実行モード

ストリーミングモードのみ

監視メトリック

メトリック

  • ソーステーブル

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • ディメンションテーブルおよび結果テーブル:なし

説明

各メトリックの詳細については、「監視メトリック」をご参照ください。

API 種別

DataStream、SQL、およびデータインジェスト YAML

結果テーブルデータの更新および削除のサポート

はい

機能

MongoDB Change Data Capture (CDC) ソーステーブルは、Change Stream API を使用して完全および増分データをキャプチャします。まず初期スナップショットとして履歴データを読み取り、その後、oplog からの増分データ読み取りへシームレスに切り替えます。このプロセスにより1 回限りのセマンティクスが提供され、重複または欠落のないレコードを保証し、障害回復時のデータ整合性を担保します。

  • Change Stream API を基盤とする

    コネクタは、MongoDB 3.6 で導入された Change Stream API を使用して、データベースおよびコレクションから挿入、更新、置き換え、削除イベントを効率的にキャプチャします。これらのイベントは、Flink が処理可能なチェンジログストリームに変換されます。

  • 完全および増分データのシームレスなキャプチャ

    コネクタは、初期スナップショットを自動的に読み取った後、手動介入なしで増分モードへと移行します。

  • 並列スナップショット読み取り

    コネクタは、パフォーマンス向上のため履歴データを並列で読み取ります。この機能には MongoDB 4.0 以降が必要です。

  • 複数の起動モード

    • initial:ジョブ初回起動時に完全スナップショットを実行し、その後 oplog から増分変更を読み取ります。

    • latest-offset:oplog の最新位置から読み取りを開始し、履歴データは読み取りません。

    • timestamp:指定したタイムスタンプから oplog イベントを読み取り、スナップショットはスキップします。この機能には MongoDB 4.0 以降が必要です。

  • 完全なチェンジログのサポート

    データの変更前および変更後の状態を含む完全なチェンジログイベントを出力します。この機能には MongoDB 6.0 以降およびプレイメージおよびポストイメージ記録機能が必要です。

Flink 統合の強化

前提条件

  • MongoDB インスタンスの要件

    • コネクタは、Alibaba Cloud MongoDB(レプリカセットまたはシャードクラスター)または自己管理型 MongoDB 3.6 以降のみをサポートします。

    • 監視対象の MongoDB データベースに対して、レプリカセット機能を有効化する必要があります。詳細については、「レプリケーション」をご参照ください。

  • MongoDB 機能の依存関係

    • 完全チェンジログイベントストリーム機能を使用するには、プレイメージおよびポストイメージ記録機能を有効化する必要があります。

    • MongoDB 認証が有効化されている場合、MongoDB ユーザーには以下のデータベース権限が必要です:

      必要な権限

      • splitVector 権限

      • listDatabases 権限

      • listCollections 権限

      • collStats 権限

      • Find 権限

      • changeStream 権限

      • config.collections および config.chunks コレクションへのアクセス権限

  • MongoDB のネットワークおよびその他の準備

    • Flink による MongoDB アクセスを許可する IP アドレスホワイトリストが設定済みである必要があります。

    • 対象となる MongoDB データベースおよびテーブルが作成済みである必要があります。

制限事項

  • CDC ソーステーブル

    • 初期スナップショット段階における並列読み取りは、MongoDB 4.0 以降でのみサポートされています。この機能を有効化するには、scan.incremental.snapshot.enabled 構成オプションを true に設定します。

    • admin、local、config データベースおよびシステムコレクションからのデータ読み取りはできません。これは MongoDB Change Streams のサブスクリプション制限によるものです。詳細については、「MongoDB ドキュメント」をご参照ください。

  • シンクテーブル

    • VVR 8.0.5 より前のバージョンの Realtime Compute for Apache Flink では、結果テーブルへの挿入のみが可能です。

    • VVR 8.0.5 以降の Realtime Compute for Apache Flink では、DDL 文で結果テーブルを作成する際にプライマリキーを宣言した場合に限り、挿入、更新、および削除が可能です。プライマリキーが宣言されていない場合は、挿入のみが可能です。

  • ディメンションテーブル

    • VVR 8.0.5 以降の Realtime Compute for Apache Flink で MongoDB ディメンションテーブルがサポートされます。

SQL

構文

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
説明

CDC ソーステーブルを作成する際には、_id STRING 列を宣言し、これをプライマリキーとして指定する必要があります。

WITH パラメーター

一般

パラメーター

説明

必須?

デフォルト値

備考

connector

コネクタの名前です。

String

はい

なし

  • ソーステーブルの場合:

    • VVR 8.0.4 以前:このオプションを mongodb-cdc に設定します。

    • VVR 8.0.5 以降:このオプションを mongodb または mongodb-cdc に設定します。

  • ディメンションテーブルまたは結果テーブルの場合:このオプションを mongodb に設定します。

uri

MongoDB データベースへの接続に使用される一意識別子 (URI) です。

String

いいえ

デフォルト値なし

説明

uri または hosts のいずれかのオプションを指定する必要があります。uri を指定した場合、schemehostsusernamepasswordconnector.options を指定しないでください。両方のオプションを指定した場合は、uri で指定された URI が使用されます。

hosts

MongoDB データベースサーバーのホスト名です。

String

いいえ

なし

複数のホスト名はカンマ (,) で区切ります。

scheme

MongoDB データベースへのアクセスに使用される接続プロトコルです。

String

いいえ

mongodb

有効な値:

  • mongodb:デフォルトの MongoDB プロトコルを使用します。

  • mongodb+srv:DNS SRV レコードプロトコルを使用します。

username

MongoDB への接続に使用するユーザー名です。

String

いいえ

デフォルト値なし

本人確認機能が有効化されている場合、このパラメーターは必須です。

password

MongoDB への接続に使用するパスワードです。

String

いいえ

デフォルト値なし

本人確認機能が有効化されている場合は、このパラメーターを設定する必要があります。

重要

パスワード漏洩を防ぐため、資格情報をハードコーディングするのではなく、変数 を使用してください。

database

MongoDB データベースの名前です。

String

いいえ

デフォルト値なし

  • ソーステーブルの場合、データベース名には正規表現によるマッチングが可能です。

  • このオプションを指定しない場合、すべてのデータベースが監視対象となります。

重要

admin、local、config データベースのデータは監視できません。

collection

MongoDB コレクションの名前です。

String

いいえ

デフォルト値なし

  • ソーステーブルの場合、コレクション名には正規表現によるマッチングが可能です。

    重要

    コレクション名に正規表現の特殊文字が含まれる場合は、完全修飾名前空間(データベース名.コレクション名)を指定してください。そうでないと、コレクションの変更をキャプチャできません。

  • このオプションを指定しない場合、すべてのコレクションが監視対象となります。

重要

システムコレクションのデータは監視できません。

connection.options

MongoDB 用の追加接続オプションです。

String

いいえ

デフォルト値なし

追加オプションをキーと値のペア(& 区切りの key=value)として指定します。例:connectTimeoutMS=12000&socketTimeoutMS=13000。

重要

デフォルトでは、MongoDB CDC はソケット接続タイムアウトを設定しません。これにより、ネットワークジッター時に長時間の中断が発生する可能性があります。

この問題を回避するために、socketTimeoutMS を適切な値に設定することを推奨します。

ソース固有

パラメーター

説明

必須?

デフォルト値

備考

scan.startup.mode

MongoDB CDC コネクタの起動モードです。

String

いいえ

initial

有効な値:

  • initial:初期オフセットからすべてのデータを取得します。

  • latest-offset:現在のオフセットから変更データを取得します。

  • timestamp:特定のタイムスタンプから変更データを取得します。

詳細については、「起動プロパティ」をご参照ください。

scan.startup.timestamp-millis

指定されたオフセットで消費を開始するタイムスタンプです。

Long

scan.startup.mode の値に応じて異なります:

  • initial:いいえ

  • latest-offset:いいえ

  • timestamp:はい

なし

値は、UNIX エポック(1970 年 1 月 1 日 00:00:00 UTC)からのミリ秒数です。

timestamp 起動モードにのみ適用されます。

initial.snapshotting.queue.size

初期スナップショット段階における最大キューサイズです。

Integer

いいえ

10240

scan.startup.modeinitial に設定されている場合にのみ有効です。

batch.size

カーソルのバッチ処理サイズです。

Integer

いいえ

1024

該当なし。

poll.max.batch.size

単一バッチで処理される変更ドキュメントの最大数です。

Integer

いいえ

1024

このオプションは、ストリーム処理中に一度にプルされる変更ドキュメントの最大数を制御します。値が大きいほど、コネクタ内の内部バッファーも大きくなります。

poll.await.time.ms

データプル間の間隔です。

Integer

いいえ

1000

単位:ミリ秒。

heartbeat.interval.ms

ハートビートパケットの送信間隔です。

Integer

いいえ

0

単位:ミリ秒。

MongoDB CDC コネクタは、データベースへハートビートパケットを送信して、最新のバックトラッキングステータスを保証します。この値を 0 に設定すると、ハートビートパケットの送信が無効化されます。

重要

更新頻度が低いコレクションに対しては、このオプションを設定することを強く推奨します。

scan.incremental.snapshot.enabled

初期スナップショット段階における並列読み取りを有効化します。

Boolean

いいえ

false

これは実験的な機能です。

scan.incremental.snapshot.chunk.size.mb

並列スナップショット読み取りが有効化された場合のシャードサイズです。

Integer

いいえ

64

これは実験的な機能です。

単位:MB。

並列スナップショット読み取りが有効化された場合にのみ有効です。

scan.full-changelog

完全なチェンジログイベントストリームを生成します。

Boolean

いいえ

false

これは実験的な機能です。

説明

MongoDB 6.0 以降およびプレイメージおよびポストイメージ機能が有効化されている必要があります。設定手順については、「ドキュメントプレイメージ」をご参照ください。

scan.flatten-nested-columns.enabled

. で区切られたフィールドを、ネストされた BSON ドキュメントフィールドとして解析します。

Boolean

いいえ

false

有効化した場合、以下の BSON ドキュメントの col フィールドは、スキーマ内で nested.col という名前に変わります。

{"nested":{"col":true}}
説明

VVR 8.0.5 以降でのみサポートされます。

scan.primitive-as-string

BSON ドキュメント内のすべてのプリミティブ型を STRING として解析します。

Boolean

いいえ

false

説明

VVR 8.0.5 以降でのみサポートされます。

scan.ignore-delete.enabled

DELETE (-D) メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

MongoDB ソースデータのアーカイブ処理中には、OpLog に多数の DELETE イベントが出現することがあります。このオプションを有効化することで、それらのイベントを無視し、ダウンストリームへの同期を防止できます。

説明
  • VVR 11.1 以降でのみサポートされます。

  • アーカイブ由来のものだけでなく、すべての DELETE イベントが無視されます。

scan.incremental.snapshot.backfill.skip

増分スナップショット読み取り中のウォーターマークバックフィルをスキップします。

Boolean

いいえ

false

このオプションを有効化すると、最低 1 回以上のセマンティクスのみが保証されます。

説明

VVR 11.1 以降でのみサポートされます。

initial.snapshotting.pipeline

スナップショット読み取り時に MongoDB へプッシュダウンされる MongoDB パイプライン操作です。必要最小限のデータのみをフィルターし、効率を向上させます。

String

いいえ

なし。

  • JSON 配列形式のオブジェクトとして指定します。例:[{"$match": {"closed": "false"}}] は、closed フィールドが "false" に等しいドキュメントのみをコピーします。

  • このオプションは、scan.startup.mode が initial に設定されている場合にのみ有効であり、意味論的一貫性を損なわないよう Debezium モードでのみ動作します。

    説明

    VVR 11.1 以降でのみサポートされます。

initial.snapshotting.max.threads

データ複製に使用されるスレッド数です。

Integer

いいえ

なし。

scan.startup.mode が initial に設定されている場合にのみ有効です。

説明

VVR 11.1 以降でのみサポートされます。

initial.snapshotting.queue.size

初期スナップショットのキューサイズです。

Integer

いいえ

16000

scan.startup.mode が initial に設定されている場合にのみ有効です。

説明

VVR 11.1 以降でのみサポートされます。

scan.change-stream.reading.parallelism

Change Stream へのサブスクライブの並列度です。

Integer

いいえ

1

scan.incremental.snapshot.enabled が有効化されている場合にのみ有効です。

重要

複数の同時リーダーによる Change Stream サブスクリプションを実現するには、heartbeat.interval.ms も設定してください。

説明

VVR 11.2 以降でのみサポートされます。

scan.change-stream.reading.queue-size

同時 Change Stream サブスクリプションのメッセージキューのサイズです。

Integer

いいえ

16384

scan.change-stream.reading.parallelism が有効化されている場合にのみ有効です。

説明

VVR 11.2 以降でのみサポートされます。

ディメンションテーブル固有

パラメーター

説明

データ型

必須?

デフォルト値

備考

lookup.cache

キャッシュポリシーです。

String

いいえ

NONE

サポートされるポリシー:

  • None:キャッシュなし。

  • Partial:外部データベースから検索されたデータのみをキャッシュします。

lookup.max-retries

データベース照会に失敗した場合の最大リトライ回数です。

Integer

いいえ

3

該当なし。

lookup.retry.interval

データベース照会に失敗した場合のリトライ間隔です。

Duration

いいえ

1s

該当なし。

lookup.partial-cache.expire-after-access

アクセス後にキャッシュ内に保持される最大期間です。

Duration

いいえ

なし

サポートされる単位:ms、s、min、h、d。

lookup.cachePARTIAL に設定する必要があります。

lookup.partial-cache.expire-after-write

書き込み後にキャッシュ内に保持される最大期間です。

Duration

いいえ

なし

lookup.cachePARTIAL に設定する必要があります。

lookup.partial-cache.max-rows

キャッシュされる最大行数です。上限を超えた場合、最も古い行が期限切れになります。

Long

いいえ

なし

lookup.cachePARTIAL に設定する必要があります。

lookup.partial-cache.cache-missing-key

物理テーブルに関連付けられたデータがない場合に、空のレコードをキャッシュします。

Boolean

いいえ

True

lookup.cachePARTIAL に設定する必要があります。

シンク固有の

パラメーター

説明

必須?

デフォルト値

備考

sink.buffer-flush.max-rows

1 バッチあたりの最大書き込みレコード数です。

Integer

いいえ

1000

該当なし。

sink.buffer-flush.interval

データをフラッシュする間隔です。

Duration

いいえ

1s

該当なし。

sink.delivery-guarantee

データ書き込みのセマンティクス保証です。

String

いいえ

at-least-once

有効な値:

  • none

  • at-least-once

説明

Exactly-once はサポートされていません。

sink.max-retries

データベースへの書き込みに失敗した場合の最大リトライ回数です。

Integer

いいえ

3

該当なし。

sink.retry.interval

データベースへの書き込みに失敗した場合のリトライ間隔です。

Duration

いいえ

1s

該当なし。

sink.parallelism

結果テーブルのカスタム並列処理の次数です。

Integer

いいえ

empty

該当なし。

sink.delete-strategy

-D および -U データイベントの処理方法を指定します。

String

いいえ

CHANGELOG_STANDARD

有効な値:

  • CHANGELOG_STANDARD:標準モード。-U および -D イベントを通常どおりダウンストリームへ適用します。

  • IGNORE_DELETE:-D イベントのみを無視し、更新時には行全体を上書きします。

  • PARTIAL_UPDATE:-U イベントを無視して部分カラム更新をサポートします。-D イベントでは行全体を削除します。

  • IGNORE_ALL:-U および -D イベントの両方を無視します。

データ型マッピング

CDC ソーステーブル

BSON 型

Flink SQL 型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point:ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line:ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

ディメンションテーブルおよび結果テーブル

BSON 型

Flink SQL 型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

使用例

CDC ソーステーブル

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --必須
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

ディメンションテーブル

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

シンクテーブル

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

データインジェスト(パブリックプレビュー)

MongoDB コネクタをデータインジェストソースとして使用できます。

制限事項

この機能は、VVR 11.1 以降でのみサポートされます。

構文

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

構成オプション

パラメーター

説明

必須?

データ型

デフォルト値

備考

type

データソースの種別です。

はい

STRING

なし

このオプションを mongodb に設定します。

scheme

MongoDB サーバーへの接続に使用されるプロトコルです。

いいえ

STRING

mongodb

有効な値:

  • mongodb

  • mongodb+srv

hosts

MongoDB サーバーのホスト名です。

はい

STRING

デフォルト値なし

複数のホスト名はカンマ (,) で区切ります。

username

MongoDB への接続に使用するユーザー名です。

いいえ

STRING

デフォルト値なし

該当なし。

password

MongoDB への接続に使用するパスワードです。

いいえ

STRING

なし

該当なし。

database

キャプチャ対象の MongoDB データベースの名前です。

はい

STRING

デフォルト値なし

正規表現がサポートされます。

collection

キャプチャ対象の MongoDB コレクションの名前です。

はい

STRING

デフォルト値なし

正規表現がサポートされます。完全な database.collection 名前空間と一致させる必要があります。

connection.options

MongoDB サーバーへの接続時に使用する追加接続オプションです。

いいえ

STRING

なし

キーと値のペア(& 区切りの k=v)を指定します。例:replicaSet=test&connectTimeoutMS=300000

schema.inference.strategy

ドキュメント型推論の戦略です。

有効な値:continuous および static

いいえ

STRING

continuous

continuous に設定した場合、MongoDB Source は継続的に型推論を実行します。受信データのスキーマが現在のスキーマと異なる場合、構造を拡張して新しいデータに対応するためのスキーマ変更イベントを発行します。

static に設定した場合、MongoDB Source は初期化時に 1 回だけスキーマ推論を実行します。

scan.max.pre.fetch.records

初期スキーマ推論時に各キャプチャコレクションでサンプリングする最大レコード数です。

いいえ

INT

50

該当なし。

scan.startup.mode

MongoDB データソースの起動モードです。

有効な値:initiallatest-offsettimestamp、および snapshot

いいえ

STRING

initial

有効な値:

  • initial:初期オフセットからすべてのデータを取得し、自動的に増分読み取りへ切り替えます。

  • latest-offset:最新の OpLog オフセットから変更データを取得します。

  • timestamp:特定のタイムスタンプから変更データを取得します。

  • snapshot:現在のデータベース状態を 1 回限りのスナップショットとして取得します。

scan.startup.timestamp-millis

起動モードが timestamp に設定されている場合、指定されたタイムスタンプから変更データをキャプチャします。

いいえ

LONG

なし

該当なし。

chunk-meta.group.size

最大メタデータチャンクサイズです。

いいえ

INT

1000

該当なし。

scan.incremental.close-idle-reader.enabled

増分読み取りへ切り替えた後に、アイドル状態のソースリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

該当なし。

scan.incremental.snapshot.backfill.skip

増分スナップショットアルゴリズムにおけるバックフィルウォーターマーク処理をスキップするかどうかを指定します。

いいえ

BOOLEAN

false

シンクコネクタが自動プライマリキー重複排除をサポートしている場合、このスイッチを有効化することで、スナップショット読み取りから増分読み取りへの切り替えにかかる時間を短縮できます。

scan.incremental.snapshot.unbounded-chunk-first.enabled

増分スナップショットフレームワーク下で、無制限のチャンクを最初に読み取るかどうかを指定します。

いいえ

BOOLEAN

false

スナップショット対象のコレクションが頻繁に更新される場合、この機能を有効化することで、無制限のチャンク読み取り時にメモリ不足エラーが発生するリスクを低減できます。

batch.size

MongoDB からデータを読み取る際のカーソルのバッチサイズです。

いいえ

INT

1024

該当なし。

poll.max.batch.size

Change Stream 変更をプルする際に要求する最大エントリ数です。

いいえ

INT

1024

該当なし。

poll.await.time.ms

Change Stream 変更をプルする際の 2 回のリクエスト間の最小待機時間です。

いいえ

INT

1000

単位:ミリ秒。

heartbeat.interval.ms

ハートビートパケットの送信間隔です。

いいえ

INT

0

単位:ミリ秒。

MongoDB CDC コネクタは、MongoDB データベースへハートビートパケットを送信して、最新のバックトラッキングステータスを保証します。この値を 0 に設定すると、ハートビートパケットの送信が無効化されます。

説明

更新頻度が低いコレクションに対しては、このオプションを設定してください。

scan.incremental.snapshot.chunk.size.mb

スナップショット段階におけるシャードサイズです。

いいえ

INT

64

単位:MB。

scan.incremental.snapshot.chunk.samples

スナップショット段階でコレクションサイズを判定するためのサンプル数です。

いいえ

INT

20

該当なし。

scan.full-changelog

MongoDB のプレイメージおよびポストイメージ記録に基づく完全なチェンジログイベントストリームを生成するかどうかを指定します。

いいえ

BOOLEAN

false

MongoDB 6.0 以降およびプレイメージおよびポストイメージ機能が有効化されている必要があります。設定手順については、「ドキュメントプレイメージ」をご参照ください。

scan.cursor.no-timeout

カーソルタイムアウトを無効化するかどうかを指定します。

いいえ

BOOLEAN

false

MongoDB サーバーは、通常、メモリ使用量の問題を防ぐため、10 分間アイドル状態のカーソルを閉じます。このオプションを true に設定すると、この動作が防止されます。

scan.ignore-delete.enabled

MongoDB からの削除イベントを無視するかどうかを指定します。

いいえ

BOOLEAN

false

該当なし。

scan.flatten.nested-documents.enabled

BSON ドキュメント内のネスト構造をフラット化するかどうかを指定します。

いいえ

BOOLEAN

false

有効化した場合、{"doc": {"foo": 1, "bar": "two"}} というスキーマは、doc.foo INT, doc.bar STRING に変換されます。

scan.all.primitives.as-string.enabled

すべてのプリミティブ型を STRING として推論します。

いいえ

BOOLEAN

false

このオプションを有効化すると、上流のデータ型が不一致な場合に、頻繁なスキーマ変更イベントが発生するのを回避できます。

metadata.list

ダウンストリームへ渡すメタデータのリストです。

いいえ

STRING

なし。

複数のメタデータ項目はカンマで区切ります。

サポートされるメタデータ:

  • ts_ms:MongoDB OpLog に記録されたイベントタイムスタンプ。

  • op_ts:ts_ms のエイリアスです。Kafka JSON へメタデータを書き込む場合に op_ts を使用します。

データ型マッピング

BSON 型

CDC 型

備考

STRING

VARCHAR

該当なし。

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

キーと値の型は推論されます。

ARRAY

ARRAY

要素の型は推論されます。

OBJECTID

VARCHAR

HexString として表現されます。

SYMBOL

REGULAREXPRESSION

JAVASCRIPT

JAVASCRIPTWITHSCOPE

VARCHAR

文字列として表現されます。

メタデータ

SQL コネクタ

MongoDB CDC SQL ソーステーブルは、メタデータ列構文をサポートしています。以下のメタデータ列にアクセスできます:

メタデータキー

メタデータ型

説明

database_name

STRING NOT NULL

ドキュメントを含むデータベースの名前です。

collection_name

STRING NOT NULL

ドキュメントを含むコレクションの名前です。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

データベース内でドキュメントが変更された時刻です。ドキュメントが ChangeStream ではなく履歴テーブルデータから取得された場合、この値は常に 0 になります。

row_kind

STRING NOT NULL

データ変更イベントの種別です。有効な値:

  • +I: INSERT

  • -D: DELETE

  • -U: UPDATE_BEFORE

  • +U: UPDATE_AFTER

説明

VVR 11.1 以降でのみサポートされます。

データインジェスト YAML

MongoDB CDC データインジェスト YAML コネクタは、以下のメタデータ列をサポートしています:

メタデータ列

メタデータ型

説明

ts_ms

BIGINT NOT NULL

データベース内でドキュメントが変更された時刻です。ドキュメントが ChangeStream ではなく履歴テーブルデータから取得された場合、この値は常に 0 になります。

また、Transform モジュールが提供する汎用メタデータ列を使用して、データベース名、コレクション名、および row_kind 情報にアクセスすることもできます。

プレイメージおよびポストイメージ機能

デフォルトでは、MongoDB 6.0 より前のバージョンでは、変更前または削除されたドキュメントは保持されません。プレイメージおよびポストイメージ機能が有効化されていない場合、MongoDB は UPSERT セマンティクスのみをサポートします。これは、UPDATE_BEFORE イベントが欠落することを意味します。しかし、多くの有用な Flink オペレーターは、INSERT、UPDATE_BEFORE、UPDATE_AFTER、および DELETE イベントを含む完全なチェンジログストリームを必要とします。

欠落している UPDATE_BEFORE イベントを補完するため、Flink SQL プランナーは UPSERT 型のデータソースに対して ChangelogNormalize オペレーターを自動的に生成します。このオペレーターは、すべてのドキュメントの現在バージョンのスナップショットをデプロイメント状態データにキャッシュします。ドキュメントが更新または削除された場合、ChangelogNormalize の状態データを照会して、更新前の状態を取得できます。ただし、これには大量の状態データを格納する必要があります。

image.png

MongoDB 6.0 は、プレイメージおよびポストイメージ機能をサポートしています。詳細については、「MongoDB Change Streams を使用してデータ変更をリアルタイムでキャプチャする」をご参照ください。この機能を有効化すると、MongoDB は特別なコレクションに各変更前後のドキュメントの完全な状態を記録します。ジョブで scan.full-changelog オプションを有効化すると、MongoDB CDC コネクタはこれらの変更ドキュメントレコードを使用して UPDATE_BEFORE レコードを生成します。これにより、コネクタは完全なイベントストリームを生成でき、ChangelogNormalize オペレーターへの依存がなくなります。

MongoDB CDC DataStream API

重要

DataStream を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して Flink に接続します。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。

DataStream API プログラムを作成し、MongoDBSource を使用できます。以下のコードは一例です:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Maven Central Repository には、ジョブ開発で直接使用できる VVR MongoDB コネクタ がホストされています。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
説明

DataStream API を使用する場合、MongoDBSource データソースを構築するために使用するパッケージは、増分スナップショット機能を有効にするかどうかによって異なります。増分スナップショット機能を有効にするには、com.ververica.cdc.connectors.mongodb.source パッケージの MongoDBSource#builder() を使用します。そうでない場合は、com.ververica.cdc.connectors.mongodbMongoDBSource#builder() を使用します。

MongoDBSource を構築する際に、以下のパラメーターを設定できます:

パラメーター

説明

hosts

接続先の MongoDB データベースのホスト名です。

username

MongoDB データベースサービスのユーザー名です。

説明

MongoDB サーバーで認証が有効化されていない場合、このパラメーターを設定する必要はありません。

password

MongoDB データベースサービスのパスワードです。

説明

MongoDB サーバーで認証が有効化されていない場合、このパラメーターを設定する必要はありません。

databaseList

監視対象の MongoDB データベースの名前です。

説明

データベース名には正規表現がサポートされており、複数のデータベースからデータを読み取ることができます。すべてのデータベースに一致させるには .* を使用します。

collectionList

監視対象の MongoDB コレクションの名前です。

説明

コレクション名には正規表現がサポートされており、複数のコレクションからデータを読み取ることができます。すべてのコレクションに一致させるには .* を使用します。

startupOptions

MongoDB CDC の起動モードを選択します。

有効な値:

  • StartupOptions.initial()

    • 初期オフセットからすべてのデータを取得します。

  • StartupOptions.latest-offset()

    • 現在のオフセットから変更データを取得します。

  • StartupOptions.timestamp()

    • 特定のタイムスタンプから変更データを取得します。

詳細については、「起動プロパティ」をご参照ください。

deserializer

SourceRecord 型のレコードを特定の型に逆シリアル化するデシリアライザーです。有効な値:

  • MongoDBConnectorDeserializationSchema:Upsert モードで生成された SourceRecords を、Flink Table API または SQL API の内部データ構造である RowData に変換します。

  • MongoDBConnectorFullChangelogDeserializationSchema:完全チェンジログモードで生成された SourceRecords を、Flink Table または SQL の内部データ構造である RowData に変換します。

  • JsonDebeziumDeserializationSchema:SourceRecords を JSON 形式の文字列に変換します。