すべてのプロダクト
Search
ドキュメントセンター

DataWorks:バッチ同期による MongoDB データソースからのデータ読み取り

最終更新日:Feb 27, 2026

DataWorks のデータ統合は、MongoDB からデータを読み取り、他のデータソースと同期するための MongoDB Reader プラグインを提供します。この Topic では、データ統合を使用して MongoDB から MaxCompute にデータをオフライン同期する例を説明します。

背景情報

このチュートリアルでは、ソースデータソースは MongoDB、宛先データソースは MaxCompute です。開始する前に、同期用の MongoDB データを準備し、同期されたデータを格納するための MaxCompute テーブルを作成してください。

前提条件

以下の前提条件が満たされていることを確認してください。

  • DataWorks を有効化し、MaxCompute データソースを作成済みであること。

  • オフラインタスクを実行するには、データ統合専用リソースグループが必要です。続行する前に、このリソースグループを購入して設定する必要があります。詳細については、「データ統合専用リソースグループの使用」をご参照ください。

    説明

    汎用リソースグループも使用できます。詳細については、「サーバーレスリソースグループの使用」をご参照ください。

サンプルデータテーブルの準備

オフラインデータ同期のために、MongoDB のデータコレクションと MaxCompute のテーブルを準備します。

  1. MongoDB データコレクションを準備します。

    このチュートリアルでは、ApsaraDB for MongoDB を例として使用します。以下のコードは、MongoDB データコレクションの準備方法を示しています。

    1. di_mongodb_conf_test という名前のデータコレクションを作成します。

      db.createCollection('di_mongodb_conf_test')
    2. このチュートリアルのサンプルデータをデータコレクションに挿入します。

      db.di_mongodb_conf_test.insertOne({
          'col_string':'mock string value',
          'col_int32':NumberInt("1"),
          'col_int32_min':NumberInt("-2147483648"),
          'col_int32_max':NumberInt("2147483647"),
          'col_int64':NumberLong("1234567890123456"),
          'col_int64_min':NumberLong("-9223372036854775807"),
          'col_int64_max':NumberLong("9223372036854775807"),
          'col_decimal':NumberDecimal("9999999.4999999999"),
          'col_double':9999999.99,
          'col_boolean':true,
          'col_timestamp':ISODate(),
          'col_date':new Date(),
          'col_array_to_json':['a','b'],
          'col_array_to_join':['a','b'],
          'col_doc':{
              'key_string':'mock string value',
              'key_int32':NumberInt("1"),
              'key_int32_min':NumberInt("-2147483648"),
              'key_int32_max':NumberInt("2147483647"),
              'key_int64':NumberLong("1234567890123456"),
              'key_int64_min':NumberLong("-9223372036854775807"),
              'key_int64_max':NumberLong("9223372036854775807"),
              'key_decimal':NumberDecimal("9999999.4999999999"),
              'key_double':9999999.99,
              'key_boolean':true,
              'key_timestamp':ISODate(),
              'key_date':new Date(),
              'key_array_to_json':['a','b'],
              'key_array_to_join':['a','b'],
          },
          'col_extra_1':'this is extra 1',
          'col_extra_2':'this is extra 2',
      })
    3. MongoDB に挿入されたデータをクエリします。

      db.getCollection("di_mongodb_conf_test").find({})

      クエリ結果は次の図のようになります。mongodb

  2. MaxCompute テーブルを準備します。

    1. di_mongodb_conf_test という名前のパーティションテーブルを作成します。パーティションフィールドは pt です。

      CREATE TABLE IF NOT EXISTS di_mongodb_conf_test
      (
        `id`                 STRING
        ,`col_string`        STRING
        ,`col_int32`         INT
        ,`col_int32_min`     INT
        ,`col_int32_max`     INT
        ,`col_int64`         BIGINT
        ,`col_int64_min`     BIGINT
        ,`col_int64_max`     BIGINT
        ,`col_decimal`       DECIMAL(38,18)
        ,`col_double`        DOUBLE
        ,`col_boolean`       BOOLEAN
        ,`col_timestamp`     TIMESTAMP
        ,`col_date`          DATE
        ,`col_array_to_json` STRING
        ,`col_array_to_join` STRING
        ,`key_string`        STRING
        ,`key_int32`         INT
        ,`key_int32_min`     INT
        ,`key_int32_max`     INT
        ,`key_int64`         BIGINT
        ,`key_int64_min`     BIGINT
        ,`key_int64_max`     BIGINT
        ,`key_decimal`       DECIMAL(38,18)
        ,`key_double`        DOUBLE
        ,`key_boolean`       BOOLEAN
        ,`key_timestamp`     TIMESTAMP
        ,`key_date`          DATE
        ,`key_array_to_json` STRING
        ,`key_array_to_join` STRING
        ,`col_doc`           STRING
        ,`col_combine`       STRING
      )
      PARTITIONED BY
      (
        pt                   STRING
      )
      LIFECYCLE 36500
      ;
    2. 値が 20230202 のパーティションを追加します。

      alter table di_mongodb_conf_test add if not exists partition (pt='20230202');
    3. パーティションテーブルが作成されたことを確認します。

      SELECT*FROM di_mongodb_conf_test
      WHEREpt='20230202';

