すべてのプロダクト
Search
ドキュメントセンター

DataWorks:MongoDB から MaxCompute への JSON 形式のデータの移行

最終更新日:Mar 26, 2026

MongoDB は、データをネストされた JSON ドキュメントとして保存します。このデータを MaxCompute に移行する場合、ドット表記パスを使用して特定のフィールドを抽出し、正しい列の型にマッピングする必要があります。このトピックでは、DataWorks の Data Integration サービスを使用して、MongoDB コレクションからネストされた JSON フィールドを読み取り、MaxCompute テーブルにロードするバッチ同期ジョブを構成する方法について説明します。

この例では、仮想プライベートクラウド (VPC) 内の ApsaraDB for MongoDB インスタンスを使用し、ネストされた JSON ドキュメントから store.bicycle.color フィールドを抽出します。

前提条件

開始する前に、以下をご確認ください。

ステップ 1:MongoDB でのテストデータの準備

このステップでは、データベースユーザーを作成し、サンプルの JSON データを MongoDB にアップロードし、同期ジョブを構成する前にデータにアクセスできることを確認します。

データベースユーザーの作成

DataWorks が接続できるように、ご利用の MongoDB データベースにユーザーを作成します。次のコマンドを実行します。

db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})

これにより、データアクセス権限を持つ user1 ロールに割り当てられた、ユーザー名 bookuser、パスワード 123456 のユーザーが作成されます。

テストデータのアップロード

VPC 内の ApsaraDB for MongoDB インスタンスを使用する場合、まずインスタンスのパブリックエンドポイントを申請してください。DataWorks の共有リソースグループは、パブリックネットワーク経由で MongoDB に接続します。

次の JSON ドキュメントを admin データベースの userlog コレクションにアップロードします。

{
    "store": {
        "book": [
            {
                "category": "reference",
                "author": "Nigel Rees",
                "title": "Sayings of the Century",
                "price": 8.95
            },
            {
                "category": "fiction",
                "author": "Evelyn Waugh",
                "title": "Sword of Honour",
                "price": 12.99
            },
            {
                "category": "fiction",
                "author": "J. R. R. Tolkien",
                "title": "The Lord of the Rings",
                "isbn": "0-395-19395-8",
                "price": 22.99
            }
        ],
        "bicycle": {
            "color": "red",
            "price": 19.95
        }
    },
    "expensive": 10
}

アップロードされたデータの検証

Data Management (DMS) コンソールで MongoDB データベースにログインし、次のコマンドを実行してデータがアップロードされたことを確認します。

db.userlog.find().limit(10)

ステップ 2:DataWorks での同期ジョブの構成

このステップでは、MaxCompute に移行先テーブルを作成し、MongoDB をデータソースとして追加し、JSON フィールドパスを MaxCompute の列にマッピングするバッチ同期ノードを構成します。

MaxCompute での移行先テーブルの作成

  1. DataWorks コンソールにログインします。

  2. ご利用のワークフローを右クリックし、[新規作成] > [MaxCompute] > [テーブル] を選択します。

  3. [テーブルの作成] ページで、エンジンタイプを選択し、[テーブル名] を入力します。

  4. テーブル編集ページで、[DDL ステートメント] をクリックします。

  5. [DDL ステートメント] ダイアログボックスで、次のステートメントを入力し、[テーブルスキーマの生成] をクリックします。

    重要

    DDL ステートメント内のテーブル名は、[テーブルの作成] ページで設定した [名前] の値と一致している必要があります。

    create table mqdata (mqdata string);
  6. [本番環境にコミット] をクリックします。

MongoDB データソースの追加

ご利用の MongoDB インスタンスを DataWorks のデータソースとして追加します。詳細については、「MongoDB データソースの追加」をご参照ください。

バッチ同期ノードの作成

  1. データ分析ページで、ご利用のワークフローを右クリックし、[新規作成] > [Data Integration] > [オフライン同期] を選択します。

  2. [ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。

  3. 上部のナビゲーションバーで、Conversion script アイコンをクリックしてスクリプトモードに切り替えます。

  4. スクリプトモードで、** アイコンをクリックします。

  5. [テンプレートのインポート] ダイアログボックスで、[ソースタイプ][データソース][ターゲットタイプ]、およびターゲットの [データソース] を選択し、[確認] をクリックします。

  6. テンプレートの内容を次のスクリプトに置き換えます。

    {
        "type": "job",
        "steps": [
            {
                "stepType": "mongodb",
                "parameter": {
                    "datasource": "mongodb_userlog",
                    "column": [
                        {
                            "name": "store.bicycle.color",
                            "type": "document.String"
                        }
                    ],
                    "collectionName": "userlog"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "isCompress": false,
                    "truncate": true,
                    "datasource": "odps_source",
                    "column": [
                        "mqdata"
                    ],
                    "emptyAsNull": false,
                    "table": "mqdata"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false
            }
        }
    }

    次の表に、主要なパラメーターを示します。

    パラメーター説明
    datasource (Reader)mongodb_userlogご利用の MongoDB データソースの名前
    name (column)store.bicycle.color抽出する JSON フィールドへのドット表記パス
    type (column)document.Stringフィールドのデータの型。ネストされたフィールドには document.String を使用します。トップレベルのフィールド (例:expensive) には string
    collectionNameuserlog読み取り元の MongoDB コレクション
    datasource (Writer)odps_sourceご利用の MaxCompute データソースの名前
    column (Writer)mqdata書き込み先の MaxCompute テーブルの列
    tablemqdataMaxCompute のテーブル名

    次の例は、移行後に JSON ドキュメントが MaxCompute テーブルにどのようにマッピングされるかを示しています。

    MongoDB ドキュメントフィールドMaxCompute の列格納される値
    store.bicycle.color"red"mqdatared
  7. ** アイコンをクリックしてジョブを実行します。

  8. [操作ログ] を確認し、ジョブがエラーなしで完了したことを確認します。

ステップ 3:移行結果の検証

  1. ご利用のワークフローを右クリックし、[新規作成] > [MaxCompute] > [ODPS SQL] を選択します。

  2. [ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。

  3. ノード設定タブで、次のクエリを入力します。

    SELECT * from mqdata;
  4. ** アイコンをクリックしてクエリを実行します。

  5. [操作ログ] を確認し、クエリが移行されたデータを返すことを確認します。結果には、ソースドキュメントの store.bicycle.color から抽出された値 red が含まれている必要があります。

JSON フィールドタイプリファレンス

Reader の列構成にある type パラメーターは、DataWorks が抽出された JSON の値をどのように解釈するかを制御します。

フィールドの場所フィールドの例type の値注意
ネスト (非トップレベル)store.bicycle.colordocument.String抽出された値の最終的な型を使用します
トップレベルexpensivestringトップレベルのフィールドは単純な型名を使用します