すべてのプロダクト
Search
ドキュメントセンター

DataWorks:バッチタスクを使用した MongoDB へのデータ書き込み

最終更新日:Jun 22, 2026

DataWorks のデータ統合における MongoDB Writer プラグインは、他のデータソースから MongoDB へのデータ同期を可能にします。このトピックでは、MaxCompute から MongoDB へのバッチ同期を実行する方法について説明します。

前提条件

開始する前に、次の要件を満たしていることを確認してください。

  • DataWorks が有効化されており、MaxCompute データソースが追加されていること。

  • このチュートリアルでは、データ統合専用リソースグループを使用してバッチタスクを実行します。続行する前に、このリソースグループを購入して構成する必要があります。詳細については、「データ統合専用リソースグループの使用」をご参照ください。

    説明

    サーバーレスリソースグループを使用することもできます。詳細については、「サーバーレスリソースグループの使用」をご参照ください。

サンプルデータの準備

バッチ同期のために、MongoDB コレクションと MaxCompute テーブルを準備します。

  1. MaxCompute テーブルを作成し、データを追加します。

    1. test_write_mongo という名前のパーティションテーブルを作成します。パーティションフィールドを pt に設定します。

      CREATE TABLE IF NOT EXISTS test_write_mongo(
          id STRING ,
          col_string STRING,
          col_int int,
          col_bigint bigint,
          col_decimal decimal,
          col_date DATETIME,
          col_boolean boolean,
          col_array string
      ) PARTITIONED BY (pt STRING) LIFECYCLE 10;
    2. 値が 20230215 のパーティションを追加します。

      insert into test_write_mongo partition (pt='20230215')
      values ('11','name11',1,111,1.22,cast('2023-02-15 15:01:01' as datetime),true,'1,2,3');
    3. パーティションテーブルが正しく作成されたことを確認します。

      SELECT * FROM test_write_mongo
      WHERE pt='20230215';
  2. 送信先の MongoDB コレクションを準備します。

    このチュートリアルでは、ApsaraDB for MongoDB を例として使用します。test_write_mongo という名前のコレクションを作成します。

    db.createCollection('test_write_mongo')

バッチタスクの構成

ステップ 1: MongoDB データソースの追加

MongoDB データソースを追加し、それとデータ統合専用リソースグループとの間にネットワーク接続を確立します。詳細については、「MongoDB データソースの構成」をご参照ください。

ステップ 2: バッチ同期ノードの作成と構成

DataWorks の DataStudio で、バッチ同期ノードを作成し、そのソースと送信先を構成します。このセクションでは、主要なパラメーターのみを説明します。他のパラメーターはデフォルト値のままにしてください。詳細な手順については、「コードレス UI でのタスクの構成」をご参照ください。

  1. 同期のためのネットワーク接続を構成します。

    MongoDB データソース、MaxCompute データソース、およびデータ統合専用リソースグループを選択します。その後、接続性をテストします。

  2. ソースと送信先を構成します。

    MaxCompute のパーティションテーブルと MongoDB のコレクションを選択します。次の表に、主要なパラメーターを示します。

    パラメーター

    説明

    書き込みモード

    既存のデータを上書きするかどうかを決定します。この機能には、書き込みモードビジネスプライマリキー の 2 つの関連設定があります。

    • 書き込みモード

      • No を選択した場合、プラグインは各レコードを新しいドキュメントとして挿入します。これがデフォルトのオプションです。

      • Yes を選択した場合、ビジネスプライマリキー を指定する必要があります。プラグインは、同じビジネスプライマリキーを持つレコードを上書きします。

    • ビジネスプライマリキー:各レコードのビジネスプライマリキーを指定します。これは上書き操作に使用されます。指定できるフィールドは 1 つだけです。MongoDB では、これは通常 `_id` フィールドに対応します。

    説明

    書き込みモードYes に設定されていて、ビジネスプライマリキーとして _id 以外のフィールドを指定した場合、After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2" のようなエラーが発生することがあります。このエラーは、受信データに、ビジネスプライマリキーによって識別される既存のドキュメントの _id と一致しない _id を持つレコードが含まれているために発生します。詳細については、「エラー: After applying the update, the (immutable) field '_id' was found to have been altered to _id: "2"」をご参照ください。

    Pre-Import Statement

    インポートタスクの前に実行する事前条件 (PreSQL) を指定します。構成は JSON フォーマットである必要があり、type および json プロパティを含める必要があります。

    • type:必須。有効な値は removedrop です (小文字)。

    • json

      • typeremove に設定されている場合、このパラメーターは必須です。その値は、標準の MongoDB クエリである必要があります。詳細については、「Query Documents」をご参照ください。

      • typedrop に設定されている場合、このパラメーターは不要です。

  3. フィールドマッピングを構成します。

    データソースが MongoDB の場合、デフォルトで Peer mapping が使用されます。また、icon アイコンをクリックして、ソーステーブルのフィールドを手動で編集することもできます。以下は手動編集の例です。

    {"name":"id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int","type":"long"}
    {"name":"col_bigint","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_date","type":"date"}
    {"name":"col_boolean","type":"bool"}
    {"name":"col_array","type":"array","splitter":","}

    フィールドを手動で編集すると、UI にソースフィールドと送信先フィールドのマッピングが表示されます。

    ソースフィールド

    送信先フィールド

    id

    {"name":"id","type":"string"}

    col_string

    {"name":"col_string","type":"string"}

    col_int

    {"name":"col_int","type":"long"}

    col_bigint

    {"name":"col_bigint","type":"long"}

    col_decimal

    {"name":"col_decimal","type":"double"}

    col_date

    {"name":"col_date","type":"date"}

    col_boolean

    {"name":"col_boolean","type":"bool"}

    col_array

    {"name":"col_array","type":"array","splitter":","}

ステップ 3: バッチ同期ノードのコミットとデプロイ

標準モードのワークスペースでこのタスクを定期的に実行するには、バッチ同期ノードをコミットして本番環境にデプロイする必要があります。詳細については、「ノードのデプロイ」をご参照ください。

ステップ 4: バッチ同期ノードの実行と結果の表示

ノードを構成した後、実行します。タスクが完了したら、MongoDB コレクションで同期されたデータを表示します。このタスクは、ターゲットデータベースに次のフィールドと値を持つドキュメントレコードを作成します: _id63ecc513143e565c9b037623col_array[1, 2, 3]col_bigint111col_booleantruecol_date2023-02-15T07:01:01.000Zcol_decimal1.22col_int1col_stringname11id11 です。

付録: データ型の変換

type パラメーターでサポートされる値

type パラメーターでサポートされる値は、INTLONGDOUBLESTRINGBOOLDATE、および ARRAY です。

配列型

データを MongoDB 配列として書き込むには、type パラメーターを ARRAY に設定し、splitter プロパティを構成します。例:

  • ソースデータは文字列です: a,b,c

  • タスク構成では、`type` は ARRAY に設定され、`splitter` は , に設定されます。

  • タスクの実行後、MongoDB の結果データは次のようになります: ["a","b","c"]