このトピックでは、Spark を使用して Tablestore のデータを使用する方法について説明します。
Tablestore への Spark アクセス
Tablestore コンソールで pet という名前のテーブルを作成し、name フィールドをプライマリキーとして設定します。
name | owner | species | sex | birth | death |
Fluffy | Harold | cat | f | 1993-02-04 | - |
Claws | Gwen | cat | m | 1994-03-17 | - |
Buffy | Harold | dog | f | 1989-05-13 | - |
Fang | Benny | dog | m | 1990-08-27 | - |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | - |
Whistler | Gwen | bird | - | 1997-12-09 | - |
Slim | Benny | snake | m | 1996-04-29 | - |
Puffball | Diane | hamster | f | 1999-03-30 | - |
次の例は、pet テーブルのデータを使用する方法を示しています。
private static RangeRowQueryCriteria fetchCriteria() {
// クエリ条件を設定します
RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
res.setMaxVersions(1);
List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
return res;
}
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
JavaSparkContext sc = null;
try {
sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
// 認証情報を設定します
TableStore.setCredential(
hadoopConf,
new Credential(accessKeyId, accessKeySecret, securityToken));
// エンドポイントを設定します
Endpoint ep = new Endpoint(endpoint, instance);
TableStore.setEndpoint(hadoopConf, ep);
// クエリ条件を追加します
TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
hadoopConf, TableStoreInputFormat.class,
PrimaryKeyWritable.class, RowWritable.class);
System.out.println(
new Formatter().format("TOTAL: %d", rdd.count()).toString());
} finally {
if (sc != null) {
sc.close();
}
}
}
Spark SQL ステートメントを使用して Tablestore にアクセスする
SQL ステートメントの例
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
--hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
--hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
サンプルコードを実行する前に、環境変数を構成する必要があります。 環境変数の構成方法の詳細については、「環境変数の構成」をご参照ください。
Tablestore にアクセスするために使用されるデータソースのタイプは、
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
から取得できます。 E-MapReduce(EMR)クラスターで Spark 2 を使用している場合は、前のステートメントのspark3
をspark2
に変更する必要があります。Spark 3 を使用して Tablestore にアクセスするときに
java.lang.ClassNotFoundException: org.apache.commons.net.util.Base64
エラーが報告された場合は、commons-net 依存関係を追加する必要があります。--jars
の値を/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar
に変更できます。ステートメントを実行するたびに
--jars
パラメーターを追加したくない場合は、Spark サービスページの/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar
spark.driver.extraClassPathspark.executor.extraClassPath[構成] タブで、 および 構成項目の値に を追加できます。
次の例は、テーブルを作成し、テーブルからデータを読み取る方法を示しています。
create table test_tableStore
using tablestore
options(endpoint = 'https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
access.key.id = '${hiveconf:accessKeyId}',
access.key.secret = '${hiveconf:accessKeySecret}',
table.name = 'test_table',
instance.name = 'test_instance',
catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);
select * from test_tableStore
関連情報
完全なサンプルコードについては、GitHub をご覧ください。
詳細については、「バッチコンピューティング」をご参照ください。