This topic describes how to use the data synchronization feature of DataWorks to migrate data from Hadoop Distributed File System (HDFS) to MaxCompute. Data synchronization between MaxCompute and Hadoop or Spark is supported.

Prerequisites

  1. A Hadoop cluster is created.

    Before data migration, make sure that your Hadoop cluster works properly. You can use Alibaba Cloud E-MapReduce (EMR) to automatically create a Hadoop cluster. For more information, see Create a cluster.

    The version information of EMR Hadoop is as follows:
    • EMR version: EMR-3.11.0
    • Cluster type: Hadoop
    • Software: HDFS 2.7.2, YARN 2.7.2, Hive 2.3.3, Ganglia 3.7.2, Spark 2.2.1, Hue 4.1.0, Zeppelin 0.7.3, Tez 0.9.1, Sqoop 1.4.6, Pig 0.14.0, ApacheDS 2.0.0, or Knox 0.13.0
    The Hadoop cluster is deployed in a classic network in the China (Hangzhou) region. A public IP address and a private IP address are configured for the Elastic Compute Service (ECS) instance in the primary instance group. High Availability is set to No for the cluster.Hadoop cluster
  2. MaxCompute is activated and a project is created.

    In this topic, project bigdata_DOC in the China (Hangzhou) region is created and DataWorks services are started. For more information, see Activate MaxCompute.

