All Products
Search
Document Center

E-MapReduce:Read data from and write data to Doris

Last Updated:Mar 26, 2026

EMR Serverless Spark uses the official Spark Doris Connector from Apache Doris to read from and write to Doris. This topic walks you through the full setup: downloading the connector JAR, establishing network connectivity, and running read and write operations from both an SQL session and a Notebook session.

Background information

Apache Doris is a high-performance, real-time AnalyticDB. It is well-suited for scenarios such as report analysis, ad hoc queries, and accelerated federated queries over data lakes. For more information, see Introduction to Apache Doris.

EMR Serverless Spark is a high-performance lakehouse product that is compatible with open source Spark and provides a fully managed, enterprise-grade data platform service. You can combine Apache Doris with EMR Serverless Spark to efficiently read, write, and analyze data, which creates an end-to-end data processing flow.

Prerequisites

Before you begin, ensure that you have:

  • A Serverless Spark workspace. See Create a workspace

  • A Doris cluster. This topic uses a Doris cluster created on EMR on ECS as an example. To create one, see Create a cluster

Limitations

The Serverless Spark engine must be esr-2.6.0, esr-3.2.0, esr-4.2.0, or later.

Step 1: Download the Spark Doris Connector JAR and upload it to OSS

The Spark Doris Connector version must be compatible with your Spark engine version. Check the compatibility matrix in the official Spark Doris Connector documentation before downloading.

  1. Go to the doris-spark-connector releases page on GitHub and download a compatible version. JAR files follow the naming format spark-doris-connector-spark-${spark_version}-${connector_version}.jar. For example, for engine version esr-3.1.0 (Spark 3.4.3, Scala 2.12), download spark-doris-connector-spark-3.4-24.0.0.jar.

  2. Upload the JAR file to an Object Storage Service (OSS) bucket. See Simple upload.

Step 2: Set up network connectivity

Serverless Spark must be able to reach your Doris cluster over the network. Follow the instructions in Network connectivity between EMR Serverless Spark and other VPCs to configure connectivity.

Important

When adding security group rules, open only the ports your setup requires. The valid port range is 1 to 65535. This topic requires the following ports to be open:

PortType
8031HTTP port
9061Remote Procedure Call (RPC) port
8041Webserver port

Step 3: Create a database and table in your Doris cluster

  1. Log on to the cluster using the Secure Shell (SSH) protocol. See Log on to a cluster.

  2. Connect to the Doris cluster.

    mysql -h127.0.0.1 -P 9031 -uroot
  3. Create a database and a table.

    CREATE DATABASE IF NOT EXISTS testdb;
    
    USE testdb;
    
    CREATE TABLE test (
        id INT,
        name STRING
    ) PROPERTIES("replication_num" = "1");
  4. Insert test data.

    INSERT INTO test VALUES (1, 'a'), (2, 'b'), (3, 'c');
  5. Verify the data was inserted.

    SELECT * FROM test;

    image

Step 4: Read data from Doris

Both methods require the same Spark configuration to load the connector. Add the following parameter when creating your session:

spark.emr.serverless.user.defined.jars  oss://<bucketname>/path/connector.jar

Replace oss://<bucketname>/path/connector.jar with the actual OSS path from Step 1, for example: oss://emr-oss/spark/spark-doris-connector-spark-3.4-24.0.0.jar.

Read with an SQL session

  1. Create an SQL session. See Manage SQL sessions. When creating the session, select an engine version compatible with your Spark Doris Connector from the Engine Version drop-down list. For Network Connection, select the network connection from Step 2. In the Spark Configuration section, add the connector parameter above.

  2. On the Data Development page, create a SparkSQL job and select the session you created in the upper-right corner. See SparkSQL development.

  3. Run the following SQL to create a temporary view backed by your Doris table and query it.

    ParameterDescriptionDefault (EMR on ECS cluster)
    testdb.testThe database and table name in Doris.testdb.test (used in this topic)
    <doris_address>The internal IP address of the node where the Doris service is located.On the Node Management page of the Doris cluster in the EMR on ECS console, click the icon before emr-master to view the internal IP address.
    <http_port>The HTTP port of the Doris FE node.8031
    <user>The username for connecting to Doris.root
    <password>The password for the username.(blank)
    CREATE TEMPORARY VIEW test
    USING doris
    OPTIONS(
      "table.identifier" = "testdb.test",
      "fenodes" = "<doris_address>:<http_port>",
      "user" = "<user>",
      "password" = "<password>"
    );
    
    SELECT * FROM test;

    If data is returned as expected, the configuration is correct.

    image

Read with a Notebook session

  1. Create a Notebook session. See Manage Notebook sessions. When creating the session, select a compatible engine version from Engine Version, select the network connection from Step 2 for Network Connection, and add the connector parameter in the Spark Configuration section.

  2. On the Data Development page, create an Interactive Development > Notebook job and select the Notebook session in the upper-right corner.

  3. Run the following Python code to read the Doris table into a DataFrame.

    dorisSparkDF = spark.read.format("doris") \
      .option("doris.table.identifier", "testdb.test") \
      .option("doris.fenodes", "<doris_address>:<http_port>") \
      .option("user", "<user>") \
      .option("password", "<password>") \
      .load()
    
    dorisSparkDF.show(3)

    For parameter descriptions, see the table in the SQL session section above. If data is returned as expected, the configuration is correct.

    image

Step 5: Write data to Doris

The write operations use the same connection parameters as the read operations: testdb.test, <doris_address>, <http_port>, <user>, and <password>. See the parameter table in Step 4 for descriptions and default values.

Write with an SQL session

In the SparkSQL tab from Step 4, run the following code to create a writable temporary view and insert rows.

CREATE TEMPORARY VIEW test_write
USING doris
OPTIONS(
  "table.identifier" = "testdb.test",
  "fenodes" = "<doris_address>:<http_port>",
  "user" = "<user>",
  "password" = "<password>"
);

INSERT INTO test_write VALUES (4, 'd'), (5, 'e');
SELECT * FROM test_write;

If the following data is returned, the write succeeded.

image

Write with a Notebook session

In the Notebook tab from Step 4, run the following code to append rows to the Doris table, then read the table back to verify.

data = [(7, 'f'), (8, 'g')]
mockDataDF = spark.createDataFrame(data, ["id", "name"])
mockDataDF.write.mode("append").format("doris") \
  .option("doris.table.identifier", "testdb.test") \
  .option("doris.fenodes", "<doris_address>:<http_port>") \
  .option("user", "<user>") \
  .option("password", "<password>") \
  .save()

dorisSparkDF = spark.read.format("doris") \
  .option("doris.table.identifier", "testdb.test") \
  .option("doris.fenodes", "<doris_address>:<http_port>") \
  .option("user", "<user>") \
  .option("password", "<password>") \
  .load()

dorisSparkDF.show(10)

If the following data is returned, the write succeeded.

image

What's next