すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Tablestore への Spark アクセス

最終更新日:Jan 11, 2025

このトピックでは、Spark を使用して Tablestore のデータを使用する方法について説明します。

Tablestore への Spark アクセス

Tablestore コンソールで pet という名前のテーブルを作成し、name フィールドをプライマリキーとして設定します。

name

owner

species

sex

birth

death

Fluffy

Harold

cat

f

1993-02-04

-

Claws

Gwen

cat

m

1994-03-17

-

Buffy

Harold

dog

f

1989-05-13

-

Fang

Benny

dog

m

1990-08-27

-

Bowser

Diane

dog

m

1979-08-31

1995-07-29

Chirpy

Gwen

bird

f

1998-09-11

-

Whistler

Gwen

bird

-

1997-12-09

-

Slim

Benny

snake

m

1996-04-29

-

Puffball

Diane

hamster

f

1999-03-30

-

次の例は、pet テーブルのデータを使用する方法を示しています。

private static RangeRowQueryCriteria fetchCriteria() {
    // クエリ条件を設定します
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}
public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    Configuration hadoopConf = new Configuration();
    JavaSparkContext sc = null;
    try {
        sc = new JavaSparkContext(sparkConf);
        Configuration hadoopConf = new Configuration();
        // 認証情報を設定します
        TableStore.setCredential(
                hadoopConf,
                new Credential(accessKeyId, accessKeySecret, securityToken));
        // エンドポイントを設定します
        Endpoint ep = new Endpoint(endpoint, instance);
        TableStore.setEndpoint(hadoopConf, ep);
        // クエリ条件を追加します
        TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
                hadoopConf, TableStoreInputFormat.class,
                PrimaryKeyWritable.class, RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        if (sc != null) {
            sc.close();
        }
    }
}

Spark SQL ステートメントを使用して Tablestore にアクセスする

SQL ステートメントの例

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
説明
  • サンプルコードを実行する前に、環境変数を構成する必要があります。 環境変数の構成方法の詳細については、「環境変数の構成」をご参照ください。

  • Tablestore にアクセスするために使用されるデータソースのタイプは、/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* から取得できます。 E-MapReduce(EMR)クラスターで Spark 2 を使用している場合は、前のステートメントの spark3spark2 に変更する必要があります。

  • Spark 3 を使用して Tablestore にアクセスするときに java.lang.ClassNotFoundException: org.apache.commons.net.util.Base64 エラーが報告された場合は、commons-net 依存関係を追加する必要があります。 --jars の値を /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar に変更できます。

  • ステートメントを実行するたびに --jars パラメーターを追加したくない場合は、Spark サービスページの /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jarspark.driver.extraClassPathspark.executor.extraClassPath[構成] タブで、 および 構成項目の値に を追加できます。

次の例は、テーブルを作成し、テーブルからデータを読み取る方法を示しています。

create table test_tableStore
using tablestore
  options(endpoint = 'https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
  access.key.id = '${hiveconf:accessKeyId}',
  access.key.secret = '${hiveconf:accessKeySecret}',
  table.name = 'test_table',
  instance.name = 'test_instance',
  catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);

select * from test_tableStore

関連情報