このトピックでは、DataWorks のデータ同期機能を使用して、Hadoop から Alibaba Cloud MaxCompute へデータを移行する方法を説明します。

環境の準備

  1. Hadoop クラスターの構築

    データの移行の前に、Hadoop クラスターが正しく動作していることを確認する必要があります。 Alibaba Cloud E-MapReduce を使用して Hadoop クラスターを自動的に構築します。

    E-MapReduce Hadoop のバージョン情報は次のとおりです。

    E-MapReduce バージョン : EMR-3.10.1 または 3.11.0

    クラスタータイプ : Hadoop

    ソフトウェア (EMR-3.11.0 の場合) : HDFS2.7.2 / YARN2.7.2 / Hive2.3.3 / Ganglia3.7.2 / Spark2.2.1 / HUE4.1.0 / Zeppelin0.7.3 / Tez0.9.1 / Sqoop1.4.6 / Pig0.14.0 / ApacheDS2.0.0 / Knox0.13.0

    Hadoop クラスターのネットワークタイプはクラシックです。 リージョンは中国 (杭州) です。 マスターインスタンスグループの ECS コンピューティングリソースは、インターネット IP アドレスとイントラネット IP アドレスで構成されています。 高可用性モードは No (非 HA モード) に設定されています。 次の図に、EMR-3.10.1 の設定を示します。

  2. MaxCompute

    詳細については、「MaxCompute の有効化」をご参照ください。

    MaxCompute を有効化して、プロジェクトを作成します。 このトピックでは、中国 (杭州) に bigdata_DOC という名前のプロジェクトを作成し、このプロジェクトに関連する DataWorks サービスを有効にします。

データの準備

  1. Hadoop クラスターでテストデータを作成します。

    E-MapReduce コンソールで、Hadoop クラスターページに移動し、[ノートブック] を使用してノートブックタスクを作成します。 この例のテーブル作成 Hive 文は次のとおりです。

    CREATE TABLE IF NOT
    EXISTS hive_doc_good_sale(
    
    create_time timestamp,
    
    category STRING,
    
    brand STRING,
    
    buyer_id STRING,
    
    trans_num BIGINT,
    
    trans_amount DOUBLE,
    
    click_cnt BIGINT
    
    )
    
    PARTITIONED BY (pt string) ROW FORMAT
    DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n'

    [実行] をクリックします。 テストテーブル hive_doc_good_sale は E-MapReduce Hadoop クラスターに正常に作成されます。

    テストデータの挿入 OSS または他のデータソースからデータを選択するか、少量のテストデータを手動で挿入します。 次のデータを手動で挿入します。

    insert into
    hive_doc_good_sale PARTITION(pt =1 ) values('2018-08-21','Coat','Brand A','lilei',3,500.6,7),('2018-08-22','Fresh food','Brand B','lilei',1,303,8),('2018-08-22','Coat','Brand C','hanmeimei',2,510,2),(2018-08-22,'Toiletries','Brand A','hanmeimei',1,442.5,1),('2018-08-22','Fresh food','Brand D','hanmeimei',2,234,3),('2018-08-23','Coat','Brand B','jimmy',9,2000,7),('2018-08-23','Fresh food','Brand A','jimmy',5,45.1,5),('2018-08-23','Coat','Brand E','jimmy',5,100.2,4),('2018-08-24','Fresh food','Brand G','peiqi',10,5560,7),('2018-08-24','Sanitary ware','Brand F','peiqi',1,445.6,2),('2018-08-24','Coat','Brand A','ray',3,777,3),('2018-08-24','Sanitary ware','Brand G','ray',3,122,3),('2018-08-24','Coat','Brand C','ray',1,62,7) ;

    データを挿入した後、select * from hive_doc_good_sale where pt =1; 文を使用して、移行のために Hadoop クラスターテーブル にデータが存在するかどうかを確認します。

  2. DataWorks を使用して変換先テーブルを作成します。

    DataWorks コンソールで、MaxCompute プロジェクトをクリックし、[Data Development] > [New] > [Create Table] を選択します。

    表示されたウインドウで、次のテーブル作成 SQL 文を入力します。

    CREATE TABLE IF NOT EXISTS hive_doc_good_sale(
       create_time string,
       category STRING,
       brand STRING,
       buyer_id STRING,
       trans_num BIGINT,
       trans_amount DOUBLE,
       click_cnt BIGINT
       )
       PARTITIONED BY (pt string) ;

    テーブルを作成するときは、Hive データ型と MaxCompute データ型の間のマッピングに注意します。

    DataWorks のデータ同期機能は timestamp データではサポートされません。 したがって、DataWorks のテーブル作成文では、create_time は文字列値が設定されます。 テーブルを作成するために、odpscmd CLI (コマンドライン) ツールを使用します。 ツールのインストール方法と設定方法の詳細については、「クライアントのインストールと設定」をご参照ください。 テーブル作成処理は次のとおりです。

    Hive と MaxCompute のデータ型の互換性を考慮して、 [odpscmd] クライアント上で次のコマンドを実行することを推奨します。
    set odps.sql.type.system.odps2=true;set
    odps.sql.hive.compatible=true;

    テーブルが作成された後、次の図に示すように、DataWorks コンソールの [Data Development] > [Table Query] を選択して MaxCompute で作成されたデータを表示します。

