DataWorks のデータ統合は、MongoDB からデータを読み取り、他のデータソースと同期するための MongoDB Reader プラグインを提供します。この Topic では、データ統合を使用して MongoDB から MaxCompute にデータをオフライン同期する例を説明します。
背景情報
このチュートリアルでは、ソースデータソースは MongoDB、宛先データソースは MaxCompute です。開始する前に、同期用の MongoDB データを準備し、同期されたデータを格納するための MaxCompute テーブルを作成してください。
前提条件
以下の前提条件が満たされていることを確認してください。
DataWorks を有効化し、MaxCompute データソースを作成済みであること。
オフラインタスクを実行するには、データ統合専用リソースグループが必要です。続行する前に、このリソースグループを購入して設定する必要があります。詳細については、「データ統合専用リソースグループの使用」をご参照ください。
説明汎用リソースグループも使用できます。詳細については、「サーバーレスリソースグループの使用」をご参照ください。
サンプルデータテーブルの準備
オフラインデータ同期のために、MongoDB のデータコレクションと MaxCompute のテーブルを準備します。
MongoDB データコレクションを準備します。
このチュートリアルでは、ApsaraDB for MongoDB を例として使用します。以下のコードは、MongoDB データコレクションの準備方法を示しています。
di_mongodb_conf_testという名前のデータコレクションを作成します。db.createCollection('di_mongodb_conf_test')このチュートリアルのサンプルデータをデータコレクションに挿入します。
db.di_mongodb_conf_test.insertOne({ 'col_string':'mock string value', 'col_int32':NumberInt("1"), 'col_int32_min':NumberInt("-2147483648"), 'col_int32_max':NumberInt("2147483647"), 'col_int64':NumberLong("1234567890123456"), 'col_int64_min':NumberLong("-9223372036854775807"), 'col_int64_max':NumberLong("9223372036854775807"), 'col_decimal':NumberDecimal("9999999.4999999999"), 'col_double':9999999.99, 'col_boolean':true, 'col_timestamp':ISODate(), 'col_date':new Date(), 'col_array_to_json':['a','b'], 'col_array_to_join':['a','b'], 'col_doc':{ 'key_string':'mock string value', 'key_int32':NumberInt("1"), 'key_int32_min':NumberInt("-2147483648"), 'key_int32_max':NumberInt("2147483647"), 'key_int64':NumberLong("1234567890123456"), 'key_int64_min':NumberLong("-9223372036854775807"), 'key_int64_max':NumberLong("9223372036854775807"), 'key_decimal':NumberDecimal("9999999.4999999999"), 'key_double':9999999.99, 'key_boolean':true, 'key_timestamp':ISODate(), 'key_date':new Date(), 'key_array_to_json':['a','b'], 'key_array_to_join':['a','b'], }, 'col_extra_1':'this is extra 1', 'col_extra_2':'this is extra 2', })MongoDB に挿入されたデータをクエリします。
db.getCollection("di_mongodb_conf_test").find({})クエリ結果は次の図のようになります。