オフラインタスクの設定

ステップ 1: MongoDB データソースの追加

MongoDB データソースを追加し、データソースとデータ統合専用リソースグループの間にネットワーク接続があることを確認します。詳細については、「MongoDB データソースの設定」をご参照ください。

ステップ 2: オフライン同期ノードの作成と同期タスクの設定

DataWorks の DataStudio で、オフライン同期ノードを作成します。タスクのソース、宛先、およびその他のパラメーターを設定します。主要な設定ステップを以下に説明します。その他のパラメーターにはデフォルトの値を使用できます。詳細については、「コードレス UI でのタスクの設定」をご参照ください。

  1. ネットワーク接続を設定します。

    前のステップで作成した MongoDB および MaxCompute のデータソースと、対応するデータ統合専用リソースグループを選択します。その後、接続性をテストします。

  2. タスクの設定:データソースの選択

    準備した MongoDB データコレクションと MaxCompute パーティションテーブルを選択します。

  3. タスクの設定:フィールドマッピング

    MongoDB データソースの場合、デフォルトでは [同名マッピング] が使用されます。图标 アイコンをクリックして、ソーステーブルのフィールドを手動で編集することもできます。以下のコードは、手動編集の例です。

    {"name":"_id","type":"string"}
    {"name":"col_string","type":"string"}
    {"name":"col_int32","type":"long"}
    {"name":"col_int32_min","type":"long"}
    {"name":"col_int32_max","type":"long"}
    {"name":"col_int64","type":"long"}
    {"name":"col_int64_min","type":"long"}
    {"name":"col_int64_max","type":"long"}
    {"name":"col_decimal","type":"double"}
    {"name":"col_double","type":"double"}
    {"name":"col_boolean","type":"boolean"}
    {"name":"col_timestamp","type":"date"}
    {"name":"col_date","type":"date"}
    {"name":"col_array_to_json","type":"string"}
    {"name":"col_array_to_join","type":"array","splitter":","}
    {"name":"col_doc.key_string","type":"document.string"}
    {"name":"col_doc.key_int32","type":"document.long"}
    {"name":"col_doc.key_int32_min","type":"document.long"}
    {"name":"col_doc.key_int32_max","type":"document.long"}
    {"name":"col_doc.key_int64","type":"document.long"}
    {"name":"col_doc.key_int64_min","type":"document.long"}
    {"name":"col_doc.key_int64_max","type":"document.long"}
    {"name":"col_doc.key_decimal","type":"document.double"}
    {"name":"col_doc.key_double","type":"document.double"}
    {"name":"col_doc.key_boolean","type":"document.boolean"}
    {"name":"col_doc.key_timestamp","type":"document.date"}
    {"name":"col_doc.key_date","type":"document.date"}
    {"name":"col_doc.key_array_to_json","type":"document"}
    {"name":"col_doc.key_array_to_join","type":"document.array","splitter":","}
    {"name":"col_doc","type":"string"}
    {"name":"col_combine","type":"combine"}

    フィールドを手動で編集すると、インターフェイスにソースと宛先のフィールド間のマッピングが表示されます。

