すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:HBase からのデータの読み取りと HBase へのデータの書き込み

最終更新日:Nov 09, 2025

EMR Serverless Spark は、公式の HBase Spark コネクタを使用して HBase に接続します。接続を確立するには、開発中に必要な構成を追加する必要があります。この Topic では、EMR Serverless Spark 環境で HBase からデータを読み取り、HBase にデータを書き込む方法について説明します。

前提条件

  • Serverless Spark ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。

  • HBase クラスターが作成されていること。

    この Topic では、EMR on ECS で作成されたカスタムクラスターを例として使用します。このクラスターには HBase サービスが含まれており、EMR HBase クラスターと呼ばれます。クラスターの作成方法の詳細については、「クラスターの作成」をご参照ください。

制限事項

この Topic で説明する操作は、次の Serverless Spark DPI エンジンバージョンでのみサポートされます:

  • esr-4.x: esr-4.1.0 以降

  • esr-3.x: esr-3.1.0 以降

  • esr-2.x: esr-2.5.0 以降

手順

ステップ 1: HBase Spark Connector の JAR パッケージを取得して OSS にアップロードする

Spark、Scala、Hadoop、および HBase のバージョン互換性要件に基づいて、次のステップを完了し、必要な依存パッケージを取得します。詳細については、「HBase Spark Connector 公式ドキュメント」をご参照ください。

  1. コネクタをコンパイルしてパッケージ化します。

    ターゲット環境の Spark、Scala、Hadoop、および HBase のバージョンに基づいて HBase Spark Connector をコンパイルします。このプロセスにより、次の 2 つのコア JAR パッケージが生成されます:

    • hbase-spark-1.1.0-SNAPSHOT.jar

    • hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar

      たとえば、次の Maven コマンドを使用して、指定されたバージョンに基づいてコネクタをコンパイルし、パッケージ化できます。

      mvn -Dspark.version=3.4.2 -Dscala.version=2.12.10 -Dhadoop-three.version=3.2.0 -Dscala.binary.version=2.12 -Dhbase.version=2.4.9 clean package -DskipTests

      お使いの環境が上記のバージョン (Spark 3.4.2、Scala 2.12.10、Hadoop 3.2.0、HBase 2.4.9) と同じである場合は、プリコンパイル済みの JAR パッケージを直接使用できます:

  2. HBase の依存関係を取得します。HBase のインストールディレクトリから、lib/shaded-clients フォルダと lib/client-facing-thirdparty フォルダから次の依存パッケージを取得します。この例では、2.4.9 は HBase のバージョン番号です。

    • hbase-shaded-client-2.4.9.jar

    • hbase-shaded-mapreduce-2.4.9.jar

    • slf4j-log4j12-1.7.30.jar

  3. 5 つの JAR パッケージを Alibaba Cloud OSS にアップロードします。この操作の詳細については、「簡易アップロード」をご参照ください。

ステップ 2: ネットワーク接続を作成する

Serverless Spark が HBase サービスにアクセスするには、HBase クラスターへのネットワーク接続が必要です。ネットワーク接続の詳細については、「EMR Serverless Spark と他の VPC 間のネットワーク接続」をご参照ください。

重要

セキュリティグループルールを追加する際は、[ポート範囲] を設定して必要なポートを開きます。ポート範囲は 1 から 65535 です。この例では、ZooKeeper サービスポート (2181)、HBase Master ポート (16000)、および HBase RegionServer ポート (16020) を開く必要があります。

ステップ 3: EMR HBase クラスターにテーブルを作成する

  1. Secure Shell (SSH) を使用してクラスターに接続します。詳細については、「クラスターへのログオン」をご参照ください。

  2. 次のコマンドを実行して HBase に接続します。

    hbase shell
  3. 次のコマンドを実行してテストテーブルを作成します。

    create 'hbase_table', 'c1', 'c2'
  4. テストデータを書き込むには、以下のコマンドを実行します。

    put 'hbase_table', 'r1', 'c1:name', 'Alice'
    put 'hbase_table', 'r1', 'c1:age', '25'
    put 'hbase_table', 'r1', 'c2:city', 'New York'
    
    put 'hbase_table', 'r2', 'c1:name', 'Bob'
    put 'hbase_table', 'r2', 'c1:age', '30'
    put 'hbase_table', 'r2', 'c2:city', 'San Francisco'

