すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:MongoDB

最終更新日:Dec 19, 2025

このトピックでは、MongoDBコネクタの使用方法について説明します。

背景情報

MongoDB は、非構造化データ向けのドキュメント指向データベースです。アプリケーション開発とスケーリングを簡素化します。MongoDB コネクタは、次の機能をサポートしています:

カテゴリ

詳細

サポートされるタイプ

ソーステーブル、ディメンションテーブル、結果テーブル、データインジェスト

ランタイムモード

ストリームモードのみがサポートされています。

独自の監視メトリック

監視メトリック

  • ソーステーブル

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • ディメンションテーブルと結果テーブル:なし。

説明

詳細については、「メトリックの説明」をご参照ください。

API タイプ

DataStream、SQL、およびデータインジェスト YAML

結果テーブルでの更新または削除のサポート

はい

特徴

MongoDB 変更データキャプチャ (CDC) ソーステーブルは、Change Stream API を使用して、完全データと増分データの両方をキャプチャします。最初に既存データの完全なスナップショットを読み取り、その後シームレスに増分 oplog の読み取りに切り替わります。このプロセスにより、データが重複も欠落もないことが保証されます。また、このコネクタは Exactly-Once セマンティクスもサポートしており、障害回復時のデータ整合性を保証します。

  • Change Stream API に基づく

    MongoDB 3.6 の Change Stream API を使用して、データベースまたはコレクションからの挿入、更新、置換、削除などの変更イベントを効率的にキャプチャします。これらのイベントを Flink が処理できるチェンジログストリームに変換します。

  • 完全な統合と増分的な統合

    手動での介入なしに、初期スナップショットを自動的に読み取り、スムーズに増分モードに移行します。

  • 並列スナップショット読み取り

    パフォーマンスを向上させるために、既存データの並列読み取りをサポートします。これには MongoDB 4.0 以降が必要です。

  • 複数の起動モード

    • initial:最初の起動時に完全なスナップショットを実行し、その後継続的に oplog を読み取ります。

    • latest-offset:既存データを読み取らずに、現在の oplog の末尾から開始します。

    • timestamp:指定されたタイムスタンプから oplog の読み取りを開始し、スナップショットをスキップします。これには MongoDB 4.0 以降が必要です。

  • 完全な changelog のサポート

    変更前イメージと変更後イメージの両方を含む完全なチェンジログの出力をサポートします (MongoDB 6.0 以降で変更前/変更後イメージの記録機能が有効になっている場合)。

Flink 統合の強化

前提条件

  • MongoDB インスタンスの要件

    • Alibaba Cloud MongoDB 3.6 以降 (ReplicaSet またはシャードクラスター) および自己管理型 MongoDB 3.6 以降のみがサポートされています。

    • 監視対象の MongoDB データベースで ReplicaSet 機能を有効にする必要があります。詳細については、「レプリケーション」をご参照ください。

  • MongoDB 機能の依存関係

    • 完全なチェンジログイベントストリーム機能を使用するには、変更前/変更後イメージの記録機能を有効にする必要があります。

    • MongoDB で認証が有効になっている場合は、次のデータベース権限が必要です。

      権限リスト

      • splitVector 権限

      • listDatabases 権限

      • listCollections 権限

      • collStats 権限

      • 権限の検索

      • changeStream 権限

      • config.collections および config.chunks コレクションへのアクセス権限

  • MongoDB ネットワークおよびその他の準備

    • Flink が MongoDB にアクセスできるように、IP アドレスホワイトリストが設定されていること。

    • ターゲットの MongoDB コレクションとデータが作成されていること。

制限事項

  • CDC ソーステーブル

    • MongoDB 4.0 以降では、初期スナップショットフェーズでの並列読み取りがサポートされています。初期スナップショットの並列モードを有効にするには、scan.incremental.snapshot.enabled パラメーターを true に設定します。

    • MongoDB Change Stream のサブスクリプション制限により、admin、local、config データベース、またはシステムコレクションからデータを読み取ることはできません。詳細については、MongoDB ドキュメントをご参照ください。

  • シンクテーブル

    • Ververica Runtime (VVR) 8.0.5 より前のバージョンでは、データの挿入のみがサポートされています。

    • VVR 8.0.5 以降では、結果テーブルでプライマリキーが宣言されている場合、データの挿入、更新、削除が可能です。プライマリキーが宣言されていない場合は、データの挿入のみが可能です。

  • ディメンションテーブル

    • VVR 8.0.5 以降では、MongoDB ディメンションテーブルの使用がサポートされています。

