EMR Serverless Spark uses the official Spark Connector from Apache Doris to connect to Doris. This topic describes how to configure EMR Serverless Spark to read data from and write data to Doris.
Background information
Apache Doris is a high-performance, real-time AnalyticDB. It is well-suited for scenarios such as report analysis, ad hoc queries, and accelerated federated queries over data lakes. For more information, see Introduction to Apache Doris.
EMR Serverless Spark is a high-performance lakehouse product that is compatible with open source Spark and provides a fully managed, enterprise-grade data platform service. You can combine Apache Doris with EMR Serverless Spark to efficiently read, write, and analyze data, which creates an end-to-end data processing flow.
Prerequisites
A Serverless Spark workspace is created. For more information, see Create a workspace.
A Doris cluster is created.
If you create a data analytics (Online Analytical Processing (OLAP)) cluster that includes the Doris service on EMR on ECS, see Create a cluster. This topic uses a Doris cluster created on EMR on ECS as an example, which is referred to as an EMR Doris cluster.
Limitations
The Serverless Spark engine must be esr-2.6.0, esr-3.2.0, esr-4.2.0, or a later version.
Procedure
Step 1: Obtain the Doris Spark Connector JAR file and upload it to OSS
Refer to the official Spark Doris Connector documentation to check the compatibility between connector versions and Spark engine versions. You must ensure that your Spark version is compatible with the Doris Spark Connector version that you use.
Go to the Doris Spark Connector GitHub repository and download a compatible version.
The Doris Spark Connector JAR file is named in the format
spark-doris-connector-spark-${spark_version}-${connector_version}.jar. For example, if you use engine version esr-3.1.0 (Spark 3.4.3, Scala 2.12), downloadspark-doris-connector-spark-3.4-24.0.0.jar.Upload the downloaded Spark Connector JAR file to Alibaba Cloud OSS. For more information, see Simple upload.
Step 2: Create a network connection
Serverless Spark requires network connectivity to the EMR Doris cluster to access the Doris service. For more information about network connectivity, see Network connectivity between EMR Serverless Spark and other VPCs.
When you add security group rules, open only the necessary ports in the Port Range field based on your actual requirements. The valid port range is 1 to 65535. The example in this topic requires you to open the HTTP port (8031), Remote Procedure Call (RPC) port (9061), and Webserver port (8041).
Step 3: Create a database and a table in the EMR Doris cluster
Log on to the cluster using the Secure Shell (SSH) protocol. For more information, see Log on to a cluster.
Run the following command to connect to the EMR Doris cluster.
mysql -h127.0.0.1 -P 9031 -urootCreate a database and a table.
CREATE DATABASE IF NOT EXISTS testdb; USE testdb; CREATE TABLE test ( id INT, name STRING ) PROPERTIES("replication_num" = "1");Insert test data.
INSERT INTO test VALUES (1, 'a'), (2, 'b'), (3, 'c');Query the data.
SELECT * FROM test;The following figure shows the returned information.

Step 4: Read the Doris table from Serverless Spark
Read a Doris table using an SQL session
Create an SQL session. For more information, see Manage SQL sessions.
When you create the session, from the Engine Version drop-down list, select an engine version that is compatible with your Doris Spark Connector. For Network Connection, select the network connection that you created in Step 2. In the Spark Configuration section, add the following parameter to load the Doris Spark Connector.
spark.emr.serverless.user.defined.jars oss://<bucketname>/path/connector.jarIn the code,
oss://<bucketname>/path/connector.jaris the path of the Doris Spark Connector that you uploaded to OSS in Step 1. For example:oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar.On the Data Development page, create a SparkSQL job, and then select the created SQL session in the upper-right corner.
For more information, see SparkSQL development.
Copy the following code to the new SparkSQL tab, modify the parameters as needed, and then click Run.
CREATE TEMPORARY VIEW test USING doris OPTIONS( "table.identifier" = "testdb.test", "fenodes" = "<doris_address>:<http_port>", "user" = "<user>", "password" = "<password>" ); SELECT * FROM test;The following table describes the parameters.
Parameter
Description
Example
testdb.testThe actual name of the database and table in the Doris service.
If you use a different Doris cluster, specify the configurations as needed.
If you use a cluster created on EMR on ECS, set the parameters to the following values:
testdb.test: This topic usestestdb.testas an example.<doris_address>: On the Node Management page of the Doris cluster in the EMR on ECS console, click the icon before emr-master to view the internal IP address.<http_port>: The default value is 8031.<user>: The default username isroot.<password>: The default password is empty.
<doris_address>The internal IP address of the node where the Doris service is located.
<http_port>The port that the Doris service uses to listen for HTTP requests.
<user>The username used to connect to the Doris service.
<password>The password for the username used to connect to the Doris service.
If data is returned as expected, the configuration is correct.

