MongoDB カタログを使用すると、コレクションスキーマを手動で定義することなく、Realtime Compute for Apache Flink から MongoDB コレクションにアクセスできます。このカタログは、バイナリ JSON (BSON) 形式のドキュメントを自動的に解析して各コレクションのスキーマを推論するため、DDL 文を記述することなく Flink SQL で特定のフィールドを直接クエリできます。このトピックでは、MongoDB カタログの作成、表示、使用、削除の方法について説明します。
背景情報
MongoDB カタログは、BSON 形式のドキュメントを解析することで、コレクションのスキーマを自動的に推論します。MongoDB カタログのテーブル名は対応する MongoDB のコレクション名と一致するため、DDL 文で MongoDB テーブルを登録する必要はありません。これにより、設定のオーバーヘッドが削減され、データの精度が向上します。
MongoDB カタログ内のテーブルは、Realtime Compute for Apache Flink SQL デプロイメントにおいて、ソーステーブル、ディメンションテーブル、および結果テーブルとして機能します。
Ververica Runtime (VVR) 8.0.6 以降を搭載した Realtime Compute for Apache Flink では、MongoDB カタログを CREATE TABLE AS 文または CREATE DATABASE AS 文と組み合わせて使用し、テーブルスキーマの変更を同期させることができます。
このトピックでは、次の操作について説明します。
事前準備
MongoDB カタログを使用する前に、次のことを確認してください。
-
ご利用の Realtime Compute for Apache Flink ワークスペースが VVR 8.0.5 以降を使用していること。`CREATE TABLE AS` 文または `CREATE DATABASE AS` 文を使用する場合は、VVR 8.0.6 以降が必要です。
制限事項
-
VVR 8.0.5 以降を搭載した Realtime Compute for Apache Flink のみ、MongoDB カタログをサポートします。
-
DDL 文を使用して既存の MongoDB カタログを変更することはできません。接続設定を更新するには、カタログを削除してから再作成してください。
-
MongoDB カタログは読み取りアクセスのみをサポートします。MongoDB カタログを介してデータベースやテーブルを作成、変更、削除することはできません。
MongoDB カタログの作成
MongoDB データベースで認証が必要な場合、password パラメーターはデフォルトでプレーンテキストの値を受け付けます。認証情報の漏洩を防ぐために、キー管理機能を使用してパスワードを変数として保存してください。詳細については、「変数とキーの管理」をご参照ください。
次の SQL 文では、山括弧 (< >) で囲まれたすべてのプレースホルダー値を置き換えてください。実際の値を代入する際には、山括弧を削除する必要があります。山括弧を残したままにすると、構文チェックエラーが発生します。
-
SQL エディタページの
[スクリプト] タブのコードエディタに、MongoDB カタログを作成するための文を入力します。CREATE CATALOG <yourcatalogname> WITH( 'type'='mongodb', 'default-database'='<dbName>', 'hosts'='<hosts>', 'scheme'='<scheme>', 'username'='<username>', 'password'='<password>', 'connection.options'='<connectionOptions>', 'max.fetch.records'='100', 'scan.flatten-nested-columns.enabled'='<flattenNestedColumns>', 'scan.primitive-as-string'='<primitiveAsString>' );次の表にパラメーターを示します。
パラメーター
データ型
説明
必須
デフォルト値
備考
yourcatalogname
STRING
MongoDB カタログの名前。
はい
N/A
カスタム名を入力します。
type
STRING
カタログのタイプ。
はい
N/A
値を
mongodbに設定します。hosts
STRING
MongoDB インスタンスのホスト名。
はい
N/A
複数のホスト名はカンマ (
,) で区切ります。default-database
STRING
デフォルトの MongoDB データベースの名前。
はい
N/A
N/A
scheme
STRING
MongoDB データベースの接続プロトコル。
いいえ
mongodb有効な値:
-
mongodb(標準の MongoDB プロトコルを使用) -
mongodb+srv(DNS SRV レコードプロトコルを使用)
username
STRING
MongoDB データベースに接続するためのユーザー名。
いいえ
N/A
MongoDB データベースで認証が有効になっている場合に必要です。
password
STRING
MongoDB データベースに接続するためのパスワード。
いいえ
N/A
MongoDB データベースで認証が有効になっている場合に必要です。認証情報の漏洩を防ぐため、プレーンテキストのパスワードを指定する代わりにキー管理機能を使用してください。「変数とキーの管理」をご参照ください。
connection.options
STRING
MongoDB データベースの追加の接続パラメーター。
いいえ
N/A
key=value形式のキーと値のペアとして指定し、アンパサンド (&) で区切ります。例:connectTimeoutMS=12000&socketTimeoutMS=13000。max.fetch.records
INT
コレクションスキーマを推論する際に MongoDB カタログが読み取ろうとするドキュメントの最大数。
いいえ
100N/A
scan.flatten-nested-columns.enabled
BOOLEAN
BSON 形式のドキュメントを解析する際に、ネストされた列を再帰的に展開するかどうかを指定します。
いいえ
false有効な値:
-
true:ネストされた列が展開されます。列のパスが列名になります。例:{"nested": {"col": true}}はnested.colになります。 -
false:ネストされたドキュメントは STRING として解析されます。
重要このパラメーターは、Flink SQL デプロイメントでソーステーブルとして使用される MongoDB カタログテーブルにのみ適用されます。
scan.primitive-as-string
BOOLEAN
BSON 形式のドキュメントを解析する際に、すべての基本データ型を STRING として推論するかどうかを指定します。
いいえ
false有効な値:
-
true:すべての基本データ型が STRING として推論されます。 -
false:型は BSON から Flink への型マッピングから推論されます。型マッピングについては、「スキーマ推論の説明」をご参照ください。
-
-
カタログを作成するためのコードを選択し、コードエディタの左側にある
[実行] をクリックします。
-
カタログリストページの左側にある
[カタログ] ペインで、新しいカタログが表示されていることを確認します。
MongoDB カタログの表示
-
SQL エディタページの
[スクリプト] タブのコードエディタに、次の文を入力します。DESCRIBE `${catalog_name}`.`${db_name}`.`${collection_name}`;パラメーター
説明
${catalog_name}
MongoDB カタログの名前。
${db_name}
ApsaraDB for MongoDB データベースの名前。
${collection_name}
ApsaraDB for MongoDB コレクションの名前。
-
コードを選択し、コードエディタの左側にある
[実行] をクリックします。文が実行されると、結果パネルにテーブルの詳細が表示されます。

