本文为您介绍如何通过Spark读取或写入数据至Hologres的操作方法。
背景信息
Spark是用于大规模数据处理的统一分析引擎,Hologres已经与Spark(社区版以及EMR Spark版)高效打通,快速助力企业搭建数据仓库。Hologres提供的Spark Connector,支持Spark以批处理的方式将数据写入Hologres,同时Spark支持读取多种数据源(例如文件、Hive、MySQL、PostgreSQL等)。
Hologres兼容PostgreSQL,因此Spark也可以用读取PostgreSQL的方式直接读取Hologres数据,进行ETL处理,再写入Hologres及其他数据源,完成大数据开发抽取、处理、加载的完整闭环。
前提条件
- 实例版本需为V0.9及以上版本。请在Hologres管控台的实例详情页查看当前实例版本,如实例是V0.9以下版本,请您使用自助升级或通过搜索(钉钉群号:32314975)加入实时数仓Hologres交流群申请升级实例。
- 需要安装对应版本的Spark环境,能够运行
spark-shell
命令。
通过Spark Connector写入(推荐使用)
Hologres当前支持使用内置的Spark Connector将Spark数据写入Hologres,相比其他写入方式,调用基于Holo Client实现Connector写入的方式性能更优。具体操作步骤如下,阿里云也为您提供了相关的使用示例,详情请参见通过Spark Connector写入使用示例。
- 获取JAR包。Spark2和Spark3上均已支持Connector写入,Spark写入Hologres时需要引用connector的JAR包,当前已经发布到Maven中央仓库,在项目中参照如下pom文件进行配置。说明 相关Connector也已开源,详情请参见hologres-connectors。
当前Hologres已自动生成JAR文件,下载链接如下。<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-2.x</artifactId> <version>1.0.1</version> <classifier>jar-with-dependencies</classifier> </dependency>
- 使用JAR包。执行如下命令,启动Spark。
spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
- 配置Spark。在Spark中执行如下命令,以连接Hologres将数据加载至目标表中。
- 代码:
df.write .format("hologres") .option(SourceProvider.USERNAME, "AccessKey ID") //阿里云账号的AccessKey ID。 .option(SourceProvider.PASSWORD, "Accesskey SECRET") //阿里云账号的Accesskey SECRET。 .option(SourceProvider.ENDPOINT, "Ip:Port") //Hologres的Ip和Port。 .option(SourceProvider.DATABASE, "Database") .option(SourceProvider.TABLE, "Table") .save()
说明 其中如下代码可以进行替换。- 原代码:
.option(SourceProvider.endpoint, "Ip:Port") .option(SourceProvider.database, "Database")
- 替换代码:
.option(SourceProvider.jdbcUrl, "jdbc:postgresql://Ip:Port/Database")
- 原代码:
- 参数释义:
参数名 默认值 是否必填 参数描述 USERNAME 无 是 登录Hologres账号的AccessKey ID。您可以单击AccessKey 管理 PASSWORD 无 是 登录Hologres账号的AccessKey Secret。您可以单击AccessKey 管理 TABLE 无 是 Hologres用于接收数据的表名称。 ENDPOINT 无 与JDBCURL二选一 Hologres实例的网络域名。 您可以进入Hologres管理控制台实例详情页,从实例配置获取主机和端口号。
DATABASE 无 与JDBCURL二选一 Hologres接收数据的表所在数据库名称。 JDBCURL 无 与ENDPOINT+DATABASE组合设置二选一 Hologres的JDBCURL。 INPUT_DATA_SCHEMA_DDL 无 spark3.x必填,值为 <your_DataFrame>.schema.toDDL
。Spark中DataFrame的DDL。 WRITE_MODE INSERT_OR_REPLACE 否 当INSERT目标表为有主键的表时采用不同策略。 - INSERT_OR_IGNORE:当主键冲突时,不写入。
- INSERT_OR_UPDATE:当主键冲突时,更新相应列。
- INSERT_OR_REPLACE:当主键冲突时,更新所有列。
WRITE_BATCH_SIZE 512 否 每个写入线程的最大批次大小,在经过WRITE_MODE合并后的Put数量达到WRITE_BATCH_SIZE时进行一次批量提交。 WRITE_BATCH_BYTE_SIZE 2 MB 否 每个写入线程的最大批次Byte大小,在经过WRITE_MODE合并后的Put数据字节数达到WRITE_BATCH_BYTE_SIZE时进行一次批量提交。 WRITE_MAX_INTERVAL_MS 10000 ms 否 距离上次提交超过WRITE_MAX_INTERVAL_MS会触发一次批量提交。 WRITE_FAIL_STRATEGY TYR_ONE_BY_ONE 否 当某一批次提交失败时,会将批次内的记录逐条提交(保序),单条提交失败的记录将会跟随异常被抛出。 WRITE_THREAD_SIZE 1 否 写入并发线程数(每个并发占用1个数据库连接)。 在一个Spark作业中,占用的总连接数与Spark并发相关,关系为
总连接数= spark.default.parallelism * WRITE_THREAD_SIZE
。DYNAMIC_PARTITION false 否 若为true,写入分区表父表时,当分区不存在时自动创建分区。 RETRY_COUNT 3 否 当连接故障时,写入和查询的重试次数。 RETRY_SLEEP_INIT_MS 1000 ms 否 每次重试的等待时间=RETRY_SLEEP_INIT_MS+RETRY_COUNT*RETRY_SLEEP_STEP_MS。 RETRY_SLEEP_STEP_MS 10*1000 ms 否 每次重试的等待时间=RETRY_SLEEP_INIT_MS+RETRY_COUNT*RETRY_SLEEP_STEP_MS。 CONNECTION_MAX_IDLE_MS 60000 ms 否 写入线程和点查线程数据库连接的最大IDLE时间,超过此时间的连接将被释放。
- 代码:
通过Spark Connector写入使用示例
根据如下示例步骤为您介绍,如何通过Spark Connector将数据写入Hologres。
- 创建Hologres表。在Hologres中执行如下SQL命令创建目标表,用来接受数据。
CREATE TABLE tb008 ( id BIGINT primary key, counts INT, name TEXT, price NUMERIC(38, 18), out_of_stock BOOL, weight DOUBLE PRECISION, thick FLOAT, time TIMESTAMPTZ, dt DATE, by bytea, inta int4[], longa int8[], floata float4[], doublea float8[], boola boolean[], stringa text[] );
- Spark准备数据并写入Hologres。
- 查询写入的数据。在Hologres侧查询目标表,即可确认写入的数据,示例如下图所示。
通过Spark读取数据源数据并写入Hologres
- Spark从数据源读取数据。Spark支持从不同数据源读取数据,具体数据源分类如下。
- Spark支持以Hologres为数据源。Hologres兼容PostgreSQL,因为Spark可以用读取PostgreSQL的方式读取Hologres中的数据。读取代码如下。说明 在使用JDBC方式进行读取前,请前往官网下载Postgresql JDBC Jar,本文以
postgresql-42.2.18
版本为例,在spark-shell启动时执行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar
加载该jar,可以与hologres-connector的jar包一同加载。// Read from some table, for example: tb008 val readDf = spark.read .format("jdbc") //使用postgresql jdbc driver读取holo .option("driver","org.postgresql.Driver") .option("url", "jdbc:postgresql://Ip:Por/test_database") .option("dbtable", "tb008") .option("user", "your_username") .option("password", "your_password") .load()
- Spark支持其他数据源(如Parquet格式的文件)。Spark支持从其他数据源中读取数据写入Hologres中,例如使用Spark从Hive中读取数据,代码如下。
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Read from some table, for example: phone val readDf = hiveContext.sql("select * from hive_database.phone")
- Spark支持以Hologres为数据源。
- Spark将读到的数据写入Hologres。
import com.alibaba.hologres.spark2.sink.SourceProvider -- Write to hologres table df.write .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.ENDPOINT, "Ip:Port") .option(SourceProvider.DATABASE, "test_database") .option(SourceProvider.TABLE, table) .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 写入攒批大小 .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 仅spark3.x需要 .mode(SaveMode.Append) // 仅spark3.x需要 .save()
通过Spark实时写入数据至Hologres
- 在Hologres创建一张表,用于接收数据,创建代码如下。
CREATE TABLE test_table_stream ( value text, count bigint );
- 读取本地端口输入行,进行词频统计并实时写入Hologres中,相关示例代码如下。
val spark = SparkSession .builder .appName("StreamToHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() -- Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) -- Generate running word count val wordCounts = words.groupBy("value").count() wordCounts.writeStream .outputMode(OutputMode.Complete()) .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname") .option(SourceProvider.TABLE, "test_table_stream") .option("batchsize", 1) .option("isolationLevel", "NONE") .option("checkpointLocation", checkpointLocation) .start() .awaitTermination()
数据类型映射
Spark与Hologres的数据类型映射如下表所示。
Spark类型 | Hologres类型 |
---|---|
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA |
ArrayType(IntegerType) | int4[] |
ArrayType(LongType) | int8[] |
ArrayType(FloatType | float4[] |
ArrayType(DoubleType) | float8[] |
ArrayType(BooleanType) | boolean[] |
ArrayType(StringType) | text[] |