Procedure

  1. Prepare data.
    1. Create test data on the Hadoop cluster.
      1. Log on to the E-MapReduce console and click the Data Platform tab. On the page that appears, find the created Hadoop cluster and click Edit Job in the Actions column. Then, create a job named doc. The following example shows the CREATE TABLE statement in Hive. For more information about how to create jobs in the E-MapReduce console, see Edit jobs.
        CREATE TABLE IF NOT
        EXISTS hive_doc_good_sale(
           create_time timestamp,
           category STRING,
           brand STRING,
           buyer_id STRING,
           trans_num BIGINT,
           trans_amount DOUBLE,
           click_cnt BIGINT
           )
           PARTITIONED BY (pt string) ROW FORMAT
        DELIMITED FIELDS TERMINATED BY ',' lines terminated by '\n';
      2. Click Run. If "Query executed successfully" is displayed, the hive_doc_good_sale table is created on the EMR Hadoop cluster.
      3. Insert test data into the table. You can import data from OSS or other data sources, or insert data manually. In this example, insert data as follows:
        insert into
        hive_doc_good_sale PARTITION(pt =1 ) values('2018-08-21','Coat','Brand A','lilei',3,500.6,7),('2018-08-22','Fresh food','Brand B','lilei',1,303,8),('2018-08-22','Coat','Brand C','hanmeimei',2,510,2),(2018-08-22,'Bathroom product','Brand A','hanmeimei',1,442.5,1),('2018-08-22','Fresh food','Brand D','hanmeimei',2,234,3),('2018-08-23','Coat','Brand B','jimmy',9,2000,7),('2018-08-23','Fresh food','Brand A','jimmy',5,45.1,5),('2018-08-23','Coat','Brand E','jimmy',5,100.2,4),('2018-08-24','Fresh food','Brand G','peiqi',10,5560,7),('2018-08-24','Bathroom product','Brand F','peiqi',1,445.6,2),('2018-08-24','Coat','Brand A','ray',3,777,3),('2018-08-24','Bathroom product','Brand G','ray',3,122,3),('2018-08-24','Coat','Brand C','ray',1,62,7) ;
      4. After you insert the data, execute the select * from hive_doc_good_sale where pt =1; statement to check whether the data exists in the Hadoop cluster table for migration.
    2. Create a MaxCompute table in the DataWorks console.
      1. Log on to the DataWorks console. In the left-side navigation pane, click Workspaces, find the target project, and click Data Analytic in the Actions column.
      2. On the DataStudio page that appears, move your pointer over the Create icon in the left-side navigation pane and choose MaxCompute > Table.
      3. In the Create Table dialog box that appears, set Table Name and click Commit.
      4. On the tab of the created table, click DDL Statement.
      5. In the DDL Statement dialog box that appears, enter the CREATE TABLE statement and click Generate Table Schema. In the Confirm message that appears, click OK. In this example, the CREATE TABLE statement is as follows:
        CREATE TABLE IF NOT EXISTS hive_doc_good_sale(
           create_time string,
           category STRING,
           brand STRING,
           buyer_id STRING,
           trans_num BIGINT,
           trans_amount DOUBLE,
           click_cnt BIGINT
           )
           PARTITIONED BY (pt string) ;

        When you create a table, you must consider the mapping between Hive data types and MaxCompute data types. For more information about the data mapping, see Mapping of data types.

        You can also use the command-line tool odpscmd to create a MaxCompute table. For more information about how to install and configure the odpscmd tool, see Install and configure the odpscmd client.

        Note To resolve compatibility issues between Hive data types and MaxCompute data types, we recommend that you run the following commands by using the odpscmd tool:
        set odps.sql.type.system.odps2=true;
        set odps.sql.hive.compatible=true;
      6. Click Submit to Production Environment to create a table.
      7. After the table is created, click Workspace Tables on the toolbar in the left-side navigation pane to view the MaxCompute table.
  2. Synchronize data.
    1. Create a custom resource group.

      In most cases, the network between a MaxCompute project cannot access a data node in a Hadoop cluster. You can customize resource groups to run the DataWorks synchronization task on the master node of the Hadoop cluster because the master node and data nodes in a Hadoop cluster are usually connected.

      1. View a data node of the Hadoop cluster.
        Log on to the E-MapReduce console and click the Cluster Management tab. On the Cluster Management page that appears, click the Hadoop cluster whose data node you want to view in the Cluster ID/Name column. On the cluster page that appears, click Instances in the left-side navigation pane.
        You can also click the ECS ID of the master node as shown in the preceding figure to go to the ECS instance details page. Then, click Connect and run the hadoop dfsadmin –report command to view the information about the data node.

        In this example, the data node only has a private IP address and cannot communicate with the default resource group of DataWorks. Therefore, you must create a custom resource group and configure the master node as the node that runs the data synchronization task of DataWorks.

      2. Create a task resource group.
        1. On the Data Integration page, click Custom Resource Groups and click Add Resource Group in the upper-right corner of the page.
          Note Currently, this navigation path is available only in the Professional Edition and higher editions.
        2. When you add a server, you must enter information such as the ECS UUID and server IP address. For a classic network, enter the server name. For a VPC, enter the server UUID. Currently, you can add scheduling resources for classic networks only in the China (Shanghai) region in DataWorks V2.0. In other regions, you must select a VPC when you add a scheduling resource group regardless of whether you use a classic network or a VPC.
          For the server IP address, enter the public IP address of the master node because the private IP address may be unreachable. To obtain the ECS UUID, log on to the management terminal of the master node and run the dmidecode | grep UUID command. You can also use this command to obtain the ECS UUID even if your Hadoop cluster is not deployed in the EMR environment.
        3. After you add a server, make sure that the master node and DataWorks are connected. If you are using an ECS instance, you must configure a security group.
          • If you are using a private IP address for communication, see Configure a security group.
          • If you are using a public IP address, you can set the Internet inbound and outbound rules for a security group. In this topic, all ports are opened to allow traffic from the Internet. In practical application scenarios, we recommend that you set detailed security group rules for security reasons.
        4. After you complete the preceding steps, install the agent for custom resource groups as prompted. If the status of the ECS instance is Available, the custom resource group is added.

          If the status of the ECS instance is Unavailable, you can log on to the master node and run the tail –f/home/admin/alisatasknode/logs/heartbeat.log command to check whether the heartbeat packet between DataWorks and the master node has timed out.

    2. Create a data source.

      After you create a workspace in DataWorks, the data source is set to odps_first by default. Therefore, you only need to add a Hadoop cluster data source. For more information, see Configure an HDFS connection.

      1. On the data integration page of DataWorks, click Data Source in the left-side navigation pane.
      2. On the Data Source page that appears, click Add Connection in the upper-right corner.
      3. In the Add Connection dialog box that appears, select HDFS.
      4. In the Add HDFS Connection dialog box, set parameters for the HDFS data source.
        Parameter Description
        Connection Name The name of the connection. The name can contain letters, digits, and underscores (_). It must start with a letter.
        Description The description of the connection. The description can contain a maximum of 80 characters in length.
        Applicable The environment in which the connection is used. Valid values: Development and Production.
        Note This parameter is available only when the workspace is in standard mode.
        DefaultFS If the EMR Hadoop cluster is in HA mode, the address is hdfs://IP address of emr-header-1:8020. If the Hadoop cluster is in non-HA mode, the address is hdfs://IP address of emr-header-1:9000.

        In this example, emr-header-1 is connected to DataWorks over the Internet. Therefore, enter the public IP address and allow traffic from the Internet.

      5. Click Test Connection.
      6. After the connectivity test is passed, click Complete.
        Note If the network type of the EMR Hadoop cluster is VPC, the connectivity test is not supported.
    3. Configure a data synchronization task.
      1. In the left-side navigation pane of the DataStudio page, move your pointer over the Create icon and choose Data Integration > Batch Synchronization.
      2. In the Create Node dialog box that appears, set Node Name and click Commit.
      3. After you create a data synchronization node, click the Switch to Code Editor icon in the toolbar at the top.
      4. In the Confirm message that appears, click OK to switch to the code editor.
      5. Click the Apply Template icon in the toolbar at the top.
      6. In the Apply Template dialog box that appears, specify Source Connection Type, Connection, Target Connection Type, and Connection, and click OK.
      7. After the template is applied, the basic configuration of the HDFS reader is complete. You can modify the data store or table for the HDFS reader. The code of this example is as follows. For more information, see Configure HDFS Reader.
        {
          "configuration": {
            "reader": {
              "plugin": "hdfs",
              "parameter": {
                "path": "/user/hive/warehouse/hive_doc_good_sale/", 
                "datasource": "HDFS1",
                "column": [
                  {
                    "index": 0,
                    "type": "string"
                  },
                  {
                    "index": 1,
                    "type": "string"
                  },
                  {
                    "index": 2,
                    "type": "string"
                  },
                  {
                    "index": 3,
                    "type": "string"
                  },
                  {
                    "index": 4,
                    "type": "long"
                  },
                  {
                    "index": 5,
                    "type": "double"
                  },
                  {
                    "index": 6,
                    "type": "long"
                  }
                ],
                "defaultFS": "hdfs://121.199.11.138:9000",
                "fieldDelimiter": ",",
                "encoding": "UTF-8",
                "fileType": "text"
              }
            },
            "writer": {
              "plugin": "odps",
              "parameter": {
                "partition": "pt=1",
                "truncate": false,
                "datasource": "odps_first",
                "column": [
                  "create_time",
                  "category",
                  "brand",
                  "buyer_id",
                  "trans_num",
                  "trans_amount",
                  "click_cnt"
                ],
                "table": "hive_doc_good_sale"
              }
            },
            "setting": {
              "errorLimit": {
                "record": "1000"
              },
              "speed": {
                "throttle": false,
                "concurrent": 1,
                "mbps": "1",
              }
            }
          },
          "type": "job",
          "version": "1.0"
        }
        In the code, the path parameter indicates the directory where the data is stored in the Hadoop cluster. You can log on to the master node and run the hdfs dfs –ls /user/hive/warehouse/hive_doc_good_sale command to check the directory. For a partitioned table, the data synchronization feature of DataWorks can automatically recurse to the partition where the data is to be stored.
      8. After the configuration is complete, click Run. If a message indicating successful task execution is displayed, the data synchronization task is complete. If a message indicating a task execution failure is displayed, you can rectify the fault based on error logs.

