すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:MongoDB から MaxCompute への JSON 形式のデータの移行

最終更新日:Nov 09, 2025

このトピックでは、DataWorks のデータ統合機能を使用して、MongoDB から MaxCompute に JSON フィールドを移行する方法について説明します。

前提条件

MongoDB でのテストデータの準備

  1. アカウントを準備します。

    データベースにユーザーを作成します。このユーザーは、DataWorks でデータソースを追加するために使用されます。この例では、次のコマンドを実行します。

    db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})

    bookuser という名前のユーザーが作成されます。ユーザーのパスワードは 123456 です。このユーザーには、データにアクセスするためのデータベース権限を持つロールが割り当てられます。

  2. データを準備します。

    MongoDB データベースにデータをアップロードします。この例では、ApsaraDB for MongoDB インスタンスを使用します。インスタンスのネットワークタイプは VPC です。インスタンスのインターネットエンドポイントをリクエストする必要があります。そうしないと、インスタンスは DataWorks のデフォルトのリソースグループと通信できません。次のコードはテストデータを示しています。

    {
        "store": {
            "book": [
                {
                    "category": "reference",
                    "author": "Nigel Rees",
                    "title": "Sayings of the Century",
                    "price": 8.95
                    },
                {
                    "category": "fiction",
                    "author": "Evelyn Waugh",
                    "title": "Sword of Honour",
                    "price": 12.99
                    },
                {
                    "category": "fiction",
                    "author": "J. R. R. Tolkien",
                    "title": "The Lord of the Rings",
                    "isbn": "0-395-19395-8",
                    "price": 22.99
                    }
                        ],
            "bicycle": {
                "color": "red",
                "price": 19.95
                    }
                        },
            "expensive": 10
                }
  3. MongoDB の DMS コンソールで、この例で使用するデータベースは admin で、コレクションは userlog です。次のコマンドを実行して、アップロードされたデータを表示します。

    db.userlog.find().limit(10)

DataWorks を使用した MongoDB から MaxCompute への JSON データの移行

  1. DataWorks コンソールにログインします。

  2. DataWorks で宛先テーブルを作成します。このテーブルは、MongoDB から移行されたデータを受信するために使用されます。

    1. 作成したビジネスフローを右クリックし、[新規テーブル] > [MaxCompute] > [テーブル] を選択します。

    2. [テーブルの作成] ページで、エンジンタイプを選択し、[名前] を入力します。

    3. テーブル編集ページで、[DDL 文] をクリックします。

    4. [DDL モード] ダイアログボックスで、テーブル作成ステートメントを入力し、[テーブルスキーマの生成] をクリックします。

      重要

      DDL 文のテーブル名は、[テーブルの作成] ページで入力した [テーブル名] と同じである必要があります。

      create table mqdata (mqdata string);
    5. [本番環境にコミット] をクリックします。

  3. MongoDB データソースを追加します。詳細については、「MongoDB データソースの設定」をご参照ください。

  4. バッチ同期ノードを作成します。

    1. データ分析ページに移動します。指定したワークフローを右クリックし、[ノードの作成] > [データ統合] > [オフライン同期] を選択します。

    2. [ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。

    3. 上部のナビゲーションバーで、Conversion script アイコンを選択します。

    4. スクリプトモードで、** アイコンをクリックします。

    5. [テンプレートのインポート] ダイアログボックスで [ソースタイプ][データソース][ターゲットタイプ][データソース] を指定し、[確認] をクリックします。

    6. 次のスクリプトを入力します。

      {
          "type": "job",
          "steps": [
          {
              "stepType": "mongodb",
              "parameter": {
                  "datasource": "mongodb_userlog",//データソースの名前。
                  "column": [
                      {
                      "name": "store.bicycle.color", // JSON フィールドのパス。この例では、color の値がフェッチされます。
                      "type": "document.String" // 最初のレイヤーにないサブプロパティの場合、タイプは最終的に取得するタイプです。この例の expensive フィールドのように、選択した JSON フィールドが第 1 レイヤーのフィールドである場合は、直接 string と入力できます。
                      }
                    ],
                  "collectionName": "userlog"   // コレクションの名前。
                  },
              "name": "Reader",
              "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                  "partition": "",
                  "isCompress": false,
                  "truncate": true,
                  "datasource": "odps_first",
                  "column": [
                  "mqdata"  // MaxCompute テーブルの列の名前。
                  ],
                  "emptyAsNull": false,
                  "table": "mqdata"
                  },
                  "name": "Writer",
                  "category": "writer"
                  }
                  ],
                  "version": "2.0",
                  "order": {
                  "hops": [
                  {
                  "from": "Reader",
                  "to": "Writer"
                  }
                  ]
                  },
                  "setting": {
                  "errorLimit": {
                  "record": ""
                  },
                  "speed": {
                  "concurrent": 2,
                  "throttle": false,
                  }
                  }
              }
    7. ** アイコンをクリックしてコードを実行します。

    8. [操作ログ] で結果を表示できます。

結果の検証

  1. ワークフローを右クリックし、[新規] > [MaxCompute] > [ODPS SQL] を選択します。

  2. [ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。

  3. ODPS SQL ノードの編集ページで、次のステートメントを入力します。

    SELECT * from mqdata;
  4. ** アイコンをクリックしてコードを実行します。

  5. [操作ログ] で結果を表示できます。