edit-icon download-icon

Spark + TableStore

Last Updated: Jun 06, 2017

Spark access to TableStore

  • Prepare a data table

Let us prepare a pet table as an example, where name is the only primary key.

name owner species sex birth death
Fluffy Harold cat f 1993-02-04
Claws Gwen cat m 1994-03-17
Buffy Harold dog f 1989-05-13
Fang Benny dog m 1990-08-27
Bowser Diane dog m 1979-08-31 1995-07-29
Chirpy Gwen bird f 1998-09-11
Whistler Gwen bird 1997-12-09
Slim Benny snake m 1996-04-29
Puffball Diane hamster f 1999-03-30
  • The example below demonstrates how Spark consumes table data in TableStore.
  1. private static RangeRowQueryCriteria fetchCriteria() {
  2. RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
  3. res.setMaxVersions(1);
  4. List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
  5. List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
  6. lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
  7. upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
  8. res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
  9. res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
  10. return res;
  11. }
  12. public static void main(String[] args) {
  13. SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
  14. JavaSparkContext sc = new JavaSparkContext(sparkConf);
  15. Configuration hadoopConf = new Configuration();
  16. JavaSparkContext sc = null;
  17. try {
  18. sc = new JavaSparkContext(sparkConf);
  19. Configuration hadoopConf = new Configuration();
  20. TableStore.setCredential(
  21. hadoopConf,
  22. new Credential(accessKeyId, accessKeySecret, securityToken));
  23. Endpoint ep = new Endpoint(endpoint, instance);
  24. TableStore.setEndpoint(hadoopConf, ep);
  25. TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
  26. JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
  27. hadoopConf, TableStoreInputFormat.class,
  28. PrimaryKeyWritable.class, RowWritable.class);
  29. System.out.println(
  30. new Formatter().format("TOTAL: %d", rdd.count()).toString());
  31. } finally {
  32. if (sc != null) {
  33. sc.close();
  34. }
  35. }
  36. }

Appendix

For complete sample code, see:

Thank you! We've received your feedback.