このトピックでは、DataWorks のデータ同期機能を使用して、Hadoop Distributed File System (HDFS) から Alibaba Cloud MaxCompute へデータを移行する方法を説明します。 MaxCompute と Hadoop または Spark 間のデータ同期がサポートされています。

始める前に

  1. Hadoop クラスターを作成します。

    データの移行の前に、Hadoop クラスターが正しく動作していることを確認します。 Alibaba Cloud E-MapReduce (EMR) を使用して Hadoop クラスターを自動的に構築します。 詳細については、「クラスターの作成」をご参照ください。

    EMR Hadoop のバージョン情報は次のとおりです。
    • EMR バージョン: EMR-3.11.0
    • クラスタータイプ : Hadoop
    • ソフトウェア:HDFS 2.7.2、YARN 2.7.2、Hive 2.3.3、Ganglia 3.7.2、Spark 2.2.1、Hue 4.1.0、Zeppelin 0.7.3、Tez 0.9.1、Sqoop 1.4.6、Pig 0.14.0、ApacheDS 2.0.0、Knox 0.13.0
    Hadoop クラスターは、中国 (杭州) リージョンのクラシックネットワークにデプロイされます。 プライマリインスタンスグループのElastic Compute Service (ECS) インスタンスには、パブリック IP アドレスとプライベート IP アドレスが構成されています。 クラスターの高可用性をなしに設定します。Hadoop クラスター
  2. MaxCompute が有効になり、プロジェクトが作成されます。

    このトピックでは、中国 (杭州) リージョンのプロジェクト bigdata_DOC を作成し、DataWorks サービスを起動します。 詳細については、「MaxCompute の有効化」をご参照ください。

