This topic describes how to build a real-time data lake by using Data Lake Analytics (DLA) and Data Transmission Service (DTS) to synchronize data from ApsaraDB RDS.

Background information

The real-time data lake solution is a next-generation data warehousing solution that features low costs and low latency. This solution supports a large number of analytics datasets on which create, read, update, and delete (CRUD) operations are performed.

Prerequisites

Procedure

  1. Create a Spark cluster.
    1. Log on to the DLA console and create a Spark cluster. For more information, see Create a virtual cluster.
    2. Configure the VPC where the Spark cluster, ApsaraDB RDS instance, and DTS are deployed. This step ensures that you can use the serverless Spark engine of DLA to consume data in the VPC. For more information, see Configure the network of data sources.
  2. Create a full synchronization job.
    1. In the left-side navigation pane, choose Serverless Spark > Submit job.
    2. On the Parameter Configuration page, click Create Job to create a Spark job. This job is used to synchronize all data of the ApsaraDB RDS instance. For more information, see Create and run Spark jobs.
    3. After the full synchronization job is created, select the job from the Job List navigation tree and compile code in the code editor to configure the job. Sample code:
      {
          "args": [
                  "oss://<OSS directory>/dla-lakehouse-batch-jdbc-config.properties"
          ],
        "file": "oss://<OSS directory>/datalake-batch-1.0.0-shaded.jar",
        "name": "dla-lakehouse-batch", // The name of the job, which can be customized.
        "className": "com.aliyun.dla.batch.jdbc.SparkHudiJdbc",
        "conf": {
          "spark.driver.resourceSpec": "large", // The resource specifications of the Spark driver, which can be set to small, medium, large, xlarge, or 2xlarge. small indicates 1 CU. medium indicates 2 CUs. large indicates 4 CUs. xlarge indicates 8 CUs. 2xlarge indicates 16 CUs. One CU equals 1 CPU core and 4 GB of memory.
          "spark.executor.resourceSpec": "medium",  // The resource specifications of the Spark executor, which can be set to small, medium, large, xlarge, or 2xlarge. small indicates 1 CU. medium indicates 2 CUs. large indicates 4 CUs. xlarge indicates 8 CUs. 2xlarge indicates 16 CUs. One CU equals 1 CPU core and 4 GB of memory.
          "spark.executor.instances": 5,  // The number of executors. The serverless Spark engine provides only one driver.
          "spark.dla.eni.enable": "true",  // Specifies whether to use an elastic network interface (ENI) to connect to the VPC.
          "spark.dla.eni.vswitch.id": "vsw-xxx",  // The ID of the vSwitch that is deployed in the VPC.
          "spark.dla.eni.security.group.id": "sg-xxx", // The ID of the security group that is associated with the VPC.
          "spark.dla.job.log.oss.uri": "oss://<OSS directory>/spark-logs/", // The Uniform Resource Identifier (URI) of the OSS directory where Spark logs are saved.
          "spark.sql.hive.metastore.version": "dla", // The version of the Hive metastore that is automatically generated for subsequent data analysis on DLA.
          "spark.dla.connectors": "oss"    // The service from which you read and write data. In this example, the service is OSS.
        }
      }
      Note To download the datalake-batch-1.0.0-shaded.jar file, you can click this URL.
      Sample data in the dla-lakehouse-batch-jdbc-config.properties template file:
      ### JDBC configurations for the source database
      ## The username that is used to log on to the source database.
      dla.datalake.batch.jdbc.username=<JDBC_USERNAME>
      ## The password that is used to log on to the source database.
      dla.datalake.batch.jdbc.password=<JDBC_PASSWORD>
      ## The JDBC URL of the source database.
      dla.datalake.batch.jdbc.url=<JDBC_URL>
      ## The names of source databases. Multiple database names are separated by commas (,).
      dla.datalake.batch.jdbc.db.name=DBNAME
      
      ## The OSS root directory to which data is synchronized. This directory is used to build a data lakehouse.
      dla.datalake.hoodie.target.base.path=oss://lakehouse/
      
      ## The JDBC synchronization mode. Set this parameter to database.
      dla.datalake.batch.jdbc.sync.mode=database
    4. After the Spark job is configured, click Execute to run the full synchronization job.
  3. Create an incremental synchronization job.
    After the full synchronization job succeeds, repeat Step 2 to create a Spark incremental synchronization job. Sample code for configuring the incremental synchronization job:
    {
        "args": [
                   "oss://<OSS directory>/dla-lakehouse-streaming-dts-config.properties"
        ],
        "file": "oss://<OSS directory>/datalake-streaming-1.0.0-shaded.jar",
        "name": "dla-lakehouse-streaming",  // The name of the job, which can be customized.
        "className": "com.aliyun.dla.streaming.dts.SparkHudiDts",
        "conf": {
              "spark.driver.resourceSpec": "large", // The resource specifications of the Spark driver, which can be set to small, medium, large, xlarge, or 2xlarge. small indicates 1 CU. medium indicates 2 CUs. large indicates 4 CUs. xlarge indicates 8 CUs. 2xlarge indicates 16 CUs. One CU equals 1 CPU core and 4 GB of memory.
                "spark.executor.resourceSpec": "medium",  // The resource specifications of the Spark executor, which can be set to small, medium, large, xlarge, or 2xlarge. small indicates 1 CU. medium indicates 2 CUs. large indicates 4 CUs. xlarge indicates 8 CUs. 2xlarge indicates 16 CUs. One CU equals 1 CPU core and 4 GB of memory.
                "spark.executor.instances": 20,  // The number of executors. The serverless Spark engine provides only one driver.
                "spark.dla.eni.enable": "true",  // Specifies whether to use an ENI to connect to the VPC.
                "spark.dla.eni.vswitch.id": "vsw-xxx",  // The ID of the vSwitch that is deployed in the VPC.
                "spark.dla.eni.security.group.id": "sg-xxx", // The ID of the security group that is associated with the VPC.
                "spark.dla.job.log.oss.uri": "oss://<OSS directory>/spark-logs/", // The URI of the OSS directory where Spark logs are saved.
                "spark.sql.hive.metastore.version": "dla", // The version of the Hive metastore that is automatically generated for subsequent data analysis on DLA.
            "spark.dla.connectors": "oss"    // The service from which you read and write data. In this example, the service is OSS.
        }
    }
    Note To download the datalake-streaming-1.0.0-shaded.jar file, you can click this URL.
    Sample data in the dla-lakehouse-streaming-dts-config.properties template file:
    ### dts config
    ## The username that is used to create a data subscription task for DTS.
    dla.datalake.streaming.dts.username=DTS_USERNAME
    ## The password that is used to create a data subscription task for DTS.
    dla.datalake.streaming.dts.password=DTS_PASSWORD
    ## The DTS consumer offset. You can set this parameter to latest, earliest, or a custom value. The default value is latest.
    dla.datalake.streaming.dts.offset=latest
    ## The start time of data consumption during an incremental synchronization job. The value is accurate to seconds. For example, 1608523200 indicates 2020/12/21 12:00:00.
    dla.datalake.streaming.dts.offset.by.timestamp=1608523200
    ## The ID of the consumer group. You can view this parameter setting in the DTS console.
    dla.datalake.streaming.dts.group.id=dtsxxxxx
    ## The endpoint of the DTS server. You can view this parameter setting in the DTS console.
    dla.datalake.streaming.dts.bootstrap.server=dts-xxx-vpc.aliyuncs.com:18003
    ## The subscription topic of DTS. You can view this topic in the DTS console.
    dla.datalake.streaming.dts.subscribe.topic=cn_hangzhou_xxx
    
    ## The storage location of checkpoints, which is used to ensure that no data is missing.
    dla.datalake.streaming.dts.checkpoint.location=oss://<OSS directory>/checkpoint/
    
    ## The OSS root directory to which data is synchronized. This directory must be the same as the directory to which full data is synchronized.
    dla.datalake.hoodie.target.base.path=oss://lakehouse/

