このトピックでは、DataWorks のデータ統合機能を使用して、MongoDB から MaxCompute に JSON フィールドを移行する方法について説明します。
前提条件
DataWorks でビジネスフローが作成されていること。この例では、DataWorks の基本モードを使用します。詳細については、「ビジネスフローの作成」をご参照ください。
MongoDB でのテストデータの準備
アカウントを準備します。
データベースにユーザーを作成します。このユーザーは、DataWorks でデータソースを追加するために使用されます。この例では、次のコマンドを実行します。
db.createUser({user:"bookuser",pwd:"123456",roles:["user1"]})bookuser という名前のユーザーが作成されます。ユーザーのパスワードは 123456 です。このユーザーには、データにアクセスするためのデータベース権限を持つロールが割り当てられます。
データを準備します。
MongoDB データベースにデータをアップロードします。この例では、ApsaraDB for MongoDB インスタンスを使用します。インスタンスのネットワークタイプは VPC です。インスタンスのインターネットエンドポイントをリクエストする必要があります。そうしないと、インスタンスは DataWorks のデフォルトのリソースグループと通信できません。次のコードはテストデータを示しています。
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }, "expensive": 10 }MongoDB の DMS コンソールで、この例で使用するデータベースは admin で、コレクションは userlog です。次のコマンドを実行して、アップロードされたデータを表示します。
db.userlog.find().limit(10)
DataWorks を使用した MongoDB から MaxCompute への JSON データの移行
DataWorks コンソールにログインします。
DataWorks で宛先テーブルを作成します。このテーブルは、MongoDB から移行されたデータを受信するために使用されます。
作成したビジネスフローを右クリックし、 を選択します。
[テーブルの作成] ページで、エンジンタイプを選択し、[名前] を入力します。
テーブル編集ページで、[DDL 文] をクリックします。
[DDL モード] ダイアログボックスで、テーブル作成ステートメントを入力し、[テーブルスキーマの生成] をクリックします。
重要DDL 文のテーブル名は、[テーブルの作成] ページで入力した [テーブル名] と同じである必要があります。
create table mqdata (mqdata string);[本番環境にコミット] をクリックします。
MongoDB データソースを追加します。詳細については、「MongoDB データソースの設定」をご参照ください。
バッチ同期ノードを作成します。
データ分析ページに移動します。指定したワークフローを右クリックし、 を選択します。
[ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。
上部のナビゲーションバーで、
アイコンを選択します。スクリプトモードで、
アイコンをクリックします。[テンプレートのインポート] ダイアログボックスで [ソースタイプ]、[データソース]、[ターゲットタイプ]、[データソース] を指定し、[確認] をクリックします。
次のスクリプトを入力します。
{ "type": "job", "steps": [ { "stepType": "mongodb", "parameter": { "datasource": "mongodb_userlog",//データソースの名前。 "column": [ { "name": "store.bicycle.color", // JSON フィールドのパス。この例では、color の値がフェッチされます。 "type": "document.String" // 最初のレイヤーにないサブプロパティの場合、タイプは最終的に取得するタイプです。この例の expensive フィールドのように、選択した JSON フィールドが第 1 レイヤーのフィールドである場合は、直接 string と入力できます。 } ], "collectionName": "userlog" // コレクションの名前。 }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "isCompress": false, "truncate": true, "datasource": "odps_first", "column": [ "mqdata" // MaxCompute テーブルの列の名前。 ], "emptyAsNull": false, "table": "mqdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "concurrent": 2, "throttle": false, } } }
アイコンをクリックしてコードを実行します。[操作ログ] で結果を表示できます。
結果の検証
ワークフローを右クリックし、 を選択します。
[ノードの作成] ダイアログボックスで、[ノード名] を入力し、[送信] をクリックします。
ODPS SQL ノードの編集ページで、次のステートメントを入力します。
SELECT * from mqdata;
アイコンをクリックしてコードを実行します。[操作ログ] で結果を表示できます。