すべてのプロダクト
Search
ドキュメントセンター

DataWorks:バッチ同期による MongoDB データソースへのデータ書き込み

最終更新日:Feb 27, 2026

DataWorks の Data Integration は、MongoDB Writer プラグインを提供しており、さまざまなデータソースからデータを読み取り、MongoDB に書き込むことができます。このトピックでは、Data Integration を使用して MaxCompute から MongoDB へデータをオフライン同期する方法について説明します。

前提条件

開始する前に、以下の前提条件が満たされていることを確認してください。

  • DataWorks がアクティブ化され、MaxCompute データソースが作成されていること。

  • この例でオフラインタスクを実行するには、Data Integration 専用リソースグループが必要です。Data Integration 専用リソースグループの購入と構成方法の詳細については、「Data Integration 専用リソースグループの使用」をご参照ください。

    説明

    汎用リソースグループを使用することもできます。詳細については、「Serverless リソースグループの使用」をご参照ください。

サンプルデータテーブルの準備

この例では、オフラインデータ同期のために MongoDB コレクションと MaxCompute テーブルを準備します。

  1. MaxCompute テーブルを準備し、データを追加します。

    1. test_write_mongo という名前のパーティションテーブルを作成し、パーティションフィールドを pt に設定します。

      CREATE TABLE IF NOT EXISTS test_write_mongo(
          id STRING ,
          col_string STRING,
          col_int int,
          col_bigint bigint,
          col_decimal decimal,
          col_date DATETIME,
          col_boolean boolean,
          col_array string
      ) PARTITIONED BY (pt STRING) LIFECYCLE 10;
    2. 20230215 を持つパーティションを追加します。

      insert into test_write_mongo partition (pt='20230215')
      values ('11','name11',1,111,1.22,cast('2023-02-15 15:01:01' as datetime),true,'1,2,3');
    3. パーティションテーブルが正しく作成されていることを確認します。

      SELECT * FROM test_write_mongo
      WHERE pt='20230215';
  2. MaxCompute データが同期される MongoDB コレクションを準備します。

    この例では、Alibaba Cloud ApsaraDB for MongoDB を使用します。名前が test_write_mongo のコレクションを作成します。

    db.createCollection('test_write_mongo')

オフラインタスクの構成

ステップ 1: MongoDB データソースの追加

MongoDB データソースを追加します。データソースと Data Integration 専用リソースグループ間のネットワーク接続が確立されていることを確認してください。詳細については、「MongoDB データソースの構成」をご参照ください。

ステップ 2: オフライン同期ノードの作成とタスクの構成

DataWorks の DataStudio で、オフライン同期ノードを作成します。次に、ソースと宛先のパラメーターを構成します。以下の手順で説明されていないパラメーターには、デフォルト値を使用できます。詳細については、「コードレス UI での同期タスクの構成」をご参照ください。

  1. 同期タスクのネットワーク接続を構成します。

    作成した MongoDB および MaxCompute データソースを選択します。構成した Data Integration 専用リソースグループを選択します。次に、ネットワーク接続をテストします。

  2. タスクの構成: データソースの選択

    準備した MaxCompute パーティションテーブルと MongoDB コレクションを選択します。

    パラメーター

    説明

    [書き込みモード (上書き)]

    転送中にデータを上書きするかどうかを指定します。これには、書き込みモード および ビジネス用プライマリキー が含まれます:

    • 書き込みモード

      • [いいえ] に設定すると、各レコードに対して挿入操作が実行されます。これはデフォルトオプションです。

      • はい」に設定した場合、ビジネス用プライマリキーを指定する必要があります。同じビジネス用プライマリキーを持つレコードに対して上書き操作が実行されます。

    • Business primary key: 各行のビジネス用プライマリキーを指定します。このキーは上書き操作に使用されます。複数のビジネス用プライマリキーを指定することはできません。通常、これは MongoDB のプライマリキーを指します。

    説明

    Write mode[はい] に設定され、`_id` 以外のフィールドがビジネスプライマリキーとして設定されている場合、ランタイムに次のようなエラーが発生する可能性があります: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"。 これは、書き込まれるデータに `_id` が `replaceKey` と一致しないレコードが含まれているためです。 詳細については、「エラー: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"」をご参照ください。

    インポート前の文

    事前実行ステートメント (PreSQL) の構成です。この構成は JSON フォーマットで、type および json のプロパティを含みます。

    • type: **必須**。有効な値は remove および drop です。なお、値は小文字である必要があります。

    • json

      • typeremove の場合、このプロパティは必須です。構成構文は標準の MongoDB クエリです。詳細については、「ドキュメントのクエリ」をご参照ください。

      • typedrop の場合、このプロパティを設定する必要はありません。

  3. タスクの構成: フィールドマッピング

    MongoDB データソースの場合、デフォルトで [同一行マッピング] が使用されます。また、icon をクリックしてソーステーブルフィールドを手動で編集することもできます。以下は手動編集の例です。

    {"name":"id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int","type":"long"}
    {"name":"col_bigint","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_date","type":"date"}
    {"name":"col_boolean","type":"bool"}
    {"name":"col_array","type":"array","splitter":","}

    フィールドを手動で編集すると、ソースフィールドと宛先フィールド間のマッピングがインターフェイスに表示されます。

ステップ 3: オフライン同期ノードの提出と公開

DataWorks で標準モードワークスペースを使用し、このオフライン同期タスクを定期的にスケジュールする場合は、オフライン同期ノードを本番環境に提出して公開する必要があります。詳細については、「タスクの公開」をご参照ください。

ステップ 4: オフライン同期ノードの実行と結果の表示

構成が完了したら、同期ノードを実行します。ノードが正常に実行された後、MongoDB コレクションに同期されたデータを表示できます。结果数据2

付録: 同期中のデータ形式変換

タイプ値について

サポートされているタイプには、INTLONGDOUBLESTRINGBOOLDATE、およびARRAYが含まれます。

配列タイプについて

タイプを ARRAY に設定する場合、splitter プロパティを構成する必要があります。これにより、データは MongoDB の配列タイプに変換されます。例:

  • ソースデータは文字列です:a,b,c

  • 同期タスクの構成で、typeARRAY に、`splitter` を `,` に設定します。

  • 同期タスクが実行されると、MongoDB に書き込まれるデータは ["a","b","c"] になります。