DataWorks のデータ統合は、MongoDB からデータを読み取り、他のデータソースに同期するための MongoDB Reader プラグインを提供します。このトピックでは、データ統合を使用して MongoDB から MaxCompute にデータをバッチ同期する方法を説明します。
背景情報
このチュートリアルでは、ソースは MongoDB、ターゲットは MaxCompute です。開始する前に、MongoDB データを準備し、MaxCompute にターゲットテーブルを作成してください。
前提条件
開始する前に、次の要件を満たしていることを確認してください:
-
DataWorks を有効化し、MaxCompute データソースを作成済みであること。
-
このチュートリアルでは、データ統合専用リソースグループを使用してバッチタスクを実行します。データ統合専用リソースグループを購入して設定する必要があります。詳細については、「データ統合専用リソースグループの使用」をご参照ください。
説明サーバーレスリソースグループを使用することもできます。詳細については、「サーバーレスリソースグループの使用」をご参照ください。
サンプルデータとテーブルの準備
このチュートリアルでは、バッチ同期のために MongoDB コレクションと MaxCompute テーブルが必要です。
-
MongoDB コレクションを準備します。
このチュートリアルでは、例として ApsaraDB for MongoDB を使用します。次のコードは、MongoDB コレクションを準備する方法を説明します。
-
di_mongodb_conf_testという名前のコレクションを作成します。db.createCollection('di_mongodb_conf_test') -
このチュートリアル用のサンプルデータをコレクションに挿入します。
db.di_mongodb_conf_test.insertOne({ 'col_string':'mock string value', 'col_int32':NumberInt("1"), 'col_int32_min':NumberInt("-2147483648"), 'col_int32_max':NumberInt("2147483647"), 'col_int64':NumberLong("1234567890123456"), 'col_int64_min':NumberLong("-9223372036854775807"), 'col_int64_max':NumberLong("9223372036854775807"), 'col_decimal':NumberDecimal("9999999.4999999999"), 'col_double':9999999.99, 'col_boolean':true, 'col_timestamp':ISODate(), 'col_date':new Date(), 'col_array_to_json':['a','b'], 'col_array_to_join':['a','b'], 'col_doc':{ 'key_string':'mock string value', 'key_int32':NumberInt("1"), 'key_int32_min':NumberInt("-2147483648"), 'key_int32_max':NumberInt("2147483647"), 'key_int64':NumberLong("1234567890123456"), 'key_int64_min':NumberLong("-9223372036854775807"), 'key_int64_max':NumberLong("9223372036854775807"), 'key_decimal':NumberDecimal("9999999.4999999999"), 'key_double':9999999.99, 'key_boolean':true, 'key_timestamp':ISODate(), 'key_date':new Date(), 'key_array_to_json':['a','b'], 'key_array_to_join':['a','b'], }, 'col_extra_1':'this is extra 1', 'col_extra_2':'this is extra 2', }) -
MongoDB に挿入したデータをクエリします。
db.getCollection("di_mongodb_conf_test").find({})クエリ結果には、コレクション内の 1 つのテストドキュメントが表示されます。
_idは63dca714b8548a78e1dc3238です。このドキュメントには、さまざまなデータ型が含まれています:文字列 (col_string)、整数 (col_int32/col_int64)、浮動小数点 (col_double/col_decimal)、ブール値 (col_boolean)、日付/時刻 (col_date/col_timestamp)、配列 (col_array_to_join/col_array_to_json)。また、ネストされたドキュメントであるcol_docと、2 つの追加フィールドcol_extra_1とcol_extra_2も含まれます。
-
-
MaxCompute テーブルを準備します。
-
パーティションフィールドとして
ptを使用し、di_mongodb_conf_testという名前のパーティションテーブルを作成します。CREATE TABLE IF NOT EXISTS di_mongodb_conf_test ( `id` STRING ,`col_string` STRING ,`col_int32` INT ,`col_int32_min` INT ,`col_int32_max` INT ,`col_int64` BIGINT ,`col_int64_min` BIGINT ,`col_int64_max` BIGINT ,`col_decimal` DECIMAL(38,18) ,`col_double` DOUBLE ,`col_boolean` BOOLEAN ,`col_timestamp` TIMESTAMP ,`col_date` DATE ,`col_array_to_json` STRING ,`col_array_to_join` STRING ,`key_string` STRING ,`key_int32` INT ,`key_int32_min` INT ,`key_int32_max` INT ,`key_int64` BIGINT ,`key_int64_min` BIGINT ,`key_int64_max` BIGINT ,`key_decimal` DECIMAL(38,18) ,`key_double` DOUBLE ,`key_boolean` BOOLEAN ,`key_timestamp` TIMESTAMP ,`key_date` DATE ,`key_array_to_json` STRING ,`key_array_to_join` STRING ,`col_doc` STRING ,`col_combine` STRING ) PARTITIONED BY ( pt STRING ) LIFECYCLE 36500 ; -
値が
20230202のパーティションを追加します。alter table di_mongodb_conf_test add if not exists partition (pt='20230202'); -
パーティションテーブルが正しく作成されていることを確認します。
SELECT * FROM di_mongodb_conf_test WHERE pt='20230202';
-
バッチ同期タスクの設定
ステップ 1:MongoDB データソースの追加
MongoDB データソースを追加し、データソースとデータ統合専用リソースグループの間のネットワーク接続を確保してください。詳細については、「MongoDB データソースの追加」をご参照ください。
ステップ 2:バッチ同期ノードの作成と設定
DataWorks の DataStudio でバッチ同期ノードを作成し、ソースとターゲットを設定します。このセクションでは主要なパラメータを説明します。その他のパラメータは、デフォルト値のままでかまいません。詳細な手順については、「コードレス UI を使用したバッチ同期ノードの設定」をご参照ください。
-
ネットワーク接続を設定します。
MongoDB と MaxCompute のデータソース、および対応するデータ統合専用リソースグループを選択し、接続テストを実行します。
-
タスクの設定:データソースを選択します。
ソースとターゲットとして、MongoDB コレクションと MaxCompute のパーティションテーブルを選択します。
-
タスクの設定:フィールドをマッピングします。
データソースが MongoDB の場合、デフォルトで [Peer mapping] が使用されます。
アイコンをクリックして、ソーステーブルのフィールドを手動で編集することもできます。以下に手動編集の例を示します。{"name":"_id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int32","type":"long"} {"name":"col_int32_min","type":"long"} {"name":"col_int32_max","type":"long"} {"name":"col_int64","type":"long"} {"name":"col_int64_min","type":"long"} {"name":"col_int64_max","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_double","type":"double"} {"name":"col_boolean","type":"boolean"} {"name":"col_timestamp","type":"date"} {"name":"col_date","type":"date"} {"name":"col_array_to_json","type":"string"} {"name":"col_array_to_join","type":"array","splitter":","} {"name":"col_doc.key_string","type":"document.string"} {"name":"col_doc.key_int32","type":"document.long"} {"name":"col_doc.key_int32_min","type":"document.long"} {"name":"col_doc.key_int32_max","type":"document.long"} {"name":"col_doc.key_int64","type":"document.long"} {"name":"col_doc.key_int64_min","type":"document.long"} {"name":"col_doc.key_int64_max","type":"document.long"} {"name":"col_doc.key_decimal","type":"document.double"} {"name":"col_doc.key_double","type":"document.double"} {"name":"col_doc.key_boolean","type":"document.boolean"} {"name":"col_doc.key_timestamp","type":"document.date"} {"name":"col_doc.key_date","type":"document.date"} {"name":"col_doc.key_array_to_json","type":"document.string"} {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","} {"name":"col_doc","type":"string"} {"name":"col_combine","type":"combine"}フィールドを編集すると、UI にソースフィールドとターゲットフィールドのマッピングが表示されます。
ステップ 3:ノードのコミットとデプロイ
標準モードのワークスペースを使用していて、このタスクをスケジュール実行する場合は、ノードをコミットし、本番環境にデプロイしてください。詳細については、「タスクのデプロイ」をご参照ください。
ステップ 4:ノードの実行と結果の確認
ノードを設定した後に実行します。タスクが完了したら、MaxCompute テーブルで同期されたデータを確認します。同期データには、次のフィールドと値が含まれます: col_string (mock string value)、 col_int32 (1)、 col_int32_min (-2147483648)、 col_int32_max (2147483647)、 col_int64 (1234567890123456)、 col_decimal (9999999.4999999999)、 col_double (9999999.99)、 col_boolean (true)、 col_timestamp (2023-02-03 14:17:56.554)、 col_date (2023-02-03)、 col_array_to_json ([a, b])、 col_array_to_join (a,b)、 pt (20230202)。一部の数値フィールドはテキストとして保存されます。複合フィールドの値は次のとおりです:
-
col_docフィールドの内容は次のとおりです。{ "key_array_to_join": [ "a", "b" ], "key_array_to_json": [ "a", "b" ], "key_boolean": true, "key_date": "2023-02-03 14:17:56", "key_decimal": { "finite": true, "high": 3471149412795809792, "infinite": false, "low": 99999994999999999, "naN": false, "negative": false }, "key_double": 9999999.99, "key_int32": 1, "key_int32_max": 2147483647, "key_int32_min": -2147483648, "key_int64": 1234567890123456, "key_int64_max": 9223372036854775807, "key_int64_min": -9223372036854775807, "key_string": "mock string value", "key_timestamp": "2023-02-03 14:17:56" } -
col_combineフィールドの内容は次のとおりです。{ "col_extra_1": "this is extra 1", "col_extra_2": "this is extra 2" }
Decimal 型データの出力に関する問題については、「付録 2:ドキュメント内の Decimal 型の出力」をご参照ください。
付録 1:データ形式の変換
配列データを JSON 形式に変換:col_array_to_json
|
ソースの MongoDB データ |
フィールドマッピングの設定 |
MaxCompute への出力 |
|
フィールドマッピングの設定で |
|
配列を連結文字列に変換:col_array_to_join
|
ソースの MongoDB データ |
フィールドマッピングの設定 |
MaxCompute への出力 |
|
フィールドマッピングを設定する際、 |
|
ネストされたドキュメントフィールドの同期
|
ソースの MongoDB データ |
フィールドマッピングの設定 |
MaxCompute への出力 |
|
|
|
ドキュメントを JSON 文字列としてシリアル化
|
ソースの MongoDB データ |
フィールドマッピングの設定 |
MaxCompute への出力 |
|
フィールドマッピングを設定する際、 |
|
マッピングされていないフィールドを JSON としてシリアル化
|
ソースの MongoDB データ |
フィールドマッピングの設定 |
MaxCompute への出力 |
|
このドキュメントには 4 つのフィールドがあります。 |
|
付録 2:Decimal 型の出力
ドキュメントが JSON 形式にシリアル化されると、Decimal128 データはデフォルトで次のように出力されます。
{
"key_decimal":
{
"finite": true,
"high": 3471149412795809792,
"infinite": false,
"low": 99999994999999999,
"naN": false,
"negative": false
}
}
数値型として出力するには、次の手順に従ってください:
-
バッチ同期タスクを設定する際に、スクリプトモードに切り替えます。
-
Reader タスク設定を変更します:パラメータセクションに
decimal128OutputTypeパラメータを追加し、値をbigDecimalに設定します。"parameter": { "collectionName": "di_mongodb_conf_test", "decimal128OutputType":"bigDecimal" }, "name": "Reader", "category": "reader" -
バッチ同期タスクを再実行し、結果を確認します。
{ "key_decimal": "9999999.4999999999" }