MaxCompute テーブルを準備します。
di_mongodb_conf_testという名前のパーティションテーブルを作成します。パーティションフィールドはptです。CREATE TABLE IF NOT EXISTS di_mongodb_conf_test ( `id` STRING ,`col_string` STRING ,`col_int32` INT ,`col_int32_min` INT ,`col_int32_max` INT ,`col_int64` BIGINT ,`col_int64_min` BIGINT ,`col_int64_max` BIGINT ,`col_decimal` DECIMAL(38,18) ,`col_double` DOUBLE ,`col_boolean` BOOLEAN ,`col_timestamp` TIMESTAMP ,`col_date` DATE ,`col_array_to_json` STRING ,`col_array_to_join` STRING ,`key_string` STRING ,`key_int32` INT ,`key_int32_min` INT ,`key_int32_max` INT ,`key_int64` BIGINT ,`key_int64_min` BIGINT ,`key_int64_max` BIGINT ,`key_decimal` DECIMAL(38,18) ,`key_double` DOUBLE ,`key_boolean` BOOLEAN ,`key_timestamp` TIMESTAMP ,`key_date` DATE ,`key_array_to_json` STRING ,`key_array_to_join` STRING ,`col_doc` STRING ,`col_combine` STRING ) PARTITIONED BY ( pt STRING ) LIFECYCLE 36500 ;値が
20230202のパーティションを追加します。alter table di_mongodb_conf_test add if not exists partition (pt='20230202');パーティションテーブルが作成されたことを確認します。
SELECT*FROM di_mongodb_conf_test WHEREpt='20230202';
オフラインタスクの設定
ステップ 1: MongoDB データソースの追加
MongoDB データソースを追加し、データソースとデータ統合専用リソースグループの間にネットワーク接続があることを確認します。詳細については、「MongoDB データソースの設定」をご参照ください。
ステップ 2: オフライン同期ノードの作成と同期タスクの設定
DataWorks の DataStudio で、オフライン同期ノードを作成します。タスクのソース、宛先、およびその他のパラメーターを設定します。主要な設定ステップを以下に説明します。その他のパラメーターにはデフォルトの値を使用できます。詳細については、「コードレス UI でのタスクの設定」をご参照ください。
ネットワーク接続を設定します。
前のステップで作成した MongoDB および MaxCompute のデータソースと、対応するデータ統合専用リソースグループを選択します。その後、接続性をテストします。
タスクの設定:データソースの選択
準備した MongoDB データコレクションと MaxCompute パーティションテーブルを選択します。
タスクの設定:フィールドマッピング
MongoDB データソースの場合、デフォルトでは [同名マッピング] が使用されます。
アイコンをクリックして、ソーステーブルのフィールドを手動で編集することもできます。以下のコードは、手動編集の例です。{"name":"_id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int32","type":"long"} {"name":"col_int32_min","type":"long"} {"name":"col_int32_max","type":"long"} {"name":"col_int64","type":"long"} {"name":"col_int64_min","type":"long"} {"name":"col_int64_max","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_double","type":"double"} {"name":"col_boolean","type":"boolean"} {"name":"col_timestamp","type":"date"} {"name":"col_date","type":"date"} {"name":"col_array_to_json","type":"string"} {"name":"col_array_to_join","type":"array","splitter":","} {"name":"col_doc.key_string","type":"document.string"} {"name":"col_doc.key_int32","type":"document.long"} {"name":"col_doc.key_int32_min","type":"document.long"} {"name":"col_doc.key_int32_max","type":"document.long"} {"name":"col_doc.key_int64","type":"document.long"} {"name":"col_doc.key_int64_min","type":"document.long"} {"name":"col_doc.key_int64_max","type":"document.long"} {"name":"col_doc.key_decimal","type":"document.double"} {"name":"col_doc.key_double","type":"document.double"} {"name":"col_doc.key_boolean","type":"document.boolean"} {"name":"col_doc.key_timestamp","type":"document.date"} {"name":"col_doc.key_date","type":"document.date"} {"name":"col_doc.key_array_to_json","type":"document"} {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","} {"name":"col_doc","type":"string"} {"name":"col_combine","type":"combine"}フィールドを手動で編集すると、インターフェイスにソースと宛先のフィールド間のマッピングが表示されます。
ステップ 3: オフライン同期ノードの送信と公開
標準モードの DataWorks ワークスペースを使用し、オフライン同期タスクを本番環境で定期的に実行するようにスケジュールしたい場合は、ノードを送信して公開する必要があります。詳細については、「タスクの公開」をご参照ください。
ステップ 4: オフライン同期ノードの実行と結果の表示
設定が完了したら、同期ノードを実行します。ノードが正常に実行された後、MaxCompute テーブルに同期されたデータを表示できます。
col_docフィールドの内容は次の図のようになります。
col_combineフィールドの内容は次の図のようになります。
10 進数型の出力の詳細については、「付録 2: ドキュメント内の Decimal 型の出力」をご参照ください。
付録 1: 同期中のデータフォーマット変換
配列データを JSON フォーマットに変換:col_array_to_json
MongoDB の生データ | フィールドマッピングの設定 | MaxCompute への出力 |
| フィールドマッピングを設定する際に、 | |
配列データを連結文字列に変換:col_array_to_join
MongoDB の生データ | フィールドマッピングの設定 | MaxCompute への出力 |
| フィールドマッピングを設定する際に、 | |
ネストされたドキュメントから指定されたフィールドを読み取る
MongoDB の生データ | フィールドマッピングの設定 | MaxCompute への出力 |
|
| |
JSON シリアル化出力としてのドキュメントデータ
MongoDB の生データ | フィールドマッピングの設定 | MaxCompute への出力 |
| フィールドマッピングを設定する際に、 | |
残りのドキュメントフィールドを JSON フォーマットにシリアル化する
MongoDB の生データ | フィールドマッピングの設定 | MaxCompute への出力 |
| ドキュメントには 4 つのフィールドがあります。タスクには 2 つの非 combine 型フィールド (`col_1` と `col_2`) が設定されています。同期タスクは、`col_1` と `col_2` 以外のすべてのフィールドを JSON フォーマットにシリアル化して結果を出力します。 | |
付録 2: ドキュメント内の Decimal 型の出力
ドキュメントが JSON フォーマットにシリアル化されると、Decimal128 型のフィールドはデフォルトで次のように出力されます。
{
"key_decimal":
{
"finite": true,
"high": 3471149412795809792,
"infinite": false,
"low": 99999994999999999,
"naN": false,
"negative": false
}
}値を数値型として出力するには、次のステップを実行します。
オフライン同期タスクを設定する際、[スクリプトモードに変換] ボタンをクリックしてコードエディタに切り替えることができます。
Reader タスクの設定を変更します。パラメーターセクションで、
decimal128OutputTypeパラメーターを追加し、その値をbigDecimalに設定します。
オフライン同期タスクを再実行し、結果を表示します。
{ "key_decimal": "9999999.4999999999" }