このトピックでは、DataWorks のデータ同期機能を使用して、Hadoop から Alibaba Cloud MaxCompute へデータを移行する方法を説明します。
環境の準備
- Hadoop クラスターの構築
データの移行の前に、Hadoop クラスターが正しく動作していることを確認する必要があります。 Alibaba Cloud E-MapReduce を使用して Hadoop クラスターを自動的に構築します。
E-MapReduce Hadoop のバージョン情報は次のとおりです。
E-MapReduce バージョン : EMR-3.10.1 または 3.11.0
クラスタータイプ : Hadoop
ソフトウェア (EMR-3.11.0 の場合) : HDFS2.7.2 / YARN2.7.2 / Hive2.3.3 / Ganglia3.7.2 / Spark2.2.1 / HUE4.1.0 / Zeppelin0.7.3 / Tez0.9.1 / Sqoop1.4.6 / Pig0.14.0 / ApacheDS2.0.0 / Knox0.13.0
Hadoop クラスターのネットワークタイプはクラシックです。 リージョンは中国 (杭州) です。 マスターインスタンスグループの ECS コンピューティングリソースは、インターネット IP アドレスとイントラネット IP アドレスで構成されています。 高可用性モードは No (非 HA モード) に設定されています。 次の図に、EMR-3.10.1 の設定を示します。
- MaxCompute
詳細については、「MaxCompute の有効化」をご参照ください。
MaxCompute を有効化して、プロジェクトを作成します。 このトピックでは、中国 (杭州) に bigdata_DOC という名前のプロジェクトを作成し、このプロジェクトに関連する DataWorks サービスを有効にします。
データの準備
- Hadoop クラスターでテストデータを作成します。
E-MapReduce コンソールで、Hadoop クラスターページに移動し、[ノートブック] を使用してノートブックタスクを作成します。 この例のテーブル作成 Hive 文は次のとおりです。
CREATE TABLE IF NOT EXISTS hive_doc_good_sale( create_time timestamp, category STRING, brand STRING, buyer_id STRING, trans_num BIGINT, trans_amount DOUBLE, click_cnt BIGINT ) PARTITIONED BY (pt string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n'
[実行] をクリックします。 テストテーブル
hive_doc_good_sale
は E-MapReduce Hadoop クラスターに正常に作成されます。テストデータの挿入 OSS または他のデータソースからデータを選択するか、少量のテストデータを手動で挿入します。 次のデータを手動で挿入します。
insert into hive_doc_good_sale PARTITION(pt =1 ) values('2018-08-21','Coat','Brand A','lilei',3,500.6,7),('2018-08-22','Fresh food','Brand B','lilei',1,303,8),('2018-08-22','Coat','Brand C','hanmeimei',2,510,2),(2018-08-22,'Toiletries','Brand A','hanmeimei',1,442.5,1),('2018-08-22','Fresh food','Brand D','hanmeimei',2,234,3),('2018-08-23','Coat','Brand B','jimmy',9,2000,7),('2018-08-23','Fresh food','Brand A','jimmy',5,45.1,5),('2018-08-23','Coat','Brand E','jimmy',5,100.2,4),('2018-08-24','Fresh food','Brand G','peiqi',10,5560,7),('2018-08-24','Sanitary ware','Brand F','peiqi',1,445.6,2),('2018-08-24','Coat','Brand A','ray',3,777,3),('2018-08-24','Sanitary ware','Brand G','ray',3,122,3),('2018-08-24','Coat','Brand C','ray',1,62,7) ;
データを挿入した後、
select * from hive_doc_good_sale where pt =1;
文を使用して、移行のために Hadoop クラスターテーブル にデータが存在するかどうかを確認します。 - DataWorks を使用して変換先テーブルを作成します。
DataWorks コンソールで、MaxCompute プロジェクトをクリックし、
を選択します。表示されたウインドウで、次のテーブル作成 SQL 文を入力します。
CREATE TABLE IF NOT EXISTS hive_doc_good_sale( create_time string, category STRING, brand STRING, buyer_id STRING, trans_num BIGINT, trans_amount DOUBLE, click_cnt BIGINT ) PARTITIONED BY (pt string) ;
テーブルを作成するときは、Hive データ型と MaxCompute データ型の間のマッピングに注意します。
DataWorks のデータ同期機能は timestamp データではサポートされません。 したがって、DataWorks のテーブル作成文では、create_time は文字列値が設定されます。 テーブルを作成するために、odpscmd CLI (コマンドライン) ツールを使用します。 ツールのインストール方法と設定方法の詳細については、「クライアントのインストールと設定」をご参照ください。 テーブル作成処理は次のとおりです。
注 Hive と MaxCompute のデータ型の互換性を考慮して、 [odpscmd] クライアント上で次のコマンドを実行することを推奨します。set odps.sql.type.system.odps2=true;set odps.sql.hive.compatible=true;
テーブルが作成された後、次の図に示すように、DataWorks コンソールの
を選択して MaxCompute で作成されたデータを表示します。
データの同期
- カスタムリソースグループの作成
たいていの場合、MaxCompute のプロジェクトデータノードと Hadoop クラスターのデータノードの間のネットワークは接続されていません。 Hadoop クラスターのマスターノードで DataWorks の同期タスクを実行するために、リソースグループをカスタマイズします。 (通常、Hadoop クラスターのマスターノードとデータノードの間のネットワークは接続されています。)
- Hadoop クラスターのデータノードを 表示します。
E-MapReduce コンソールのホームページで、[クラスター管理] > [クラスター] > [ホスト] を選択します。 Hadoop クラスターのデータノードを表示します。 次の図に示すように、E-MapReduce Hadoop クラスター (非 HA モード) のマスターノードのホスト名は emr-header-1 で、データノードのホスト名は emr-worker-X です。
次の図に示すように、マスターノードの ECS ID をクリックし、表示された ECS 詳細ページで [接続] をクリックし、hadoop dfsadmin –report コマンドを実行してデータノードを表示します。
前の図に示すように、データノードはイントラネットアドレスのみを持ち、DataWorks のデフォルトリソースグループと通信することはできません。 したがって、リソースグループをカスタマイズし、マスターノードを DataWorks の同期タスクを実行するノードに設定する必要があります。
- カスタムリソースグループの作成
次の図に示すように、DataWorks コンソールで、Data Integration ページに移動し、[Resource Group] を選択し、[New Resource Groups] をクリックします。 リソースグループをカスタマイズする方法の詳細については、「スケジューリングリソースの追加」をご参照ください。
サーバーを追加するときは、ECS UUID とマシン IP アドレスなどの情報を入力する必要があります。 (クラシックネットワークの場合、サーバー名を入力します。 VPC ネットワークの場合、サーバーの UUID を入力します。 DataWorks V2.0 の中国 (上海) のクラシックネットワークにのみスケジューリングリソースを追加します。 他のリージョンでは、ネットワークタイプがクラシックか VPC かにかかわらず、スケジューリングリソースグループを追加するときに、VPC ネットワークタイプを選択します。) イントラネット IP アドレスにアクセスできない可能性があるため、マシン IP アドレスをマスターノードのインターネット IP アドレスに設定します。 次の図に示すように、ECS UUID はマスターノード管理端末に接続する必要があり、dmidecode | grep UUID コマンドを実行して UUID を取得します。 (Hadoop クラスターが E-MapReduce 上に構築されていない場合は、同じ方法を使用します。)
サーバーを追加した後、マスターノードと DataWorks の間のネットワークが接続されていることを確認します。 ECS サーバーを使用している場合は、サーバーセキュリティグループを設定する必要があります。 通信にイントラネット IP アドレスを使用している場合は、サーバーセキュリティグループを設定します。 詳細については、「セキュリティグループの追加」をご参照ください。
インターネット IP アドレスを使用している場合は、[セキュリティグループルール] に従って、インターネットの入口と出口を直接設定します。(実際のアプリケーションシナリオでは、データセキュリティのために 詳細なバイパスルールを設定することを推奨します。)
前の手順が完了したら、表示に従ってリソースグループエージェントをインストールします。 ステータスが [使用可能] の場合は、カスタムリソースグループが正常に追加されています。
ステータスが [使用不可] の場合は、次の図に示すように、マスターノードにログインし、
tail –f/home/admin/alisatasknode/logs/heartbeat.log
コマンドを実行して、DataWorks とマスターノードの間のハートビートメッセージがタイムアウトしているかどうかを確認します。
- Hadoop クラスターのデータノードを 表示します。
- データソースの作成
DataWorks でデータソースを作成する方法の詳細については、「データソースの設定」をご参照ください。
DataWorks でプロジェクトを作成した後、データソースはデフォルトで odps_first に設定されます。 したがって、Hadoop クラスターデータソースを追加するだけです。 これを行うには、次の手順を実行します。DataWorks の Data Integration ページで、 を選択し、[HDFS] を選択します。
表示されたウインドウで、データソース名と defaultFS を入力します。 E-MapReduce Hadoop クラスターが HA クラスターの場合、アドレスは hdfs://emr-header-1 の IP:8020 です。 E-MapReduce Hadoop クラスターが非 HA クラスターの場合、アドレスは hdfs://emr-header-1 の IP:9000 です。 このトピックでは、 emr-header-1 はインターネットを介して DataWorks へ接続します。 したがって、インターネット IP アドレスを入力してセキュリティグループを開きます。
設定が完了した後、[Test Connectivity] をクリックします。 Test connectivity successfully が表示された場合は、データソースは正常に追加されています。
注 E-MapReduce Hadoop クラスターのネットワークタイプが VPC の場合、接続テストはサポートされていません。 - 同期タスクの設定
DataWorks の Data Integration ページで、 [Sync Tasks] をクリックし、スクリプトモードを作成します。 表示されたウインドウで、次の図に示すように、データソースを選択します。
テンプレートがインポートされた後、同期タスクはスクリプトモードに変換されます。 詳細については、「スクリプトモード」をご参照ください。
データ同期タスクスクリプトを設定するとき、DataWorks 同期タスクと Hive テーブルのデータ型は次のとおりです。
Hive テーブルのデータ型 DataX と DataWorks のデータ型 TINYINT,SMALLINT,INT,BIGINT Long FLOAT,DOUBLE,DECIMAL Double String,CHAR,VARCHAR String BOOLEAN Boolean Date,TIMESTAMP Date Binary Binary コードの詳細は次のとおりです。
{ "configuration": { "reader": { "plugin": "hdfs", "parameter": { "path": "/user/hive/warehouse/hive_doc_good_sale/", "datasource": "HDFS1", "column": [ { "index": 0, "type": "string" }, { "index": 1, "type": "string" }, { "index": 2, "type": "string" }, { "index": 3, "type": "string" }, { "index": 4, "type": "long" }, { "index": 5, "type": "double" }, { "index": 6, "type": "long" } ], "defaultFS": "hdfs://121.199.11.138:9000", "fieldDelimiter": ",", "encoding": "UTF-8", "fileType": "text" } }, "writer": { "plugin": "odps", "parameter": { "partition": "pt=1", "truncate": false, "datasource": "odps_first", "column": [ "create_time", "category", "brand", "buyer_id", "trans_num", "trans_amount", "click_cnt" ], "table": "hive_doc_good_sale" } }, "setting": { "errorLimit": { "record": "1000" }, "speed": { "throttle": false, "concurrent": 1, "mbps": "1", "dmu": 1 } } }, "type": "job", "version": "1.0" }
path パラメータは、Hadoop クラスターでデータが格納されている場所を示します。 マスターノードにログインして、
hdfs dfs –ls /user/hive/warehouse/hive_doc_good_sale
コマンドを実行し、場所を確認します。 パーティションテーブルの場合、パーティションを指定する必要はありません。 次の図に示すように、DataWorks のデータ同期機能では、 パーティションパスは自動的に再帰的に処理されます。設定が完了した後、[Run] をクリックします。 タスクが正常に実行されたことを示すメッセージが表示された場合は、同期タスクは完了です。 タスクの実行に失敗したことを示すメッセージが表示された場合は、トラブルシューティングを進めるためにログをコピーします。
結果の検証
DataWorks コンソールでは、hive_doc_good_sale テーブルを選択します。 Hive データが MaxCompute へ同期されたかどうかを確認します。 テーブルクエリタスクを作成することもでき、select * FROM hive_doc_good_sale where pt =1;
スクリプトをタスクに入力して [Run] をクリックして結果を問い合わせます。
select * FROM hive_doc_good_sale where pt =1;
を [odpscmd] CLI ツールに入力しても、テーブルの結果を問い合わせられます。
MaxCompute から Hadoop へのデータの移行
{
"configuration": {
"reader": {
"plugin": "odps",
"parameter": {
"partition": "pt=1",
"isCompress": false,
"datasource": "odps_first",
"column": [
"create_time",
"category",
"brand",
"buyer_id",
"trans_num",
"trans_amount",
"click_cnt"
],
"table": "hive_doc_good_sale"
}
},
"writer": {
"plugin": "hdfs",
"parameter": {
"path": "/user/hive/warehouse/hive_doc_good_sale",
"fileName": "pt=1",
"datasource": "HDFS_data_source",
"column": [
{
"name": "create_time",
"type": "string"
},
{
"name": "category",
"type": "string"
},
{
"name": "brand",
"type": "string"
},
{
"name": "buyer_id",
"type": "string"
},
{
"name": "trans_num",
"type": "BIGINT"
},
{
"name": "trans_amount",
"type": "DOUBLE"
},
{
"name": "click_cnt",
"type": "BIGINT"
}
],
"defaultFS": "hdfs://47.99.162.100:9000",
"writeMode": "append",
"fieldDelimiter": ",",
"encoding": "UTF-8",
"fileType": "text"
}
},
"setting": {
"errorLimit": {
"record": "1000"
},
"speed": {
"throttle": false,
"concurrent": 1,
"mbps": "1",
"dmu": 1
}
}
},
"type": "job",
"version": "1.0"
}
前述の同期タスクを実行する前に、Hadoop クラスターを設定する必要があります。 詳細については、「HDFS Writer の設定」をご参照ください。 同期タスクを実行した後、同期されたファイルを手動でコピーする必要があります。