すべてのプロダクト
Search
ドキュメントセンター

DataWorks:バッチ同期を使用した MongoDB からのデータ読み取り

最終更新日:Jun 22, 2026

DataWorks のデータ統合は、MongoDB からデータを読み取り、他のデータソースに同期するための MongoDB Reader プラグインを提供します。このトピックでは、データ統合を使用して MongoDB から MaxCompute にデータをバッチ同期する方法を説明します。

背景情報

このチュートリアルでは、ソースは MongoDB、ターゲットは MaxCompute です。開始する前に、MongoDB データを準備し、MaxCompute にターゲットテーブルを作成してください。

前提条件

開始する前に、次の要件を満たしていることを確認してください:

  • DataWorks を有効化し、MaxCompute データソースを作成済みであること。

  • このチュートリアルでは、データ統合専用リソースグループを使用してバッチタスクを実行します。データ統合専用リソースグループを購入して設定する必要があります。詳細については、「データ統合専用リソースグループの使用」をご参照ください。

    説明

    サーバーレスリソースグループを使用することもできます。詳細については、「サーバーレスリソースグループの使用」をご参照ください。

サンプルデータとテーブルの準備

このチュートリアルでは、バッチ同期のために MongoDB コレクションと MaxCompute テーブルが必要です。

  1. MongoDB コレクションを準備します。

    このチュートリアルでは、例として ApsaraDB for MongoDB を使用します。次のコードは、MongoDB コレクションを準備する方法を説明します。

    1. di_mongodb_conf_test という名前のコレクションを作成します。

      db.createCollection('di_mongodb_conf_test')
    2. このチュートリアル用のサンプルデータをコレクションに挿入します。

      db.di_mongodb_conf_test.insertOne({
          'col_string':'mock string value',
          'col_int32':NumberInt("1"),
          'col_int32_min':NumberInt("-2147483648"),
          'col_int32_max':NumberInt("2147483647"),
          'col_int64':NumberLong("1234567890123456"),
          'col_int64_min':NumberLong("-9223372036854775807"),
          'col_int64_max':NumberLong("9223372036854775807"),
          'col_decimal':NumberDecimal("9999999.4999999999"),
          'col_double':9999999.99,
          'col_boolean':true,
          'col_timestamp':ISODate(),
          'col_date':new Date(),
          'col_array_to_json':['a','b'],
          'col_array_to_join':['a','b'],
          'col_doc':{
              'key_string':'mock string value',
              'key_int32':NumberInt("1"),
              'key_int32_min':NumberInt("-2147483648"),
              'key_int32_max':NumberInt("2147483647"),
              'key_int64':NumberLong("1234567890123456"),
              'key_int64_min':NumberLong("-9223372036854775807"),
              'key_int64_max':NumberLong("9223372036854775807"),
              'key_decimal':NumberDecimal("9999999.4999999999"),
              'key_double':9999999.99,
              'key_boolean':true,
              'key_timestamp':ISODate(),
              'key_date':new Date(),
              'key_array_to_json':['a','b'],
              'key_array_to_join':['a','b'],
          },
          'col_extra_1':'this is extra 1',
          'col_extra_2':'this is extra 2',
      })
    3. MongoDB に挿入したデータをクエリします。

      db.getCollection("di_mongodb_conf_test").find({})

      クエリ結果には、コレクション内の 1 つのテストドキュメントが表示されます。_id63dca714b8548a78e1dc3238 です。このドキュメントには、さまざまなデータ型が含まれています:文字列 (col_string)、整数 (col_int32/col_int64)、浮動小数点 (col_double/col_decimal)、ブール値 (col_boolean)、日付/時刻 (col_date/col_timestamp)、配列 (col_array_to_join/col_array_to_json)。また、ネストされたドキュメントである col_doc と、2 つの追加フィールド col_extra_1col_extra_2 も含まれます。

  2. MaxCompute テーブルを準備します。

    1. パーティションフィールドとして pt を使用し、di_mongodb_conf_test という名前のパーティションテーブルを作成します。

      CREATE TABLE IF NOT EXISTS di_mongodb_conf_test
      (
        `id`                 STRING
        ,`col_string`        STRING
        ,`col_int32`         INT
        ,`col_int32_min`     INT
        ,`col_int32_max`     INT
        ,`col_int64`         BIGINT
        ,`col_int64_min`     BIGINT
        ,`col_int64_max`     BIGINT
        ,`col_decimal`       DECIMAL(38,18)
        ,`col_double`        DOUBLE
        ,`col_boolean`       BOOLEAN
        ,`col_timestamp`     TIMESTAMP
        ,`col_date`          DATE
        ,`col_array_to_json` STRING
        ,`col_array_to_join` STRING
        ,`key_string`        STRING
        ,`key_int32`         INT
        ,`key_int32_min`     INT
        ,`key_int32_max`     INT
        ,`key_int64`         BIGINT
        ,`key_int64_min`     BIGINT
        ,`key_int64_max`     BIGINT
        ,`key_decimal`       DECIMAL(38,18)
        ,`key_double`        DOUBLE
        ,`key_boolean`       BOOLEAN
        ,`key_timestamp`     TIMESTAMP
        ,`key_date`          DATE
        ,`key_array_to_json` STRING
        ,`key_array_to_join` STRING
        ,`col_doc`           STRING
        ,`col_combine`       STRING
      )
      PARTITIONED BY
      (
        pt                   STRING
      )
      LIFECYCLE 36500
      ;
    2. 値が 20230202 のパーティションを追加します。

      alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
    3. パーティションテーブルが正しく作成されていることを確認します。

      SELECT * FROM di_mongodb_conf_test
      WHERE pt='20230202';