ステップ 4: Serverless Spark を使用して HBase テーブルからデータを読み取る

  1. Notebook セッションを作成します。詳細については、「Notebook セッションの管理」をご参照ください。

    セッションを作成する際、[エンジンバージョン] ドロップダウンリストから HBase Spark Connector に一致するエンジンバージョンを選択します。[ネットワーク接続] には、ステップ 2 で作成したネットワーク接続を選択します。[Spark 設定] セクションで、次のパラメーターを追加して HBase Spark Connector をロードします。

    spark.jars                                        oss://<bucketname>/path/to/hbase-shaded-client-2.4.9.jar,oss://<bucketname>/path/to/hbase-shaded-mapreduce-2.4.9.jar,oss://<bucketname>/path/to/hbase-spark-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/hbase-spark-protocol-shaded-1.1.0-SNAPSHOT.jar,oss://<bucketname>/path/to/slf4j-log4j12-1.7.30.jar
    spark.hadoop.hbase.zookeeper.quorum               The private IP address of ZooKeeper
    spark.hadoop.hbase.zookeeper.property.clientPort  The service port of ZooKeeper

    次の表にパラメーターを示します。

    パラメータ

    説明

    spark.jars

    外部依存 JAR パッケージのパス。

    OSS にアップロードされた 5 つのファイル。例: oss://<yourBucketname>/spark/hbase/hbase-shaded-client-2.4.9.jar

    spark.hadoop.hbase.zookeeper.quorum

    ZooKeeper のプライベート IP アドレス。

    • 別の HBase クラスターを使用する場合は、必要に応じて構成を指定します。

    • Alibaba Cloud EMR HBase クラスターを使用する場合、EMR HBase クラスターの [ノード管理] ページでマスターノードの [プライベート IP] を確認できます。

    spark.hadoop.hbase.zookeeper.property.clientPort

    ZooKeeper のサービスポート。

    • 別の HBase クラスターを使用する場合は、必要に応じて構成を指定します。

    • Alibaba Cloud EMR HBase クラスターを使用する場合、ポートは 2181 です。

  2. [データ開発] ページで、[インタラクティブ開発] > [Notebook] タイプのタスクを作成します。次に、右上隅で作成した Notebook セッションを選択します。

    詳細については、「Notebook セッションの管理」をご参照ください。

  3. 新しい Notebook タブに次のコードをコピーし、必要に応じてパラメーターを変更してから、[実行] をクリックします。

    # HBase テーブルを読み取ります。
    df = spark.read.format("org.apache.hadoop.hbase.spark") \
        .option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city") \
        .option("hbase.table", "hbase_table") \
        .option("hbase.spark.pushdown.columnfilter", False) \
        .load()
    
    # 一時ビューを登録します。
    df.createOrReplaceTempView("hbase_table_view")
    
    # SQL を使用してデータをクエリします。
    results = spark.sql("SELECT * FROM hbase_table_view")
    results.show()
    

    データが正常に返された場合、構成は正しいです。

    image

ステップ 5: Serverless Spark を使用して HBase テーブルにデータを書き込む

同じ Notebook タブで、次のコードをコピーし、必要に応じてパラメーターを変更してから、[実行] をクリックします。

from pyspark.sql.types import StructType, StructField, StringType

data = [
    ("r3", "sam", "26", "New York")
]

schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("city", StringType(), True)
])
 
testDS = spark.createDataFrame(data=data,schema=schema)

testDS.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", "id STRING :key, name STRING c1:name, age STRING c1:age, city STRING c2:city").option("hbase.table", "hbase_table").save()

データが書き込まれた後、テーブルをクエリしてデータが正常に書き込まれたことを確認できます。

image