ステップ 3: オフライン同期ノードの送信と公開

標準モードの DataWorks ワークスペースを使用し、オフライン同期タスクを本番環境で定期的に実行するようにスケジュールしたい場合は、ノードを送信して公開する必要があります。詳細については、「タスクの公開」をご参照ください。

ステップ 4: オフライン同期ノードの実行と結果の表示

設定が完了したら、同期ノードを実行します。ノードが正常に実行された後、MaxCompute テーブルに同期されたデータを表示できます。结果数据

  • col_doc フィールドの内容は次の図のようになります。字段信息

  • col_combine フィールドの内容は次の図のようになります。字段2

説明

10 進数型の出力の詳細については、「付録 2: ドキュメント内の Decimal 型の出力」をご参照ください。

付録 1: 同期中のデータフォーマット変換

配列データを JSON フォーマットに変換:col_array_to_json

MongoDB の生データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_array_to_json":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_json","type":"string"}

フィールドマッピングを設定する際に、typestring に設定すると、同期タスクは生データを JSON フォーマットにシリアル化します。

[a, b]

配列データを連結文字列に変換:col_array_to_join

MongoDB の生データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_array_to_join":
    [
        "a",
        "b"
    ]
}
{"name":"col_array_to_join","type":"array","splitter":","}

フィールドマッピングを設定する際に、typearray に設定すると、splitter パラメーターが必須になります。同期タスクは、区切り文字を使用して生データの配列の要素を連結し、結果を文字列として出力します。

a,b

ネストされたドキュメントから指定されたフィールドを読み取る

MongoDB の生データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_doc":
    {
        "key_string": "mock string value"
    }
}
{"name":"col_doc.key_string","type":"document.string"}

name は、document 内で同期するフィールドの path です。同期タスクは、path に基づいて document を読み取り、データを出力します。

mock string value

JSON シリアル化出力としてのドキュメントデータ

MongoDB の生データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_doc":
    {
        "key_string": "mock string value",
        "key_int32": 1
    }
}
{"name":"col_doc","type":"string"}

フィールドマッピングを設定する際に、typestring に設定すると、同期タスクは col_doc ドキュメント全体を JSON フォーマットにシリアル化して結果を出力します。

{"key_string":"mockstringvalue","key_int32":1}

残りのドキュメントフィールドを JSON フォーマットにシリアル化する

MongoDB の生データ

フィールドマッピングの設定

MaxCompute への出力

{
    "col_1": "value1",
    "col_2": "value2",
    "col_3": "value3",
    "col_4": "value4"
}
{"name":"col_1","type":"string"}
{"name":"col_2","type":"string"}
{"name":"col_combine","type":"combine"}

ドキュメントには 4 つのフィールドがあります。タスクには 2 つの非 combine 型フィールド (`col_1` と `col_2`) が設定されています。同期タスクは、`col_1` と `col_2` 以外のすべてのフィールドを JSON フォーマットにシリアル化して結果を出力します。

{"col_3":"value3","col_4":"value4"}

付録 2: ドキュメント内の Decimal 型の出力

ドキュメントが JSON フォーマットにシリアル化されると、Decimal128 型のフィールドはデフォルトで次のように出力されます。

{
    "key_decimal":
    {
        "finite": true,
        "high": 3471149412795809792,
        "infinite": false,
        "low": 99999994999999999,
        "naN": false,
        "negative": false
    }
}

値を数値型として出力するには、次のステップを実行します。

  1. オフライン同期タスクを設定する際、[スクリプトモードに変換] ボタンをクリックしてコードエディタに切り替えることができます。

  2. Reader タスクの設定を変更します。パラメーターセクションで、decimal128OutputType パラメーターを追加し、その値を bigDecimal に設定します。decimal

  3. オフライン同期タスクを再実行し、結果を表示します。

    {
        "key_decimal": "9999999.4999999999"
    }