DataWorks の Data Integration は、MongoDB Writer プラグインを提供しており、さまざまなデータソースからデータを読み取り、MongoDB に書き込むことができます。このトピックでは、Data Integration を使用して MaxCompute から MongoDB へデータをオフライン同期する方法について説明します。
前提条件
開始する前に、以下の前提条件が満たされていることを確認してください。
DataWorks がアクティブ化され、MaxCompute データソースが作成されていること。
この例でオフラインタスクを実行するには、Data Integration 専用リソースグループが必要です。Data Integration 専用リソースグループの購入と構成方法の詳細については、「Data Integration 専用リソースグループの使用」をご参照ください。
説明汎用リソースグループを使用することもできます。詳細については、「Serverless リソースグループの使用」をご参照ください。
サンプルデータテーブルの準備
この例では、オフラインデータ同期のために MongoDB コレクションと MaxCompute テーブルを準備します。
MaxCompute テーブルを準備し、データを追加します。
test_write_mongoという名前のパーティションテーブルを作成し、パーティションフィールドをptに設定します。CREATE TABLE IF NOT EXISTS test_write_mongo( id STRING , col_string STRING, col_int int, col_bigint bigint, col_decimal decimal, col_date DATETIME, col_boolean boolean, col_array string ) PARTITIONED BY (pt STRING) LIFECYCLE 10;値
20230215を持つパーティションを追加します。insert into test_write_mongo partition (pt='20230215') values ('11','name11',1,111,1.22,cast('2023-02-15 15:01:01' as datetime),true,'1,2,3');パーティションテーブルが正しく作成されていることを確認します。
SELECT * FROM test_write_mongo WHERE pt='20230215';
MaxCompute データが同期される MongoDB コレクションを準備します。
この例では、Alibaba Cloud ApsaraDB for MongoDB を使用します。名前が
test_write_mongoのコレクションを作成します。db.createCollection('test_write_mongo')
オフラインタスクの構成
ステップ 1: MongoDB データソースの追加
MongoDB データソースを追加します。データソースと Data Integration 専用リソースグループ間のネットワーク接続が確立されていることを確認してください。詳細については、「MongoDB データソースの構成」をご参照ください。
ステップ 2: オフライン同期ノードの作成とタスクの構成
DataWorks の DataStudio で、オフライン同期ノードを作成します。次に、ソースと宛先のパラメーターを構成します。以下の手順で説明されていないパラメーターには、デフォルト値を使用できます。詳細については、「コードレス UI での同期タスクの構成」をご参照ください。
同期タスクのネットワーク接続を構成します。
作成した MongoDB および MaxCompute データソースを選択します。構成した Data Integration 専用リソースグループを選択します。次に、ネットワーク接続をテストします。
タスクの構成: データソースの選択
準備した MaxCompute パーティションテーブルと MongoDB コレクションを選択します。
パラメーター
説明
[書き込みモード (上書き)]
転送中にデータを上書きするかどうかを指定します。これには、
書き込みモードおよびビジネス用プライマリキーが含まれます:書き込みモード:[いいえ] に設定すると、各レコードに対して挿入操作が実行されます。これはデフォルトオプションです。
「はい」に設定した場合、
ビジネス用プライマリキーを指定する必要があります。同じビジネス用プライマリキーを持つレコードに対して上書き操作が実行されます。
Business primary key: 各行のビジネス用プライマリキーを指定します。このキーは上書き操作に使用されます。複数のビジネス用プライマリキーを指定することはできません。通常、これは MongoDB のプライマリキーを指します。
説明Write modeが [はい] に設定され、`_id` 以外のフィールドがビジネスプライマリキーとして設定されている場合、ランタイムに次のようなエラーが発生する可能性があります:After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"。 これは、書き込まれるデータに `_id` が `replaceKey` と一致しないレコードが含まれているためです。 詳細については、「エラー: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"」をご参照ください。インポート前の文
事前実行ステートメント (PreSQL) の構成です。この構成は JSON フォーマットで、
typeおよびjsonのプロパティを含みます。type: **必須**。有効な値はremoveおよびdropです。なお、値は小文字である必要があります。json:typeがremoveの場合、このプロパティは必須です。構成構文は標準の MongoDB クエリです。詳細については、「ドキュメントのクエリ」をご参照ください。typeがdropの場合、このプロパティを設定する必要はありません。
タスクの構成: フィールドマッピング
MongoDB データソースの場合、デフォルトで [同一行マッピング] が使用されます。また、
をクリックしてソーステーブルフィールドを手動で編集することもできます。以下は手動編集の例です。{"name":"id","type":"string"} {"name":"col_string","type":"string"} {"name":"col_int","type":"long"} {"name":"col_bigint","type":"long"} {"name":"col_decimal","type":"double"} {"name":"col_date","type":"date"} {"name":"col_boolean","type":"bool"} {"name":"col_array","type":"array","splitter":","}フィールドを手動で編集すると、ソースフィールドと宛先フィールド間のマッピングがインターフェイスに表示されます。
ステップ 3: オフライン同期ノードの提出と公開
DataWorks で標準モードワークスペースを使用し、このオフライン同期タスクを定期的にスケジュールする場合は、オフライン同期ノードを本番環境に提出して公開する必要があります。詳細については、「タスクの公開」をご参照ください。
ステップ 4: オフライン同期ノードの実行と結果の表示
構成が完了したら、同期ノードを実行します。ノードが正常に実行された後、MongoDB コレクションに同期されたデータを表示できます。
付録: 同期中のデータ形式変換
タイプ値について
サポートされているタイプには、INT、LONG、DOUBLE、STRING、BOOL、DATE、およびARRAYが含まれます。
配列タイプについて
タイプを ARRAY に設定する場合、splitter プロパティを構成する必要があります。これにより、データは MongoDB の配列タイプに変換されます。例:
ソースデータは文字列です:
a,b,c。同期タスクの構成で、
typeをARRAYに、`splitter` を `,` に設定します。同期タスクが実行されると、MongoDB に書き込まれるデータは
["a","b","c"]になります。