SQL

構文

CREATE TABLE tableName(
 _id STRING,
 [columnName dataType,]*
 PRIMARY KEY(_id) NOT ENFORCED
) WITH (
 'connector' = 'mongodb',
 'hosts' = 'localhost:27017',
 'username' = 'mongouser',
 'password' = '${secret_values.password}',
 'database' = 'testdb',
 'collection' = 'testcoll'
)
説明

CDC ソーステーブルを作成する場合、_id STRING 列を宣言し、それを一意のプライマリキーとして設定する必要があります。

WITH パラメーター

一般的なパラメーター

パラメーター

説明

データ型

必須

デフォルト値

注意

connector

コネクタの名前。

String

はい

なし

  • ソーステーブルとして:

    • VVR 8.0.4 以前の場合は、mongodb-cdc に設定します。

    • VVR 8.0.5 以降の場合は、mongodb または mongodb-cdc に設定します。

  • ディメンションテーブルまたは結果テーブルとして使用する場合、値は mongodb である必要があります。

uri

MongoDB 接続 URI。

String

いいえ

なし

説明

uri または hosts パラメーターのいずれかを指定する必要があります。uri を指定する場合、schemehostsusernamepassword、または connector.options を指定する必要はありません。両方が指定された場合、接続には uri が使用されます。

hosts

MongoDB インスタンスのホスト名。

String

いいえ

なし

カンマ (,) で区切って複数のホスト名を指定できます。

scheme

MongoDB で使用される接続プロトコル。

String

いいえ

mongodb

有効な値:

  • mongodb: デフォルトの MongoDB プロトコルを使用して接続します。

  • mongodb+srv: DNS SRV レコードプロトコルを使用して接続します。

username

MongoDB に接続するためのユーザー名。

String

いいえ

なし

ID 検証が有効な場合は、このパラメーターは必須です。

password

MongoDB に接続するためのパスワード。

String

いいえ

なし

ID 検証が有効な場合は、このパラメーターは必須です。

重要

パスワードの漏洩を防ぐため、変数を使用してパスワード値を設定することを推奨します。詳細については、「プロジェクト変数」をご参照ください。

database

MongoDBデータベースの名前。

String

いいえ

なし

  • ソーステーブルの場合、データベース名は正規表現をサポートします。

  • このパラメーターが設定されていない場合、すべてのデータベースが監視されます。

重要

admin、local、または config データベースのデータの監視はサポートされていません。

collection

MongoDBコレクションの名前。

String

いいえ

なし

  • ソーステーブルの場合、コレクション名は正規表現をサポートします。

    重要

    監視したいコレクション名に正規表現の特殊文字が含まれている場合は、完全修飾名前空間 (database_name.collection_name) を指定する必要があります。そうしないと、対応するコレクションへの変更をキャプチャできません。

  • このパラメーターが設定されていない場合、すべてのコレクションが監視されます。

重要

システムコレクションのデータの監視はサポートされていません。

connection.options

MongoDB 側の接続パラメーター。

String

いいえ

なし

アンパサンド (&) で区切られた key=value 形式の追加の接続パラメーター。例:connectTimeoutMS=12000&socketTimeoutMS=13000。

重要

デフォルトでは、MongoDB CDC はソケット接続タイムアウトを自動的に設定しません。これにより、ネットワークジッター中に長時間の中断が発生する可能性があります。

この問題を回避するには、socketTimeoutMS を適切な値に設定してください。

ソーステーブルのみ

パラメーター

説明

データ型

必須

デフォルト値

注意

scan.startup.mode

MongoDB CDC の起動モード。

String

いいえ

initial

