全部产品
Search
文档中心

通过Spark访问HBase增强版

更新时间: 2021-03-29

本文主要介绍如何通过Spark访问HBase增强版。

访问准备

HBase增强版支持从Spark访问,用户需要添加alihbase-connector的依赖,具体步骤如下:

  1. 确认Spark环境中依赖的开源hbase客户端版本以及安装目录,可以yarn logs -applicationId xxx查看具体执行日志中加载的版本和路径,hbase-client-xxx.jar。

  2. 确认需要添加alihbase-connector插件的版本,查考历史版本(Maven方式)中的版本对应表

  3. 通过历史版本(Jar包替换)文档中插件下载地址,下载对应版本的alihbase-connector,将alihbase-connector放入开源hbase客户端相同目录中。

获取访问地址

参见使用 Java API访问增强版集群,使用地址中Java API访问地址,默认端口为30020,如果是公网访问,请使用公网域名。

获取用户名和密码

默认的用户名为root,密码为root。或者在集群管理页面中关闭ACL功能后,无需再提供用户名密码。

添加HBase增强版访问配置

方式一:配置文件

hbase-site.xml 中增加下列配置项:

<configuration>
      <!--
    集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
    -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020</value>
    </property>
    <!--
    设置用户名密码,默认root:root,可根据实际情况调整
    -->
    <property>
        <name>hbase.client.username</name>
        <value>root</value>
    </property>
    <property>
        <name>hbase.client.password</name>
        <value>root</value>
    </property>
    <property>
        <name>hbase.client.connection.impl</name>
       <value>org.apache.hadoop.hbase.client.AliHBaseUEClusterConnection</value>
    </property-->
</configuration>

方式二:代码

通过代码在Configuration中添加参数:

// 新建一个Configuration
Configuration conf = HBaseConfiguration.create();
// 集群的连接地址,在控制台页面的数据库连接界面获得(注意公网地址和VPC内网地址)
conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020");
// 设置用户名密码,默认root:root,可根据实际情况调整
conf.set("hbase.client.username", "root")
conf.set("hbase.client.password", "root")
conf.set("hbase.client.connection.impl", AliHBaseUEClusterConnection.class.getName());

Spark访问示例

test(" test the spark sql count result") {
  //1. 添加hbase ue访问配置
  var conf = HBaseConfiguration.create
  conf.set("hbase.zookeeper.quorum", "ld-xxxx-proxy-hbaseue.hbaseue.xxx.rds.aliyuncs.com:30020")
  conf.set("hbase.client.username", "test_user")
  conf.set("hbase.client.password", "password")
  //2. 创建表
  val hbaseTableName = "testTable"
  val cf = "f"
  val column1 = cf + ":a"
  val column2 = cf + ":b"
  var rowsCount: Int = -1
  var namespace = "spark_test"
  val admin = ConnectionFactory.createConnection(conf).getAdmin()
  val tableName = TableName.valueOf(namespace, hbaseTableName)   
  val htd = new HTableDescriptor(tableName)
  htd.addFamily(new HColumnDescriptor(cf))
  admin.createTable(htd)
  //3. 插入测试数据
  val rng = new Random()
  val k: Array[Byte] = new Array[Byte](3)
  val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
  val puts = new util.ArrayList[Put]()
  var i = 0
  for (b1 <- ('a' to 'z')) {
      for (b2 <- ('a' to 'z')) {
        for (b3 <- ('a' to 'z')) {
          if(i < 10) {
            k(0) = b1.toByte
            k(1) = b2.toByte
            k(2) = b3.toByte
            val put = new Put(k)
            put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
            puts.add(put)
            i = i + 1
          }
        }
      }
  }
  val conn = ConnectionFactory.createConnection(conf)
  val table = conn.getTable(tableName)
  table.put(puts)
  //4. 创建spark表
  val sparkTableName = "spark_hbase"
  val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
                         |    OPTIONS ('catalog'=
                         |    '{"table":{"namespace":"$${hbaseTableName}",                   "name":"${hbaseTableName}"},"rowkey":"rowkey",
                         |    "columns":{
                         |    "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
                         |    "col1":{"cf":"cf1", "col":"a", "type":"string"},
                         |    "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
                         |    )""".stripMargin
  println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
  sparkSession.sql(createCmd)
  //5. 执行count sql
  val result = sparkSession.sql("select count(*) from " + sparkTableName)
  val sparkCounts = result.collect().apply(0).getLong(0)
  println(" sparkCounts : " + sparkCounts)