バッチ同期タスクの設定

ステップ 1:MongoDB データソースの追加

MongoDB データソースを追加し、データソースとデータ統合専用リソースグループの間のネットワーク接続を確保してください。詳細については、「MongoDB データソースの追加」をご参照ください。

ステップ 2:バッチ同期ノードの作成と設定

DataWorks の DataStudio でバッチ同期ノードを作成し、ソースとターゲットを設定します。このセクションでは主要なパラメータを説明します。その他のパラメータは、デフォルト値のままでかまいません。詳細な手順については、「コードレス UI を使用したバッチ同期ノードの設定」をご参照ください。

  1. ネットワーク接続を設定します。

    MongoDB と MaxCompute のデータソース、および対応するデータ統合専用リソースグループを選択し、接続テストを実行します。

  2. タスクの設定:データソースを選択します。

    ソースとターゲットとして、MongoDB コレクションと MaxCompute のパーティションテーブルを選択します。

  3. タスクの設定:フィールドをマッピングします。

    データソースが MongoDB の場合、デフォルトで [Peer mapping] が使用されます。 icon アイコンをクリックして、ソーステーブルのフィールドを手動で編集することもできます。以下に手動編集の例を示します。

    {"name":"_id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int32","type":"long"}
    {"name":"col_int32_min","type":"long"}
    {"name":"col_int32_max","type":"long"}
    {"name":"col_int64","type":"long"}
    {"name":"col_int64_min","type":"long"}
    {"name":"col_int64_max","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_double","type":"double"}
    {"name":"col_boolean","type":"boolean"}
    {"name":"col_timestamp","type":"date"}
    {"name":"col_date","type":"date"}
    {"name":"col_array_to_json","type":"string"}
    {"name":"col_array_to_join","type":"array","splitter":","}
    {"name":"col_doc.key_string","type":"document.string"}
    {"name":"col_doc.key_int32","type":"document.long"}
    {"name":"col_doc.key_int32_min","type":"document.long"}
    {"name":"col_doc.key_int32_max","type":"document.long"}
    {"name":"col_doc.key_int64","type":"document.long"}
    {"name":"col_doc.key_int64_min","type":"document.long"}
    {"name":"col_doc.key_int64_max","type":"document.long"}
    {"name":"col_doc.key_decimal","type":"document.double"}
    {"name":"col_doc.key_double","type":"document.double"}
    {"name":"col_doc.key_boolean","type":"document.boolean"}
    {"name":"col_doc.key_timestamp","type":"document.date"}
    {"name":"col_doc.key_date","type":"document.date"}
    {"name":"col_doc.key_array_to_json","type":"document.string"}
    {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","}
    {"name":"col_doc","type":"string"}
    {"name":"col_combine","type":"combine"}

    フィールドを編集すると、UI にソースフィールドとターゲットフィールドのマッピングが表示されます。

ステップ 3:ノードのコミットとデプロイ

標準モードのワークスペースを使用していて、このタスクをスケジュール実行する場合は、ノードをコミットし、本番環境にデプロイしてください。詳細については、「タスクのデプロイ」をご参照ください。

ステップ 4:ノードの実行と結果の確認

