DataWorks は MongoDB との双方向のデータ同期を提供します。このトピックでは、DataWorks が MongoDB に対して提供するデータ同期機能について説明します。
サポートバージョン
MongoDB バージョン 4.x、5.x、6.x、7.x、および 8.0 のみがサポートされます。
注意事項
-
MongoDB データベースに接続するには、そのデータベース用に作成されたアカウントを使用します。ApsaraDB for MongoDB データソースを使用する場合、デフォルトでルートアカウントが作成されます。セキュリティ上の理由から、MongoDB データソースを追加する際には、ルートアカウントを使用しないことを推奨します。
-
MongoDB シャードクラスターを使用する場合、データソースには mongos ノードのアドレスを設定する必要があります。mongod/shard ノードのアドレスは設定しないでください。設定した場合、同期タスクがデータセット全体ではなく、指定された shard からのみデータをクエリする可能性があります。mongos と mongod の詳細については、mongos および mongod のドキュメントをご参照ください。
-
同時実行数が 1 より大きい場合、同期タスクに設定されたコレクション内のすべての
_idフィールドは、同じデータ型である必要があります。たとえば、すべての_idフィールドは、string 型または ObjectId 型のいずれかである必要があります。そうでない場合、一部のデータが同期されない可能性があります。説明-
同時実行数が 1 より大きい場合、タスクは
_idフィールドに基づいて分割されます。そのため、このシナリオでは_idフィールドに複数のデータ型が混在することはサポートされていません。_idフィールドに複数のデータ型が含まれている場合は、データ同期の同時実行数を 1 に設定してください。そのためには、splitFactor パラメーターを設定しないか、splitFactor パラメーターを 1 に設定します。
-
-
Data Integration は配列型をサポートしていません。ただし、MongoDB は配列型をサポートし、強力なインデックス機能を提供します。特定のパラメーターを設定して、文字列を MongoDB の配列に変換できます。変換後、データを MongoDB に並列で書き込むことができます。
-
自己管理型 MongoDB データベースは、パブリックネットワークアクセスをサポートしていません。Alibaba Cloud 内部ネットワーク経由でのみアクセスできます。
-
Docker を使用してデプロイされた MongoDB クラスターはサポートされていません。
-
query パラメーターを使用して特定の列からデータを読み取ることはできません。
-
バッチ同期タスクで、Data Integration が MongoDB からフィールド構造を取得できない場合、Data Integration はデフォルトで 6 つのフィールドのフィールドマッピングを生成します。フィールド名は
col1、col2、col3、col4、col5、およびcol6です。 -
タスク実行中、デフォルトでは
splitVectorコマンドがタスクのシャーディングに使用されます。一部の MongoDB バージョンではsplitVectorコマンドがサポートされていないため、no such cmd splitVectorエラーが発生する可能性があります。このエラーを防ぐには、タスク設定で
アイコンをクリックしてコードエディタに切り替え、MongoDB パラメーター設定に次のパラメーターを追加して splitVectorの使用を無効にします。"useSplitVector" : false
サポートされるフィールドタイプ
MongoDB Reader でサポートされる MongoDB データ型
Data Integration は、すべての MongoDB データ型をサポートしているわけではありませんが、ほとんどのデータ型をサポートしています。ご使用のデータ型がサポートされていることを確認してください。
Data Integration がサポートされているデータ型を読み取る際、次の操作を実行します。
-
基本データ型の場合、Data Integration は column パラメーターで設定されたフィールドの名前に基づいて、対応するパスからデータを自動的に読み取ります。詳細については、「付録: MongoDB のサンプルスクリプトとパラメーターの説明」をご参照ください。Data Integration はデータ型も自動的に変換します。列の type プロパティを指定する必要はありません。
タイプ
バッチ読み取り (MongoDB Reader)
説明
ObjectId
サポート
オブジェクト ID 型です。
Double
サポート
64 ビット浮動小数点数型です。
32-bit integer
サポート
32 ビット整数です。
64-bit integer
サポート
64 ビット整数です。
Decimal128
サポート
Decimal128 型です。
説明フィールドがネストされた型または combine 型として設定されている場合、JSON シリアル化中にオブジェクトとして処理されます。データを 10 進数として出力するには、
decimal128OutputTypeパラメーターを追加し、bigDecimalに設定する必要があります。String
サポート
文字列型です。
Boolean
サポート
ブール型です。
Timestamp
サポート
タイムスタンプ型です。
説明BsonTimestamp はタイムスタンプを格納します。タイムゾーンの影響を考慮する必要はありません。詳細については、「MongoDB のタイムゾーンの問題」をご参照ください。
Date
サポート
日付型です。
-
一部の複雑なデータ型については、列の type プロパティを設定してカスタム処理を実行できます。
タイプ
バッチ読み取り (MongoDB Reader)
説明
Document
サポート
埋め込みドキュメント型です。
-
type プロパティが設定されていない場合、Document は JSON シリアル化を使用して直接変換されます。
-
type プロパティが
documentに設定されている場合、フィールドはネストされた型になります。MongoDB Reader はパスに基づいて Document のプロパティを読み取ります。詳細な例については、後述の「例 2: 多階層にネストされた Document の再帰的解析」をご参照ください。
Array
サポート
配列型です。
-
type が
array.jsonまたはarraysに設定されている場合、データは JSON シリアル化を使用して直接処理されます。 -
type が
arrayまたはdocument.arrayに設定されている場合、要素は文字列に連結されます。区切り文字は、column の splitter プロパティで指定され、デフォルトはカンマ (,) です。
重要Data Integration は配列型をサポートしていません。ただし、MongoDB は配列型をサポートし、強力なインデックス機能を提供します。特定のパラメーターを設定して、文字列を MongoDB の配列に変換できます。変換後、データを MongoDB に並列で書き込むことができます。
-
Data Integration の特殊なデータ型: combine
|
タイプ |
バッチ読み取り (MongoDB Reader) |
説明 |
|
Combine |
サポート |
Data Integration のカスタムデータ型です。 type が |
MongoDB Reader のデータ型マッピング
次の表に、MongoDB Reader の MongoDB データ型と Data Integration データ型のマッピングを示します。
|
変換後の型カテゴリ |
MongoDB データ型 |
|
LONG |
INT、LONG、document.INT、および document.LONG |
|
DOUBLE |
DOUBLE および document.DOUBLE |
|
STRING |
STRING、ARRAY、document.STRING、document.ARRAY、および COMBINE |
|
DATE |
DATE および document.DATE |
|
BOOLEAN |
BOOL および document.BOOL |
|
BYTES |
BYTES および document.BYTES |
MongoDB Writer のデータ型マッピング
|
型カテゴリ |
MongoDB データ型 |
|
Integer |
INT および LONG |
|
Floating-point |
DOUBLE |
|
String |
STRING および ARRAY |
|
Date and time |
DATE |
|
Boolean |
BOOL |
|
Binary |
BYTES |
例 1: combine 型の使用
MongoDB Reader プラグインの combine データ型を使用すると、MongoDB ドキュメント内の複数のフィールドを単一の JSON 文字列にマージできます。たとえば、3 つの MongoDB ドキュメントから MaxCompute にフィールドをインポートするとします。次の例では、フィールドはキーと値のペアではなくキーで表されます。フィールド a と b は 3 つのドキュメントすべてに共通であり、x_n は可変フィールドです。
-
doc1: a b x_1 x_2 -
doc2: a b x_2 x_3 x_4 -
doc3: a b x_5
設定ファイルでは、1 対 1 のマッピングが必要なフィールドを明示的に指定する必要があります。マージするフィールドには、ドキュメント内の既存のフィールド名とは異なる新しい名前を割り当て、type を COMBINE に設定します。次のコードに例を示します。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]
次の表に、MaxCompute での最終的な出力を示します。
|
odps_column1 |
odps_column2 |
odps_column3 |
|
a |
b |
{x_1,x_2} |
|
a |
b |
{x_2,x_3,x_4} |
|
a |
b |
{x_5} |
combine 型を使用して MongoDB ドキュメント内の複数のフィールドをマージした後、出力が MaxCompute にマッピングされると、共通フィールドは自動的に削除されます。ドキュメントの固有フィールドのみが保持されます。
たとえば、a と b はすべてのドキュメントに共通のフィールドです。ドキュメント doc1: a b x_1 x_2 のフィールドを combine 型を使用してマージすると、出力は {a,b,x_1,x_2} になります。この結果が MaxCompute にマッピングされると、共通フィールド a と b は削除されます。最終的な出力は {x_1,x_2} になります。
例 2: 多階層にネストされた Document の再帰的解析
MongoDB のドキュメントに複数のネストレベルがある場合、document 型を設定して再帰的に解析できます。次のコードに例を示します。
-
MongoDB のソースデータ:
{ "name": "name1", "a": { "b": { "c": "this is value" } } } -
MongoDB の列設定:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}設定が完了すると、ソースフィールドと宛先フィールドは次のようにマッピングされます:
_idはidに、nameはnameに、a.b.cはcにマッピングされます。
上記の設定により、ネストされたソースフィールド a.b.c の値が宛先フィールド c に書き込まれます。同期タスクが実行されると、宛先に書き込まれるデータは this is value になります。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、パラメーターの意味を理解できます。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、次の設定ガイドをご参照ください。
単一テーブルのバッチ同期タスクの設定
-
手順の詳細については、「コードレス UI でのタスクの設定」および「コードエディタでのタスクの設定」をご参照ください。
-
コードエディタのすべてのパラメーターとサンプルスクリプトの詳細については、「付録: MongoDB のサンプルスクリプトとパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの設定
「単一テーブルのリアルタイム同期タスクの設定」をご参照ください。
データベース全体の同期タスクの設定
データベース全体のバッチ同期、完全および増分リアルタイム同期、またはシャード化されたデータベースからのリアルタイム同期のタスクを設定できます。詳細については、「データベース全体のバッチ同期タスク」および「データベース全体のリアルタイム同期タスクの設定」をご参照ください。
ベストプラクティス
よくある質問
付録: MongoDB のサンプルスクリプトとパラメーターの説明
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプトに関連パラメーターを設定する必要があります。詳細については、「スクリプトモードでの設定」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトのサンプル
次のスクリプトは、MongoDB からローカル環境にデータを抽出するように設定されたタスクの例です。パラメーターの詳細については、後続のパラメーターの説明をご参照ください。
-
コードを実行する前に、コメントを削除してください。
-
array から特定の要素を抽出することはできません。
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // データソース名。
"collectionName": "tag_data", // コレクション名。
"query": "", // データフィルタリングクエリ。
"column": [
{
"name": "unique_id", // フィールド名。
"type": "string" // フィールドタイプ。
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" // タイムゾーン。
}
},
"errorLimit":{
"record":"0"// エラーレコード数。
},
"speed":{
"throttle":true,// 速度制限を有効にするかどうかを指定します。このパラメーターを false に設定すると、速度制限は無効になり、mbps パラメーターは有効になりません。このパラメーターを true に設定すると、速度制限が有効になります。
"concurrent":1, // 同時実行タスク数。
"mbps":"12"// 速度制限レート。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
|
パラメーター |
説明 |
|
datasource |
データソースの名前。コードエディタでは、このパラメーターの値は追加されたデータソースの名前と同じである必要があります。 |
|
collectionName |
MongoDB コレクションの名前。 |
|
hint |
hint パラメーターは、クエリオプティマイザーにクエリに対して特定のインデックスを使用させ、パフォーマンスを向上させることができます。詳細については、「hint パラメーター」をご参照ください。例:
|
|
column |
MongoDB から読み取るドキュメントフィールドを指定する配列。
|
|
batchSize |
バッチで取得するレコード数。このパラメーターはオプションです。デフォルト値: |
|
cursorTimeoutInMs |
カーソルのタイムアウト期間。このパラメーターはオプションです。デフォルト値: 説明
|
|
query |
このパラメーターを使用して、返される MongoDB データをフィルタリングします。指定された時間形式のみがサポートされます。UNIX タイムスタンプ形式は直接サポートされていません。 説明
query パラメーターの一般的な例を次に示します。
説明
MongoDB のクエリ構文の詳細については、MongoDB の公式ドキュメントをご参照ください。 |
|
splitFactor |
深刻なデータスキューが存在する場合、同時実行数を増やさずに、より細かい粒度のシャーディングを実現するために splitFactor を増やすことを検討してください。 |
Writer スクリプトのサンプル
次のスクリプトは、MongoDB にデータを書き込むように設定されたデータ同期タスクの例です。パラメーターの詳細については、後続のパラメーターの説明をご参照ください。
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// プラグイン名。
"parameter": {
"datasource": "",// データソース名。
"column": [
{
"name": "_id",// 列名。
"type": "ObjectId"// データ型。replaceKey が _id の場合、type を ObjectId に設定する必要があります。type を string に設定すると、置き換えは失敗します。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// 書き込みモード。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// コレクション名。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// エラーレコード数。
"record": "0"
},
"speed": {
"throttle": true,// 速度制限を有効にするかどうかを指定します。このパラメーターを false に設定すると、速度制限は無効になり、mbps パラメーターは有効になりません。このパラメーターを true に設定すると、速度制限が有効になります。
"concurrent": 1,// 同時実行タスク数。
"mbps": "1"// 速度制限レート。1 mbps = 1 MB/s。
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer スクリプトのパラメーター
|
パラメーター |
説明 |
必須 |
デフォルト値 |
|
datasource |
データソースの名前。コードエディタでは、このパラメーターの値は追加されたデータソースの名前と同じである必要があります。 |
はい |
なし |
|
collectionName |
MongoDB コレクションの名前。 |
はい |
なし |
|
column |
MongoDB に書き込むドキュメントフィールドを指定する配列。
|
はい |
なし |
|
writeMode |
転送中にデータを上書きするかどうかを指定します。これには isReplace と replaceKey が含まれます:
説明
isReplace が true に設定され、
これは、書き込まれるデータに |
いいえ |
なし |
|
preSql |
MongoDB にデータを書き込む前に実行する事前操作 (既存データのクリアなど)。preSql が空の場合、事前操作は設定されません。preSql を設定する場合、その値が JSON 構文に準拠していることを確認してください。 |
いいえ |
なし |
Data Integration タスクを実行すると、設定された preSql が最初に実行されます。実際のデータ書き込みフェーズは、preSql の実行が完了した後にのみ開始されます。preSql パラメーターは、書き込まれるデータの内容には影響しません。preSql パラメーターは、Data Integration にべき等実行を提供します。たとえば、preSql を使用して、ビジネスルールに基づいて各タスク実行前に既存データをクリアできます。この場合、タスクが失敗した場合は、Data Integration タスクを再実行するだけで済みます。
preSql の形式要件は次のとおりです。
-
type フィールドを設定して、事前操作のタイプを指定する必要があります。サポートされている値は drop と remove です。例:
"preSql":{"type":"remove"}。-
drop: コレクションとその中のデータを削除します。削除するコレクションは collectionName パラメーターで指定されます。
-
remove: 条件に基づいてデータを削除します。
-
json: JSON オブジェクトを使用して、データ削除の条件を指定できます。例:
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}。この例では、${last_day}は$[yyyy-mm-dd]形式の DataWorks スケジューリングパラメーターです。必要に応じて、他の MongoDB がサポートする条件演算子 (例: $gt、$lt、$gte、$lte)、論理演算子 (例: and、or)、または関数 (例: max、min、sum、avg、ISODate) も使用できます。Data Integration は、次の標準 MongoDB API を使用してデータ削除を実行します。
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);説明条件に基づいてデータを削除するには、JSON 設定を使用することを推奨します。
-
item: item にデータフィルタリングのための列名 (name)、条件 (condition)、および列の値 (value) を設定できます。例:
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}。Data Integration は、設定された item 条件に基づいてクエリ条件を構築し、標準の MongoDB API を使用して削除を実行します。例:
col.deleteMany(query);。
-
-
preSql が認識されない場合、事前削除操作は実行されません。