HBase provides Spark connectors to allow you to connect E-MapReduce (EMR) Serverless Spark to HBase. To connect EMR Serverless Spark to HBase, you need to only add specific configurations when you develop a job. This topic describes how to read data from and write data to HBase in EMR Serverless Spark.
Prerequisites
An EMR Serverless Spark workspace is created. For more information, see Create a workspace.
An EMR cluster that contains the HBase service is created.
In this topic, a custom cluster that contains the HBase service is created. The cluster is hereinafter referred to as HBase cluster. For information about how to create a cluster, see Create a cluster.
Limits
The engine version of Serverless Spark must meet the following requirements:
esr-4.x: esr-4.1.0 or later
esr-3.x: esr-3.1.0 or later
esr-2.x: esr-2.5.0 or later
Procedure
Step 1: Obtain the JAR packages of HBase and the Spark connector and upload the packages to OSS
In this step, you need to perform the following operations to obtain the required JAR packages based on the version compatibility of Spark, Scale, Hadoop, and HBase. For more information, see Apache HBase™ Spark Connector.
Compile and package the Spark connector.
You can compile the Spark connector to generate the core JAR files based on the desired versions of Spark, Scala, Hadoop, and HBase. Sample core JAR files:
hbase-spark-1.1.0-SNAPSHOT.jar
hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar
Sample command used to compile and package the Spark connector:
mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTests
If the versions of Spark, Scala, Hadoop, and HBase that you use are the same as those in the preceding command, you can directly use the following compiled JAR packages:
Obtain the JAR packages of HBase. You can extract the JAR packages of HBase from the
lib/shaded-clients
andlib/client-facing-thirdparty
folders. In the following JAR packages, 2.4.9 is the version of HBase.hbase-shaded-client-2.4.9.jar
hbase-shaded-mapreduce-2.4.9.jar
slf4j-log4j12-1.7.30.jar
Upload the preceding JAR files to Alibaba Cloud Object Storage Service (OSS). For more information, see Simple upload.
Step 2: Create a network connection
EMR Serverless Spark can access HBase only if the connection between EMR Serverless Spark and HBase is established. For more information, see Configure network connectivity between EMR Serverless Spark and a data source across VPCs.
When you configure a security group rule, you must configure the Port Range parameter based on your business requirements. The parameter value ranges from 1 to 65535. In this example, you need to enable the service port 2181 for ZooKeeper, the port 16000 for HBase Master, and the port 16020 for HBase RegionServer.
Step 3: Create a table in the HBase cluster
Log on to the HBase cluster in SSH mode. For more information, see Log on to a cluster.
Run the following command to connect to HBase:
hbase shell
Run the following command to create a test table:
create 'hbase_table', 'c1', 'c2'
Run the following commands to write test data to the table:
put 'hbase_table', 'r1', 'c1:name', 'Alice' put 'hbase_table', 'r1', 'c1:age', '25' put 'hbase_table', 'r1', 'c2:city', 'New York' put 'hbase_table', 'r2', 'c1:name', 'Bob' put 'hbase_table', 'r2', 'c1:age', '30' put 'hbase_table', 'r2', 'c2:city', 'San Francisco'
Step 4: Read data from the HBase table in EMR Serverless Spark
Create a Notebook session. For more information, see Manage notebook sessions.
On the Create Notebook Session page, set the Engine Version parameter to the version that corresponds to the version of the Spark connector, select the created network connection from the Network Connection drop-down list, and then add the following code to the Spark Configuration section to load the Spark connector:
spark.jars oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar spark.hadoop.hbase.zookeeper.quorum ZooKeeper internal IP address spark.hadoop.hbase.zookeeper.property.clientPort ZooKeeper service port
The following table describes the parameters in the preceding code.
Parameter
Description
Example
spark.jars
The path to which the external JAR packages are uploaded.
In this topic, the JAR packages are uploaded to
oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar
.spark.hadoop.hbase.zookeeper.quorum
The internal IP address of ZooKeeper.
If you use an HBase cluster that is not created in the EMR console, configure this parameter based on your actual situation.
If you use an HBase cluster created in the EMR console, you can view the internal IP address of the master node of the cluster on the Nodes tab of the HBase cluster.
spark.hadoop.hbase.zookeeper.property.clientPort
The service port of ZooKeeper.
If you use an HBase cluster that is not created in the EMR console, configure this parameter based on your actual situation.
If you use an HBase cluster created in the EMR console, set the service port to
2181
.
On the Data Development page, create a notebook job. In the upper-right corner of the configuration tab of the job, select the created notebook session.
For more information, see Manage notebook sessions.
Copy the following code to the Python cell of the created notebook, modify the parameters based on your business requirements, and then click Run.
# Read the HBase table. df = spark.read.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \ .option("hbase.table", "hbase_table") \ .option("hbase.spark.pushdown.columnfilter", False) \ .load() # Register a temporary view. df.createOrReplaceTempView("hbase_table_view") # Query data using SQL. results = spark.sql("SELECT * FROM hbase_table_view") results.show()
If data is returned as expected, the configurations are correct.
Step 5: Write data to the HBase table in EMR Serverless Spark
Copy the following code to the Python cell of the created notebook, modify the parameters based on your business requirements, and then click Run.
from pyspark.sql.types import StructType, StructField, StringType
data = [
("r3", "sam", "26", "New York")
]
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", StringType(), True),
StructField("city", StringType(), True)
])
testDS = spark.createDataFrame(data=data,schema=schema)
testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()
After the write operation is complete, you can check whether data is written to the HBase table.