本章节介绍如何在 MapReduce 中消费 TableStore 中的数据。
MR接入TableStore
- 准备一张数据表
创建一张表pet,其中name为主键。
name owner species sex birth death Fluffy Harold cat f 1993-02-04 - Claws Gwen cat m 1994-03-17 - Buffy Harold dog f 1989-05-13 - Fang Benny dog m 1990-08-27 - Bowser Diane dog m 1979-08-31 1995-07-29 Chirpy Gwen bird f 1998-09-11 - Whistler Gwen bird - 1997-12-09 - Slim Benny snake m 1996-04-29 - Puffball Diane hamster f 1999-03-30 - - 下面这个例子示例了如何在MR中消费TableStore中的数据。
public class RowCounter { public static class RowCounterMapper extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> { private final static Text agg = new Text("TOTAL"); private final static LongWritable one = new LongWritable(1); @Override public void map(PrimaryKeyWritable key, RowWritable value, Context context) throws IOException, InterruptedException { context.write(agg, one); } } public static class IntSumReducer extends Reducer<Text,LongWritable,Text,LongWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } } private static RangeRowQueryCriteria fetchCriteria() { RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet"); res.setMaxVersions(1); List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>(); List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>(); lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN)); upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX)); res.setInclusiveStartPrimaryKey(new PrimaryKey(lower)); res.setExclusiveEndPrimaryKey(new PrimaryKey(upper)); return res; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); job.setJarByClass(RowCounter.class); job.setMapperClass(RowCounterMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TableStoreInputFormat.class); TableStore.setCredential(job, accessKeyId, accessKeySecret, securityToken); TableStore.setEndpoint(job, endpoint, instance); TableStoreInputFormat.addCriteria(job, fetchCriteria()); FileOutputFormat.setOutputPath(job, new Path(outputPath)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
- 下面这个例子示例了如何在MR中将数据写到TableStore。
public static class OwnerMapper extends Mapper<PrimaryKeyWritable, RowWritable, Text, MapWritable> { @Override public void map(PrimaryKeyWritable key, RowWritable row, Context context) throws IOException, InterruptedException { PrimaryKeyColumn pet = key.getPrimaryKey().getPrimaryKeyColumn("name"); Column owner = row.getRow().getLatestColumn("owner"); Column species = row.getRow().getLatestColumn("species"); MapWritable m = new MapWritable(); m.put(new Text(pet.getValue().asString()), new Text(species.getValue().asString())); context.write(new Text(owner.getValue().asString()), m); } } public static class IntoTableReducer extends Reducer<Text,MapWritable,Text,BatchWriteWritable> { @Override public void reduce(Text owner, Iterable<MapWritable> pets, Context context) throws IOException, InterruptedException { List<PrimaryKeyColumn> pkeyCols = new ArrayList<PrimaryKeyColumn>(); pkeyCols.add(new PrimaryKeyColumn("owner", PrimaryKeyValue.fromString(owner.toString()))); PrimaryKey pkey = new PrimaryKey(pkeyCols); List<Column> attrs = new ArrayList<Column>(); for(MapWritable petMap: pets) { for(Map.Entry<Writable, Writable> pet: petMap.entrySet()) { Text name = (Text) pet.getKey(); Text species = (Text) pet.getValue(); attrs.add(new Column(name.toString(), ColumnValue.fromString(species.toString()))); } } RowPutChange putRow = new RowPutChange(outputTable, pkey) .addColumns(attrs); BatchWriteWritable batch = new BatchWriteWritable(); batch.addRowChange(putRow); context.write(owner, batch); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, TableStoreOutputFormatExample.class.getName()); job.setMapperClass(OwnerMapper.class); job.setReducerClass(IntoTableReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MapWritable.class); job.setInputFormatClass(TableStoreInputFormat.class); job.setOutputFormatClass(TableStoreOutputFormat.class); TableStore.setCredential(job, accessKeyId, accessKeySecret, securityToken); TableStore.setEndpoint(job, endpoint, instance); TableStoreInputFormat.addCriteria(job, ...); TableStoreOutputFormat.setOutputTable(job, outputTable); System.exit(job.waitForCompletion(true) ? 0 : 1); }
附录
完整示例代码请参考:MR读TableStore。