すべてのプロダクト
Search
ドキュメントセンター

DataWorks:JSON 形式のデータを MongoDB から MaxCompute に移行する

最終更新日:Mar 28, 2025

このトピックでは、DataWorks のデータ統合サービスを使用して、JSON 形式のフィールドを MongoDB から MaxCompute に移行する方法について説明します。

前提条件

  • MaxCompute および DataWorks サービスがアクティブ化されていること。詳細については、「MaxCompute と DataWorks をアクティブ化する」をご参照ください。

  • MaxCompute データソースが追加されていること。詳細については、「MaxCompute データソースを追加する」をご参照ください。

  • DataWorks コンソールのワークスペースにワークフローが作成されていること。この例では、ベーシックモードのワークスペースにワークフローが作成されています。詳細については、「ワークフローを作成する」をご参照ください。

MongoDB でテストデータを準備する

  1. アカウントを準備します。

    DataWorks で接続を作成するための情報を準備するために、データベースにユーザーを作成します。この例では、次のコマンドを実行します。

    db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})

    この例では、bookuser という名前のユーザーが作成され、ユーザーパスワードは 123456 です。ユーザーには、データアクセス権限を持つロールが割り当てられます。

  2. データを準備します。

    データを MongoDB データベースにアップロードします。この例では、仮想プライベートクラウド (VPC) 内の ApsaraDB for MongoDB インスタンスが使用されます。DataWorks の共有リソースグループと通信するには、ApsaraDB for MongoDB インスタンスのパブリックエンドポイントを申請する必要があります。次のテストデータがアップロードされます。

    {
        "store": {
            "book": [
                {
                    "category": "reference",
                    "author": "Nigel Rees",
                    "title": "Sayings of the Century",
                    "price": 8.95
                    },
                {
                    "category": "fiction",
                    "author": "Evelyn Waugh",
                    "title": "Sword of Honour",
                    "price": 12.99
                    },
                {
                    "category": "fiction",
                    "author": "J. R. R. Tolkien",
                    "title": "The Lord of the Rings",
                    "isbn": "0-395-19395-8",
                    "price": 22.99
                    }
                        ],
            "bicycle": {
                "color": "red",
                "price": 19.95
                    }
                        },
            "expensive": 10
                }
  3. データ管理 (DMS) コンソールで MongoDB データベースにログオンします。この例では、データベースの名前は admin で、コレクションの名前は userlog です。次のコマンドを実行して、アップロードされたデータを表示できます。

    db.userlog.find().limit(10)

DataWorks を使用して JSON 形式のデータを MongoDB から MaxCompute に移行する

  1. DataWorks コンソール にログインします。

  2. DataWorks でテーブルを作成します。このテーブルは、MongoDB から移行されたデータを格納するために使用されます。

    1. 作成済みの [ワークフロー] を右クリックし、ワークフロー[テーブルの作成] > を選択します。

    2. [テーブルの作成] ページで、エンジンの種類を選択し、[テーブル名] を入力します。

    3. テーブル編集ページで、[DDL ステートメント] をクリックします。

    4. [DDL ステートメント] ダイアログボックスで、次のステートメントを入力し、[テーブルスキーマの生成] をクリックします。

      重要

      テーブル作成ステートメントで指定するテーブル名は、[テーブルの作成] ダイアログボックスで構成した [名前] パラメーターの値と同じである必要があります。

      create table mqdata (mqdata string);
    5. [本番環境にコミット] をクリックします。

  3. MongoDB データソースを追加します。詳細については、「MongoDB データソースAdd a MongoDB data source」をご参照ください。

  4. バッチ同期ノードを作成します。

    1. データ分析ページに移動します。指定されたワークフローを右クリックし、[新規] > [ データ統合] > [ オフライン同期] を選択します。

    2. [ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。

    3. 上部のナビゲーションバーで、Conversion script アイコンを選択します。

    4. スクリプトモードで、** アイコンをクリックします。

    5. [テンプレートのインポート] ダイアログボックスで [ソースの種類][データソース][ターゲットの種類][データソース] を選択し、[確認] をクリックします。

    6. 次のスクリプトを入力します。

      {
          "type": "job",
          "steps": [
          {
              "stepType": "mongodb",
              "parameter": {
                  "datasource": "mongodb_userlog", // データソースの名前。
                  "column": [
                      {
                      "name": "store.bicycle.color", // JSON 形式のフィールドのパス。この例では、color フィールドが抽出されます。
                      "type": "document.String" // トップレベル以外のフィールドの場合、このようなフィールドのデータ型は最終的に取得される型です。指定された JSON 形式のフィールドがトップレベルのフィールド(この例の expensive フィールドなど)の場合は、string と入力します。
                      }
                    ],
                  "collectionName": "userlog" // コレクションの名前。
                  },
              "name": "Reader",
              "category": "reader"
              },
              {
                  "stepType": "odps",
                  "parameter": {
                  "partition": "",
                  "isCompress": false,
                  "truncate": true,
                  "datasource": "odps_source",// MaxCompute データソースの名前。
                  "column": [
                  "mqdata" // MaxCompute テーブルの列の名前。
                  ],
                  "emptyAsNull": false,
                  "table": "mqdata"
                  },
                  "name": "Writer",
                  "category": "writer"
                  }
                  ],
                  "version": "2.0",
                  "order": {
                  "hops": [
                  {
                  "from": "Reader",
                  "to": "Writer"
                  }
                  ]
                  },
                  "setting": {
                  "errorLimit": {
                  "record": ""
                  },
                  "speed": {
                  "concurrent": 2,
                  "throttle": false,
                  }
                  }
              }
    7. ** アイコンをクリックしてコードを実行します。

    8. [操作ログ] で結果を確認できます。

移行結果を確認する

  1. ワークフローを右クリックし、[新規] > [ Maxcompute] > [ ODPS SQL] を選択します。

  2. [ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。

  3. ODPS SQL ノードの構成タブで、次のステートメントを入力します。

    SELECT * from mqdata;
  4. ** アイコンをクリックしてコードを実行します。

  5. [操作ログ] で結果を確認できます。