Parameters in the dla-lakehouse-streaming-dts-config.properties file

You can specify optional parameters based on your business requirements. The required parameters must be specified in the dla-lakehouse-streaming-dts-config.properties file.

DTS parameters
Parameter Required Description
dla.datalake.streaming.dts.username Yes The username that is used to create a data subscription task for DTS. In the DTS console, you can click View Task Settings in the left-side navigation pane and view the parameter value on the page that appears.
dla.datalake.streaming.dts.password Yes The password that is used to create a data subscription task for DTS. In the DTS console, you can click View Task Settings in the left-side navigation pane and view the parameter value on the page that appears.
dla.datalake.streaming.dts.offset No The consumer offset. The default value is latest.
dla.datalake.streaming.dts.group.id Yes The ID of the consumer group. In the DTS console, you can click Consume Data in the left-side navigation pane and view the parameter value on the page that appears.
dla.datalake.streaming.dts.bootstrap.server Yes The endpoint of the DTS server. In the DTS console, you can click View Task Settings in the left-side navigation pane and view the parameter value on the page that appears.
dla.datalake.streaming.dts.max.offsets.per.trigger No The number of data records that are processed during a synchronization job. By default, 10,000 records can be processed.
dla.datalake.streaming.dts.subscribe.topic Yes The subscription topic. In the DTS console, you can click Consume Data in the left-side navigation pane and view the parameter value on the page that appears.
dla.datalake.streaming.dts.processing.time.interval No The interval at which data is synchronized. The default value is 3. Unit: seconds.
dla.datalake.streaming.dts.checkpoint.location No The storage location of checkpoints. The default location is /tmp/sparkstreaming/checkpoint/.
dla.datalake.streaming.dts.db.tables No The database tables from which you want to obtain data. The parameter value is in the format of db1:table1;db2:table2.
dla.datalake.streaming.dts.concurrent.table.write.enable No Specifies whether to enable concurrent data write operations on multiple tables. The default value is true.
dla.datalake.streaming.dts.concurrent.table.write.thread.pool.size No The size of the thread pool for concurrent data write operations on multiple tables. The default value is 10.
DLA parameters
Parameter Required Description
dla.datalake.meta.sync.enable No Specifies whether to enable automatic synchronization for DLA. The default value is true, which indicates that automatic synchronization is enabled.
dla.datalake.meta.username Required when dla.datalake.meta.sync.enable is set to true. The JDBC username that is used to synchronize data to DLA.
dla.datalake.meta.password Required when dla.datalake.meta.sync.enable is set to true. The password that is used to synchronize data to DLA.
dla.datalake.meta.jdbc.url Required when dla.datalake.meta.sync.enable is set to true. The JDBC URL that is used to synchronize data to DLA.
dla.datalake.meta.db.name No The name of the database to which data is synchronized. If this parameter is specified, all tables are synchronized to the database. If this parameter is not specified, the database name is resolved from dts/jdbc/dfs.
dla.datalake.meta.table.name No The name of the table that you want to synchronize to DLA. If dla.datalake.batch.jdbc.sync.mode is set to table and this parameter is specified, all data is synchronized to this table. Otherwise, the table name is automatically resolved.
Hudi parameters
Parameter Required Description
dla.datalake.hoodie.target.base.path No The OSS root directory to which DTS data is synchronized. The default directory is /tmp/dla-streaming-datalake/.
dla.datalake.hoodie.compact.inline No Specifies whether to enable inline compaction during data write operations. If this feature is enabled, copy-on-write (COW) becomes ineffective. The default value is true.
dla.datalake.hoodie.compact.inline.max.delta.commits No The maximum number of delta commits that are used to trigger compaction. The default value is 10.
dla.datalake.hoodie.table.type No The type of the Hudi table. The default value is MERGE_ON_READ.
dla.datalake.hoodie.insert.shuffle.parallelism No The concurrency of inserts. The default value is 3.
dla.datalake.hoodie.upsert.shuffle.parallelism No The concurrency of upserts. The default value is 3.
dla.datalake.hoodie.enable.timeline.server No Specifies whether to enable timeline. The default value is false, which indicates that timeline is disabled.
dla.datalake.hoodie.save.mode No Specifies how data is saved. The default value is Override if full data synchronization is performed.
dla.datalake.hoodie.table.name No The name of the Hudi table.
dla.datalake.hoodie.datasource.write.operation No The write type. The default value is bulk_insert when full data synchronization is performed.
dla.datalake.hoodie.bulkinsert.shuffle.parallelism No The concurrency when dla.datalake.hoodie.datasource.write.operation is set to bulk_insert during full data synchronization.
dla.datalake.hoodie.partition.field No Specifies whether to perform partitioning. By default, it is an empty string, which indicates that partitioning is not performed.
dla.datalake.hoodie.precombine.field No The precombine field. This parameter is required if dla.datalake.batch.jdbc.sync.mode is set to table during DFS data synchronization. In other cases, this parameter is optional.
dla.datalake.hoodie.datasource.write.recordkey.field No The primary key field. This parameter is required if dla.datalake.batch.jdbc.sync.mode is set to table during DFS data synchronization. In other cases, this parameter is optional.
dla.datalake.hoodie.key.generator.class No The key generation class. The default value is org.apache.hudi.keygen.ComplexKeyGenerator.
dla.datalake.hoodie.dla.sync.partition.fields No The partition fields that you want to synchronize to DLA. The default value is an empty string.
dla.datalake.hoodie..dla.sync.partition.extractor.class No The class that is used to extract partition field values that you want to synchronize to DLA. The default value is org.apache.hudi.hive.NonPartitionedExtractor.
Note The directory for saving the Hudi table is {dla.datalake.hoodie.target.base.path}/{Name of the database you extract}/{Name of the table you extract}.
Other system parameters
Parameter Required Description
dla.datalake.system.convert.all.types.to.string No Specifies whether to convert all data types to the STRING type. The default value is false.
dla.datalake.system.convert.decimal.to.string No Specifies whether to convert the DECIMAL type to the STRING type. The default value is true.
dla.datalake.system.convert.decimal.to.double No Specifies whether to convert the DECIMAL type to the DOUBLE type. The default value is false. If this parameter is set to true, set dla.datalake.system.convert.decimal.to.string to false.
dla.datalake.system.decimal.columns.definition No The definition of the DECIMAL type. The parameter value is in the format of Table name:Column name 1,precision,scale;Column name 2,precision,scale. Example: tableName1:c1,10,2;c2,5,2#tableName2:c,4,2.
dla.datalake.system.convert.int.to.long No Specifies whether to convert the INT type to the LONG type. The default value is true.
Note Make sure that you use the same data types for full data synchronization jobs and incremental synchronization jobs. Otherwise, an error is returned due to type mismatch.