すべてのプロダクト
Search
ドキュメントセンター

Tablestore:バッチコンピューティング

最終更新日:May 16, 2025

このトピックでは、Spark コンピュートエンジンで Tablestore にアクセスする場合に、DataFrame を使用して Tablestore データのバッチコンピューティングを実行する方法について説明します。 また、ローカル環境とクラスタ環境の両方でコードを実行およびデバッグする方法についても説明します。

前提条件

  • Tablestore にデータテーブルが作成され、データテーブルにデータが書き込まれます。 詳細については、「Wide Column モデルの概要」をご参照ください。

    説明

    search_view テーブルのスキーマとサンプルデータについては、「付録: サンプルデータテーブル」をご参照ください。

  • Tablestore にアクセスする権限を持つ Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの AccessKey ペアが作成されます。 詳細については、「AccessKey ペアを作成する」をご参照ください。

  • Java 開発環境がデプロイされています。

    このトピックでは、Windows 環境、JDK 1.8IntelliJ IDEA 2024.1.2 (Community Edition)、および Apache Maven を例として使用します。

手順

ステップ 1: プロジェクトのソースコードをダウンロードする

Git を使用してサンプルプロジェクトをダウンロードします。

git clone https://github.com/aliyun/tablestore-examples.git

ネットワークの問題でプロジェクトをダウンロードできない場合は、tablestore-examples-master.zip を直接ダウンロードできます。

ステップ 2: Maven の依存関係を更新する

  1. tablestore-spark-demo ルートディレクトリに移動します。

    説明

    tablestore-spark-demo ルートディレクトリにある README.md ドキュメントを読んで、プロジェクト情報を十分に理解することをお勧めします。

  2. 次のコマンドを実行して、emr-tablestore-2.2.0-SNAPSHOT.jar をローカルの Maven リポジトリにインストールします。

    mvn install:install-file -Dfile="libs/emr-tablestore-2.2.0-SNAPSHOT.jar" -DartifactId=emr-tablestore -DgroupId="com.aliyun.emr" -Dversion="2.2.0-SNAPSHOT" -Dpackaging=jar -DgeneratePom=true

ステップ 3: (オプション) サンプルコードを変更する

コアコードの変更の説明

このセクションでは、TableStoreBatchSample を例として使用して、サンプルコードの主要部分について説明します。

コードブロック

説明

val df = sparkSession.read
  .format("tablestore")
  .option("instance.name", instanceName)
  .option("table.name", tableName)
  .option("endpoint", endpoint)
  .option("access.key.id", accessKeyId)
  .option("access.key.secret", accessKeySecret)
  .option("split.size.mbs", 100)
  // .option("catalog", dataCatalog)
  // The latest version allows you to use the .schema() method to replace the catalog configurations.
  .schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG")
  .load()

Spark の DataFrameReader インターフェースを使用して Tablestore からデータを読み取り、DataFrame オブジェクトとしてロードします。

  • format("tablestore") は、ServiceLoader メソッドを使用して Spark Tablestore コネクタがロードされることを指定します。 特定の構成については、META-INF.services ディレクトリを参照してください。

  • instanceNametableNameendpointaccessKeyIdaccessKeySecret は、それぞれ Tablestore インスタンス名、データテーブル名、インスタンスエンドポイント、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID、Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレットを指定します。

  • split.size.mbs は、各分割のサイズを MB 単位で指定します。 デフォルト値: 100。このパラメータの値が小さいほど、より多くの分割が生成され、より多くの Spark タスクに対応します。

  • .schema("salt LONG, UserId STRING, OrderId STRING, price DOUBLE, timestamp LONG") は、データテーブルのスキーマを指定します。 スキーマには、フィールド名とデータ型が指定されています。 サンプルデータテーブルには、salt (Long 型)、UserId (String 型)、OrderId (String 型)、price (Double 型)、timestamp (Long 型) の 5 つのフィールドがあります。

    説明

    最新バージョンでは、.schema() メソッドを使用してカタログ構成を置き換えることができます。 ビジネス要件に基づいてバージョンを選択してください。

val dataCatalog: String =
  s"""
     |{"columns": {
     |    "salt": {"type":"long"},
     |    "UserId": {"type":"string"},
     |    "OrderId": {"type":"string"},
     |    "price": {"type":"double"},
     |    "timestamp": {"type":"long"}
     | }
     |}""".stripMargin

JSON 文字列 dataCatalog を定義して、Tablestore のスキーマ情報を記述します。 各フィールドの名前とデータ型は、キーと値のペアを使用して指定されます。

df.filter("salt = 1 AND UserId = 'user_A'").show(20, truncate = false)

DataFrame に対してフィルター操作を実行して、salt = 1 AND UserId = 'user_A' という条件を満たすデータを選択し、最初の 20 レコードを表示します。

df.createTempView("search_view")
val searchDF = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId = 'user_A'")
searchDF.show()
val searchDF2 = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId = 'user_A'" +
  " AND OrderId = '00002664-9d8b-441b-bad7-845202f3b142'")
searchDF2.show()
val searchDF3 = sparkSession.sql("SELECT COUNT(*) FROM search_view WHERE salt = 1 AND UserId >= 'user_A' AND UserId < 'user_B'")
searchDF3.show()

