In the existing data lakehouse architecture, MaxCompute serves as a hub to read data from and write data to Hadoop clusters. In scenarios in which data centers are deployed, if customers do not want to expose cluster information to the Internet, developers need to access data in the cloud from on-premises Hadoop clusters. This topic describes how to use an Alibaba Cloud E-MapReduce (EMR) cluster as an on-premises Hadoop cluster to read data from and write data to MaxCompute.
Background information
The following figure shows the data center architecture that is used in this practice.
Prepare a development environment
Prepare an EMR cluster.
Purchase an EMR cluster.
For more information, see Getting started with EMR.
Log on to the EMR cluster.
NoteFor more information about how to log on to an EMR cluster, see Log on to a cluster.
In this practice, you need to log on to an Elastic Compute Service (ECS) instance in the EMR cluster. For more information about how to connect to an ECS instance, see Connect to an ECS instance.
Prepare an IntelliJ IDEA project in Scala.
Install IntelliJ IDEA.
IntelliJ IDEA is used in this practice. For more information about how to install IntelliJ IDEA, see Install IntelliJ IDEA.
Install Maven.
For more information, see Installing Apache Maven.
Create a Scala project.
Download the Scala plug-in.
Start IntelliJ IDEA and choose File > Settings. In the Settings dialog box, click Plugins in the left-side navigation pane and click Install in the Scala plug-in card.
Install the Scala Java Development Kit (JDK).
For more information, see Install Scala on your computer.
Create a Scala project.
In IntelliJ IDEA, choose Scala > IDEA to create a Scala project.
Prepare MaxCompute data.
Create a MaxCompute project.
For more information about how to create a MaxCompute project, see Create a MaxCompute project.
Obtain an AccessKey pair.
You can obtain the AccessKey ID and AccessKey secret from the AccessKey Pair page.
Obtain the endpoint of the MaxCompute project.
Obtain the endpoint of the MaxCompute project. The endpoint of the MaxCompute project varies based on the region and network connection method that you selected when you create the MaxCompute project. For more information about the endpoints that correspond to different regions and network connection methods, see Endpoints.
Create tables.
In this practice, you need to prepare a partitioned table and a non-partitioned table for testing. For more information about how to create a table, see Create a table.
Read data from and write data to MaxCompute
Write code.
The following sample code is used to read data from a non-partitioned table.
NoteFor the sample code that is used to read data from a partitioned table, write data to a non-partitioned table, and write data to a partitioned table, see PartitionDataReaderTest.scala, DataWriterTest.scala, and PartitionDataWriterTest.scala. You can write code based on your business requirements.
/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.spark.sql.SparkSession /** * @author renxiang * @date 2021-12-20 */ object DataReaderTest { val ODPS_DATA_SOURCE = "org.apache.spark.sql.odps.datasource.DefaultSource" val ODPS_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api" def main(args: Array[String]): Unit = { val odpsProject = args(0) val odpsAkId = args(1) val odpsAkKey = args(2) val odpsTable = args(3) val spark = SparkSession .builder() .appName("odps-datasource-reader") .getOrCreate() import spark._ val df = spark.read.format(ODPS_DATA_SOURCE) .option("spark.hadoop.odps.project.name", odpsProject) .option("spark.hadoop.odps.access.id", odpsAkId) .option("spark.hadoop.odps.access.key", odpsAkKey) .option("spark.hadoop.odps.end.point", ODPS_ENDPOINT) .option("spark.hadoop.odps.table.name", odpsTable) .load() df.createOrReplaceTempView("odps_table") println("select * from odps_table") val dfFullScan = sql("select * from odps_table") println(dfFullScan.count) dfFullScan.show(10) Thread.sleep(72*3600*1000) } }
Package and upload the code.
Package the code by using Maven.
In the right-side pane of the code development page in IntelliJ IDEA, click Maven.
In the Maven dialog box, double-click package in the Lifecycle directory.
Compile a JAR file on your on-premises machine.
Go to the project directory.
Run the following command in the command line window of the operating system that you use, such as Windows Command Prompt:
cd ${project.dir}/spark-datasource-v3.1
Run the following
mvn
command to compile the source code and package it into a JAR file:mvn clean package jar:test-jar
Check whether the dependencies.jar and tests.jar files are stored in the
target
directory.
Upload the JAR files to the server.
Run the
scp
command to upload the two JAR files to the server. Command syntax:scp <Directory of the JAR files on the on-premises machine> root@<Public IP address of the ECS instance>:<Directory of the JAR files on the server>
Sample command:
scp D:\Project\emr_mc_1\spark-datasource-v3.1\target\spark-datasource-1.0-SNAPSHOT-tests.jar root@8.xx.xx.xx:/root/emr_mc
View the JAR files.
Run the
ll
command in theemr_mc
directory on the server to view the JAR files.Run the following command to upload the JAR files to each ECS instance:
scp -r [Directory of the JAR files on the source server] root@Private IP address of the ECS instance:[Address of the JAR files on the destination server]
Run the code.
Execution mode
Local mode
Command syntax for running code in local mode
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${jar-path} \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}
Parameters
Parameter
Description
master
The execution mode. Valid values:
Local: If you run code in this mode, only the computing resources on the current ECS instance are called.
Yarn: If you run code in this mode, computing resources on all ECS instances in the EMR cluster are called. The code execution efficiency in Yarn mode is higher than that in local mode.
jars
The directory of the JAR file that includes dependencies.
class
The name of the class of the JAR file that you want to execute.
jar-path
The directory of the JAR file that you want to execute.
maxcompute-project-name
The name of the MaxCompute project.
aliyun-access-key-id
The AccessKey ID of your Alibaba Cloud account or a RAM user of your Alibaba Cloud account.
You can obtain the AccessKey ID from the AccessKey Pair page.
aliyun-access-key-secret
The AccessKey secret that corresponds to the AccessKey ID.
You can obtain the AccessKey secret from the AccessKey Pair page.
maxcompute-table-name
The name of the MaxCompute table on which you want to perform read or write operations.
Yarn mode
Command syntax for running code in Yarn mode
val ODPS_ENDPOINT = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api" ./bin/spark-submit \ --master yarn \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${jar-path} \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}
Parameters
Parameter
Description
master
The execution mode. Valid values:
Local: If you run code in this mode, only the computing resources on the current ECS instance are called.
Yarn: If you run code in this mode, computing resources on all ECS instances in the EMR cluster are called. The code execution efficiency in Yarn mode is higher than that in local mode.
jars
The directory of the JAR file that includes dependencies.
class
The name of the class of the JAR file that you want to execute.
jar-path
The directory of the JAR file that you want to execute.
maxcompute-project-name
The name of the MaxCompute project.
aliyun-access-key-id
The AccessKey ID of your Alibaba Cloud account or a RAM user of your Alibaba Cloud account.
You can obtain the AccessKey ID from the AccessKey Pair page.
aliyun-access-key-secret
The AccessKey secret that corresponds to the AccessKey ID.
You can obtain the AccessKey secret from the AccessKey Pair page.
maxcompute-table-name
The name of the MaxCompute table on which you want to perform read or write operations.
Example 1: Read data from a non-partitioned table in MaxCompute.
Command syntax
-- Go to the Spark directory. cd /usr/lib/spark-current -- Submit a task. ./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataReaderTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}
The following sample code shows how to perform the operation.
The following figure shows the execution result.
Example 2: Read data from a partitioned table in MaxCompute.
Command syntax
-- Go to the Spark directory. cd /usr/lib/spark-current -- Submit a task. ./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class PartitionDataReaderTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} \ ${partition-descripion}
The following sample code shows how to perform the operation.
The following figure shows the execution result.
Example 3: Write data to a non-partitioned table in MaxCompute.
Command syntax
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class DataWriterTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name}
The following sample code shows how to perform the operation.
The following figure shows the execution result.
Example 4: Write data to a partitioned table in MaxCompute.
Command syntax
./bin/spark-submit \ --master local \ --jars ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-jar-with-dependencies.jar,${project.dir}/spark-datasource-v2.3/libs/cupid-table-api-1.1.5-SNAPSHOT.jar,${project.dir}/spark-datasource-v2.3/libs/table-api-tunnel-impl-1.1.5-SNAPSHOT.jar \ --class PartitionDataWriterTest \ ${project.dir}/spark-datasource-v3.1/target/spark-datasource-1.0-SNAPSHOT-tests.jar \ ${maxcompute-project-name} \ ${aliyun-access-key-id} \ ${aliyun-access-key-secret} \ ${maxcompute-table-name} \ ${partition-descripion}
The following sample code shows how to perform the operation.
The following figure shows the execution result.
Perform a performance test
In this practice, EMR and MaxCompute are used for cloud interconnection. If a data center is connected to MaxCompute, the read and write performance depends on Tunnel resources or the bandwidth of physical connections.
Instance specifications
Instance
Specifications
EMR cluster
Number of master nodes: 2.
ECS instance specifications: ecs.c6.2xlarge, 8 vCPUs, 16 GiB of memory, and 2.5 Gbit/s.
System disks: one enhanced SSD (ESSD) with 120 GiB of memory.
Data disks: one ESSD with 80 GiB of memory.
Number of core nodes: 2.
ECS instance specifications: ecs.c6.2xlarge, 8 vCPUs, 16 GiB of memory, and 2.5 Gbit/s.
System disks: one ESSD with 120 GiB of memory.
Data disks: four ESSDs (80 GiB of memory for each disk).
MaxCompute
Pay-as-you-go standard edition.
Perform a test on reading data from a large table.
The following table describes the information about the large table.
Item
Description
Table name
dwd_product_movie_basic_info
NoteThis table is a table in the public datasets of the MAXCOMPUTE_PUBLIC_DATA project. For more information about public datasets, see Public datasets.
Table size
4829258484 bytes
Number of partitions
593
Name of the partition from which data is read
20170422
The following figure shows the test result.
The operation takes 0.850871 seconds.
Perform a test on writing data to a large table.
Write tens of thousands of data records to a partition.
The operation takes 2.533892 seconds.
Write hundreds of thousands of data records to a partition.
The operation takes 8.441193 seconds.
Write millions of data records to a partition.
The operation takes 73.28 seconds.