MongoDB カタログを作成すると、コレクションのスキーマを定義しなくても、Realtime Compute for Apache Flink コンソールで MongoDB コレクションにアクセスできます。このトピックでは、Realtime Compute for Apache Flink コンソールで MongoDB カタログを作成、表示、使用、および削除する方法について説明します。
背景情報
MongoDB カタログは、Binary JSON(BSON)形式のドキュメントを自動的に解析して、コレクションのスキーマを推測します。そのため、MongoDB カタログを使用すると、Realtime Compute for Apache Flink SQL でコレクションのスキーマを宣言しなくても、コレクションの特定のフィールドを取得できます。 MongoDB カタログを使用する場合は、次の点に注意してください。
MongoDB カタログのテーブルの名前は、MongoDB コレクションの名前と一致します。このようにして、MongoDB コレクションにアクセスするために DDL ステートメントを実行して MongoDB テーブルを登録する必要はありません。これにより、データ開発の効率とデータの精度が向上します。
MongoDB カタログのテーブルは、Realtime Compute for Apache Flink SQL のデプロイで、ソーステーブル、ディメンションテーブル、および結果テーブルとして使用できます。
Ververica Runtime(VVR)8.0.6 以降を使用する Realtime Compute for Apache Flink では、CREATE TABLE AS ステートメントまたはCREATE DATABASE AS ステートメントと共に MongoDB カタログを使用して、テーブルスキーマの変更を同期できます。
このトピックでは、MongoDB カタログを管理するために行える操作について説明します。
制限事項
Ververica Runtime(VVR)8.0.5 以降を使用する Realtime Compute for Apache Flink のみ、MongoDB カタログをサポートしています。
DDL ステートメントを実行して既存の MongoDB カタログを変更することはできません。
MongoDB カタログを使用して、テーブルからデータのクエリのみを実行できます。 MongoDB カタログを使用してデータベースとテーブルを作成、変更、または削除することはできません。
MongoDB カタログの作成
スクリプトタブの SQL エディターページのコードエディターで、MongoDB カタログを作成するためのステートメントを入力します。
CREATE CATALOG <yourcatalogname> WITH( 'type'='mongodb', 'default-database'='<dbName>', 'hosts'='<hosts>', 'scheme'='<scheme>', 'username'='<username>', 'password'='<password>', 'connection.options'='<connectionOptions>', 'max.fetch.records'='100', 'scan.flatten-nested-columns.enable'='<flattenNestedColumns>', 'scan.primitive-as-string'='<primitiveAsString>' );パラメーター
データ型
説明
必須
備考
yourcatalogname
STRING
MongoDB カタログの名前。
はい
カスタム名を入力します。
重要パラメーターの値をカタログの名前に置き換えるときは、山かっこ(<>)を削除する必要があります。そうしないと、構文チェック中にエラーが返されます。
type
STRING
カタログのタイプ。
はい
値を mongodb に設定します。
hosts
STRING
MongoDB インスタンスが存在するホストの名前。
はい
複数のホスト名をカンマ(
,)で区切ります。default-database
STRING
デフォルトの MongoDB データベースの名前。
はい
該当なし。
scheme
STRING
MongoDB データベースで使用される接続プロトコル。
いいえ
有効な値:
mongodb:デフォルトの MongoDB プロトコルを使用して MongoDB データベースにアクセスします。これはデフォルト値です。mongodb+srv:DNS SRV レコードプロトコルを使用して MongoDB データベースにアクセスします。
username
STRING
MongoDB データベースへの接続に使用するユーザー名。
いいえ
MongoDB データベースで ID 検証機能が有効になっている場合、このパラメーターは必須です。
password
STRING
MongoDB データベースへの接続に使用するパスワード。
いいえ
MongoDB データベースで ID 検証機能が有効になっている場合、このパラメーターは必須です。
説明パスワードの漏洩を防ぐため、鍵管理方式を使用してパスワードを指定することをお勧めします。詳細については、「変数の管理」をご参照ください。
connection.options
STRING
MongoDB データベースへの接続用に構成されたパラメーター。
いいえ
パラメーターは、
key=value形式のキーと値のペアであり、アンパサンド(&)で区切られます。例: connectTimeoutMS=12000&socketTimeoutMS=13000。max.fetch.records
INT
MongoDB カタログが BSON 形式のドキュメントを解析するときに取得を試みるドキュメントの最大数。
いいえ
デフォルト値:100。
scan.flatten-nested-columns.enabled
BOOLEAN
BSON 形式のドキュメントの解析時に、ドキュメント内のネストされた列を再帰的に展開するかどうかを指定します。
いいえ
有効な値:
true:ネストされた列は再帰的に展開されます。 Realtime Compute for Apache Flink は、展開された列の値にインデックスを付けるパスを列の名前として使用します。たとえば、
{"nested": {"col": true}}の col 列は、展開後に nested.col という名前になります。false:ネストされた BSON 形式のドキュメントは STRING 型として解析されます。これはデフォルト値です。
重要Realtime Compute for Apache Flink SQL のデプロイでソーステーブルとして使用される MongoDB カタログのテーブルのみ、このパラメーターをサポートしています。
scan.primitive-as-string
BOOLEAN
BSON 形式のドキュメントの解析時に、すべての基本データ型を STRING 型として推測するかどうかを指定します。
いいえ
有効な値:
true:すべての基本データ型は STRING 型として推測されます。
false:データ型はデータ型マッピングに基づいて推測されます。これはデフォルト値です。データ型マッピングの詳細については、「MongoDB カタログのテーブルの詳細について」をご参照ください。
カタログを作成するために使用されるコードを選択し、コードの左側にある [実行] をクリックします。