MongoDB カタログの使用
ソーステーブルとしての使用
カタログテーブルを介して MongoDB コレクションからデータを読み取るには、次のパターンを使用します。
INSERT INTO ${other_sink_table}
SELECT...
FROM `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
/*+OPTIONS('scan.incremental.snapshot.enabled'='true')*/;
MongoDB カタログを使用する際に WITH 句のパラメーターを追加で指定するには、カタログ定義を変更するのではなく、SQL ヒントを使用します。上記の例では、SQL ヒントを使用して、初期スナップショットフェーズでの並列読み取りを有効にしています。利用可能なすべてのパラメーターについては、「MongoDB コネクタ」をご参照ください。
CTAS/CDAS を使用したデータ同期
CREATE TABLE AS 文または CREATE DATABASE AS 文を使用して、MongoDB コレクションから宛先テーブルにデータを同期します。
これらの文を使用する前に、次のすべての条件が満たされていることを確認してください。
-
VVR のバージョンが 8.0.6 以降であり、MongoDB データベースのバージョンが 6.0 以降であること。
-
scan.incremental.snapshot.enabled と scan.full-changelog パラメーターが SQL ヒントで
trueに設定されていること。 -
MongoDB データベースでプリイメージ機能が有効になっていること。手順については、「Document Preimages」をご参照ください。
単一のコレクションからリアルタイムでデータを同期する:
CREATE TABLE IF NOT EXISTS `${target_table_name}`
WITH(...)
AS TABLE `${mongodb_catalog}`.`${db_name}`.`${collection_name}`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
1 つのデプロイメントで複数のコレクションからデータを同期する:
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
AS TABLE `mongodb-catalog`.`database`.`collection0`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
AS TABLE `mongodb-catalog`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
AS TABLE `mongodb-catalog`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
END;
CREATE TABLE AS 文を使用して単一のデプロイメントで複数の MongoDB コレクションを同期する場合、すべてのテーブルで次のパラメーターを同じように設定する必要があります。
-
MongoDB データベース接続パラメーター:hosts、scheme、username、password、および connection.options
-
scan.startup.mode
MongoDB データベース全体からデータを同期する:
CREATE DATABASE IF NOT EXISTS `some_catalog`.`some_database`
AS DATABASE `mongodb-catalog`.`database`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true', 'scan.full-changelog'='true') */;
ディメンションテーブルとしての使用
INSERT INTO ${other_sink_table}
SELECT ...
FROM ${other_source_table} AS e
JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;
結果テーブルとしての使用
INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
SELECT ...
FROM ${other_source_table}
MongoDB カタログの削除
MongoDB カタログを削除しても、実行中のデプロイメントには影響しません。ただし、削除されたカタログ内のテーブルを参照しているデプロイメントは、次に公開または再起動されるときにそのテーブルを見つけられずに失敗します。注意して進めてください。
-
SQL エディタページの
[スクリプト] タブのコードエディタに、次の文を入力します。DROP CATALOG ${catalog_name};${catalog_name} は、削除したい MongoDB カタログの名前を指定します。
-
文を右クリックし、ショートカットメニューから
[実行] を選択してカタログを削除します。 -
カタログリストページの左側にある
[カタログ] ペインで、カタログが表示されなくなったことを確認します。
スキーマ推論の説明
MongoDB カタログがテーブルのスキーマを推論する際、デフォルトのパラメーターとプライマリキー情報が自動的に追加されます。カタログが BSON 形式のドキュメントを解析してコレクションのスキーマを取得する際、max.fetch.records パラメーターで指定された数のデータレコードまで読み取りを試みます。カタログは各レコードのスキーマを解析し、解析されたすべてのスキーマを単一のコレクションスキーマにマージします。
コレクションスキーマは、次の部分で構成されます。
-
物理列:MongoDB カタログは、BSON 形式のドキュメントから物理列を推論します。
-
デフォルトのプライマリキー:MongoDB カタログ内のどのテーブルでも、データの重複を防ぐために _id 列がプライマリキーとして機能します。
MongoDB カタログは、一連の BSON 形式のドキュメントを読み取った後、それらを順次解析し、次のルールを使用して物理列をコレクションスキーマにマージします。MongoDB カタログは、次のルールに基づいて BSON 形式のドキュメントをマージします。
-
解析された物理列のフィールドが現在のコレクションスキーマに存在しない場合、MongoDB カタログはそのフィールドをスキーマに追加します。
-
解析された物理列がコレクションスキーマ内の既存の列と同じ名前を持つ場合、データ型に基づいて次のマージロジックを適用します。
-
列が同じデータ型を共有しているが精度が異なる場合、MongoDB カタログはより大きな精度の列を保持します。
-
列のデータ型が異なる場合、MongoDB カタログは次の図に示す型階層から最小の共通親型を使用します。DECIMAL 型と FLOAT 型の列がマージされる場合、精度を維持するために結果は DOUBLE になります。
-
次の表は、コレクションスキーマが推論される際の、BSON と Realtime Compute for Apache Flink SQL 間のデータ型マッピングを示しています。
|
BSON データ型 |
Realtime Compute for Apache Flink SQL のデータ型 |
|
Boolean |
BOOLEAN |
|
Int32 |
INT |
|
Int64 |
BIGINT |
|
Binary |
BYTES |
|
Double |
DOUBLE |
|
Decimal128 |
DECIMAL |
|
String |
STRING |
|
ObjectId |
STRING |
|
DateTime |
TIMESTAMP_LTZ(3) |
|
Timestamp |
TIMESTAMP_LTZ(0) |
|
Array |
STRING |
|
Document |
STRING |
関連ドキュメント
-
MongoDB コネクタの使用方法の詳細については、「MongoDB コネクタ」をご参照ください。
-
Realtime Compute for Apache Flink の組み込みカタログが要件を満たさない場合は、カスタムカタログを使用できます。詳細については、「カスタムカタログの管理」をご参照ください。