HBase は、E-MapReduce(EMR)Serverless Spark を HBase に接続できるようにする Spark コネクタを提供しています。 EMR Serverless Spark を HBase に接続するには、ジョブを開発するときに特定の構成を追加するだけで済みます。 このトピックでは、EMR Serverless Spark で HBase からデータを読み取り、HBase にデータを書き込む方法について説明します。
前提条件
EMR Serverless Spark ワークスペースが作成されていること。 詳細については、「ワークスペースを作成する」をご参照ください。
HBase サービスを含む EMR クラスタが作成されていること。
このトピックでは、HBase サービスを含むカスタムクラスタが作成されています。 このクラスタは、以下、HBase クラスタと呼ばれます。 クラスタの作成方法については、「クラスタを作成する」をご参照ください。
制限事項
Serverless Spark のエンジンバージョンは、次の要件を満たしている必要があります。
esr-4.x: esr-4.1.0 以降
esr-3.x: esr-3.1.0 以降
esr-2.x: esr-2.5.0 以降
手順
ステップ 1: HBase と Spark コネクタの JAR パッケージを取得し、OSS にアップロードする
このステップでは、Spark、Scale、Hadoop、および HBase のバージョンの互換性に基づいて、必要な JAR パッケージを取得するために、次の操作を実行する必要があります。 詳細については、「Apache HBase™ Spark Connector」をご参照ください。
Spark コネクタをコンパイルしてパッケージ化する。
Spark コネクタをコンパイルして、目的のバージョンの Spark、Scala、Hadoop、および HBase に基づいてコア JAR ファイルを生成できます。 コア JAR ファイルの例:
hbase-spark-1.1.0-SNAPSHOT.jar
hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar
Spark コネクタのコンパイルとパッケージ化に使用されるコマンドの例:
mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTests
使用している Spark、Scala、Hadoop、および HBase のバージョンが上記のコマンドと同じ場合は、次のコンパイル済み JAR パッケージを直接使用できます。
HBase の JAR パッケージを取得する。
lib/shaded-clients
フォルダとlib/client-facing-thirdparty
フォルダから HBase の JAR パッケージを抽出できます。 次の JAR パッケージでは、2.4.9 は HBase のバージョンです。hbase-shaded-client-2.4.9.jar
hbase-shaded-mapreduce-2.4.9.jar
slf4j-log4j12-1.7.30.jar
上記の JAR ファイルを Alibaba Cloud Object Storage Service(OSS)にアップロードします。 詳細については、「シンプルアップロード」をご参照ください。
ステップ 2: ネットワーク接続を作成する
EMR Serverless Spark は、EMR Serverless Spark と HBase の間の接続が確立されている場合にのみ HBase にアクセスできます。 詳細については、「VPC 間の EMR Serverless Spark とデータソース間のネットワーク接続を構成する」をご参照ください。
セキュリティグループルールを構成する場合は、ビジネス要件に基づいて [ポート範囲] パラメータを構成する必要があります。 パラメータ値の範囲は 1 ~ 65535 です。 この例では、ZooKeeper のサービスポート 2181、HBase Master のポート 16000、および HBase RegionServer のポート 16020 を有効にする必要があります。
ステップ 3: HBase クラスタにテーブルを作成するクラスター
SSH モードで HBase クラスタにログオンします。 詳細については、「クラスタにログオンする」をご参照ください。
次のコマンドを実行して HBase に接続します。
hbase shell
次のコマンドを実行してテストテーブルを作成します。
create 'hbase_table', 'c1', 'c2'
次のコマンドを実行して、テストデータをテーブルに書き込みます。
put 'hbase_table', 'r1', 'c1:name', 'Alice' put 'hbase_table', 'r1', 'c1:age', '25' put 'hbase_table', 'r1', 'c2:city', 'New York' put 'hbase_table', 'r2', 'c1:name', 'Bob' put 'hbase_table', 'r2', 'c1:age', '30' put 'hbase_table', 'r2', 'c2:city', 'San Francisco'
ステップ 4: EMR Serverless Spark で HBase テーブルからデータを読み取るEMR Serverless Spark のテーブル
Notebook セッションを作成します。 詳細については、「Notebook セッションの管理」をご参照ください。
[Notebook セッションの作成] ページで、[エンジンバージョン] パラメータを Spark コネクタのバージョンに対応するバージョンに設定し、作成したネットワーク接続を [ネットワーク接続] ドロップダウンリストから選択し、[Spark 構成] セクションに次のコードを追加して Spark コネクタを読み込みます。
spark.jars oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar spark.hadoop.hbase.zookeeper.quorum ZooKeeper 内部 IP アドレス spark.hadoop.hbase.zookeeper.property.clientPort ZooKeeper サービスポート
次の表は、上記のコードのパラメータについて説明しています。
パラメータ
説明
例
spark.jars
外部 JAR パッケージがアップロードされるパス。
このトピックでは、JAR パッケージは
oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar
にアップロードされます。spark.hadoop.hbase.zookeeper.quorum
ZooKeeper の内部 IP アドレス。
EMR コンソールで作成されていない HBase クラスタを使用する場合は、実際の状況に基づいてこのパラメータを構成します。
EMR コンソールで作成された HBase クラスタを使用する場合は、HBase クラスタの [ノード] タブでクラスタのマスターノードの内部 IP アドレスを表示できます。
spark.hadoop.hbase.zookeeper.property.clientPort
ZooKeeper のサービスポート。
EMR コンソールで作成されていない HBase クラスタを使用する場合は、実際の状況に基づいてこのパラメータを構成します。
EMR コンソールで作成された HBase クラスタを使用する場合は、サービスポートを
2181
に設定します。
[データ開発] ページで、[notebook] ジョブを作成します。 ジョブの構成タブの右上隅で、作成した notebook セッションを選択します。
詳細については、「Notebook セッションの管理」をご参照ください。
次のコードを作成した notebook の Python セルにコピーし、ビジネス要件に基づいてパラメータを変更し、[実行] をクリックします。
# HBase テーブルを読み取ります。 df = spark.read.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \ .option("hbase.table", "hbase_table") \ .option("hbase.spark.pushdown.columnfilter", False) \ .load() # 一時ビューを登録します。 df.createOrReplaceTempView("hbase_table_view") # SQL を使用してデータをクエリします。 results = spark.sql("SELECT * FROM hbase_table_view") results.show()
データが期待どおりに返された場合、構成は正しいです。
ステップ 5: EMR Serverless Spark で HBase テーブルにデータを書き込むEMR Serverless Spark のテーブル
次のコードを作成した notebook の Python セルにコピーし、ビジネス要件に基づいてパラメータを変更し、[実行] をクリックします。
from pyspark.sql.types import StructType, StructField, StringType
data = [
("r3", "sam", "26", "New York")
]
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", StringType(), True),
StructField("city", StringType(), True)
])
testDS = spark.createDataFrame(data=data,schema=schema)
testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()
書き込み操作が完了したら、データが HBase テーブルに書き込まれたかどうかを確認できます。