カタログ一覧ページの左側の [カタログ] ペインで、作成したカタログを表示します。
MongoDB カタログの表示
スクリプトタブの SQL エディターページのコードエディターで、次のステートメントを入力します。
DESCRIBE `${catalog_name}`.`${db_name}`.`${collection_name}`;パラメーター
説明
${catalog_name}
MongoDB カタログの名前。
${db_name}
ApsaraDB for MongoDB データベースの名前。
${collection_name}
ApsaraDB for MongoDB コレクションの名前。
カタログを表示するために使用されるコードを選択し、コードの左側にある [実行] をクリックします。
コードが実行されると、結果にテーブルの情報が表示されます。

MongoDB カタログの使用
MongoDB カタログのテーブルがソーステーブルとして使用される場合、テーブルと一致する MongoDB コレクションからデータを読み取ることができます。
INSERT INTO ${other_sink_table} SELECT... FROM `${mongodb_catalog}`.`${db_name}`.`${collection_name}` /*+OPTIONS('scan.incremental.snapshot.enabled'='true')*/;説明MongoDB カタログを使用するときに WITH 句で他のパラメーターを指定する場合は、SQL ヒントを使用してパラメーターを追加することをお勧めします。上記の SQL ステートメントでは、SQL ヒントを使用して初期スナップショットフェーズで並列読み取りモードを有効にしています。他のパラメーターの詳細については、「MongoDB コネクタ」をご参照ください。
MongoDB カタログのテーブルがソーステーブルとして使用される場合、CREATE TABLE AS ステートメントまたはCREATE DATABASE AS ステートメントを実行して、テーブルと一致する MongoDB コレクションから宛先テーブルにデータを同期できます。
重要CREATE TABLE AS ステートメントまたは CREATE DATABASE AS ステートメントを使用して、テーブルと一致する MongoDB コレクションから宛先テーブルにデータを同期する場合は、次のビジネス要件が満たされていることを確認してください。
VVR バージョンは 8.0.6 以降です。 MongoDB データベースのバージョンは 6.0 以降です。
SQL ヒントで scan.incremental.snapshot.enabled パラメーターと scan.full-changelog パラメーターが true に設定されています。
MongoDB データベースでプレイメージ機能が有効になっています。プレイメージ機能を有効にする方法の詳細については、「ドキュメントプレイメージ」をご参照ください。
単一トピックのデータをリアルタイムで同期します。
CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH(...) AS TABLE `${mongodb_catalog}`.`${db_name}`.`${collection_name}` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;デプロイで複数トピックのデータを同期します。
BEGIN STATEMENT SET; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0` AS TABLE `mongodb-catalog`.`database`.`collection0` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1` AS TABLE `mongodb-catalog`.`database`.`collection1` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2` AS TABLE `mongodb-catalog`.`database`.`collection2` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */; END;CREATE TABLE AS ステートメントを MongoDB カタログと共に使用して、デプロイ内の複数の MongoDB コレクションからデータを同期できます。デプロイ内の複数の MongoDB コレクションからデータを同期するには、デプロイ内のすべてのテーブルの次のパラメーターの構成が同じであることを確認してください。
MongoDB データベースに関連するパラメーター(hosts、scheme、username、password、connectionOptions など)
scan.startup.mode
MongoDB データベース全体からデータを同期します。
CREATE DATABASE IF NOT EXISTS `some_catalog`.`some_database` AS DATABASE `mongodb-catalog`.`database` /*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
MongoDB ディメンションテーブルからデータを読み取ります。
INSERT INTO ${other_sink_table} SELECT ... FROM ${other_source_table} AS e JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;結果データを MongoDB テーブルに書き込みます。
INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}` SELECT ... FROM ${other_source_table}
MongoDB カタログの削除
MongoDB カタログを削除した後も、実行中のデプロイは影響を受けません。ただし、カタログのテーブルを使用するデプロイは、デプロイが公開または再起動されたときにテーブルを見つけることができません。 MongoDB カタログを削除する場合は注意してください。
スクリプトタブの SQL エディターページのコードエディターで、次のステートメントを入力します。
DROP CATALOG ${catalog_name};${catalog_name} は、削除する MongoDB カタログの名前を指定します。
カタログの削除に使用するステートメントを右クリックし、ショートカットメニューから実行を選択します。
カタログペインのカタログリストページの左側で、カタログが削除されたかどうかを確認します。
スキーマ推論の説明
MongoDB カタログがテーブルのスキーマを推測すると、MongoDB カタログはテーブルにデフォルトパラメーターとプライマリキー情報を自動的に追加します。これにより、テーブルの詳細を簡単に理解できます。 MongoDB カタログが BSON 形式のドキュメントを解析してコレクションのスキーマを取得すると、MongoDB カタログはデータレコードの消費を試みます。 MongoDB カタログが消費を試みることができるデータレコードの最大数は、max.fetch.records パラメーターで指定されます。カタログは各データレコードのスキーマを解析し、データレコードのスキーマをコレクションのスキーマにマージします。コレクションのスキーマは、次の部分で構成されます。
物理列
MongoDB カタログは、BSON 形式のドキュメントに基づいて物理列を推測します。
追加されるデフォルトのプライマリキー
MongoDB カタログのテーブルの場合、_id 列はデータの重複を防ぐためにプライマリキーとして使用されます。
MongoDB カタログは、BSON 形式のドキュメントのグループを取得した後、ドキュメントを順番に解析し、取得した物理列をマージして、次のルールに基づいてコレクションのスキーマを取得します。この関数は、次のルールに基づいて JSON ドキュメントをマージします。
取得した物理列のフィールドがコレクションのスキーマに含まれていない場合、MongoDB カタログはフィールドをコレクションのスキーマに自動的に追加します。
解析後に取得された特定の物理列の名前がトピックスキーマの特定の列と同じ名前の場合、ビジネスシナリオに基づいて操作を実行します。
列のデータ型が同じであるが精度が異なる場合、Kafka JSON カタログは精度の高い列をマージします。
列のデータ型が異なる場合、Kafka JSON カタログは、次の図に示すツリー構造の最小の親ノードを、同じ名前の列の型として使用します。 DECIMAL 型と FLOAT 型の列がマージされる場合、精度は DOUBLE 型にマージされます。
次の表は、コレクションのスキーマが推測されるときの BSON と Realtime Compute for Apache Flink SQL 間のデータ型マッピングを示しています。
BSON データ型 | Realtime Compute for Apache Flink SQL のデータ型 |
Boolean | BOOLEAN |
Int32 | INT |
Int64 | BIGINT |
Binary | BYTES |
Double | DOUBLE |
Decimal128 | DECIMAL |
String | STRING |
ObjectId | STRING |
DateTime | TIMESTAMP_LTZ(3) |
Timestamp | TIMESTAMP_LTZ(0) |
Array | STRING |
Document | STRING |
関連情報
MongoDB コネクタの使用方法の詳細については、「MongoDB コネクタ」をご参照ください。
Realtime Compute for Apache Flink の組み込みカタログがビジネス要件を満たしていない場合は、カスタムカタログを使用できます。詳細については、「カスタムカタログの管理」をご参照ください。