有効な値:

  • initial:初期オフセットからすべてのデータをプルします。

  • latest-offset:現在のオフセットから変更データをプルします。

  • timestamp:指定されたタイムスタンプから変更データをプルします。

詳細については、「起動プロパティ」をご参照ください。

scan.startup.timestamp-millis

特定のオフセットで消費を開始するタイムスタンプ。

Long

scan.startup.mode の値によって異なります:

  • initial: いいえ

  • latest-offset: いいえ

  • timestamp: はい

なし

形式は、Linux エポックからのミリ秒数です。

このパラメーターは、timestamp 起動モードにのみ適用されます。

initial.snapshotting.queue.size

初期スナップショットのキューサイズ制限。

Integer

いいえ

10240

このパラメーターは、scan.startup.mode オプションが initial に設定されている場合にのみ有効になります。

batch.size

カーソルのバッチサイズ。

Integer

いいえ

1024

なし。

poll.max.batch.size

1 つのバッチに含まれる変更ドキュメントの最大数。

Integer

いいえ

1024

このパラメーターは、ストリーム処理中に一度に取得される変更ドキュメントの最大数を制御します。値が大きいほど、コネクタ内で割り当てられるバッファーが大きくなります。

poll.await.time.ms

2 つのデータプル間の時間間隔。

Integer

いいえ

1000

単位はミリ秒です。

heartbeat.interval.ms

ハートビートを送信する時間間隔。

Integer

いいえ

0

単位はミリ秒です。

MongoDB CDC コネクタは、再開トークンが最新であることを保証するために、データベースに積極的にハートビートを送信します。値が 0 の場合、ハートビートは送信されません。

重要

頻繁に更新されないコレクションについては、このオプションを設定することを強く推奨します

scan.incremental.snapshot.enabled

初期スナップショットの並列モードを有効にするかどうかを指定します。

Boolean

いいえ

false

これは実験的な機能です。

scan.incremental.snapshot.chunk.size.mb

並列モードでスナップショットを読み取るためのチャンクサイズ。

Integer

いいえ

64

これは実験的な機能です。

単位は MB です。

このパラメーターは、並列スナップショットが有効な場合にのみ有効です。

scan.full-changelog

完全なチェンジログイベントストリームを生成します。

Boolean

いいえ

false

これは実験的な機能です。

説明

pre-image および post-image 機能が有効になっている MongoDB 6.0 以降が必要です。この機能を有効にする方法の詳細については、「Document Preimages」をご参照ください。

scan.flatten-nested-columns.enabled

ドット (.) で区切られたフィールド名をネストされた BSON ドキュメントの読み取りとして解析するかどうかを指定します。

Boolean

いいえ

false

有効にすると、次の BSON ドキュメントの例では、col フィールドはスキーマ内で nested.col という名前になります。

{"nested":{"col":true}}
説明

このパラメーターは VVR 8.0.5 以降でのみサポートされています。

scan.primitive-as-string

BSON ドキュメント内のすべてのプリミティブ型を文字列として解析するかどうかを指定します。

Boolean

いいえ

false

説明

このパラメーターは VVR 8.0.5 以降でのみサポートされています。

scan.ignore-delete.enabled

削除 (-D) メッセージを無視するかどうかを指定します。

Boolean

いいえ

false

MongoDB ソースからデータをアーカイブする際、OpLog 内で多くの DELETE イベントが生成されることがあります。これらのイベントをダウンストリームに同期したくない場合は、このパラメーターを有効にして削除イベントを無視できます。

説明
  • このパラメーターは VVR 11.1 以降でのみサポートされています。

  • アーカイブ操作に起因しない他の DELETE イベントも無視されます。

scan.incremental.snapshot.backfill.skip

増分スナップショットアルゴリズムのウォーターマークバックフィルプロセスをスキップするかどうかを指定します。

Boolean

いいえ

false

このスイッチを有効にすると、at-least-once セマンティクスのみが提供されます。

説明

このパラメーターは VVR 11.1 以降でのみサポートされています。

initial.snapshotting.pipeline

MongoDB のパイプライン操作。スナップショット読み取りフェーズ中に、この操作は MongoDB にプッシュダウンされ、必要なデータのみをフィルター処理して読み取り効率を向上させます。

