This topic describes how to use MaxCompute Spark to access Phoenix data and write data from ApsaraDB for HBase to MaxCompute.
Background information
Phoenix is the SQL layer provided by ApsaraDB for HBase and is suitable for scenarios that require high concurrency, low latency, and simple queries. To access Phoenix data by using MaxCompute Spark, you must create a Phoenix table, write data to the table, write Spark code on IntelliJ IDEA, and then perform smoke testing on the code in the DataWorks console. This topic describes the entire process of using MaxCompute Spark to access Phoenix data.
Prerequisites
Make sure that the following prerequisites are met:
MaxCompute is activated and a MaxCompute project is created. For more information, see Activate MaxCompute and DataWorks and Create a MaxCompute project.
DataWorks is activated. For more information, see Purchase guide.
ApsaraDB for HBase is activated. For more information, see Purchase a cluster.
NoteIn this topic, ApsaraDB for HBase V1.1 is used. You can use another version of ApsaraDB for HBase based on your business requirements.
The Phoenix 4.12.0 package is downloaded and installed. For more information, see Use HBase SQL (Phoenix) 4.x.
NoteApsaraDB for HBase V1.1 corresponds to Phoenix 4.12.0. Check the version mapping during development.
Virtual Private Cloud (VPC) is activated. The security group and a whitelist of the ApsaraDB for HBase cluster are configured. For more information, see Network connection process.
NoteIn this topic, the ApsaraDB for HBase cluster resides in a VPC. Therefore, ports 2181, 10600, and 16020 are enabled for the security group. The
100.104.0.0/16
CIDR block of the vSwitch of the VPC in which the ApsaraDB for HBase cluster and MaxCompute reside is added to the IP address whitelist of MaxCompute.
Procedure
Go to the
bin
directory of Phoenix and run the following command to start the Phoenix client:./sqlline.py hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181
Notehb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181
is the endpoint of ZooKeeper. You can log on to the ApsaraDB for HBase console and obtain the endpoint of ZooKeeper on the Database Connection page of the HBase cluster details page.On the Phoenix client, execute the following statements to create a table named users and insert data into the table:
CREATE TABLE IF NOT EXISTS users( id UNSIGNED_INT, username char(50), password char(50) CONSTRAINT my_ph PRIMARY KEY (id)); UPSERT INTO users(id,username,password) VALUES (1,'kongxx','Letmein');
NoteFor more information about the Phoenix syntax, see Get started with HBase SQL (Phoenix).
On the Phoenix client, execute the following statement to query data in the users table:
select * from users;
Write and package the Spark code on IntelliJ IDEA.
Use Scala to write the Spark code for testing.
Configure the on-premises development environment based on the related POM file on IntelliJ IDEA. You can use the public endpoint to test the code logic. After the code logic is verified, modify the configuration of the spark.hadoop.odps.end.point parameter in the sample code. To obtain the public endpoint, log on to the ApsaraDB for HBase console and go to the Database Connection page of the ApsaraDB for HBase cluster details page. Sample code:
package com.phoenix import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.phoenix.spark._ /** * This example applies to Phoenix 4.x. */ object SparkOnPhoenix4xSparkSession { def main(args: Array[String]): Unit = { // The ZooKeeper endpoint of the ApsaraDB for HBase cluster. val zkAddress = hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181,hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com:2181 // The name of the Phoenix table. val phoenixTableName = users // The name of the MaxCompute Spark table. val ODPSTableName = users_phoenix val sparkSession = SparkSession .builder() .appName("SparkSQL-on-MaxCompute") .config("spark.sql.broadcastTimeout", 20 * 60) .config("spark.sql.crossJoin.enabled", true) .config("odps.exec.dynamic.partition.mode", "nonstrict") // You must set spark.master to local[N] to run the code. N indicates the parallelism. //.config("spark.master", "local[4]") .config("spark.hadoop.odps.project.name", "***") .config("spark.hadoop.odps.access.id", "***") .config("spark.hadoop.odps.access.key", "***") //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api") .config("spark.sql.catalogImplementation", "odps") .getOrCreate() var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load() df.show() df.write.mode("overwrite").insertInto(ODPSTableName) } }
The POM file contains the following content:
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <properties> <spark.version>2.3.0</spark.version> <cupid.sdk.version>3.3.8-public</cupid.sdk.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <phoenix.version>4.12.0-HBase-1.1</phoenix.version> </properties> <groupId>com.aliyun.odps</groupId> <artifactId>Spark-Phonix</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-model</artifactId> <version>1.3.8</version> </dependency> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-evaluator</artifactId> <version>1.3.10</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </exclusion> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scalap</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>cupid-sdk</artifactId> <version>${cupid.sdk.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-mapred</artifactId> </exclusion> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-commons</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-spark</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> <shadedArtifactAttached>true</shadedArtifactAttached> <artifactSet> <includes> <!-- Include here the dependencies you want to be packed in your fat jar --> <include>*:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>**/log4j.properties</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Package the code and dependency file into a JAR file on IntelliJ IDEA and upload the JAR file to the MaxCompute project by using the MaxCompute client. For more information, see Add resources.
NoteThe size of a JAR file that is uploaded in the DataWorks console cannot exceed 50 MB. Therefore, the MaxCompute client is used to upload the JAR file.
Perform smoke testing in the DataWorks console.
Execute the following statement to create a MaxCompute table in the DataWorks console: For more information, see Create and manage MaxCompute tables.
CREATE TABLE IF NOT EXISTS users_phoenix ( id INT , username STRING, password STRING ) ;
In the DataWorks console, select the desired MaxCompute project and upload the JAR file to the data development environment. For more information, see Create and use MaxCompute resources.
Create an ODPS Spark node and configure parameters for the node. For more information, see Create an ODPS Spark node.
The following sample code shows the configuration of the spark.hadoop.odps.cupid.vpc.domain.list parameter. Configure the parameter based on your ApsaraDB for HBase cluster.
{ "regionId":"cn-beijing", "vpcs":[ { "vpcId":"vpc-2zeaeq21********0exox", "zones":[ { "urls":[ { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":2181 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":2181 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":2181 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":16000 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":16000 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":16000 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":16020 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":16020 }, { "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com", "port":16020 }, ] } ] } ] }
Click the icon to start smoke testing.
After smoke testing is successful, execute the following statement on an ad hoc query node:
select * from users_phoenix;