DataWorks のデータ統合における MongoDB Writer プラグインは、他のデータソースから MongoDB へのデータ同期を可能にします。このトピックでは、MaxCompute から MongoDB へのバッチ同期を実行する方法について説明します。
前提条件
開始する前に、次の要件を満たしていることを確認してください。
-
DataWorks が有効化されており、MaxCompute データソースが追加されていること。
-
このチュートリアルでは、データ統合専用リソースグループを使用してバッチタスクを実行します。続行する前に、このリソースグループを購入して構成する必要があります。詳細については、「データ統合専用リソースグループの使用」をご参照ください。
説明サーバーレスリソースグループを使用することもできます。詳細については、「サーバーレスリソースグループの使用」をご参照ください。
サンプルデータの準備
バッチ同期のために、MongoDB コレクションと MaxCompute テーブルを準備します。
-
MaxCompute テーブルを作成し、データを追加します。
-
test_write_mongoという名前のパーティションテーブルを作成します。パーティションフィールドをptに設定します。CREATE TABLE IF NOT EXISTS test_write_mongo( id STRING , col_string STRING, col_int int, col_bigint bigint, col_decimal decimal, col_date DATETIME, col_boolean boolean, col_array string ) PARTITIONED BY (pt STRING) LIFECYCLE 10; -
値が
20230215のパーティションを追加します。insert into test_write_mongo partition (pt='20230215') values ('11','name11',1,111,1.22,cast('2023-02-15 15:01:01' as datetime),true,'1,2,3'); -
パーティションテーブルが正しく作成されたことを確認します。
SELECT * FROM test_write_mongo WHERE pt='20230215';
-
-
送信先の MongoDB コレクションを準備します。
このチュートリアルでは、ApsaraDB for MongoDB を例として使用します。
test_write_mongoという名前のコレクションを作成します。db.createCollection('test_write_mongo')
バッチタスクの構成
ステップ 1: MongoDB データソースの追加
MongoDB データソースを追加し、それとデータ統合専用リソースグループとの間にネットワーク接続を確立します。詳細については、「MongoDB データソースの構成」をご参照ください。
ステップ 2: バッチ同期ノードの作成と構成
DataWorks の DataStudio で、バッチ同期ノードを作成し、そのソースと送信先を構成します。このセクションでは、主要なパラメーターのみを説明します。他のパラメーターはデフォルト値のままにしてください。詳細な手順については、「コードレス UI でのタスクの構成」をご参照ください。
-
同期のためのネットワーク接続を構成します。
MongoDB データソース、MaxCompute データソース、およびデータ統合専用リソースグループを選択します。その後、接続性をテストします。
-
ソースと送信先を構成します。
MaxCompute のパーティションテーブルと MongoDB のコレクションを選択します。次の表に、主要なパラメーターを示します。
パラメーター
説明
書き込みモード
既存のデータを上書きするかどうかを決定します。この機能には、
書き込みモードとビジネスプライマリキーの 2 つの関連設定があります。-
書き込みモード:-
No を選択した場合、プラグインは各レコードを新しいドキュメントとして挿入します。これがデフォルトのオプションです。
-
Yes を選択した場合、
ビジネスプライマリキーを指定する必要があります。プラグインは、同じビジネスプライマリキーを持つレコードを上書きします。
-
-
ビジネスプライマリキー:各レコードのビジネスプライマリキーを指定します。これは上書き操作に使用されます。指定できるフィールドは 1 つだけです。MongoDB では、これは通常 `_id` フィールドに対応します。
説明書き込みモードが Yes に設定されていて、ビジネスプライマリキーとして _id 以外のフィールドを指定した場合、After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"のようなエラーが発生することがあります。このエラーは、受信データに、ビジネスプライマリキーによって識別される既存のドキュメントの _id と一致しない _id を持つレコードが含まれているために発生します。詳細については、「エラー: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"」をご参照ください。Pre-Import Statement
インポートタスクの前に実行する事前条件 (PreSQL) を指定します。構成は JSON フォーマットである必要があり、
typeおよびjsonプロパティを含める必要があります。-
type:必須。有効な値はremoveとdropです (小文字)。 -
json:-
typeがremoveに設定されている場合、このパラメーターは必須です。その値は、標準の MongoDB クエリである必要があります。詳細については、「Query Documents」をご参照ください。 -
typeがdropに設定されている場合、このパラメーターは不要です。
-
-
-
フィールドマッピングを構成します。
データソースが MongoDB の場合、デフォルトで Peer mapping が使用されます。また、
アイコンをクリックして、ソーステーブルのフィールドを手動で編集することもできます。以下は手動編集の例です。{"name":"id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int","type":"long"} {"name":"col_bigint","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_date","type":"date"} {"name":"col_boolean","type":"bool"} {"name":"col_array","type":"array","splitter":","}フィールドを手動で編集すると、UI にソースフィールドと送信先フィールドのマッピングが表示されます。
ソースフィールド
送信先フィールド
id
{"name":"id","type":"string"}col_string
{"name":"col_string","type":"string"}col_int
{"name":"col_int","type":"long"}col_bigint
{"name":"col_bigint","type":"long"}col_decimal
{"name":"col_decimal","type":"double"}col_date
{"name":"col_date","type":"date"}col_boolean
{"name":"col_boolean","type":"bool"}col_array
{"name":"col_array","type":"array","splitter":","}
ステップ 3: バッチ同期ノードのコミットとデプロイ
標準モードのワークスペースでこのタスクを定期的に実行するには、バッチ同期ノードをコミットして本番環境にデプロイする必要があります。詳細については、「ノードのデプロイ」をご参照ください。
ステップ 4: バッチ同期ノードの実行と結果の表示
ノードを構成した後、実行します。タスクが完了したら、MongoDB コレクションで同期されたデータを表示します。このタスクは、ターゲットデータベースに次のフィールドと値を持つドキュメントレコードを作成します: _id は 63ecc513143e565c9b037623、col_array は [1, 2, 3]、col_bigint は 111、col_boolean は true、col_date は 2023-02-15T07:01:01.000Z、col_decimal は 1.22、col_int は 1、col_string は name11、id は 11 です。
付録: データ型の変換
type パラメーターでサポートされる値
type パラメーターでサポートされる値は、INT、LONG、DOUBLE、STRING、BOOL、DATE、および ARRAY です。
配列型
データを MongoDB 配列として書き込むには、type パラメーターを ARRAY に設定し、splitter プロパティを構成します。例:
-
ソースデータは文字列です:
a,b,c。 -
タスク構成では、`type` は
ARRAYに設定され、`splitter` は,に設定されます。 -
タスクの実行後、MongoDB の結果データは次のようになります:
["a","b","c"]。