すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:HBase からデータを読み取り、HBase にデータを書き込む

最終更新日:Apr 18, 2025

HBase は、E-MapReduce(EMR)Serverless Spark を HBase に接続できるようにする Spark コネクタを提供しています。 EMR Serverless Spark を HBase に接続するには、ジョブを開発するときに特定の構成を追加するだけで済みます。 このトピックでは、EMR Serverless Spark で HBase からデータを読み取り、HBase にデータを書き込む方法について説明します。

前提条件

  • EMR Serverless Spark ワークスペースが作成されていること。 詳細については、「ワークスペースを作成する」をご参照ください。

  • HBase サービスを含む EMR クラスタが作成されていること。

    このトピックでは、HBase サービスを含むカスタムクラスタが作成されています。 このクラスタは、以下、HBase クラスタと呼ばれます。 クラスタの作成方法については、「クラスタを作成する」をご参照ください。

制限事項

Serverless Spark のエンジンバージョンは、次の要件を満たしている必要があります。

  • esr-4.x: esr-4.1.0 以降

  • esr-3.x: esr-3.1.0 以降

  • esr-2.x: esr-2.5.0 以降

手順

ステップ 1: HBase と Spark コネクタの JAR パッケージを取得し、OSS にアップロードする

このステップでは、Spark、Scale、Hadoop、および HBase のバージョンの互換性に基づいて、必要な JAR パッケージを取得するために、次の操作を実行する必要があります。 詳細については、「Apache HBase™ Spark Connector」をご参照ください。

  1. Spark コネクタをコンパイルしてパッケージ化する。

    Spark コネクタをコンパイルして、目的のバージョンの Spark、Scala、Hadoop、および HBase に基づいてコア JAR ファイルを生成できます。 コア JAR ファイルの例:

    • hbase-spark-1.1.0-SNAPSHOT.jar

    • hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar

      Spark コネクタのコンパイルとパッケージ化に使用されるコマンドの例:

      mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTests

      使用している Spark、Scala、Hadoop、および HBase のバージョンが上記のコマンドと同じ場合は、次のコンパイル済み JAR パッケージを直接使用できます。

  2. HBase の JAR パッケージを取得する。lib/shaded-clients フォルダと lib/client-facing-thirdparty フォルダから HBase の JAR パッケージを抽出できます。 次の JAR パッケージでは、2.4.9 は HBase のバージョンです。

    • hbase-shaded-client-2.4.9.jar

    • hbase-shaded-mapreduce-2.4.9.jar

    • slf4j-log4j12-1.7.30.jar

  3. 上記の JAR ファイルを Alibaba Cloud Object Storage Service(OSS)にアップロードします。 詳細については、「シンプルアップロード」をご参照ください。

ステップ 2: ネットワーク接続を作成する

EMR Serverless Spark は、EMR Serverless Spark と HBase の間の接続が確立されている場合にのみ HBase にアクセスできます。 詳細については、「VPC 間の EMR Serverless Spark とデータソース間のネットワーク接続を構成する」をご参照ください。

重要

セキュリティグループルールを構成する場合は、ビジネス要件に基づいて [ポート範囲] パラメータを構成する必要があります。 パラメータ値の範囲は 1 ~ 65535 です。 この例では、ZooKeeper のサービスポート 2181、HBase Master のポート 16000、および HBase RegionServer のポート 16020 を有効にする必要があります。

ステップ 3: HBase クラスタにテーブルを作成するクラスター

  1. SSH モードで HBase クラスタにログオンします。 詳細については、「クラスタにログオンする」をご参照ください。

  2. 次のコマンドを実行して HBase に接続します。

    hbase shell
  3. 次のコマンドを実行してテストテーブルを作成します。

    create 'hbase_table', 'c1', 'c2'
  4. 次のコマンドを実行して、テストデータをテーブルに書き込みます。

    put 'hbase_table', 'r1', 'c1:name', 'Alice'
    put 'hbase_table', 'r1', 'c1:age', '25'
    put 'hbase_table', 'r1', 'c2:city', 'New York'
    
    put 'hbase_table', 'r2', 'c1:name', 'Bob'
    put 'hbase_table', 'r2', 'c1:age', '30'
    put 'hbase_table', 'r2', 'c2:city', 'San Francisco'

ステップ 4: EMR Serverless Spark で HBase テーブルからデータを読み取るEMR Serverless Spark のテーブル

  1. Notebook セッションを作成します。 詳細については、「Notebook セッションの管理」をご参照ください。

    [Notebook セッションの作成] ページで、[エンジンバージョン] パラメータを Spark コネクタのバージョンに対応するバージョンに設定し、作成したネットワーク接続を [ネットワーク接続] ドロップダウンリストから選択し、[Spark 構成] セクションに次のコードを追加して Spark コネクタを読み込みます。

    spark.jars                                        oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar
    spark.hadoop.hbase.zookeeper.quorum               ZooKeeper 内部 IP アドレス
    spark.hadoop.hbase.zookeeper.property.clientPort  ZooKeeper サービスポート

    次の表は、上記のコードのパラメータについて説明しています。

    パラメータ

    説明

    spark.jars

    外部 JAR パッケージがアップロードされるパス。

    このトピックでは、JAR パッケージは oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar にアップロードされます。

    spark.hadoop.hbase.zookeeper.quorum

    ZooKeeper の内部 IP アドレス。

    • EMR コンソールで作成されていない HBase クラスタを使用する場合は、実際の状況に基づいてこのパラメータを構成します。

    • EMR コンソールで作成された HBase クラスタを使用する場合は、HBase クラスタの [ノード] タブでクラスタのマスターノードの内部 IP アドレスを表示できます。

    spark.hadoop.hbase.zookeeper.property.clientPort

    ZooKeeper のサービスポート。

    • EMR コンソールで作成されていない HBase クラスタを使用する場合は、実際の状況に基づいてこのパラメータを構成します。

    • EMR コンソールで作成された HBase クラスタを使用する場合は、サービスポートを 2181 に設定します。

  2. [データ開発] ページで、[notebook] ジョブを作成します。 ジョブの構成タブの右上隅で、作成した notebook セッションを選択します。

    詳細については、「Notebook セッションの管理」をご参照ください。

  3. 次のコードを作成した notebook の Python セルにコピーし、ビジネス要件に基づいてパラメータを変更し、[実行] をクリックします。

    # HBase テーブルを読み取ります。
    df = spark.read.format("org.apache.hadoop.hbase.spark") \
        .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \
        .option("hbase.table", "hbase_table") \
        .option("hbase.spark.pushdown.columnfilter", False) \
        .load()
    
    # 一時ビューを登録します。
    df.createOrReplaceTempView("hbase_table_view")
    
    # SQL を使用してデータをクエリします。
    results = spark.sql("SELECT * FROM hbase_table_view")
    results.show()
    

    データが期待どおりに返された場合、構成は正しいです。

    image

ステップ 5: EMR Serverless Spark で HBase テーブルにデータを書き込むEMR Serverless Spark のテーブル

次のコードを作成した notebook の Python セルにコピーし、ビジネス要件に基づいてパラメータを変更し、[実行] をクリックします。

from pyspark.sql.types import StructType, StructField, StringType

data = [
    ("r3", "sam", "26", "New York")
]

schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("city", StringType(), True)
])
 
testDS = spark.createDataFrame(data=data,schema=schema)

testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()

書き込み操作が完了したら、データが HBase テーブルに書き込まれたかどうかを確認できます。

image