All Products
Search
Document Center

ApsaraDB for SelectDB:Import data by using Spark

Last Updated:Aug 15, 2024

ApsaraDB for SelectDB is compatible with Apache Doris. You can use Spark Doris Connector to import a large amount of data by using the distributed computing capabilities of Spark. This topic describes how Spark Doris Connector works and how to use Spark Doris Connector to import data to ApsaraDB for SelectDB.

Overview

You can use Spark Doris Connector as one of the ways to import a large amount of data to ApsaraDB for SelectDB. Based on the distributed computing capabilities of Spark, you can read a large amount of data from upstream data sources, such as MySQL, PostgreSQL, Hadoop Distributed File System (HDFS), and Amazon Simple Storage Service (Amazon S3), into DataFrame, and then import the data to tables in ApsaraDB for SelectDB by using Spark Doris Connector. You can also read data from tables in ApsaraDB for SelectDB by using Spark Java Database Connectivity (JDBC).

How it works

The following figure shows how Spark Doris Connector is used to import data to ApsaraDB for SelectDB. Within the architecture shown in the figure, Spark Doris Connector serves as a bridge to write external data to ApsaraDB for SelectDB, and preprocesses the data by using a distributed compute cluster. This accelerates the data flow of the entire data link, and therefore replaces the traditional low-performance data import by using JDBC.

image

Prerequisites

The version of Spark Doris Connector is 1.3.1 or later.

Install Spark Doris Connector dependency

You can use one of the following methods to install the dependency of Spark Doris Connector:

  • Install the Maven dependency of Spark Doris Connector. The following sample code shows an example. To obtain dependencies for other versions, visit Maven Repository.

    <dependency>
        <groupId>org.apache.doris</groupId>
        <artifactId>spark-doris-connector-3.2_2.12</artifactId>
        <version>1.3.2</version>
    </dependency>
  • Download the JAR package of Spark Doris Connector.

    The following table lists the three common packages for Spark Doris Connector. Download a JAR package based on your Spark version. To obtain dependencies for other versions, visit Maven Repository.

    Note
    • The following JAR packages are compiled by using Java 8. If you want to use other Java versions, contact technical support for ApsaraDB for SelectDB.

    • The version of a JAR package indicates the corresponding supported Spark version, Scala version, and Spark Doris Connector version from left to right.

    Version

    JAR package

    2.4-2.12-1.3.2

    spark-doris-connector-2.4_2.12-1.3.2

    3.1-2.12-1.3.2

    spark-doris-connector-3.1_2.12-1.3.2

    3.2-2.12-1.3.2

    spark-doris-connector-3.2_2.12-1.3.2

    After you download the JAR package, use one of the following methods to run Spark:

    • For a Spark cluster that is running in local cluster mode, place the JAR package of Spark Doris Connector in the jars directory of the Spark installation directory.

    • For a Spark cluster that is running in YARN cluster mode, upload the JAR package of Spark Doris Connector as a pre-deployment package. Examples:

      1. Upload the spark-doris-connector-3.2_2.12-1.3.2.jar package to the HDFS of the Spark cluster.

        hdfs dfs -mkdir /spark-jars/ 
        hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/
      2. Add the dependency on the spark-doris-connector-3.2_2.12-1.3.2.jar package to the cluster.

        spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar

Procedure

After you run Spark on the Spark client or import the package of Spark Doris Connector to the Spark development environment, you can synchronize data by using Spark SQL or DataFrame. The following examples show how to import upstream data to an ApsaraDB for SelectDB instance.

Use Spark SQL

val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
  
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;

Parameters

Parameter

Default value

Required

Description

fenodes

None

Yes

The endpoint that is used to connect to the ApsaraDB for SelectDB instance over HTTP.

To obtain the Virtual Private Cloud (VPC) endpoint or public endpoint and HTTP port of an ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information tab, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter.

Example: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

None

Yes

The name of the table in the ApsaraDB for SelectDB instance. Format: Database name.Table name. Example: test_db.test_table.

request.retries

3

No

The maximum number of retries that are allowed to send requests to the ApsaraDB for SelectDB instance.