データの同期

  1. カスタムリソースグループの作成

    たいていの場合、MaxCompute のプロジェクトデータノードと Hadoop クラスターのデータノードの間のネットワークは接続されていません。 Hadoop クラスターのマスターノードで DataWorks の同期タスクを実行するために、リソースグループをカスタマイズします。 (通常、Hadoop クラスターのマスターノードとデータノードの間のネットワークは接続されています。)

    1. Hadoop クラスターのデータノードを 表示します。

      E-MapReduce コンソールのホームページで、[クラスター管理] > [クラスター] > [ホスト] を選択します。 Hadoop クラスターのデータノードを表示します。 次の図に示すように、E-MapReduce Hadoop クラスター (非 HA モード) のマスターノードのホスト名は emr-header-1 で、データノードのホスト名は emr-worker-X です。

      次の図に示すように、マスターノードの ECS ID をクリックし、表示された ECS 詳細ページで [接続] をクリックし、hadoop dfsadmin –report コマンドを実行してデータノードを表示します。

      前の図に示すように、データノードはイントラネットアドレスのみを持ち、DataWorks のデフォルトリソースグループと通信することはできません。 したがって、リソースグループをカスタマイズし、マスターノードを DataWorks の同期タスクを実行するノードに設定する必要があります。

    2. カスタムリソースグループの作成

      次の図に示すように、DataWorks コンソールで、Data Integration ページに移動し、[Resource Group] を選択し、[New Resource Groups] をクリックします。 リソースグループをカスタマイズする方法の詳細については、「スケジューリングリソースの追加」をご参照ください。

      サーバーを追加するときは、ECS UUID とマシン IP アドレスなどの情報を入力する必要があります。 (クラシックネットワークの場合、サーバー名を入力します。 VPC ネットワークの場合、サーバーの UUID を入力します。 DataWorks V2.0 の中国 (上海) のクラシックネットワークにのみスケジューリングリソースを追加します。 他のリージョンでは、ネットワークタイプがクラシックか VPC かにかかわらず、スケジューリングリソースグループを追加するときに、VPC ネットワークタイプを選択します。) イントラネット IP アドレスにアクセスできない可能性があるため、マシン IP アドレスをマスターノードのインターネット IP アドレスに設定します。 次の図に示すように、ECS UUID はマスターノード管理端末に接続する必要があり、dmidecode | grep UUID コマンドを実行して UUID を取得します。 (Hadoop クラスターが E-MapReduce 上に構築されていない場合は、同じ方法を使用します。)

      サーバーを追加した後、マスターノードと DataWorks の間のネットワークが接続されていることを確認します。 ECS サーバーを使用している場合は、サーバーセキュリティグループを設定する必要があります。 通信にイントラネット IP アドレスを使用している場合は、サーバーセキュリティグループを設定します。 詳細については、「セキュリティグループの追加」をご参照ください。

      インターネット IP アドレスを使用している場合は、[セキュリティグループルール] に従って、インターネットの入口と出口を直接設定します。(実際のアプリケーションシナリオでは、データセキュリティのために 詳細なバイパスルールを設定することを推奨します。)

      前の手順が完了したら、表示に従ってリソースグループエージェントをインストールします。 ステータスが [使用可能] の場合は、カスタムリソースグループが正常に追加されています。

      ステータスが [使用不可] の場合は、次の図に示すように、マスターノードにログインし、tail –f/home/admin/alisatasknode/logs/heartbeat.log コマンドを実行して、DataWorks とマスターノードの間のハートビートメッセージがタイムアウトしているかどうかを確認します。

  2. データソースの作成

    DataWorks でデータソースを作成する方法の詳細については、「データソースの設定」をご参照ください。

    DataWorks でプロジェクトを作成した後、データソースはデフォルトで odps_first に設定されます。 したがって、Hadoop クラスターデータソースを追加するだけです。 これを行うには、次の手順を実行します。DataWorks の Data Integration ページで、[Data Source] > [New Source] を選択し、[HDFS] を選択します。

    表示されたウインドウで、データソース名と defaultFS を入力します。 E-MapReduce Hadoop クラスターが HA クラスターの場合、アドレスは hdfs://emr-header-1 の IP:8020 です。 E-MapReduce Hadoop クラスターが非 HA クラスターの場合、アドレスは hdfs://emr-header-1 の IP:9000 です。 このトピックでは、 emr-header-1 はインターネットを介して DataWorks へ接続します。 したがって、インターネット IP アドレスを入力してセキュリティグループを開きます。

    設定が完了した後、[Test Connectivity] をクリックします。 Test connectivity successfully が表示された場合は、データソースは正常に追加されています。

    E-MapReduce Hadoop クラスターのネットワークタイプが VPC の場合、接続テストはサポートされていません。
  3. 同期タスクの設定

    DataWorks の Data Integration ページで、 [Sync Tasks] をクリックし、スクリプトモードを作成します。 表示されたウインドウで、次の図に示すように、データソースを選択します。

    テンプレートがインポートされた後、同期タスクはスクリプトモードに変換されます。 詳細については、「スクリプトモード」をご参照ください。

    データ同期タスクスクリプトを設定するとき、DataWorks 同期タスクと Hive テーブルのデータ型は次のとおりです。

    Hive テーブルのデータ型 DataX と DataWorks のデータ型
    TINYINT,SMALLINT,INT,BIGINT Long
    FLOAT,DOUBLE,DECIMAL Double
    String,CHAR,VARCHAR String
    BOOLEAN Boolean
    Date,TIMESTAMP Date
    Binary Binary

    コードの詳細は次のとおりです。

    {
      "configuration": {
        "reader": {
          "plugin": "hdfs",
          "parameter": {
            "path": "/user/hive/warehouse/hive_doc_good_sale/", 
            "datasource": "HDFS1",
            "column": [
              {
                "index": 0,
                "type": "string"
              },
              {
                "index": 1,
                "type": "string"
              },
              {
                "index": 2,
                "type": "string"
              },
              {
                "index": 3,
                "type": "string"
              },
              {
                "index": 4,
                "type": "long"
              },
              {
                "index": 5,
                "type": "double"
              },
              {
                "index": 6,
                "type": "long"
              }
            ],
            "defaultFS": "hdfs://121.199.11.138:9000",
            "fieldDelimiter": ",",
            "encoding": "UTF-8",
            "fileType": "text"
          }
        },
        "writer": {
          "plugin": "odps",
          "parameter": {
            "partition": "pt=1",
            "truncate": false,
            "datasource": "odps_first",
            "column": [
              "create_time",
              "category",
              "brand",
              "buyer_id",
              "trans_num",
              "trans_amount",
              "click_cnt"
            ],
            "table": "hive_doc_good_sale"
          }
        },
        "setting": {
          "errorLimit": {
            "record": "1000"
          },
          "speed": {
            "throttle": false,
            "concurrent": 1,
            "mbps": "1",
            "dmu": 1
          }
        }
      },
      "type": "job",
      "version": "1.0"
    }

    path パラメータは、Hadoop クラスターでデータが格納されている場所を示します。 マスターノードにログインして、hdfs dfs –ls /user/hive/warehouse/hive_doc_good_sale コマンドを実行し、場所を確認します。 パーティションテーブルの場合、パーティションを指定する必要はありません。 次の図に示すように、DataWorks のデータ同期機能では、 パーティションパスは自動的に再帰的に処理されます。

    設定が完了した後、[Run] をクリックします。 タスクが正常に実行されたことを示すメッセージが表示された場合は、同期タスクは完了です。 タスクの実行に失敗したことを示すメッセージが表示された場合は、トラブルシューティングを進めるためにログをコピーします。

