このトピックでは、DataWorks のデータ移行機能を使用して、MongoDB から MaxCompute へ JSON フィールドを抽出する方法を設定します。
準備
- アカウントを作成します。
DataWorks にデータソースを追加するには、事前にデータベースにユーザーを作成してください。 この例では、
db.createUser({user:"bookuser",pwd:"123456",roles:["root"]})
コマンドを実行し、「bookuser」という名前のユーザーを作成します。 ユーザーのパスワードは 123456、権限は root です。 - データを準備します。
MongoDB へデータをアップロードします。 この例では、Alibaba Cloud ApsaraDB for MongoDB が使用されています。 ネットワークタイプは VPC です。 (MongoDB が DataWorks のデフォルトリソースグループと通信するためには、インターネット IP アドレスが必要です。) テストデータは次のとおりです。
MongoDB の DMS コンソールにログインします。 この例では、データベース名は admin、コレクションは userlog です。 アップロードされたデータを表示するには、クエリウィンドウでdb.userlog.find().limit(10) コマンドを実行します。{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }, "expensive": 10 }
DataWorks を使用した MaxCompute へのデータ抽出
- 1. MongoDB データソースの追加
DataWorks コンソールで、[Data Integration] ページへ移動し、[MongoDB] を追加します。次の図のようにパラメーターが表示されます。 接続性テストが成功したら、[Complete] をクリックします この例では、MongoDB のネットワークタイプは VPC です。 したがって、[Public IP Address Available] を [Public IP Address Available] に設定する必要があります。IP アドレスとポート番号を取得するには、 にログインし、目的のインスタンスをクリックします。 次の図のようにパラメーターが表示されます。
- 2. データ同期タスクの作成
DataWorks コンソールで、データ同期ノードを作成します。 詳細については、「OTSStream リーダーの設定」をご参照ください。同時に、 JSON データを格納する「mqdata」という名前のテーブルをDataWorks で作成します。 詳しくは、「バケットの作成」をご参照ください。グラフィカルインターフェイスでテーブルパラメータを設定できます。 mqdata テーブルには MQ データという名前の列が 1 つだけあります。 データ型は String 型です。
- 3. パラメーターを設定します。
テーブルを作成したら、グラフィカルインターフェイスでデータ同期タスクのパラメーターを設定します。 まず宛先データソースを odps_first に、宛先テーブルを mqdata に設定します。 元のデータソースを MongoDB に設定し、mongodb_userlog を選択します。 上記の設定が終わったら、Switch to script mode をクリックします。 スクリプトモードのコードの例は次のとおりです。
前述の設定が完了したら、[Run] をクリックします。 次の情報が表示されたら、コードは正常に実行されています。{ "type": "job", "steps": [ { "stepType": "mongodb", "parameter": { "datasource": "mongodb_userlog", //Data source name "column": [ { "name": "store.bicycle.color", //JSON field path. In this example, the value of color is extracted. "type": "document.document.string" //The number of fields in this line must be the same as that in the preceding line (the name line). If the JSON field is a level-1 field, for example, the expensive field in this topic, enter the string. } ], "collectionName //Collection name": "userlog" }, "name":"Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "isCompress": false, "truncate": true, "datasource": "odps_first", "column": [ "mqdata" //Table column name in MaxCompute ], "emptyAsNull": false, "table": "mqdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false, "dmu": 1 } } }
結果の検証
Business Flow で ODPS SQL ノードを作成します。
SELECT * from mqdata;
文を入力し、mqdata テーブルのデータを表示します。 SELECT * from mqdata; コマンドを MaxCompute クライアントで実行しても、データを表示できます。