Read a Doris table using a Notebook session
Create a Notebook session. For more information, see Manage Notebook sessions.
When you create a session, select an engine version that is compatible with your Doris Spark Connector from the Engine Version drop-down list. For Network Connection, select the network connection that you created in Step 2. In the Spark Configuration section, add the following parameter to load the Doris Spark Connector.
spark.emr.serverless.user.defined.jars oss://<bucketname>/path/connector.jarIn the code,
oss://<bucketname>/path/connector.jaris the OSS path to the Doris Spark Connector that you uploaded in Step 1. For example:oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar.On the Data Development page, create an job, and then select the created Notebook session in the upper-right corner.
For more information, see Manage Notebook sessions.
Copy the following code to the new Notebook tab, modify the parameters as needed, and then click Run.
dorisSparkDF = spark.read.format("doris") \ .option("doris.table.identifier", "testdb.test") \ .option("doris.fenodes", "<doris_address>:<http_port>") \ .option("user", "<user>") \ .option("password", "<password>") \ .load() dorisSparkDF.show(3)The following table describes the parameters.
Parameter
Description
Example
testdb.testThe actual name of the database and table in the Doris service.
If you use a different Doris cluster, specify the configurations as needed.
If you use a cluster created on EMR on ECS, set the parameters to the following values:
testdb.test: This topic usestestdb.testas an example.<doris_address>: On the Node Management page of the Doris cluster in the EMR on ECS console, click the icon before emr-master to view the internal IP address.<http_port>: The default value is 8031.<user>: The default username isroot.<password>: The default password is empty.
<doris_address>The internal IP address of the node where the Doris service is located.
<http_port>The port that the Doris service uses to listen for HTTP requests.
<user>The username used to connect to the Doris service.
<password>The password for the username used to connect to the Doris service.
If data is returned as expected, the configuration is correct.

Step 5: Write data to the Doris table from Serverless Spark
Write to a Doris table using an SQL session
Copy the following code to the SparkSQL tab that you created in the previous step, and then click Run.
CREATE TEMPORARY VIEW test_write
USING doris
OPTIONS(
"table.identifier" = "testdb.test",
"fenodes" = "<doris_address>:<http_port>",
"user" = "<user>",
"password" = "<password>"
);
INSERT INTO test_write VALUES (4, 'd'), (5, 'e');
SELECT * FROM test_write;If the following data is returned, the data is written successfully.

Write to a Doris table using a Notebook session
Copy the following code to the Notebook tab that you created in the previous step, and then click Run.
data = [(7, 'f'), (8, 'g')]
mockDataDF = spark.createDataFrame(data, ["id", "name"])
mockDataDF.write.mode("append").format("doris") \
.option("doris.table.identifier", "testdb.test") \
.option("doris.fenodes", "<doris_address>:<http_port>") \
.option("user", "<user>") \
.option("password", "<password>") \
.save()
dorisSparkDF = spark.read.format("doris") \
.option("doris.table.identifier", "testdb.test") \
.option("doris.fenodes", "<doris_address>:<http_port>") \
.option("user", "<user>") \
.option("password", "<password>") \
.load()
dorisSparkDF.show(10)If the following data is returned, the data is written successfully.
