すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用して HBase にアクセスする

最終更新日:Jan 11, 2025

このトピックでは、Spark を使用して HBase にデータを書き込む方法について説明します。

Spark を使用して HBase にアクセスする例

重要 コンピューティングクラスターは、HBase クラスターと同じセキュリティグループに属している必要があります。そうでない場合、クラスターを接続できません。 EMR コンソールでコンピューティングクラスターを作成する場合は、HBase クラスターが配置されているセキュリティグループを選択する必要があります。
  • Java コード
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    
    try {
      List<byte[]> list = new ArrayList<>();
      list.add(Bytes.toBytes("1"));
      ...
      list.add(Bytes.toBytes("5"));
    
      JavaRDD<byte[]> rdd = jsc.parallelize(list);
      Configuration conf = HBaseConfiguration.create();
    
      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
    
      hbaseContext.foreachPartition(rdd,
          new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
       public void call(Tuple2<Iterator<byte[]>, Connection> t)
            throws Exception {
        Table table = t._2().getTable(TableName.valueOf(tableName));
        BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
        while (t._1().hasNext()) {
          byte[] b = t._1().next();
          Result r = table.get(new Get(b));
          if (r.getExists()) {
           mutator.mutate(new Put(b));
          }
        }
    
        mutator.flush();
        mutator.close();
        table.close();
       }
      });
    } finally {
      jsc.stop();
    }
  • Scala コード
    val sc = new SparkContext("local", "test")
    val config = new HBaseConfiguration()
    
    ...
    
    val hbaseContext = new HBaseContext(sc, config)
    
    rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
      val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
      it.foreach((putRecord) => {
       . val put = new Put(putRecord._1)
       . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
       . bufferedMutator.mutate(put)
     })
      bufferedMutator.flush()
      bufferedMutator.close()
    })

参照

  • Spark を使用して EMR HBase クラスターにアクセスする方法の詳細については、hbase-connectors をご参照ください。
  • HBase の詳細については、HBase and Spark をご参照ください。