request.connect.timeout.ms

30000

No

The connection timeout period for requests to the ApsaraDB for SelectDB instance.

request.read.timeout.ms

30000

No

The read timeout period for requests to the ApsaraDB for SelectDB instance.

request.query.timeout.s

3600

No

The query timeout period for the ApsaraDB for SelectDB instance. The default value indicates an hour. A value of -1 indicates an unlimited timeout period.

request.tablet.size

Integer.MAX_VALUE

No

The number of ApsaraDB for SelectDB tablets corresponding to a Resilient Distributed Dataset (RDD) partition.

The smaller the value is set, the more partitions are generated. This improves the parallelism on the Spark side, but also puts more pressure on ApsaraDB for SelectDB.

read.field

None

No

The columns in the ApsaraDB for SelectDB table whose data you want to read. Separate multiple columns with commas (,).

batch.size

1024

No

The maximum number of rows that can be read from the backend at a time. If more rows are read from the backend at a time, the number of connections established between Spark and ApsaraDB for SelectDB is reduced. This reduces the additional time overheads caused by network latency.

exec.mem.limit

2147483648

No

The memory threshold for a single query. The default value indicates 2 GB. Unit: bytes.

deserialize.arrow.async

false

No

Specifies whether to asynchronously deserialize Arrow data to RowBatch that is required for the iteration of Spark Doris Connector.

deserialize.queue.size

64

No

The size of an internal processing queue to asynchronously deserialize Arrow data. This parameter takes effect only when the deserialize.arrow.async parameter is set to true.

write.fields

None

No

The fields that you want to write to the ApsaraDB for SelectDB table or the order in which the fields are written. Separate multiple fields with commas (,). By default, all fields are written to the ApsaraDB for SelectDB table based on the field order in the ApsaraDB for SelectDB table.

sink.batch.size

100000

No

The maximum number of rows that can be written to the backend at a time.

sink.max-retries

0

No

The maximum number of retries after data fails to be written to the backend.

sink.properties.format

csv

No

The data format supported by Stream Load. Valid values: csv, json, and arrow.

sink.properties.*

--

No

The parameters used to submit a Stream Load job. For example, you can specify 'sink.properties.column_separator' = ',' to specify the column delimiter. For more information about the parameters, see Import data by using Stream Load.

sink.task.partition.size

None

No

The number of partitions to which data is written in the ApsaraDB for SelectDB instance. If you perform operations such as filtering on an RDD in Spark, the number of partitions to which data is written may increase, but each partition contains only a small number of records. This increases the frequency of write operations and wastes computing resources.

A smaller value indicates a lower frequency to write data to the ApsaraDB for SelectDB instance. This reduces the data merging pressure for the ApsaraDB for SelectDB instance. You must use this parameter with the sink.task.use.repartition parameter.

sink.task.use.repartition

false

No

Specifies whether to repartition data to a specific number of partitions before the data is written to the ApsaraDB for SelectDB instance. Default value: false, which indicates that coalesce is used. If no action is called before a write operation, the computing parallelism may be suboptimal.

If this parameter is set to true, repartitioning is enabled. You can specify the number of partitions to which the data is repartitioned. This increases shuffle overheads.

sink.batch.interval.ms

50

No

The interval at which data is written to the sink. Unit: milliseconds.

sink.enable-2pc

false

No

Specifies whether a two-phase commit (2PC) protocol is used to write data. If you set this parameter to true, transactions are committed after the Spark job is complete. If some tasks fail, all pre-committed transactions are rolled back.

sink.auto-redirect

true

No

Specifies whether to redirect Stream Load requests. If you set this parameter to true, Stream Load requests are loaded by using the frontend. This way, you do not need to explicitly obtain the backend information.

user

None

Yes

The username that is used to connect to the ApsaraDB for SelectDB instance.

password

None

Yes

The password that is used to connect to the ApsaraDB for SelectDB instance.

filter.query.in.max.count

100

No

The maximum number of values that can be included in an IN clause when Spark performs predicate pushdown. If the number of values in an IN clause exceeds the specified threshold, the filter operation is handled by Spark.

ignore-type

