This topic describes how to use the Spark connector provided by Hologres to read data from Apache Spark and write the data to Hologres.
Background information
Apache Spark is a unified analytics engine that processes a large amount of data. Hologres is integrated with both the Apache Spark community and Apache Spark on Amazon EMR to help you build data warehouses. Hologres provides the Spark connector that allows you to write data from Apache Spark to Hologres in batch mode. You can use the connector to read data from multiple types of sources, such as files, Hive tables, MySQL tables, and PostgreSQL tables.
Hologres is compatible with PostgreSQL. You can use the Spark connector to read Hologres data based on the PostgreSQL Java Database Connectivity (JDBC) driver. Then, you can extract, transform, and load (ETL) the data and write it to Hologres or other destinations.
Prerequisites
- The version of your Hologres instance is V0.9 or later. You can view the version of your Hologres instance on the instance details page in the Hologres console. If the version of your Hologres instance is earlier than V0.9, contact technical support in the DingTalk group (ID 32314975) to apply for an instance update.
- Apache Spark of a specific version that supports the
spark-shell
command is installed.
(Recommended) Use the Spark connector to write data to Hologres
We recommend that you use the built-in Spark connector of Hologres to write data to Hologres. The Spark connector works together with Holo Client. Compared with other methods of writing data, the Spark connector provides better write performance. To use the Spark connector to write data, perform the following steps. For more information about the sample code, see the Example of using the Spark connector to write data to Hologres section in this topic.
- Obtain a JAR package. The Spark connector is supported by both Apache Spark 2 and Apache Spark 3. When you use the Spark connector to write data, you must reference a JAR package of the Spark connector. The JAR package is already published in the Maven Central Repository. You can refer to the following pom.xml file for configurations.Note The Spark connector is open sourced on the alibabacloud-hologres-connectors page on GitHub.
You can click the following links to download the JAR packages provided by Hologres:<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-2.x</artifactId> <version>1.0.1</version> <classifier>jar-with-dependencies</classifier> </dependency>
- Use the JAR package. Run the following command to start the Spark connector:
spark-shell --jars hologres-connector-spark-2.x-1.0-SNAPSHOT-jar-with-dependencies.jar
- Configure the Apache Spark service. Run the following commands in Apache Spark to connect to Hologres and write data to the destination table:
-
df.write .format("hologres") .option(SourceProvider.USERNAME, "AccessKey ID") // The AccessKey ID of your Alibaba Cloud account. .option(SourceProvider.PASSWORD, "Accesskey SECRET") // The AccessKey secret of your Alibaba Cloud account. .option(SourceProvider.ENDPOINT, "Ip:Port") // The IP address and the port number of the Hologres instance. .option(SourceProvider.DATABASE, "Database") .option(SourceProvider.TABLE, "Table") .save()
Note The following settings are alternative:- Setting 1:
.option(SourceProvider.endpoint, "Ip:Port") .option(SourceProvider.database, "Database")
- Setting 2:
.option(SourceProvider.jdbcUrl, "jdbc:postgresql://Ip:Port/Database")
- Setting 1:
- Parameters
Parameter Default value Required Description USERNAME N/A Yes The AccessKey ID of your Alibaba Cloud account. You can obtain the AccessKey ID from the Security Management page. PASSWORD N/A Yes The AccessKey secret of your Alibaba Cloud account. You can obtain the AccessKey ID from the Security Management page. TABLE N/A Yes The name of the Hologres table to which you want to write data. ENDPOINT N/A Set the JDBCURL parameter or this parameter. The endpoint of the Hologres instance. You can obtain the endpoint of the Hologres instance from the Network Information section of the instance details page in the Hologres console.
DATABASE N/A Set the JDBCURL parameter or this parameter. The name of the Hologres database in which the destination table resides. JDBCURL N/A Set the ENDPOINT and DATABASE parameters or only this parameter. The JDBC URL of the Hologres instance. INPUT_DATA_SCHEMA_DDL N/A This parameter is required for Spark 3.x. Set the value in the format of <your_DataFrame>.schema.toDDL
.The DDL statement for the DataFrame in Spark. WRITE_MODE INSERT_OR_REPLACE No The policy that is used to handle primary key conflicts. This parameter is available if the destination table contains a primary key. Valid values: - INSERT_OR_IGNORE: discards the data that you want to write if a primary key conflict occurs.
- INSERT_OR_UPDATE: updates the corresponding columns of the destination table if a primary key conflict occurs.
- INSERT_OR_REPLACE: updates all the columns of the destination table if a primary key conflict occurs.
WRITE_BATCH_SIZE 512 No The maximum number of requests allowed in a batch in a thread to write data. If the total number of the PUT requests reaches the WRITE_BATCH_SIZE value after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch. WRITE_BATCH_BYTE_SIZE 2 MB No The maximum number of bytes allowed in a batch in a thread to write data. If the total number of bytes of the PUT requests reaches the WRITE_BATCH_BYTE_SIZE value after conflicts are handled based on the WRITE_MODE parameter, the data is submitted in a batch. WRITE_MAX_INTERVAL_MS 10000 ms No The intervals at which data is submitted in a batch. WRITE_FAIL_STRATEGY TYR_ONE_BY_ONE No The policy that is used to handle submission failures. If a batch cannot be submitted, Holo Client submits the data entries in the batch in the specified sequence. If a single data entry cannot be submitted, Holo Client returns an error message that contains the information about the data entry. WRITE_THREAD_SIZE 1 No The number of concurrent threads to write data. Each thread occupies one connection. The total number of connections occupied by a Spark job depends on the Spark parallelism. The total number of connections can be calculated by using the following formula:
Total number of connections = spark.default.parallelism × Value of the WRITE_THREAD_SIZE parameter
.DYNAMIC_PARTITION false No Specifies whether to automatically create a partition if the data is written to a parent table that has no partitions. Valid values: true and false. true: automatically creates a partition if the data is written to a parent table that has no partitions. RETRY_COUNT 3 No The maximum number of retries allowed to write and query data if a connection failure occurs. RETRY_SLEEP_INIT_MS 1000 ms No The intervals at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter. RETRY_SLEEP_STEP_MS 10*1000 ms No The amount of time required for a retry. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the RETRY_SLEEP_INIT_MS parameter + Value of the RETRY_COUNT parameter × Value of the RETRY_SLEEP_STEP_MS parameter. CONNECTION_MAX_IDLE_MS 60000 ms No The idle timeout period that applies to the connections used to read and write data. If a connection does not send or receive data by the time the idle timeout period ends, Holo Client automatically releases the connection.
-
Example of using the Spark connector to write data to Hologres
The following example shows how to use the Spark connector to write data to Hologres.
- Create a table in Hologres. Execute the following SQL statement in Hologres to create a Hologres table to which you want to write data:
CREATE TABLE tb008 ( id BIGINT primary key, counts INT, name TEXT, price NUMERIC(38, 18), out_of_stock BOOL, weight DOUBLE PRECISION, thick FLOAT, time TIMESTAMPTZ, dt DATE, by bytea, inta int4[], longa int8[], floata float4[], doublea float8[], boola boolean[], stringa text[] );
- Use the Spark connector to prepare data and write the data to Hologres.
- Query the written data. You can query the destination Hologres table in the Hologres console to check the written data. The following figure shows an example.
Use the Spark connector to read data from and write the data to Hologres
- Read data from the data source. You can use the Spark connector to read data from different types of sources. The following examples show how to read data from Hologres or another type of source:
- Read data from Hologres Hologres is compatible with PostgreSQL. You can use the Spark connector to read Hologres data based on the PostgreSQL JDBC driver. The following example provides the sample code.Note Before you read Hologres data, you must download the Postgresql JDBC Jar package and run the
./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar
command in the Spark shell to load the PostgreSQL JDBC JAR package. You can also load the PostgreSQL JDBC JAR package together with the JAR package of the Spark connector.// Read from some table, for example: tb008 val readDf = spark.read .format("jdbc") // Read Hologres data based on the PostgreSQL JDBC driver. .option("driver","org.postgresql.Driver") .option("url", "jdbc:postgresql://Ip:Por/test_database") .option("dbtable", "tb008") .option("user", "your_username") .option("password", "your_password") .load()
- Read data from another type of source such as a Parquet file You can use the Spark connector to read data from other types of sources, such as a Parquet file or a Hive table. The following example provides the sample code:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) // Read from some table, for example: phone val readDf = hiveContext.sql("select * from hive_database.phone")
- Read data from Hologres
- Write the data to Hologres.
import com.alibaba.hologres.spark2.sink.SourceProvider -- Write to hologres table df.write .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.ENDPOINT, "Ip:Port") .option(SourceProvider.DATABASE, "test_database") .option(SourceProvider.TABLE, table) .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- The maximum number of requests allowed in a batch. .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- The DDL statement of the DataFrame. This parameter is required only for Spark 3.x. .mode(SaveMode.Append) // The save mode. This parameter is required only for Spark 3.x. .save()
Use the Spark connector to write data to Hologres in real time
- Execute the following statement to create a Hologres table to which you want to write data:
CREATE TABLE test_table_stream ( value text, count bigint );
- Read data from your on-premises machine. Collect word frequency statistics and write them to Hologres. The following sample code provides an example:
val spark = SparkSession .builder .appName("StreamToHologres") .master("local[*]") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() -- Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) -- Generate running word count val wordCounts = words.groupBy("value").count() wordCounts.writeStream .outputMode(OutputMode.Complete()) .format("hologres") .option(SourceProvider.USERNAME, "your_username") .option(SourceProvider.PASSWORD, "your_password") .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname") .option(SourceProvider.TABLE, "test_table_stream") .option("batchsize", 1) .option("isolationLevel", "NONE") .option("checkpointLocation", checkpointLocation) .start() .awaitTermination()
Data type mappings
The following table describes data type mappings between Spark and Hologres.
Spark data type | Hologres data type |
---|---|
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA |
ArrayType(IntegerType) | int4[] |
ArrayType(LongType) | int8[] |
ArrayType(FloatType | float4[] |
ArrayType(DoubleType) | float8[] |
ArrayType(BooleanType) | boolean[] |
ArrayType(StringType) | text[] |