手順

  1. データを準備します。
    1. Hadoop クラスターでテストデータを作成します。
      1. E-MapReduce コンソールにログインし、[データプラットフォーム] タブをクリックします。 表示されるページで、作成された Hadoop クラスターを見つけて、[アクション] 列の [ジョブの編集] をクリックします。 doc という名前のジョブを作成します。 次の例では、Hive での CREATE TABLE 文を示します。 E-MapReduce コンソールでジョブを作成する方法については、「ジョブ操作」をご参照ください。
        CREATE TABLE IF NOT
        EXISTS hive_doc_good_sale(
           create_time timestamp,
           category STRING,
           brand STRING,
           buyer_id STRING,
           trans_num BIGINT,
           trans_amount DOUBLE,
           click_cnt BIGINT
           )
           PARTITIONED BY (pt string) ROW FORMAT
        DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n';
      2. [実行] をクリックします。 Query executed successfully と表示された場合、hive_doc_good_sale テーブルが EMR Hadoop クラスターに作成されたことを示します。
      3. テーブルにテストデータを挿入します。 OSS や他のデータソースからデータをインポートしたり、手動でデータを挿入したりできます。 この例では、次のようにデータを挿入します。
        insert into
        hive_doc_good_sale PARTITION(pt =1 ) values('2018-08-21','Coat','Brand A','lilei',3,500.6,7),('2018-08-22','Fresh food','Brand B','lilei',1,303,8),('2018-08-22','Coat','Brand C','hanmeimei',2,510,2),(2018-08-22,'Bathroom product','Brand A','hanmeimei',1,442.5,1),('2018-08-22','Fresh food','Brand D','hanmeimei',2,234,3),('2018-08-23','Coat','Brand B','jimmy',9,2000,7),('2018-08-23','Fresh food','Brand A','jimmy',5,45.1,5),('2018-08-23','Coat','Brand E','jimmy',5,100.2,4),('2018-08-24','Fresh food','Brand G','peiqi',10,5560,7),('2018-08-24','Bathroom product','Brand F','peiqi',1,445.6,2),('2018-08-24','Coat','Brand A','ray',3,777,3),('2018-08-24','Bathroom product','Brand G','ray',3,122,3),('2018-08-24','Coat','Brand C','ray',1,62,7) ;
      4. データを挿入した後、select * from hive_doc_good_sale where pt =1; 文を実行して、移行のために Hadoop クラスターテーブルにデータが存在するかどうかを確認します。
    2. DataWorks コンソールで MaxCompute テーブルを作成します。
      1. DataWorks コンソールにログインします。 左側のナビゲーションウィンドウで、[ワークスペース] をクリックし、ターゲットプロジェクトを見つけて、[アクション] 列で [データ分析] をクリックします。
      2. 表示される [DataStudio] ページで、左側のナビゲーションウィンドウにある [作成] アイコンにポインターを合わせて、 [MaxCompute] > [テーブル] を選択します。
      3. 表示される [ノードの作成] ダイアログボックスで、[テーブル名] を設定し、[コミット] をクリックします。
      4. 作成したテーブルのタブで、[DDL 文] をクリックします。
      5. 表示される [DDL 文] ダイアログボックスで、CREATE TABLEス 文を入力し、 [テーブルスキーマの生成] をクリックします。 表示される [確認] メッセージで [OK] をクリックします。 この例では、CREATE TABLE 文は次のとおりです。
        CREATE TABLE IF NOT EXISTS hive_doc_good_sale(
           create_time string,
           category STRING,
           brand STRING,
           buyer_id STRING,
           trans_num BIGINT,
           trans_amount DOUBLE,
           click_cnt BIGINT
           )
           PARTITIONED BY (pt string) ;

        テーブルを作成する場合、Hive データ型と MaxCompute データ型の間のマッピングを考慮する必要があります。 データマッピングの詳細については、「Hive データ型の対応表」をご参照ください。

        コマンドラインツール odpscmd を使用して、MaxCompute テーブルを作成することもできます。 odpscmd ツールのインストール方法と設定方法の詳細については、「odpscmd クライアントのインストールと設定」をご参照ください。

        Hive データ型と MaxCompute データ型の間の互換性の問題を解決するには、odpscmd ツールを使用して次のコマンドを実行することを推奨します。
        set odps.sql.type.system.odps2=true;
        set odps.sql.hive.compatible=true;
      6. [本番環境に送信] をクリックして、テーブルを作成します。
      7. テーブルが作成されたら、左側のナビゲーションウィンドウのツールバーで、[ワークスペーステーブル] をクリックして、MaxCompute テーブルを表示します。
  2. データを同期します。
    1. カスタムリソースグループを作成します。

      ほとんどの場合、MaxCompute プロジェクト間のネットワークは、Hadoop クラスターのデータノードにアクセスできません。 Hadoop クラスターのマスターノードとデータノードは通常接続されているため、リソースグループをカスタマイズして、Hadoop クラスターのマスターノードで DataWorks 同期タスクを実行できます。

      1. Hadoop クラスターのデータノードを表示します。
        Alibaba Cloud E-MapReduce コンソールにログインし、[クラスター管理] タブをクリックします。 表示される [クラスター管理] ページで、[クラスターID /名前] 列で、データノードを表示する Hadoop クラスターをクリックします。 表示されるクラスターページでの左側のナビゲーションウィンドウで [インスタンス] をクリックします。
        前の図に示すように、マスターノードの ECS ID をクリックして、ECS インスタンスの詳細ページに移動することもできます。 [接続] をクリックして、hadoop dfsadmin –report コマンドを実行して、データノードに関する情報を表示します。

        この例では、データノードにはプライベート IP アドレスしかないため、DataWorks のデフォルトのリソースグループと通信できません。 したがって、カスタムリソースグループを作成し、マスターノードをDataWorks のデータ同期タスクを実行するノードとして構成する必要があります。

      2. カスタムリソースグループを作成します。
        1. [データ統合] ページで、ページの右上隅にある [カスタムリソースグループ] をクリックし、[リソースグループの追加] をクリックします。
          現在、このナビゲーションパスは、Professional Edition 以降のエディションでのみ使用できます。
        2. サーバーを追加するときは、ECS UUID とサーバー IP アドレスなどの情報を入力する必要があります。 クラシックネットワークの場合、サーバー名を入力します。 VPC の場合、サーバーの UUID を入力します。 現在、DataWorks V2.0 では、クラシックネットワークのスケジュールリソースは中国 (上海) リージョンのみで追加できます。 他のリージョンでは、クラシックネットワークを使用するか VPC を使用するかに関係なく、スケジューリングリソースグループを追加するときに VPC を選択する必要があります。
          サーバーの IP アドレスには、プライベート IP アドレスに到達できない可能性があるため、マスターノードのパブリック IP アドレスを入力します。 ECS UUID を取得するには、マスターノードの管理ターミナルにログインして、dmidecode | grep UUID コマンドを実行します。 また、このコマンドを使用して、Hadoop クラスターが EMR 環境にデプロイされていない場合でも、ECS UUID を取得できます。
        3. サーバーを追加した後、マスターノードと DataWorks が接続されていることを確認します。 ECS インスタンスを使用している場合は、セキュリティグループを構成する必要があります。
          • 通信にイントラネット IP アドレスを使用している場合は、「セキュリティグループの設定」をご参照ください。
          • パブリック IP アドレスを使用している場合は、セキュリティグループにインターネットの受信ルールと送信ルールを設定できます。 このトピックでは、インターネットからのトラフィックを許可するためにすべてのポートが開かれています。 実際のアプリケーションシナリオでは、セキュリティ上の理由から詳細なセキュリティグループルールを設定することを推奨します。
        4. 前の手順が完了したら、表示に従ってリソースグループエージェントをインストールします。 ECS インスタンスのステータスが [利用可能] の場合、カスタムリソースグループが追加されます。

          ECS インスタンスのステータスが [利用不可] の場合、マスターノードにログインし、tail –f/home/admin/alisatasknode/logs/heartbeat.log コマンドを実行して、DataWorks とマスターノードの間のハートビートメッセージがタイムアウトしているかどうかを確認します。

    2. データソースを作成します。

      DataWorks でワークスペースを作成した後、データソースはデフォルトで odps_first に設定されます。 したがって、Hadoop クラスターデータソースを追加するのみです。 詳細については、「HDFS 接続の設定」をご参照ください。

      1. DataWorks のデータ統合ページで、左側のナビゲーションウィンドウで [データソース] をクリックします。
      2. 表示される [データソース] タブで、右上隅に表示される [接続の追加] をクリックします。
      3. 表示される [接続の追加] ダイアログボックスで、[HDFS] を選択します。
      4. [HDFS 接続の追加] ダイアログボックスで、HDFS データソースのパラメーターを設定します。
        パラメーター 説明
        Connection Name 接続名です。 名前には、英数字、アンダースコア (_) を使用できます。 先頭は文字である必要があります。
        Description 接続の説明です。 説明の長さは最大 80 文字である必要があります。
        Applicable 接続が使用される環境です。 有効な値: DevelopmentProduction
        このパラメーターは、ワークスペースが標準モードの場合にのみ使用できます。
        DefaultFS EMR Hadoop クラスターが HA モードの場合、アドレスは hdfs://IP address of emr-header-1:8020 になります。 Hadoop クラスターが非 HA モードの場合、アドレスは hdfs://IP address of emr-header-1:9000 になります。

        このトピックでは、 emr-header-1 はインターネットを介して DataWorks へ接続します。 したがって、パブリック IP アドレスを入力し、インターネットからのトラフィックを許可します。

      5. [テスト接続] をクリックします。
      6. 接続テストが成功したら、[完了] をクリックします。
        EMR Hadoop クラスターのネットワークタイプが VPC の場合、接続テストはサポートされていません。
    3. データ同期タスクを設定します。
      1. [DataStudio] ページの左側のナビゲーションウィンドウでポインターを [作成] アイコンに合わせて、 [データ統合] > [バッチ同期] を選択します。
      2. 表示される [ノードの作成] ダイアログボックスで [ノード名] を指定し、[コミット] をクリックします。
      3. データ同期ノードを作成した後、上部のツールバーの [コードエディターに切り替え] アイコンをクリックします。
      4. 表示される [確認] メッセージで、 [OK] をクリックしてコードエディターに切り替えます。
      5. 上部のツールバーの [テンプレートの適用] アイコンをクリックします。
      6. 表示される [テンプレートの適用] ダイアログボックスで、[ソース接続タイプ][接続][ターゲット接続タイプ][接続] を指定して、 [OK] をクリックします。
      7. テンプレートを適用すると、HDFS リーダーの基本構成が完了します。 HDFS リーダーのデータストアまたはテーブルを変更できます。 この例のコードは次のとおりです。 詳細については、「HDFS Reader」をご参照ください。
        {
          "configuration": {
            "reader": {
              "plugin": "hdfs",
              "parameter": {
                "path": "/user/hive/warehouse/hive_doc_good_sale/", 
                "datasource": "HDFS1",
                "column": [
                  {
                    "index": 0,
                    "type": "string"
                  },
                  {
                    "index": 1,
                    "type": "string"
                  },
                  {
                    "index": 2,
                    "type": "string"
                  },
                  {
                    "index": 3,
                    "type": "string"
                  },
                  {
                    "index": 4,
                    "type": "long"
                  },
                  {
                    "index": 5,
                    "type": "double"
                  },
                  {
                    "index": 6,
                    "type": "long"
                  }
                ],
                "defaultFS": "hdfs://121.199.11.138:9000",
                "fieldDelimiter": ",",
                "encoding": "UTF-8",
                "fileType": "text"
              }
            },
            "writer": {
              "plugin": "odps",
              "parameter": {
                "partition": "pt=1",
                "truncate": false,
                "datasource": "odps_first",
                "column": [
                  "create_time",
                  "category",
                  "brand",
                  "buyer_id",
                  "trans_num",
                  "trans_amount",
                  "click_cnt"
                ],
                "table": "hive_doc_good_sale"
              }
            },
            "setting": {
              "errorLimit": {
                "record": "1000"
              },
              "speed": {
                "throttle": false,
                "concurrent": 1,
                "mbps": "1",
              }
            }
          },
          "type": "job",
          "version": "1.0"
        }
        コードの path パラメーターは、Hadoop クラスターでデータが保存されているディレクトリを示します。 マスターノードにログインし、hdfs dfs –ls /user/hive/warehouse/hive_doc_good_sale コマンドを実行してディレクトリを確認します。 パーティションテーブルの場合、DataWorks のデータ同期機能は、データが格納されるパーティションに自動的に再帰できます。
      8. 設定が完了した後、[実行] をクリックします。 タスクが正常に実行されたことを示すメッセージが表示されたら、データ同期タスクは完了です。 タスクの実行に失敗したことを示すメッセージが表示された場合、エラーログに基づいて障害を修正します。