None

No

The field types that you want to ignore when you read the schema for a temporary view.

Example: 'ignore-type'='bitmap,hll'

Use DataFrame

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "Pending Payment"),
  ("2", 200, null),
  ("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

Parameters

Parameter

Default value

Required

Description

fenodes

None

Yes

The endpoint that is used to connect to the ApsaraDB for SelectDB instance over HTTP.

To obtain the VPC endpoint or public endpoint and HTTP port of an ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information tab, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter.

Example: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

None

Yes

The name of the table in the ApsaraDB for SelectDB instance. Format: Database name.Table name. Example: test_db.test_table.

request.retries

3

No

The maximum number of retries that are allowed to send requests to the ApsaraDB for SelectDB instance.

request.connect.timeout.ms

30000

No

The connection timeout period for requests to the ApsaraDB for SelectDB instance.

request.read.timeout.ms

30000

No

The read timeout period for requests to the ApsaraDB for SelectDB instance.

request.query.timeout.s

3600

No

The query timeout period for the ApsaraDB for SelectDB instance. The default value indicates an hour. A value of -1 indicates an unlimited timeout period.

request.tablet.size

Integer.MAX_VALUE

No

The number of ApsaraDB for SelectDB tablets corresponding to an RDD partition.

The smaller the value is set, the more partitions are generated. This improves the parallelism on the Spark side, but also puts more pressure on ApsaraDB for SelectDB.

read.field

None

No

The columns in the ApsaraDB for SelectDB table whose data you want to read. Separate multiple columns with commas (,).

batch.size

1024

No

The maximum number of rows that can be read from the backend at a time. If more rows are read from the backend at a time, the number of connections established between Spark and ApsaraDB for SelectDB is reduced. This reduces the additional time overheads caused by network latency.

exec.mem.limit

2147483648

No

The memory threshold for a single query. The default value indicates 2 GB. Unit: bytes.

deserialize.arrow.async

false

No

Specifies whether to asynchronously deserialize Arrow data to RowBatch that is required for the iteration of Spark Doris Connector.

deserialize.queue.size

64

No

The size of an internal processing queue to asynchronously deserialize Arrow data. This parameter takes effect only when the deserialize.arrow.async parameter is set to true.

write.fields

None

No

The fields that you want to write to the ApsaraDB for SelectDB table or the order in which the fields are written. Separate multiple fields with commas (,). By default, all fields are written to the ApsaraDB for SelectDB table based on the field order in the ApsaraDB for SelectDB table.

sink.batch.size

100000

No

The maximum number of rows that can be written to the backend at a time.

sink.max-retries

0

No

The maximum number of retries after data fails to be written to the backend.

sink.properties.format

csv

No

The data format supported by Stream Load. Valid values: csv, json, and arrow.

sink.properties.*

--

No

The parameters used to submit a Stream Load job. For example, you can specify 'sink.properties.column_separator' = ',' to specify the column delimiter. For more parameters, see Import data by using Stream Load

sink.task.partition.size

None

No

The number of partitions to which data is written in the ApsaraDB for SelectDB instance. If you perform operations such as filtering on an RDD in Spark, the number of partitions to which data is written may increase, but each partition contains only a small number of records. This increases the frequency of write operations and wastes computing resources.

A smaller value indicates a lower frequency to write data to the ApsaraDB for SelectDB instance. This reduces the data merging pressure for the ApsaraDB for SelectDB instance. You must use this parameter with the sink.task.use.repartition parameter.

sink.task.use.repartition

false

No

Specifies whether to repartition data to a specific number of partitions before the data is written to the ApsaraDB for SelectDB instance. Default value: false, which indicates that coalesce is used. If no action is called before a write operation, the computing parallelism may be suboptimal.

If this parameter is set to true, repartitioning is enabled. You can specify the number of partitions to which the data is repartitioned. This increases shuffle overheads.

sink.batch.interval.ms

50

No

The interval at which data is written to the sink. Unit: milliseconds.

sink.enable-2pc

false

No

Specifies whether a 2PC protocol is used to write data. If you set this parameter to true, transactions are committed after the Spark job is complete. If some tasks fail, all pre-committed transactions are rolled back.

sink.auto-redirect

true

No

Specifies whether to redirect Stream Load requests. If you set this parameter to true, Stream Load requests are loaded by using the frontend. This way, you do not need to explicitly obtain the backend information.

user

None

Yes

The username that is used to connect to the ApsaraDB for SelectDB instance.

password

None

Yes

The password that is used to connect to the ApsaraDB for SelectDB instance.

filter.query.in.max.count

100

No

The maximum number of values that can be included in an IN clause when Spark performs predicate pushdown. If the number of values in an IN clause exceeds the specified threshold, the filter operation is handled by Spark.

ignore-type

None

No

The field types that you want to ignore when you read the schema for a temporary view.

Example: 'ignore-type'='bitmap,hll'

sink.streaming.passthrough

false

No

Writes the values in the first column without processing.

Example

The following table lists the versions of all software in the sample environment.

Software

Java

Spark

Scala

SelectDB

Version

1.8

3.1.2

2.12

3.0.4

Prepare the environment

  • Configure the Spark environment.

    1. Download and decompress the Spark installation package. In this example, the Spark installation package spark-3.1.2-bin-hadoop3.2.tgz is used.

      wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
      tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
    2. Place the spark-doris-connector-3.2_2.12-1.3.2.jar package in the SPARK_HOME/jars directory.

  • Construct the data to be imported. In this example, a small amount of data is imported from a MySQL database.

    1. Create a test table in the MySQL database.

      CREATE TABLE `employees` (
        `emp_no` int NOT NULL,
        `birth_date` date NOT NULL,
        `first_name` varchar(14) NOT NULL,
        `last_name` varchar(16) NOT NULL,
        `gender` enum('M','F') NOT NULL,
        `hire_date` date NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. Use Data Management (DMS) to generate test data. For more information, see Generate test data.

  • Configure an ApsaraDB for SelectDB instance.

    1. Create an ApsaraDB for SelectDB instance. For more information, see Create an instance.

    2. Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an ApsaraDB for SelectDB instance.

    3. Create a test database and a test table.

      1. Create a test database.

        CREATE DATABASE test_db;
      2. Create a test table.

        USE test_db;
        CREATE TABLE employees (
            emp_no       int NOT NULL,
            birth_date   date,
            first_name   varchar(20),
            last_name    varchar(20),
            gender       char(2),
            hire_date    date
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
    4. Apply for a public endpoint for the ApsaraDB for SelectDB instance. For more information, see Apply for or release a public endpoint.

    5. Add the public IP address of the Spark environment to the IP address whitelist of the ApsaraDB for SelectDB instance. For more information, see Configure an IP address whitelist.

Import data from a MySQL database to an ApsaraDB for SelectDB instance

Use Spark SQL

The following example shows how to use Spark SQL to import data from an upstream MySQL database to an ApsaraDB for SelectDB instance.

  1. Start the spark-sql service.

    bin/spark-sql
  2. Submit a task in spark-sql.

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. After the Spark task is complete, log on to the ApsaraDB for SelectDB console to view the data that is imported by using Spark.

Use DataFrame

The following example shows how to use DataFrame to import data from an upstream MySQL database to an ApsaraDB for SelectDB instance.

  1. Start the spark-shell service.

    bin/spark-shell
  2. Submit a task in spark-shell.

    val mysqlDF = spark.read.format("jdbc")
    	.option("url", "jdbc:mysql://host:port/test_db")
    	.option("dbtable", "employees")
    	.option("driver", "com.mysql.jdbc.Driver")
    	.option("user", "admin")
    	.option("password", "****")
    	.load()
    
    mysqlDF.write.format("doris")
    	.option("fenodes", "host:httpPort")
    	.option("table.identifier", "test_db.employees")
    	.option("user", "admin")
    	.option("password", "****")
    	.option("sink.batch.size", 100000)
    	.option("sink.max-retries", 3)
    	.option("sink.properties.format", "json")
    	.save()
  3. After the Spark task is complete, log on to the ApsaraDB for SelectDB console to view the data that is imported by using Spark.