このトピックでは、Spark を使用して HBase にデータを書き込む方法について説明します。
Spark を使用して HBase にアクセスする例
重要 コンピューティングクラスターは、HBase クラスターと同じセキュリティグループに属している必要があります。そうでない場合、クラスターを接続できません。 EMR コンソールでコンピューティングクラスターを作成する場合は、HBase クラスターが配置されているセキュリティグループを選択する必要があります。
- Java コード
JavaSparkContext jsc = new JavaSparkContext(sparkConf); try { List<byte[]> list = new ArrayList<>(); list.add(Bytes.toBytes("1")); ... list.add(Bytes.toBytes("5")); JavaRDD<byte[]> rdd = jsc.parallelize(list); Configuration conf = HBaseConfiguration.create(); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.foreachPartition(rdd, new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() { public void call(Tuple2<Iterator<byte[]>, Connection> t) throws Exception { Table table = t._2().getTable(TableName.valueOf(tableName)); BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); while (t._1().hasNext()) { byte[] b = t._1().next(); Result r = table.get(new Get(b)); if (r.getExists()) { mutator.mutate(new Put(b)); } } mutator.flush(); mutator.close(); table.close(); } }); } finally { jsc.stop(); } - Scala コード
val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() ... val hbaseContext = new HBaseContext(sc, config) rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) it.foreach((putRecord) => { . val put = new Put(putRecord._1) . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) . bufferedMutator.mutate(put) }) bufferedMutator.flush() bufferedMutator.close() })
参照
- Spark を使用して EMR HBase クラスターにアクセスする方法の詳細については、hbase-connectors をご参照ください。
- HBase の詳細については、HBase and Spark をご参照ください。