タスクの結果

  1. 左側のナビゲーションウィンドウで、[アドホッククエリ] をクリックします。
  2. [作成] アイコンにポインターを合わせて、 [作成] > [ODPS SQL] を選択します。
  3. 表示される [ノードの作成] ダイアログボックスでパラメーターを設定し、[コミット] をクリックしてアドホッククエリノードを作成します。 アドホッククエリノードのタブで、次の SQL 文を実行して、hive_doc_good_sale に同期されたデータを表示します。 SQL 文は次のとおりです。
    --Check whether the data is written to MaxCompute.
    select * from hive_doc_good_sale where pt=1;

    コマンドラインツール odpscmd に select * FROM hive_doc_good_sale where pt =1; コマンドを入力し、テーブルの結果をクエリできます。

MaxCompute から Hadoop にデータを移行する場合も、同様の手順です。 ただし、同期スクリプトでリーダーオブジェクトとライターオブジェクトを交換する必要があります。 同期スクリプトは次のとおりです。

{
  "configuration": {
    "reader": {
      "plugin": "odps",
      "parameter": {
      "partition": "pt=1",
      "isCompress": false,
      "datasource": "odps_first",
      "column": [
        "create_time",
        "category",
        "brand",
      "buyer_id",
      "trans_num",
      "trans_amount",
      "click_cnt"
    ],
    "table": "hive_doc_good_sale"
    }
  },
  "writer": {
    "plugin": "hdfs",
    "parameter": {
    "path": "/user/hive/warehouse/hive_doc_good_sale",
    "fileName": "pt=1",
    "datasource": "HDFS_data_source",
    "column": [
      {
        "name": "create_time",
        "type": "string"
      },
      {
        "name": "category",
        "type": "string"
      },
      {
        "name": "brand",
        "type": "string"
      },
      {
        "name": "buyer_id",
        "type": "string"
      },
      {
        "name": "trans_num",
        "type": "BIGINT"
      },
      {
        "name": "trans_amount",
        "type": "DOUBLE"
      },
      {
        "name": "click_cnt",
        "type": "BIGINT"
      }
    ],
    "defaultFS": "hdfs://47.99.162.100:9000",
    "writeMode": "append",
    "fieldDelimiter": ",",
    "encoding": "UTF-8",
    "fileType": "text"
    }
  },
  "setting": {
    "errorLimit": {
      "record": "1000"
  },
  "speed": {
    "throttle": false,
    "concurrent": 1,
    "mbps": "1",
  }
  }
},
"type": "job",
"version": "1.0"
}

前述の同期タスクを実行する前に、Hadoop クラスターを設定する必要があります。 同期タスクを実行した後、同期されたファイルを手動でコピーできます。