String

いいえ

なし。

  • オブジェクトの JSON 配列として表現されます。例:[{"$match": {"closed": "false"}}] は、closed フィールドが "false" のドキュメントのみをコピーします。

  • このオプションは、scan.startup.mode オプションが initial に設定されている場合にのみ有効です。セマンティクスの不整合を引き起こす可能性があるため、増分スナップショットモードではなく Debezium モードでのみ使用できます。

    説明

    このパラメーターは VVR 11.1 以降でのみサポートされています。

initial.snapshotting.max.threads

データレプリケーションに使用されるスレッド数。

Integer

いいえ

なし。

このパラメーターは、scan.startup.mode オプションが initial に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 11.1 以降でのみサポートされています。

initial.snapshotting.queue.size

初期スナップショットのキューサイズ。

Integer

いいえ

16000

このパラメーターは、scan.startup.mode オプションが initial に設定されている場合にのみ有効です。

説明

このパラメーターは VVR 11.1 以降でのみサポートされています。

scan.change-stream.reading.parallelism

Change Stream をサブスクライブするための並列度。

Integer

いいえ

1

このパラメーターは、scan.incremental.snapshot.enabled パラメーターが有効な場合にのみ有効です。

重要

複数の同時スレッドで Change Stream をサブスクライブするには、heartbeat.interval.ms パラメーターも設定する必要があります。

説明

このパラメーターは VVR 11.2 以降でのみサポートされています。

scan.change-stream.reading.queue-size

同時 Change Stream サブスクリプションのメッセージキューサイズ。

Integer

いいえ

16384

このパラメーターは、scan.change-stream.reading.parallelism パラメーターが有効な場合にのみ有効です。

説明

このパラメーターは VVR 11.2 以降でのみサポートされています。

ディメンションテーブルのみ

パラメーター

説明

データ型

必須

デフォルト値

注意

lookup.cache

キャッシュポリシー。

String

いいえ

NONE

以下のキャッシュポリシーがサポートされています:

  • None:キャッシュなし。

  • Partial:外部データベースで検索する場合にのみデータをキャッシュします。

lookup.max-retries

データベースクエリが失敗した場合の最大再試行回数。

Integer

いいえ

3

なし。

lookup.retry.interval

データベースクエリが失敗した場合のリトライ間隔。

Duration

いいえ

1s

なし。

lookup.partial-cache.expire-after-access

キャッシュ内のレコードの最大保持期間。

Duration

いいえ

なし

サポートされている時間単位は ms、s、min、h、d です。

この設定を使用する場合、lookup.cachePARTIAL に設定する必要があります。

lookup.partial-cache.expire-after-write

キャッシュに書き込まれた後のレコードの最大保持期間。

Duration

いいえ

なし

この設定を使用する場合、lookup.cachePARTIAL に設定する必要があります。

lookup.partial-cache.max-rows

キャッシュ内の最大行数。この値を超えると、最も古い行が期限切れになります。

Long

いいえ

なし

この設定を使用する場合、lookup.cachePARTIAL に設定する必要があります。

lookup.partial-cache.cache-missing-key

物理テーブルにデータが見つからない場合に空のレコードをキャッシュするかどうかを指定します。

Boolean

いいえ

True

この設定を使用する場合、lookup.cachePARTIAL に設定する必要があります。

結果テーブルのみ

パラメーター

説明

データ型

必須

デフォルト値

注意

sink.buffer-flush.max-rows

各バッチ書き込みの最大レコード数。

Integer

いいえ

1000

なし。

sink.buffer-flush.interval

データ書き込みの更新間隔。

Duration

いいえ

1s

なし。

sink.delivery-guarantee

データ書き込みのセマンティクス保証。

String

いいえ

at-least-once

有効な値:

  • none

  • at-least-once

説明

Exactly-once は現在サポートされていません。

sink.max-retries

データベースへの書き込みが失敗した場合の最大リトライ回数。

Integer

いいえ

3

なし。

sink.retry.interval

データベースへの書き込みが失敗した場合のリトライ間隔。

Duration

いいえ

1s

なし。

sink.parallelism