ノードを設定した後に実行します。タスクが完了したら、MaxCompute テーブルで同期されたデータを確認します。同期データには、次のフィールドと値が含まれます: col_string (mock string value)、 col_int32 (1)、 col_int32_min (-2147483648)、 col_int32_max (2147483647)、 col_int64 (1234567890123456)、 col_decimal (9999999.4999999999)、 col_double (9999999.99)、 col_boolean (true)、 col_timestamp (2023-02-03 14:17:56.554)、 col_date (2023-02-03)、 col_array_to_json ([a, b])、 col_array_to_join (a,b)、 pt (20230202)。一部の数値フィールドはテキストとして保存されます。複合フィールドの値は次のとおりです:

  • col_doc フィールドの内容は次のとおりです。

    {
      "key_array_to_join": [
        "a",
        "b"
      ],
      "key_array_to_json": [
        "a",
        "b"
      ],
      "key_boolean": true,
      "key_date": "2023-02-03 14:17:56",
      "key_decimal": {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
      },
      "key_double": 9999999.99,
      "key_int32": 1,
      "key_int32_max": 2147483647,
      "key_int32_min": -2147483648,
      "key_int64": 1234567890123456,
      "key_int64_max": 9223372036854775807,
      "key_int64_min": -9223372036854775807,
      "key_string": "mock string value",
      "key_timestamp": "2023-02-03 14:17:56"
    }
  • col_combine フィールドの内容は次のとおりです。

    {
      "col_extra_1": "this is extra 1",
      "col_extra_2": "this is extra 2"
    }
説明

Decimal 型データの出力に関する問題については、「付録 2:ドキュメント内の Decimal 型の出力」をご参照ください。

付録 1:データ形式の変換

配列データを JSON 形式に変換:col_array_to_json

ソースの MongoDB データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_array_to_json":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_json","type":"string"}

フィールドマッピングの設定で typestring の場合、同期タスクは、実行時に元のデータを JSON 形式にシリアル化して出力します。

[a, b]

配列を連結文字列に変換:col_array_to_join

ソースの MongoDB データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_array_to_join":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_join","type":"array","splitter":","}

フィールドマッピングを設定する際、typearray の場合は splitter パラメータが必要です。同期タスクの実行時に、ソースデータ配列の内容が区切り文字で連結され、最終的な出力は連結済みの文字列になります。

a,b

ネストされたドキュメントフィールドの同期

ソースの MongoDB データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_doc":
    {
        "key_string": "mock string value"
    }
}
{"name":"col_doc.key_string","type":"document.string"}

name は、document 内で同期するフィールドの path を指定します。同期タスクの実行時に、path に基づいて document を読み取り、データを出力します。

mock string value

ドキュメントを JSON 文字列としてシリアル化

ソースの MongoDB データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_doc":
    {
        "key_string": "mock string value",
        "key_int32": 1
    }
}
{"name":"col_doc","type":"string"}

フィールドマッピングを設定する際、typestring の場合、同期タスクは実行時に col_doc 全体を JSON 文字列にシリアル化して出力します。

{"key_string":"mock string value","key_int32":1}

マッピングされていないフィールドを JSON としてシリアル化

ソースの MongoDB データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_1": "value1",
    "col_2": "value2",
    "col_3": "value3",
    "col_4": "value4"
}
{"name":"col_1","type":"string"}
{"name":"col_2","type":"string"}
{"name":"col_combine","type":"combine"}

このドキュメントには 4 つのフィールドがあります。col_1col_2 は明示的にマッピングされています。同期中に、マッピングされていない残りのフィールド (col_3col_4) は JSON オブジェクトにシリアル化されて出力されます。

{"col_3":"value3","col_4":"value4"}

付録 2:Decimal 型の出力

ドキュメントが JSON 形式にシリアル化されると、Decimal128 データはデフォルトで次のように出力されます。

{
    "key_decimal":
    {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
    }
}

数値型として出力するには、次の手順に従ってください:

  1. バッチ同期タスクを設定する際に、スクリプトモードに切り替えます。

  2. Reader タスク設定を変更します:パラメータセクションに decimal128OutputType パラメータを追加し、値を bigDecimal に設定します。

    "parameter": {
        "collectionName": "di_mongodb_conf_test",
        "decimal128OutputType":"bigDecimal"
    },
    "name": "Reader",
    "category": "reader"
  3. バッチ同期タスクを再実行し、結果を確認します。

    {
        "key_decimal": "9999999.4999999999"
    }