MongoDB データソースを使用すると、MongoDB からのデータの読み取りと MongoDB へのデータの書き込みが可能になります。このトピックでは、DataWorks が MongoDB に対して提供するデータ同期機能について説明します。
サポートされるバージョン
MongoDB バージョン 4.x、5.x、6.x、7.x、および 8.0 がサポートされています。
注意事項
Data Integration は、対応するデータベースアカウントを使用して MongoDB データベースに接続します。ApsaraDB for MongoDB を使用する場合、デフォルトでルートアカウントが提供されます。セキュリティ上の理由から、ルートアカウントを使用して MongoDB データソースにアクセスしないでください。
ご利用の MongoDB データベースがシャードクラスターである場合は、データソースを設定する際に Mongos アドレスを設定してください。mongod/shard ノードアドレスは設定しないでください。設定した場合、同期タスクが完全なデータセットではなく、特定の shard からのみデータを抽出する可能性があります。Mongos と mongod の詳細については、「mongos」および「mongod」をご参照ください。
同時実行数が 1 より大きい場合、設定されたコレクション内のすべての
_idフィールドは同じ型である必要があります。たとえば、すべての_idフィールドは文字列または ObjectId のいずれかである必要があります。そうでない場合、一部のデータが同期に失敗する可能性があります。説明同時実行数が 1 より大きい場合、タスクは
_idフィールドに基づいて分割されます。したがって、このシナリオでは_idフィールドは混合データ型をサポートしません。_idフィールドに複数のデータ型がある場合は、同時実行数 1 でデータを同期できます。splitFactor パラメーターを設定しないか、splitFactor を 1 に設定してください。
Data Integration は配列型をサポートしていません。しかし、MongoDB は配列とその強力なインデックス機能をサポートしています。特定のパラメーターを設定して、MongoDB で文字列を配列に変換できます。変換後、データを MongoDB に並行して書き込むことができます。
セルフマネージドの MongoDB データベースは、パブリックネットワークアクセスをサポートしていません。Alibaba Cloud プライベートネットワーク経由のアクセスのみをサポートします。
Docker でデプロイされた MongoDB クラスターはサポートされていません。
Data Integration は、データクエリ設定 (query パラメーター) で特定のカラムからデータを読み取ることをサポートしていません。
オフライン同期タスクで、フィールド構造を MongoDB から取得できない場合、6 つのフィールドを持つデフォルトのフィールドマッピングが生成されます。フィールド名は
col1、col2、col3、col4、col5、およびcol6です。タスクの実行中、タスクをシャーディングするためにデフォルトで
splitVectorコマンドが使用されます。一部の MongoDB バージョンはsplitVectorコマンドをサポートしていないため、no such cmd splitVectorエラーが発生します。このエラーを防ぐには、同期タスク設定で
アイコンをクリックしてコードエディタに切り替えます。次に、MongoDB のパラメーター設定に次のパラメーターを追加して、splitVectorの使用を回避します。"useSplitVector" : false
サポートされるフィールドタイプ
MongoDB Reader でサポートされる MongoDB データ型
Data Integration はほとんどの MongoDB データ型をサポートしていますが、一部サポートされていないデータ型もあります。続行する前に、ご利用のデータ型を確認してください。
サポートされているデータ型の場合、Data Integration は次のようにデータを読み取ります:
基本データ型の場合、Data Integration は同期タスクに設定された読み取りフィールド (column) の名前に基づいて、対応するパスからデータを自動的に読み取ります。また、データ型も自動的に変換します。カラムに type プロパティを指定する必要はありません。詳細については、「付録:MongoDB スクリプトのデモとパラメーターの説明」をご参照ください。
タイプ
オフライン読み取り (MongoDB Reader)
説明
ObjectId
サポート
オブジェクト ID 型です。
Double
サポート
64 ビット浮動小数点数型です。
32 ビット整数
サポート
32 ビット整数です。
64 ビット整数
サポート
64 ビット整数です。
Decimal128
サポート
Decimal128 型です。
説明この型がネストされた型または Combine 型として設定されている場合、JSON シリアル化中にオブジェクトとして処理されます。データを 10 進数として出力するには、
decimal128OutputTypeパラメーターを追加し、bigDecimalに設定する必要があります。String
サポート
文字列型です。
Boolean
サポート
ブール値型です。
Timestamp
サポート
タイムスタンプ型です。
説明BsonTimestamp はタイムスタンプを格納します。タイムゾーンの影響を考慮する必要はありません。詳細については、「MongoDB のタイムゾーンの問題」をご参照ください。
Date
サポート
日付型です。
一部の複雑なデータ型については、カラムの type プロパティを設定することで、処理をカスタマイズできます。
タイプ
オフライン読み取り (MongoDB Reader)
説明
Document
サポート
埋め込みドキュメント型です。
type プロパティが設定されていない場合、ドキュメントは直接 JSON シリアル化によって処理されます。
type プロパティが
documentに設定されている場合、それはネストされた型です。MongoDB Reader はパスによってドキュメントのプロパティを読み取ります。詳細な例については、「データ型の例 2:複数レベルのネストされたドキュメントの再帰的解析」をご参照ください。
Array
サポート
配列型です。
type が
array.jsonまたはarraysに設定されている場合、データは直接 JSON シリアル化によって処理されます。type が
arrayまたはdocument.arrayに設定されている場合、データは文字列に連結されます。デフォルトの区切り文字 (カラム内の splitter) はカンマ (,) です。
重要Data Integration は配列型をサポートしていません。しかし、MongoDB は配列とその強力なインデックス機能をサポートしています。特定のパラメーターを設定して、MongoDB で文字列を配列に変換できます。変換後、データを MongoDB に並行して書き込むことができます。
特殊な Data Integration データ型:combine
タイプ | オフライン読み取り (MongoDB Reader) | 説明 |
Combine | サポート | カスタムの Data Integration 型です。 type が |
MongoDB Reader のデータ型変換
次の表に、MongoDB Reader が実行するデータ型変換を示します。
変換後の型カテゴリ | MongoDB データ型 |
LONG | INT、LONG、document.INT、および document.LONG |
DOUBLE | DOUBLE および document.DOUBLE |
STRING | STRING、ARRAY、document.STRING、document.ARRAY、および COMBINE |
DATE | DATE および document.DATE |
BOOLEAN | BOOL および document.BOOL |
BYTES | BYTES および document.BYTES |
MongoDB Writer のデータ型変換
型カテゴリ | MongoDB データ型 |
整数 | INT および LONG |
浮動小数点 | DOUBLE |
文字列 | STRING および ARRAY |
日付と時刻 | DATE |
ブール値 | BOOL |
バイナリ | BYTES |
データ型の例 1:combine 型の使用
MongoDB Reader プラグインの Combine データ型を使用すると、MongoDB ドキュメント内の複数のフィールドを単一の JSON 文字列にマージできます。たとえば、MongoDB から MaxCompute にフィールドをインポートしたいとします。次の 3 つのドキュメントには、キーがフィールド全体を表し、値が省略されているフィールドが含まれています。フィールド `a` と `b` はすべてのドキュメントに共通であり、`x_n` は固定されていないフィールドです。
doc1: a b x_1 x_2doc2: a b x_2 x_3 x_4doc3: a b x_5
設定ファイルで、1 対 1 のマッピングが必要なフィールドを指定します。マージするフィールドには、ドキュメント内の既存のフィールドとは異なる新しい名前を割り当て、型を COMBINE に設定します。以下に示します。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]MaxCompute でのエクスポート結果は次のようになります。
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
COMBINE 型を使用して MongoDB ドキュメント内の複数のフィールドをマージした後、出力が MaxCompute にマッピングされると、共通フィールドは自動的に削除されます。ドキュメントのユニークなフィールドのみが保持されます。
たとえば、`a` と `b` はすべてのドキュメントで共通のフィールドです。COMBINE 型を使用してドキュメント doc1: a b x_1 x_2 のフィールドをマージすると、出力は {a,b,x_1,x_2} になるはずです。この結果が MaxCompute にマッピングされると、共通フィールド `a` と `b` は削除されます。最終的な出力は {x_1,x_2} になります。
データ型の例 2:複数レベルのネストされたドキュメントの再帰的解析
MongoDB ドキュメントに複数レベルのネストがある場合、document 型を設定して再帰的に処理できます。以下に例を示します:
MongoDB のソースデータ:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }MongoDB のカラム設定:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
この設定により、ソースのネストされたフィールド `a.b.c` の値が、宛先フィールド `c` に書き込まれます。同期タスクの実行後、宛先に書き込まれるデータは this is value です。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解できます。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、次の設定ガイドをご参照ください。
単一テーブルのオフライン同期タスクの設定
手順については、「コードレス UI でのタスク設定」および「コードエディタでのタスク設定」をご参照ください。
コードエディタでタスクを設定するためのすべてのパラメーターとスクリプトのデモについては、「付録:MongoDB スクリプトのデモとパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの設定
手順については、「Data Integration でのリアルタイム同期タスクの設定」および「DataStudio でのリアルタイム同期タスクの設定」をご参照ください。
データベース全体の同期タスクの設定
データベース全体のオフライン同期、データベース全体のフルおよび増分リアルタイム同期、シャーディングを伴うデータベース全体のリアルタイム同期など、データベース全体の同期タスクの設定方法については、「データベース全体のオフライン同期タスク」および「データベース全体のリアルタイム同期タスクの設定」をご参照ください。
ベストプラクティス
よくある質問
付録:MongoDB スクリプトのデモとパラメーターの説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプトに関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスク設定」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明しています。
Reader スクリプトのデモ
次のコードは、MongoDB からローカルの宛先にデータを抽出するジョブを設定する方法を示しています。パラメーターの詳細については、次の表をご参照ください。
コードを実行する際は、コメントを削除してください。
array から特定の要素を抽出することはサポートされていません。
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // データソースの名前。
"collectionName": "tag_data", // コレクションの名前。
"query": "", // データクエリフィルター。
"column": [
{
"name": "unique_id", // フィールドの名前。
"type": "string" // フィールドの型。
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" // タイムゾーン。
}
},
"errorLimit":{
"record":"0"// エラーレコードの数。
},
"speed":{
"throttle":true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
"concurrent":1, // 同時実行ジョブの数。
"mbps":"12"// データレートの制限。1 mbps は 1 MB/s に相当します。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}パラメーター | 説明 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加したデータソースの名前と同じである必要があります。 |
collectionName | MongoDB コレクションの名前。 |
hint | MongoDB は hint パラメーターをサポートしており、クエリオプティマイザーに特定のインデックスを使用してクエリを完了させるよう強制します。場合によっては、これによりクエリのパフォーマンスが向上することがあります。詳細については、「hint パラメーター」をご参照ください。以下に例を示します: |
column | MongoDB のドキュメントフィールドの名前。このパラメーターを配列として設定して、MongoDB の複数のフィールドを表します。
|
batchSize | 1 バッチで取得するレコード数。このパラメーターはオプションです。デフォルト値は |
cursorTimeoutInMs | カーソルのタイムアウト期間。このパラメーターはオプションです。デフォルト値は 説明
|
query | このパラメーターを使用して、返される MongoDB データの範囲を制限できます。次の日付形式のみがサポートされています。タイムスタンプ形式を直接使用することはサポートされていません。 説明
一般的な query の例を次に示します:
説明 MongoDB のクエリ構文の詳細については、MongoDB の公式ドキュメントをご参照ください。 |
splitFactor | データスキューが著しい場合は、splitFactor を増やして、同時実行数を増やさずにチャンクの粒度を小さくすることを検討してください。 |
Writer スクリプトのデモ
次のコードは、データを MongoDB に書き込むデータ同期ジョブを設定する方法を示しています。パラメーターの詳細については、次の表をご参照ください。
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// プラグインの名前。
"parameter": {
"datasource": "",// データソースの名前。
"column": [
{
"name": "_id",// カラムの名前。
"type": "ObjectId"// データ型。replaceKey が _id に設定されている場合、型は ObjectId に設定する必要があります。型を string に設定すると、置き換えは失敗します。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// 書き込みモード。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// 接続の名前。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// エラーレコードの数。
"record": "0"
},
"speed": {
"throttle": true,// throttle が false に設定されている場合、mbps パラメーターは効果がなく、データレートは制限されません。throttle が true に設定されている場合、データレートは制限されます。
"concurrent": 1,// 同時実行ジョブの数。
"mbps": "1"// データレートの制限。1 mbps は 1 MB/s に相当します。
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加したデータソースの名前と同じである必要があります。 | はい | なし |
collectionName | MongoDB コレクションの名前。 | はい | なし |
column | MongoDB のドキュメントフィールドの名前。このパラメーターを配列として設定して、MongoDB の複数のフィールドを表します。
| はい | なし |
writeMode | 転送中にデータを上書きするかどうかを指定します。これには isReplace と replaceKey が含まれます:
説明 isReplace が true に設定され、 これは、書き込まれるデータに | いいえ | なし |
preSql | MongoDB にデータを書き込む前の事前操作 (既存データのクリアなど) を表します。preSql が空の場合、事前操作は設定されません。preSql を設定する際は、JSON 構文要件に準拠していることを確認してください。 | いいえ | なし |
Data Integration ジョブが実行されると、設定された preSql が最初に実行されます。実際のデータ書き込みフェーズは、preSql が完了した後にのみ開始されます。preSql 自体は、書き込まれるデータの内容には影響しません。preSql パラメーターにより、Data Integration はべき等実行をサポートできます。たとえば、preSql は、ビジネスルールに従って各タスク実行前に既存データをクリーンアップできます。この場合、タスクが失敗した場合は、Data Integration ジョブを再実行するだけで済みます。
preSql の形式要件は次のとおりです:
事前操作のカテゴリを示すために、type フィールドを設定する必要があります。サポートされる値は drop と remove です。例:
"preSql":{"type":"remove"}。drop:コレクションとそのデータを削除します。collectionName パラメーターで指定されたコレクションが削除対象です。
remove:条件に基づいてデータを削除します。
json:JSON を使用してデータ削除の条件を定義できます。例:
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}。ここで、${last_day}は$[yyyy-mm-dd]形式の DataWorks スケジューリングパラメーターです。必要に応じて、他の MongoDB がサポートする条件付きオペレーター ($gt、$lt、$gte、$lte など)、論理演算子 (and、or など)、または関数 (max、min、sum、avg、ISODate など) を使用できます。Data Integration は、次の標準 MongoDB API を使用して、クエリを削除するためのデータ操作を実行します。
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);説明条件付きでデータを削除するには、JSON 設定形式を使用することを推奨します。
item:item でデータフィルタリングのためのカラム名 (name)、条件 (condition)、およびカラム値 (value) を設定できます。例:
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}。Data Integration は、設定した item 条件に基づいてクエリ条件を構築し、標準の MongoDB API を使用して削除を実行します。例:
col.deleteMany(query);。
preSql が認識されない場合、事前削除操作は実行されません。