このページでは、Spark で Table Store データを消費する方法について説明します。
Spark への Table Store アクセス許可
- テーブルを準備します。
pet という名前のテーブルを作成します。 名前列をプライマリキーフィールドに設定します。
名前 | 飼い主 | 種類 | 性別 | 誕生日 | 命日 |
---|---|---|---|---|---|
ふわふわ | ハロルド | ネコ | 雌 | 1993-02-04 | |
クローズ | グウェン | ネコ | 雄 | 1994-03-17 | |
バフィ | ハロルド | 犬 | 雌 | 1989-05-13 | |
ファング | ベニー | 犬 | 雄 | 1990-08-27 | |
クッパ | ダイアン | 犬 | 雄 | 1979-08-31 | 1995-07-29 |
チャーピー | グウェン | 鳥 | 雌 | 1998-09-11 | |
ウィスラー | グウェン | 鳥 | 1997-12-09 | ||
スリム | ベニー | ヘビ | 雄 | 1996-04-29 | |
パフボール | ダイアン | ハムスター | 雌 | 1999-03-30 |
- 以下の例に、Spark が Table Store データを消費する方法を示します。
private static RangeRowQueryCriteria fetchCriteria() { RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet"); res.setMaxVersions(1); List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>(); List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>(); lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN)); upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX)); res.setInclusiveStartPrimaryKey(new PrimaryKey(lower)); res.setExclusiveEndPrimaryKey(new PrimaryKey(upper)); return res; } public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("RowCounter"); JavaSparkContext sc = new JavaSparkContext(sparkConf); Configuration hadoopConf = new Configuration(); JavaSparkContext sc = null; try { sc = new JavaSparkContext(sparkConf); Configuration hadoopConf = new Configuration(); TableStore.setCredential( hadoopConf, new Credential(accessKeyId, accessKeySecret, securityToken)); Endpoint ep = new Endpoint(endpoint, instance); TableStore.setEndpoint(hadoopConf, ep); TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria()); JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD( hadoopConf, TableStoreInputFormat.class, PrimaryKeyWritable.class, RowWritable.class); System.out.println( new Formatter().format("TOTAL: %d", rdd.count()).toString()); } finally { if (sc ! = null) { sc.close(); } } }
付録
完全なサンプルコードについては、以下をご参照ください。Spark への Table Store アクセス許可