EMR Serverless Spark は、公式の HBase Spark コネクタを使用して HBase に接続します。接続を確立するには、開発中に必要な構成を追加する必要があります。この Topic では、EMR Serverless Spark 環境で HBase からデータを読み取り、HBase にデータを書き込む方法について説明します。
前提条件
Serverless Spark ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。
HBase クラスターが作成されていること。
この Topic では、EMR on ECS で作成されたカスタムクラスターを例として使用します。このクラスターには HBase サービスが含まれており、EMR HBase クラスターと呼ばれます。クラスターの作成方法の詳細については、「クラスターの作成」をご参照ください。
制限事項
この Topic で説明する操作は、次の Serverless Spark DPI エンジンバージョンでのみサポートされます:
esr-4.x: esr-4.1.0 以降
esr-3.x: esr-3.1.0 以降
esr-2.x: esr-2.5.0 以降
手順
ステップ 1: HBase Spark Connector の JAR パッケージを取得して OSS にアップロードする
Spark、Scala、Hadoop、および HBase のバージョン互換性要件に基づいて、次のステップを完了し、必要な依存パッケージを取得します。詳細については、「HBase Spark Connector 公式ドキュメント」をご参照ください。
コネクタをコンパイルしてパッケージ化します。
ターゲット環境の Spark、Scala、Hadoop、および HBase のバージョンに基づいて HBase Spark Connector をコンパイルします。このプロセスにより、次の 2 つのコア JAR パッケージが生成されます:
hbase-spark-1.1.0-SNAPSHOT.jarhbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jarたとえば、次の Maven コマンドを使用して、指定されたバージョンに基づいてコネクタをコンパイルし、パッケージ化できます。
mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTestsお使いの環境が上記のバージョン (Spark 3.4.2、Scala 2.12.10、Hadoop 3.2.0、HBase 2.4.9) と同じである場合は、プリコンパイル済みの JAR パッケージを直接使用できます:
HBase の依存関係を取得します。HBase のインストールディレクトリから、
lib/shaded-clientsフォルダとlib/client-facing-thirdpartyフォルダから次の依存パッケージを取得します。この例では、2.4.9 は HBase のバージョン番号です。hbase-shaded-client-2.4.9.jarhbase-shaded-mapreduce-2.4.9.jarslf4j-log4j12-1.7.30.jar
5 つの JAR パッケージを Alibaba Cloud OSS にアップロードします。この操作の詳細については、「簡易アップロード」をご参照ください。
ステップ 2: ネットワーク接続を作成する
Serverless Spark が HBase サービスにアクセスするには、HBase クラスターへのネットワーク接続が必要です。ネットワーク接続の詳細については、「EMR Serverless Spark と他の VPC 間のネットワーク接続」をご参照ください。
セキュリティグループルールを追加する際は、[ポート範囲] を設定して必要なポートを開きます。ポート範囲は 1 から 65535 です。この例では、ZooKeeper サービスポート (2181)、HBase Master ポート (16000)、および HBase RegionServer ポート (16020) を開く必要があります。
ステップ 3: EMR HBase クラスターにテーブルを作成する
Secure Shell (SSH) を使用してクラスターに接続します。詳細については、「クラスターへのログオン」をご参照ください。
次のコマンドを実行して HBase に接続します。
hbase shell次のコマンドを実行してテストテーブルを作成します。
create 'hbase_table', 'c1', 'c2'テストデータを書き込むには、以下のコマンドを実行します。
put 'hbase_table', 'r1', 'c1:name', 'Alice' put 'hbase_table', 'r1', 'c1:age', '25' put 'hbase_table', 'r1', 'c2:city', 'New York' put 'hbase_table', 'r2', 'c1:name', 'Bob' put 'hbase_table', 'r2', 'c1:age', '30' put 'hbase_table', 'r2', 'c2:city', 'San Francisco'
ステップ 4: Serverless Spark を使用して HBase テーブルからデータを読み取る
Notebook セッションを作成します。詳細については、「Notebook セッションの管理」をご参照ください。
セッションを作成する際、[エンジンバージョン] ドロップダウンリストから HBase Spark Connector に一致するエンジンバージョンを選択します。[ネットワーク接続] には、ステップ 2 で作成したネットワーク接続を選択します。[Spark 設定] セクションで、次のパラメーターを追加して HBase Spark Connector をロードします。
spark.jars oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar spark.hadoop.hbase.zookeeper.quorum The private IP address of ZooKeeper spark.hadoop.hbase.zookeeper.property.clientPort The service port of ZooKeeper次の表にパラメーターを示します。
パラメータ
説明
例
spark.jars外部依存 JAR パッケージのパス。
OSS にアップロードされた 5 つのファイル。例:
oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar。spark.hadoop.hbase.zookeeper.quorumZooKeeper のプライベート IP アドレス。
別の HBase クラスターを使用する場合は、必要に応じて構成を指定します。
Alibaba Cloud EMR HBase クラスターを使用する場合、EMR HBase クラスターの [ノード管理] ページでマスターノードの [プライベート IP] を確認できます。
spark.hadoop.hbase.zookeeper.property.clientPortZooKeeper のサービスポート。
別の HBase クラスターを使用する場合は、必要に応じて構成を指定します。
Alibaba Cloud EMR HBase クラスターを使用する場合、ポートは
2181です。
[データ開発] ページで、 タイプのタスクを作成します。次に、右上隅で作成した Notebook セッションを選択します。
詳細については、「Notebook セッションの管理」をご参照ください。
新しい Notebook タブに次のコードをコピーし、必要に応じてパラメーターを変更してから、[実行] をクリックします。
# HBase テーブルを読み取ります。 df = spark.read.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \ .option("hbase.table", "hbase_table") \ .option("hbase.spark.pushdown.columnfilter", False) \ .load() # 一時ビューを登録します。 df.createOrReplaceTempView("hbase_table_view") # SQL を使用してデータをクエリします。 results = spark.sql("SELECT * FROM hbase_table_view") results.show()データが正常に返された場合、構成は正しいです。

ステップ 5: Serverless Spark を使用して HBase テーブルにデータを書き込む
同じ Notebook タブで、次のコードをコピーし、必要に応じてパラメーターを変更してから、[実行] をクリックします。
from pyspark.sql.types import StructType, StructField, StringType
data = [
("r3", "sam", "26", "New York")
]
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", StringType(), True),
StructField("city", StringType(), True)
])
testDS = spark.createDataFrame(data=data,schema=schema)
testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()
データが書き込まれた後、テーブルをクエリしてデータが正常に書き込まれたことを確認できます。