Result

  1. In the left-side navigation pane, click Ad-Hoc Query.
  2. Move the pointer over the Create icon and choose Create > ODPS SQL
  3. In the Create Node dialog box that appears, set parameters and click Commit to create an ad hoc query node. On the ad hoc query node tab, run the following SQL statement to view data synchronized to hive_doc_good_sale. The SQL statement is as follows:
    --Check whether the data is written to MaxCompute.
    select * from hive_doc_good_sale where pt=1;

    You can also run the select * FROM hive_doc_good_sale where pt =1; command in the command-line tool odpscmd to query the table results.

You can perform the same steps if you want to migrate data from MaxCompute to Hadoop. However, you need to exchange the reader and writer objects in the synchronization script. The synchronization script is as follows:

{
  "configuration": {
    "reader": {
      "plugin": "odps",
      "parameter": {
      "partition": "pt=1",
      "isCompress": false,
      "datasource": "odps_first",
      "column": [
        "create_time",
        "category",
        "brand",
      "buyer_id",
      "trans_num",
      "trans_amount",
      "click_cnt"
    ],
    "table": "hive_doc_good_sale"
    }
  },
  "writer": {
    "plugin": "hdfs",
    "parameter": {
    "path": "/user/hive/warehouse/hive_doc_good_sale",
    "fileName": "pt=1",
    "datasource": "HDFS_data_source",
    "column": [
      {
        "name": "create_time",
        "type": "string"
      },
      {
        "name": "category",
        "type": "string"
      },
      {
        "name": "brand",
        "type": "string"
      },
      {
        "name": "buyer_id",
        "type": "string"
      },
      {
        "name": "trans_num",
        "type": "BIGINT"
      },
      {
        "name": "trans_amount",
        "type": "DOUBLE"
      },
      {
        "name": "click_cnt",
        "type": "BIGINT"
      }
    ],
    "defaultFS": "hdfs://47.99.162.100:9000",
    "writeMode": "append",
    "fieldDelimiter": ",",
    "encoding": "UTF-8",
    "fileType": "text"
    }
  },
  "setting": {
    "errorLimit": {
      "record": "1000"
  },
  "speed": {
    "throttle": false,
    "concurrent": 1,
    "mbps": "1",
  }
  }
},
"type": "job",
"version": "1.0"
}

Before you run the preceding synchronization task, you must configure the Hadoop cluster. For more information, see Configure HDFS Writer. After you run the synchronization task, you can manually copy the files that have been synchronized.