DataFrame を一時ビュー search_view として登録し、Spark SQL を介して複数の集計クエリを実行して、さまざまな条件を満たすレコードをカウントします。

  • SQL クエリ 1: salt = 1 AND UserId = 'user_A' という条件を満たすレコードの総数をカウントします。

  • SQL クエリ 2: salt = 1 AND UserId = 'user_A' AND OrderId = '00002664-9d8b-441b-bad7-845202f3b142' という条件を満たすレコードの総数をカウントします。

  • SQL クエリ 3: salt = 1 AND UserId >= 'user_A' AND UserId < 'user_B'' という条件を満たすレコードの総数をカウントします。

ステップ 4: コードを実行およびデバッグする

コードは、ローカルまたは Spark クラスタで実行およびデバッグできます。 このセクションでは、TableStoreBatchSample を例として使用して、デバッグプロセスについて説明します。

ローカル開発環境

このセクションでは、IntelliJ IDEA を使用した Windows オペレーティングシステムを例として使用して、コードをデバッグする方法について説明します。

  1. Scala プラグインをインストールします。

    デフォルトでは、IntelliJ IDEA は Scala をサポートしていません。 Scala プラグインを手動でインストールする必要があります。

  2. winutils.exe をインストールします (このトピックでは winutils 3.3.6 を使用します)。

    Windows 環境で Spark を実行する場合、互換性の問題を解決するために winutils.exe もインストールする必要があります。 winutils.exe は、GitHub プロジェクトのホームページからダウンロードできます。

  3. Scala プログラム TableStoreBatchSample を右クリックし、[実行構成の変更] を選択して、[実行] [構成] ダイアログボックスを開きます。

    説明

    実際の操作は、オペレーティングシステムと IntelliJ IDEA のバージョンによって多少異なります。

    1. [プログラム引数] フィールドに、インスタンス名、データテーブル名、AccessKey ID、AccessKey シークレット、インスタンスエンドポイントを順番に指定します。

      myinstance search_view LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com
    2. [オプションの変更] をクリックし、["provided" スコープの依存関係をクラスパスに追加する] を選択して、[OK] をクリックします。

      2025-05-13_145645

  4. Scala プログラムを実行します。

    Scala プログラムを実行すると、結果は Tablestore コンソールに出力されます。

    With DataFrame
    +----+------+------------------------------------+-----+-------------+
    |salt|UserId|OrderId                             |price|timestamp    |
    +----+------+------------------------------------+-----+-------------+
    |1   |user_A|00002664-9d8b-441b-bad7-845202f3b142|29.6 |1744773183629|
    |1   |user_A|9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f|785.3|1744773190240|
    +----+------+------------------------------------+-----+-------------+
    
    With Spark SQL
    +--------+
    |count(1)|
    +--------+
    |       2|
    +--------+
    
    +--------+
    |count(1)|
    +--------+
    |       1|
    +--------+
    
    +--------+
    |count(1)|
    +--------+
    |       2|
    +--------+

Spark クラスタ環境

重要

デバッグを実行する前に、Spark クラスタをデプロイし、クラスタ環境の Spark バージョンがサンプルプロジェクトの Spark バージョンと一致していることを確認してください。 そうしないと、バージョンの非互換性によりランタイムエラーが発生する可能性があります。

このセクションでは、spark-submit メソッドを例として使用します。 サンプルコードのマスターは、デフォルトで local[*] に設定されています。 Spark クラスタでコードを実行する場合は、この設定を削除し、spark-submit パラメータを使用してマスターを指定できます。

  1. mvn -U clean package コマンドを実行して、プロジェクトをパッケージ化します。 JAR パッケージのパスは、target/tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar です。

  2. JAR パッケージを Spark クラスタの Driver ノードにアップロードし、spark-submit を使用してタスクを送信します。

    spark-submit --class com.aliyun.tablestore.spark.demo.batch.TableStoreBatchSample --master yarn tablestore-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar myinstance search_view LTAI********************** DByT************************** https://myinstance.cn-hangzhou.ots.aliyuncs.com 

    fig_batch_dataframe001

付録: サンプルデータテーブル

次の表に、search_view テーブルのスキーマとサンプルデータを示します。

サンプルテーブルスキーマ

フィールド名

説明

pk

long

プライマリキー列。

salt

long

ランダムな salt 値。

UserId

string

ユーザー ID。

OrderId

string

注文 ID。

price

double

注文金額。

timestamp

long

タイムスタンプ。

サンプルデータ

pk (プライマリキー列)

salt

UserId

OrderId

price

timestamp

1

1

user_A

00002664-9d8b-441b-bad7-845202f3b142

29.6

1744773183629

2

1

user_A

9d8b7a6c-5e4f-4321-8765-0a9b8c7d6e5f

785.3

1744773190240

3

2

user_A

c3d4e5f6-7a8b-4901-8c9d-0a1b2c3d4e5f

187

1744773195579

4

3

user_B

f1e2d3c4-b5a6-4789-90ab-123cdef45678

11.9

1744773203345

5

4

user_B

e2f3a4b5-c6d7-4890-9abc-def012345678

2547

1744773207789