DataWorks は、MongoDB との双方向のデータ同期をサポートしています。このトピックでは、DataWorks が MongoDB に対して提供するデータ同期機能について説明します。
サポートしているバージョン
DataWorks は、MongoDB バージョン 4.x、5.x、6.x、7.x、8.0 をサポートしています。
注意事項
MongoDB データベースには、そのデータベース用に作成されたアカウントを使用して接続します。ApsaraDB for MongoDB データソースを使用する場合、デフォルトで root アカウントが作成されます。セキュリティ上の理由から、MongoDB データソースを追加する際には root アカウントを使用しないことを推奨します。
MongoDB シャードクラスターを使用する場合、データソースには mongos ノードのアドレスを構成する必要があります。mongod/shard ノードのアドレスは構成しないでください。構成した場合、同期タスクがデータセット全体ではなく、指定された shard からのみデータをクエリする可能性があります。mongos と mongod の詳細については、mongos および mongod のドキュメントをご参照ください。
MongoDB のプライマリ/セカンダリクラスターはサポートされていません。
同時実行数が 1 より大きい場合、同期タスクに構成されたコレクション内のすべての
_idフィールドは、同じデータ型である必要があります。たとえば、すべての_idフィールドは、string 型または ObjectId 型である必要があります。そうでない場合、一部のデータが同期されない可能性があります。説明同時実行数が 1 より大きい場合、タスクは
_idフィールドに基づいて分割されます。そのため、このシナリオでは_idフィールドで混合データ型はサポートされません。_idフィールドに複数のデータ型が含まれている場合は、データ同期の同時実行数を 1 に設定してください。これを行うには、splitFactor パラメーターを構成しないか、splitFactor パラメーターを 1 に設定します。
データ統合は配列型をサポートしていません。ただし、MongoDB は配列型をサポートしており、強力なインデックス機能を提供しています。特定のパラメーターを構成して、文字列を MongoDB の配列に変換できます。変換後、データを MongoDB に並列で書き込むことができます。
自己管理型 MongoDB データベースは、パブリックネットワークアクセスをサポートしていません。Alibaba Cloud 内部ネットワーク経由でのみアクセスできます。
Docker を使用してデプロイされた MongoDB クラスターはサポートされていません。
データ統合は、query パラメーターを使用して指定された列からデータを読み取ることをサポートしていません。
バッチ同期タスクで、データ統合が MongoDB からフィールド構造を取得できない場合、データ統合はデフォルトで 6 つのフィールドのフィールドマッピングを生成します。フィールド名は
col1、col2、col3、col4、col5、col6です。タスク実行中、デフォルトでは
splitVectorコマンドがタスクのシャーディングに使用されます。一部の MongoDB バージョンはsplitVectorコマンドをサポートしていないため、no such cmd splitVectorエラーが発生する可能性があります。このエラーを防ぐには、タスク構成で
アイコンをクリックしてコードエディタに切り替え、MongoDB パラメーター構成に次のパラメーターを追加して splitVectorの使用を防止します。"useSplitVector" : false
サポートしているフィールドタイプ
MongoDB Reader がサポートする MongoDB のデータ型
データ統合は、すべての MongoDB データ型をサポートしているわけではありませんが、ほとんどのデータ型をサポートしています。ご使用のデータ型がサポートされていることを確認してください。
データ統合がサポートされているデータ型を読み取る際、次の操作を実行します:
基本データ型の場合、データ統合は column パラメーターで構成されたフィールドの名前に基づいて、対応するパスからデータを自動的に読み取ります。詳細については、「付録:MongoDB のサンプルスクリプトとパラメーターの説明」をご参照ください。データ統合はデータ型も自動的に変換します。列に type プロパティを指定する必要はありません。
タイプ
バッチ読み取り (MongoDB Reader)
説明
ObjectId
サポート
オブジェクト ID 型です。
Double
サポート
64 ビット浮動小数点数型です。
32-bit integer
サポート
32 ビット整数です。
64-bit integer
サポート
64 ビット整数です。
Decimal128
サポート
Decimal128 型です。
説明フィールドがネストされた型または combine 型として構成されている場合、JSON シリアル化中にオブジェクトとして処理されます。データを 10 進数として出力するには、
decimal128OutputTypeパラメーターを追加し、bigDecimalに設定する必要があります。String
サポート
文字列型です。
Boolean
サポート
ブール値型です。
Timestamp
サポート
タイムスタンプ型です。
説明BsonTimestamp はタイムスタンプを格納します。タイムゾーンの影響を考慮する必要はありません。詳細については、「MongoDB のタイムゾーン問題」をご参照ください。
Date
サポート
日付型です。
一部の複雑なデータ型については、列の type プロパティを構成してカスタム処理を実行できます。
タイプ
バッチ読み取り (MongoDB Reader)
説明
Document
サポート
埋め込みドキュメント型です。
type プロパティが構成されていない場合、Document は JSON シリアル化を使用して直接変換されます。
type プロパティが
documentに設定されている場合、フィールドはネストされた型になります。MongoDB Reader はパスに基づいて Document のプロパティを読み取ります。詳細な例については、後述の「例 2:多階層のネストされた Document の再帰的な解析」をご参照ください。
Array
サポート
配列型です。
type が
array.jsonまたはarraysに設定されている場合、データは JSON シリアル化を使用して直接処理されます。type が
arrayまたはdocument.arrayに設定されている場合、要素は文字列に連結されます。区切り文字は、列の splitter プロパティで指定され、デフォルトではカンマ (,) です。
重要データ統合は配列型をサポートしていません。ただし、MongoDB は配列型をサポートしており、強力なインデックス機能を提供しています。特定のパラメーターを構成して、文字列を MongoDB の配列に変換できます。変換後、データを MongoDB に並列で書き込むことができます。
データ統合の特殊なデータ型:combine
タイプ | バッチ読み取り (MongoDB Reader) | 説明 |
Combine | サポート | データ統合のカスタムデータ型です。 type が |
MongoDB Reader のデータ型マッピング
次の表に、MongoDB Reader の MongoDB データ型とデータ統合データ型のマッピングを示します。
変換後の型カテゴリ | MongoDB データ型 |
LONG | INT、LONG、document.INT、および document.LONG |
DOUBLE | DOUBLE および document.DOUBLE |
STRING | STRING、ARRAY、document.STRING、document.ARRAY、および COMBINE |
DATE | DATE および document.DATE |
BOOLEAN | BOOL および document.BOOL |
BYTES | BYTES および document.BYTES |
MongoDB Writer のデータ型マッピング
型カテゴリ | MongoDB データ型 |
Integer | INT および LONG |
Floating-point | DOUBLE |
String | STRING および ARRAY |
Date and time | DATE |
Boolean | BOOL |
Binary | BYTES |
例 1:combine 型の使用
MongoDB Reader プラグインの combine データ型を使用すると、MongoDB ドキュメント内の複数のフィールドを単一の JSON 文字列にマージできます。たとえば、3 つの MongoDB ドキュメントから MaxCompute にフィールドをインポートするとします。次の例では、フィールドはキーと値のペアではなくキーで表されます。フィールド a と b は 3 つのドキュメントすべてに共通しており、x_n は可変フィールドです。
doc1: a b x_1 x_2doc2: a b x_2 x_3 x_4doc3: a b x_5
構成ファイルでは、1 対 1 のマッピングが必要なフィールドを明示的に指定する必要があります。マージしたいフィールドには、ドキュメント内の既存のフィールド名とは異なる新しい名前を割り当て、type を COMBINE に設定します。次のコードに例を示します。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]次の表に、MaxCompute での最終的な出力を示します。
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
combine 型を使用して MongoDB ドキュメント内の複数のフィールドをマージした後、出力が MaxCompute にマッピングされると、共通フィールドは自動的に削除されます。ドキュメントのユニークなフィールドのみが保持されます。
たとえば、a と b はすべてのドキュメントに共通のフィールドです。ドキュメント doc1: a b x_1 x_2 のフィールドを combine 型を使用してマージすると、出力は {a,b,x_1,x_2} になります。この結果が MaxCompute にマッピングされると、共通フィールド a と b は削除されます。最終的な出力は {x_1,x_2} です。
例 2:多階層のネストされた Document の再帰的な解析
MongoDB のドキュメントに多階層のネストがある場合、document 型を構成して再帰的に解析できます。次のコードに例を示します。
MongoDB のソースデータ:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }MongoDB の列構成:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
上記の設定により、ネストされたソースフィールド a.b.c の値が宛先フィールド c に書き込まれます。同期タスクの実行後、宛先に書き込まれるデータは this is value です。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際、DataWorks コンソールでパラメーターのヒントを表示して、パラメーターの意味を理解できます。
データ同期タスクの開発
同期タスクの構成のエントリポイントと手順については、次の構成ガイドをご参照ください。
単一テーブルのバッチ同期タスクの構成
手順の詳細については、「コードレス UI でのタスクの構成」および「コードエディタでのタスクの構成」をご参照ください。
コードエディタのすべてのパラメーターとサンプルスクリプトの詳細については、「付録:MongoDB のサンプルスクリプトとパラメーターの説明」をご参照ください。
単一テーブルのリアルタイム同期タスクの構成
手順の詳細については、「データ統合でのリアルタイム同期タスクの構成」および「DataStudio でのリアルタイム同期タスクの構成」をご参照ください。
データベース全体の同期タスクの構成
データベース全体のバッチ同期、フルおよび増分リアルタイム同期、またはシャード化されたデータベースからのリアルタイム同期のタスクを構成できます。詳細については、「データベース全体のバッチ同期タスク」および「データベース全体のリアルタイム同期タスクの構成」をご参照ください。
ベストプラクティス
よくある質問
付録:MongoDB のサンプルスクリプトとパラメーターの説明
コードエディタを使用したバッチ同期タスクの構成
コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタでのタスクの構成」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して構成する必要があるパラメーターについて説明しています。
Reader スクリプトのサンプル
次のスクリプトは、MongoDB からローカル環境にデータを抽出するように構成されたジョブの例です。パラメーターの詳細については、後続のパラメーターの説明をご参照ください。
コードを実行する前に、コメントを削除してください。
array から指定された要素を抽出することはできません。
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", // データソース名。
"collectionName": "tag_data", // コレクション名。
"query": "", // データフィルタリングクエリ。
"column": [
{
"name": "unique_id", // フィールド名。
"type": "string" // フィールドタイプ。
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" // タイムゾーン。
}
},
"errorLimit":{
"record":"0"// エラーレコード数。
},
"speed":{
"throttle":true,// スロットルを有効にするかどうかを指定します。このパラメーターを false に設定すると、スロットルは無効になり、mbps パラメーターは効果がありません。このパラメーターを true に設定すると、スロットルが有効になります。
"concurrent":1, // 同時実行ジョブ数。
"mbps":"12"// スロットルレート。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}パラメーター | 説明 |
datasource | データソースの名前。コードエディタでは、このパラメーターの値は追加されたデータソースの名前と同じである必要があります。 |
collectionName | MongoDB コレクションの名前。 |
hint | MongoDB は hint パラメーターをサポートしており、これによりクエリオプティマイザーは特定のインデックスを使用してクエリを完了するように強制されます。場合によっては、これによりクエリのパフォーマンスが向上することがあります。詳細については、「hint パラメーター」をご参照ください。次のコードに例を示します: |
column | MongoDB のドキュメントフィールド名。複数のフィールドを表すために配列として構成します。
|
batchSize | バッチで取得するレコード数。このパラメーターはオプションです。デフォルト値: |
cursorTimeoutInMs | カーソルのタイムアウト期間。このパラメーターはオプションです。デフォルト値: 説明
|
query | このパラメーターを使用して、返される MongoDB データをフィルタリングできます。次の時間形式のみがサポートされています。UNIX タイムスタンプ形式は直接サポートされていません。 説明
query パラメーターの一般的な例を次に示します:
説明 MongoDB のクエリ構文の詳細については、MongoDB の公式ドキュメントをご参照ください。 |
splitFactor | 深刻なデータスキューが存在する場合、同時実行数を増やさずに、より細かい粒度のシャーディングを実現するために splitFactor を増やすことを検討してください。 |
Writer スクリプトのサンプル
次のスクリプトは、MongoDB にデータを書き込むように構成されたデータ同期ジョブの例です。パラメーターの詳細については、後続のパラメーターの説明をご参照ください。
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",// プラグイン名。
"parameter": {
"datasource": "",// データソース名。
"column": [
{
"name": "_id",// 列名。
"type": "ObjectId"// データ型。replaceKey が _id の場合、type を ObjectId に設定する必要があります。type を string に設定すると、置き換えは失敗します。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {// 書き込みモード。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"// コレクション名。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {// エラーレコード数。
"record": "0"
},
"speed": {
"throttle": true,// スロットルを有効にするかどうかを指定します。このパラメーターを false に設定すると、スロットルは無効になり、mbps パラメーターは効果がありません。このパラメーターを true に設定すると、スロットルが有効になります。
"concurrent": 1,// 同時実行ジョブ数。
"mbps": "1"// スロットルレート。1 mbps = 1 MB/s。
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでは、このパラメーターの値は追加されたデータソースの名前と同じである必要があります。 | はい | なし |
collectionName | MongoDB コレクションの名前。 | はい | なし |
column | MongoDB のドキュメントフィールド名。複数のフィールドを表すために配列として構成します。
| はい | なし |
writeMode | 転送中にデータを上書きするかどうかを指定します。これには isReplace と replaceKey が含まれます:
説明 isReplace が true に設定され、 これは、書き込まれるデータに | いいえ | なし |
preSql | MongoDB にデータを書き込む前に実行する事前操作。たとえば、既存データのクリアなどです。preSql が空の場合、事前操作は構成されません。preSql を構成する際は、その値が JSON 構文に準拠していることを確認してください。 | いいえ | なし |
データ統合ジョブを実行すると、構成された preSql が最初に実行されます。実際のデータ書き込みフェーズは、preSql の実行が完了した後にのみ開始されます。preSql パラメーターは、書き込まれるデータの内容には影響しません。preSql パラメーターは、データ統合にべき等実行を提供します。たとえば、ビジネスルールに基づいて各タスク実行前に既存データをクリアするために preSql を使用できます。この場合、タスクが失敗しても、データ統合ジョブを再実行するだけで済みます。
preSql の形式要件は次のとおりです:
type フィールドを構成して、事前操作のタイプを指定する必要があります。サポートされている値は drop と remove です。例:
"preSql":{"type":"remove"}。drop:コレクションとその中のデータを削除します。削除するコレクションは collectionName パラメーターで指定されます。
remove:条件に基づいてデータを削除します。
json:JSON オブジェクトを使用して、データ削除の条件を指定できます。例:
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}。この例では、${last_day}は$[yyyy-mm-dd]形式の DataWorks スケジューリングパラメーターです。必要に応じて、他の MongoDB がサポートする条件付きオペレーター (例:$gt、$lt、$gte、$lte)、論理演算子 (例:and、or)、または関数 (例:max、min、sum、avg、ISODate) も使用できます。データ統合は、次の標準 MongoDB API を使用してデータ削除クエリを実行します。
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);説明条件に基づいてデータを削除するには、JSON 構成を使用することを推奨します。
item:item にデータフィルタリングのための列名 (name)、条件 (condition)、および列の値 (value) を構成できます。例:
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}。データ統合は、構成された item 条件に基づいてクエリ条件を構築し、標準の MongoDB API を使用して削除を実行します。例:
col.deleteMany(query);。
preSql が認識されない場合、事前削除操作は実行されません。