カスタムのシンク並列度。

Integer

いいえ

Empty

なし。

sink.delete-strategy

-D または -U データ型の処理方法を設定します。

String

いいえ

CHANGELOG_STANDARD

有効な値:

  • CHANGELOG_STANDARD:標準モードでは、-U および -D イベントは通常どおりダウンストリームシステムに適用されます。

  • IGNORE_DELETE:-D イベントのみを無視しますが、更新時には行全体を上書きします。

  • PARTIAL_UPDATE:-U イベントを無視して部分的な列の更新を有効にします。ただし、-D イベントを受信した場合でも、行全体が削除されます。

  • IGNORE_ALL:-U と -D の両方のイベントを無視します。

型マッピング

CDC ソーステーブル

BSON 型

Flink SQL 型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

Date Timestamp

DATE

Date Timestamp

TIME

DateTime

TIMESTAMP(3)

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)

TIMESTAMP_LTZ(0)

String

ObjectId

UUID

Symbol

MD5

JavaScript

Regex

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

ディメンションテーブルと結果テーブル

BSON 型

Flink SQL 型

Int32

INT

Int64

BIGINT

Double

DOUBLE

Decimal128

DECIMAL

Boolean

BOOLEAN

DateTime

TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP_LTZ(0)

String

ObjectId

STRING

Binary

BYTES

Object

ROW

Array

ARRAY

CDC ソーステーブル

CREATE TEMPORARY TABLE mongo_source (
  `_id` STRING, --必ず宣言する必要があります
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  collection_name STRING METADATA VIRTUAL,
  op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.full-changelog' = 'true'
);
CREATE TEMPORARY TABLE  productssink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING,
  db_name STRING,
  collection_name STRING,
  op_ts TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO productssink  
SELECT
  name,
  weight,
  tags,
  price.amount,
  suppliers[1].name,
  db_name,
  collection_name,
  op_ts
FROM
  mongo_source;

ディメンションテーブル