結果の検証

DataWorks コンソールでは、[Data Development] > [Table Query] を選択し、hive_doc_good_sale テーブルを選択します。 Hive データが MaxCompute へ同期されたかどうかを確認します。 テーブルクエリタスクを作成することもでき、select * FROM hive_doc_good_sale where pt =1; スクリプトをタスクに入力して [Run] をクリックして結果を問い合わせます。

select * FROM hive_doc_good_sale where pt =1; を [odpscmd] CLI ツールに入力しても、テーブルの結果を問い合わせられます。

MaxCompute から Hadoop へのデータの移行

MaxCompute から Hadoop へデータを移行するには、前述の手順を実行しますが、同期スクリプト内の reader とwriter オブジェクトを入れ替える必要があります。 例は次のとおりです。

{
  "configuration": {
    "reader": {
      "plugin": "odps",
      "parameter": {
      "partition": "pt=1",
      "isCompress": false,
      "datasource": "odps_first",
      "column": [
        "create_time",
        "category",
        "brand",
      "buyer_id",
      "trans_num",
      "trans_amount",
      "click_cnt"
    ],
    "table": "hive_doc_good_sale"
    }
  },
  "writer": {
    "plugin": "hdfs",
    "parameter": {
    "path": "/user/hive/warehouse/hive_doc_good_sale",
    "fileName": "pt=1",
    "datasource": "HDFS_data_source",
    "column": [
      {
        "name": "create_time",
        "type": "string"
      },
      {
        "name": "category",
        "type": "string"
      },
      {
        "name": "brand",
        "type": "string"
      },
      {
        "name": "buyer_id",
        "type": "string"
      },
      {
        "name": "trans_num",
        "type": "BIGINT"
      },
      {
        "name": "trans_amount",
        "type": "DOUBLE"
      },
      {
        "name": "click_cnt",
        "type": "BIGINT"
      }
    ],
    "defaultFS": "hdfs://47.99.162.100:9000",
    "writeMode": "append",
    "fieldDelimiter": ",",
    "encoding": "UTF-8",
    "fileType": "text"
    }
  },
  "setting": {
    "errorLimit": {
      "record": "1000"
  },
  "speed": {
    "throttle": false,
    "concurrent": 1,
    "mbps": "1",
    "dmu": 1
  }
  }
},
"type": "job",
"version": "1.0"
}

前述の同期タスクを実行する前に、Hadoop クラスターを設定する必要があります。 詳細については、「HDFS Writer の設定」をご参照ください。 同期タスクを実行した後、同期されたファイルを手動でコピーする必要があります。