CREATE TEMPORARY TABLE datagen_source (
  id STRING,
  a int,
  b BIGINT,
  `proctime` AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_dim (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection',
  'lookup.cache' = 'PARTIAL',
  'lookup.partial-cache.expire-after-access' = '10min',
  'lookup.partial-cache.expire-after-write' = '10min',
  'lookup.partial-cache.max-rows' = '100'
);
CREATE TEMPORARY TABLE print_sink (
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price_amount DECIMAL,
  suppliers_name STRING
) WITH (
  'connector' = 'print',
  'logger' = 'true'
);
INSERT INTO print_sink
SELECT
  T.id,
  T.a,
  T.b,
  H.name
FROM
  datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;

シンクテーブル

CREATE TEMPORARY TABLE datagen_source (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE mongo_sink (
  `_id` STRING,
  name STRING,
  weight DECIMAL,
  tags ARRAY<STRING>,
  price ROW<amount DECIMAL, currency STRING>,
  suppliers ARRAY<ROW<name STRING, address STRING>>,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
  'username' = 'root',
  'password' = '${secret_values.password}',
  'database' = 'flinktest',
  'collection' = 'flinkcollection'
);
INSERT INTO mongo_sink
SELECT * FROM datagen_source;

データインジェスト

MongoDB コネクタは、データインジェスト YAML タスクのデータソースとして使用できます。

制限事項

この機能は VVR 11.1 以降でのみサポートされています。

構文

source:
   type: mongodb
   name: MongoDB Source
   hosts: localhost:33076
   username: ${mongo.username}
   password: ${mongo.password}
   database: foo_db
   collection: foo_col_.*

sink:
  type: ...

設定項目

パラメーター

説明

必須

データの型

デフォルト値

注意

type

データソースのタイプ。

はい

STRING

なし

値は mongodb である必要があります。

scheme

MongoDB サーバーへの接続プロトコル。

いいえ

STRING

mongodb

有効な値:

  • mongodb

  • mongodb+srv

hosts

MongoDB に接続するためのサーバーアドレス。

はい

STRING

なし

複数のアドレスをカンマ (,) で区切って指定できます。

username

MongoDB への接続に使用するユーザー名。

いいえ

STRING

なし

なし。

password

MongoDB への接続に使用するパスワード。

いいえ

STRING

なし

なし。

database

キャプチャする MongoDB データベースの名前。

はい

STRING

なし

正規表現をサポートします。

collection

キャプチャする MongoDB コレクションの名前。

はい

STRING

なし

正規表現をサポートしています。完全な database.collection 名前空間に一致させる必要があります。

connection.options

MongoDB サーバーに接続する際に追加する接続オプション。

いいえ

STRING

なし

アンパサンド (&) で区切られた k=v 形式のキーと値のペア。例:replicaSet=test&connectTimeoutMS=300000

schema.inference.strategy

ドキュメントの型推論の戦略。

有効な値は continuous および static です。

いいえ

STRING

連続

continuous に設定すると、MongoDB Source は継続的に型推論を実行します。後続のレコードが現在のスキーマと一致しない場合、新しいデータに対応するために構造を広げるスキーマ進化イベントを発行します。

static に設定すると、MongoDB は初期化フェーズで 1 回だけスキーマ推論を実行します。

scan.max.pre.fetch.records

初期推論中にキャプチャされた各コレクションからサンプリングするレコードの最大数。

いいえ

INT

50

なし。

scan.startup.mode

MongoDB データソースの起動モードを指定します。

有効な値は initiallatest-offsettimestamp、および snapshot です。

いいえ

STRING

initial

有効な値:

  • initial:初期オフセットからすべてのデータをプルし、自動的に増分モードに切り替えます。

  • latest-offset:最新の OpLog オフセットから変更データをプルします。

  • timestamp:指定されたタイムスタンプから変更データをプルします。

  • snapshot:現在のデータベース状態の 1 回限りのスナップショットを実行します。

scan.startup.timestamp-millis

起動モードが timestamp に設定されている場合、このパラメーターは変更データのキャプチャを開始するタイムスタンプを指定します。

いいえ

LONG

なし

なし。

chunk-meta.group.size

メタデータチャンクサイズの制限を設定します。

いいえ

INT

1000

なし。

scan.incremental.close-idle-reader.enabled

増分モードに切り替えた後、アイドル状態の Source Reader を閉じるかどうかを指定します。

いいえ

BOOLEAN

false

なし。

scan.incremental.snapshot.backfill.skip

増分スナップショットアルゴリズムのウォーターマークバックフィルプロセスをスキップするかどうかを指定します。

いいえ

BOOLEAN

false

使用しているシンクコネクタがプライマリキーに基づいて自動的に重複を削除できる場合、このスイッチを有効にすると、完全から増分への移行にかかる時間を短縮できます。

scan.incremental.snapshot.unbounded-chunk-first.enabled

増分スナップショットアルゴリズムを実行する際に、無制限のチャンクを最初に読み取るかどうかを指定します。

いいえ

BOOLEAN

false

スナップショットを作成しているコレクションが頻繁に更新される場合、この機能を有効にすると、無制限チャンクの読み取り時にメモリ不足エラーが発生する可能性を減らすことができます。

batch.size

MongoDB データを読み取るためのカーソルバッチサイズ。

いいえ

INT

1024

なし。

poll.max.batch.size

Change Stream をプルする際のリクエストあたりの最大エントリ数。

いいえ

INT

1024

なし。

poll.await.time.ms

Change Stream を取得する際のリクエスト間の最小待機時間。

いいえ

INT

1000

単位はミリ秒です。

heartbeat.interval.ms

ハートビートを送信する時間間隔。

いいえ

INT

0

単位はミリ秒です。

MongoDB CDC コネクタは、再開トークンが最新であることを保証するために、データベースに積極的にハートビートを送信します。値が 0 の場合、ハートビートは送信されません。

説明

頻繁に更新されないコレクションについては、このオプションを設定することを強く推奨します。

scan.incremental.snapshot.chunk.size.mb

スナップショットフェーズ中のチャンクサイズ。

いいえ

INT

64

単位は MB です。

scan.incremental.snapshot.chunk.samples

スナップショットフェーズ中にコレクションサイズを決定する際に使用するサンプル数。

いいえ

INT

20

なし。

scan.full-changelog

Mongo の Pre- and Post-Image レコードに基づいて、完全なチェンジログイベントストリームを生成するかどうかを指定します。

いいえ

BOOLEAN

false

pre-image および post-image 機能が有効になっている MongoDB 6.0 以降が必要です。この機能を有効にする方法の詳細については、「Document Preimages」をご参照ください。

scan.cursor.no-timeout

データ読み取り用のカーソルが期限切れにならないように設定するかどうかを指定します。

いいえ

BOOLEAN

false

MongoDB サーバーは通常、高いメモリ使用量を防ぐために、一定期間 (10 分) アイドル状態になったカーソルを閉じます。このオプションを true に設定すると、これを防ぐことができます。

scan.ignore-delete.enabled

MongoDB ソースからの削除イベントレコードを無視するかどうかを指定します。

いいえ

BOOLEAN

false

なし。

scan.flatten.nested-documents.enabled

BSON ドキュメント内のネストされた構造をフラット化するかどうかを指定します。

いいえ

BOOLEAN

false

このオプションを有効にすると、 {"doc": {"foo": 1, "bar": "two"}} のようなスキーマは doc.foo INT, doc.bar STRING に展開されます。

scan.all.primitives.as-string.enabled

すべてのプリミティブ型を STRING として推論するかどうかを指定します。

いいえ

BOOLEAN

false

このオプションを有効にすると、混合された入力データ型を扱う際に多くのスキーマ進化イベントが生成されるのを避けることができます。

型マッピング

BSON 型

CDC 型

STRING

VARCHAR

なし。

INT32

INT

INT64

BIGINT

DECIMAL128

DECIMAL

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

TIMESTAMP

TIMESTAMP

DATETIME

LOCALZONEDTIMESTAMP

BINARY

VARBINARY

DOCUMENT

MAP

キー/値の型パラメーターを推論する必要があります。

ARRAY

ARRAY

要素型のパラメーターを推論する必要があります。

OBJECTID

VARCHAR

HexString として表現されます。

SYMBOL

REGULAREXPRESSION

JAVASCRIPT

JAVASCRIPTWITHSCOPE

VARCHAR

文字列として表現されます。

メタデータ

SQL コネクタ

MongoDB CDC SQL ソーステーブルは、メタデータ列の構文をサポートしています。メタデータ列を介して、次のメタデータにアクセスできます。

メタデータキー

メタデータタイプ

説明

database_name

STRING NOT NULL

ドキュメントを含むデータベースの名前。

collection_name

STRING NOT NULL

ドキュメントを含むコレクションの名前。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

データベース内でドキュメントが変更された時刻。ドキュメントが ChangeStream からではなく、テーブルの既存のデータからのものである場合、この値は常に 0 です。

row_kind

STRING NOT NULL

データ変更のタイプを示します。有効な値:

  • +I: INSERT

  • -D: DELETE

  • -U: UPDATE_BEFORE

  • +U: UPDATE_AFTER

説明

この機能は VVR 11.1 以降でのみサポートされています。

データインジェスト YAML

MongoDB CDC データインジェスト YAML コネクタは、次のメタデータ列の読み取りをサポートしています:

メタデータキー

メタデータ型

説明

ts_ms

BIGINT NOT NULL

データベース内でドキュメントが変更された時刻。ドキュメントが ChangeStream からではなく、テーブルの既存のデータからのものである場合、この値は常に 0 です。

また、Transform モジュールが提供する共通のメタデータ列を使用して、データベース名、コレクション名、および row_kind の情報にアクセスすることもできます。

MongoDB の変更前/変更後イメージの記録機能について

デフォルトでは、6.0 より前のバージョンの MongoDB は、変更前のドキュメントや削除されたドキュメントのデータを提供しません。この機能が有効になっていない場合、利用可能な情報は Upsert セマンティクスしか実現できず、これは Update Before データエントリが欠落していることを意味します。しかし、Flink の多くの便利なオペレーターは、Insert、Update Before、Update After、Delete イベントの完全な変更ストリームに依存しています。

欠落している変更前イベントを補うために、Flink SQL Planner は Upsert タイプのデータソースに対して自動的に ChangelogNormalize ノードを生成します。このノードは、Flink ステート内のすべてのドキュメントの現在のバージョンのスナップショットをキャッシュします。更新または削除されたドキュメントに遭遇すると、キャッシュされたスナップショットから変更前の状態を検索できます。ただし、このオペレーターノードは大量のステートデータを必要とします。

image.png

MongoDB 6.0 は、データベースの変更前および変更後イメージ機能をサポートしています。詳細については、「MongoDB Change Streams を使用してデータ変更をリアルタイムでキャプチャする」をご参照ください。この機能を有効にすると、MongoDB は各変更の前後のドキュメントの完全な状態を特別なコレクションに記録します。その後、タスクで scan.full-changelog 設定項目を有効にすると、MongoDB CDC は変更ドキュメントレコードから Update Before レコードを生成します。このプロセスにより、完全なイベントストリームが作成され、ChangelogNormalize ノードへの依存がなくなります。

Mongo CDC DataStream API

重要

DataStream API を使用してデータを読み書きする場合、対応する DataStream コネクタを使用して Flink に接続する必要があります。設定手順については、「DataStream コネクタの使用」をご参照ください。

DataStream API プログラムを作成し、MongoDBSource を使用できます。次のコードは例を示しています:

Java

MongoDBSource.builder()
 .hosts("mongo.example.com:27017")
 .username("mongouser")
 .password("mongopasswd")
 .databaseList("testdb")
 .collectionList("testcoll")
 .startupOptions(StartupOptions.initial())
 .deserializer(new JsonDebeziumDeserializationSchema())
 .build();

XML

VVR MongoDB コネクタは Maven Central Repository で入手できます。タスク開発で直接使用できます。

<dependency>
 <groupId>com.alibaba.ververica</groupId>
 <artifactId>flink-connector-mongodb</artifactId>
 <version>${vvr.version}</version>
</dependency>
説明

DataStream API を使用する場合、増分スナップショット機能を有効にするには、MongoDBSource を構築する際に com.ververica.cdc.connectors.mongodb.source パッケージの MongoDBSource#builder() を使用します。それ以外の場合は、com.ververica.cdc.connectors.mongodb パッケージの MongoDBSource#builder() を使用します。

MongoDBSource を構築する際に、次のパラメーターを設定できます:

パラメータ

説明

hosts

接続する MongoDB データベースのホスト名。

username

MongoDB データベースサービスのユーザー名。

説明

MongoDB サーバーで認証が有効になっていない場合、このパラメーターを設定する必要はありません。

password

MongoDB データベースサービスのパスワード。

説明

MongoDB サーバーで認証が有効になっていない場合、このパラメーターを設定する必要はありません。

databaseList

監視する MongoDB データベースの名前。

説明

データベース名は、複数のデータベースからデータを読み取るための正規表現をサポートしています。.* を使用すると、すべてのデータベースと一致します。

collectionList

監視する MongoDB コレクションの名前。

説明

コレクション名は、複数のコレクションからデータを読み取るための正規表現をサポートしています。.* を使用すると、すべてのコレクションと一致します。

startupOptions

MongoDB CDC の起動モードを選択します。

有効な値:

  • StartupOptions.initial()

    • 初期オフセットからすべてのデータをプルします。

  • StartupOptions.latest-offset()

    • 現在のオフセットから変更データをプルします。

  • StartupOptions.timestamp()

    • 指定されたタイムスタンプから変更データをプルします。

詳細については、「起動プロパティ」をご参照ください。

deserializer

SourceRecord 型のレコードを指定された型に逆シリアル化するデシリアライザ。有効な値:

  • MongoDBConnectorDeserializationSchema:Upsert モードで生成された SourceRecord を Flink Table API または SQL API の内部データ構造 RowData に変換します。

  • MongoDBConnectorFullChangelogDeserializationSchema:Full Changelog モードで生成された SourceRecord を Flink Table または SQL の内部データ構造 RowData に変換します。

  • JsonDebeziumDeserializationSchema:SourceRecord を JSON 形式の文字列に変換します。