This topic describes how to build a real-time data lake by using Data Lake Analytics (DLA) and Data Transmission Service (DTS) to synchronize data from ApsaraDB RDS.
Background information
The real-time data lake solution is a next-generation data warehousing solution that features low costs and low latency. This solution supports a large number of analytics datasets on which create, read, update, and delete (CRUD) operations are performed. The following figure shows the architecture of the real-time data lake solution.
Prerequisites
- The ApsaraDB RDS instance from which you want to synchronize data is deployed in the same region as DTS and DLA.
- A data subscription task is created for DTS. This task is used to subscribe to data flows in binary log files on the ApsaraDB RDS instance that is mapped to DLA. For more information, see Overview of change tracking scenarios.
- Data flows in binary log files on the ApsaraDB RDS instance are configured for the
DTS data subscription task that you created. For more information, see Track data changes from ApsaraDB RDS for MySQL (new) and Track data changes from an ApsaraDB RDS for MySQL instance (previous).
Note To ensure the security and performance of your business, we recommend that you select the virtual private cloud (VPC) as the network type when you configure a data subscription task.
Procedure
Parameters in the dla-lakehouse-streaming-dts-config.properties file
You can specify optional parameters based on your business requirements. The required parameters must be specified in the dla-lakehouse-streaming-dts-config.properties file.
Parameter | Required | Description |
---|---|---|
dla.datalake.streaming.dts.username | Yes | The username that is used to create a data subscription task for DTS. In the DTS console, you can click View Task Settings in the left-side navigation pane and view the parameter value on the page that appears. |
dla.datalake.streaming.dts.password | Yes | The password that is used to create a data subscription task for DTS. In the DTS console, you can click View Task Settings in the left-side navigation pane and view the parameter value on the page that appears. |
dla.datalake.streaming.dts.offset | No | The consumer offset. The default value is latest. |
dla.datalake.streaming.dts.group.id | Yes | The ID of the consumer group. In the DTS console, you can click Consume Data in the left-side navigation pane and view the parameter value on the page that appears. |
dla.datalake.streaming.dts.bootstrap.server | Yes | The endpoint of the DTS server. In the DTS console, you can click View Task Settings in the left-side navigation pane and view the parameter value on the page that appears. |
dla.datalake.streaming.dts.max.offsets.per.trigger | No | The number of data records that are processed during a synchronization job. By default, 10,000 records can be processed. |
dla.datalake.streaming.dts.subscribe.topic | Yes | The subscription topic. In the DTS console, you can click Consume Data in the left-side navigation pane and view the parameter value on the page that appears. |
dla.datalake.streaming.dts.processing.time.interval | No | The interval at which data is synchronized. The default value is 3. Unit: seconds. |
dla.datalake.streaming.dts.checkpoint.location | No | The storage location of checkpoints. The default location is /tmp/sparkstreaming/checkpoint/. |
dla.datalake.streaming.dts.db.tables | No | The database tables from which you want to obtain data. The parameter value is in the format of db1:table1;db2:table2. |
dla.datalake.streaming.dts.concurrent.table.write.enable | No | Specifies whether to enable concurrent data write operations on multiple tables. The default value is true. |
dla.datalake.streaming.dts.concurrent.table.write.thread.pool.size | No | The size of the thread pool for concurrent data write operations on multiple tables. The default value is 10. |
Parameter | Required | Description |
---|---|---|
dla.datalake.meta.sync.enable | No | Specifies whether to enable automatic synchronization for DLA. The default value is true, which indicates that automatic synchronization is enabled. |
dla.datalake.meta.username | Required when dla.datalake.meta.sync.enable is set to true. | The JDBC username that is used to synchronize data to DLA. |
dla.datalake.meta.password | Required when dla.datalake.meta.sync.enable is set to true. | The password that is used to synchronize data to DLA. |
dla.datalake.meta.jdbc.url | Required when dla.datalake.meta.sync.enable is set to true. | The JDBC URL that is used to synchronize data to DLA. |
dla.datalake.meta.db.name | No | The name of the database to which data is synchronized. If this parameter is specified, all tables are synchronized to the database. If this parameter is not specified, the database name is resolved from dts/jdbc/dfs. |
dla.datalake.meta.table.name | No | The name of the table that you want to synchronize to DLA. If dla.datalake.batch.jdbc.sync.mode is set to table and this parameter is specified, all data is synchronized to this table. Otherwise, the table name is automatically resolved. |
Parameter | Required | Description |
---|---|---|
dla.datalake.hoodie.target.base.path | No | The OSS root directory to which DTS data is synchronized. The default directory is /tmp/dla-streaming-datalake/. |
dla.datalake.hoodie.compact.inline | No | Specifies whether to enable inline compaction during data write operations. If this feature is enabled, copy-on-write (COW) becomes ineffective. The default value is true. |
dla.datalake.hoodie.compact.inline.max.delta.commits | No | The maximum number of delta commits that are used to trigger compaction. The default value is 10. |
dla.datalake.hoodie.table.type | No | The type of the Hudi table. The default value is MERGE_ON_READ. |
dla.datalake.hoodie.insert.shuffle.parallelism | No | The concurrency of inserts. The default value is 3. |
dla.datalake.hoodie.upsert.shuffle.parallelism | No | The concurrency of upserts. The default value is 3. |
dla.datalake.hoodie.enable.timeline.server | No | Specifies whether to enable timeline. The default value is false, which indicates that timeline is disabled. |
dla.datalake.hoodie.save.mode | No | Specifies how data is saved. The default value is Override if full data synchronization is performed. |
dla.datalake.hoodie.table.name | No | The name of the Hudi table. |
dla.datalake.hoodie.datasource.write.operation | No | The write type. The default value is bulk_insert when full data synchronization is performed. |
dla.datalake.hoodie.bulkinsert.shuffle.parallelism | No | The concurrency when dla.datalake.hoodie.datasource.write.operation is set to bulk_insert during full data synchronization. |
dla.datalake.hoodie.partition.field | No | Specifies whether to perform partitioning. By default, it is an empty string, which indicates that partitioning is not performed. |
dla.datalake.hoodie.precombine.field | No | The precombine field. This parameter is required if dla.datalake.batch.jdbc.sync.mode is set to table during DFS data synchronization. In other cases, this parameter is optional. |
dla.datalake.hoodie.datasource.write.recordkey.field | No | The primary key field. This parameter is required if dla.datalake.batch.jdbc.sync.mode is set to table during DFS data synchronization. In other cases, this parameter is optional. |
dla.datalake.hoodie.key.generator.class | No | The key generation class. The default value is org.apache.hudi.keygen.ComplexKeyGenerator. |
dla.datalake.hoodie.dla.sync.partition.fields | No | The partition fields that you want to synchronize to DLA. The default value is an empty string. |
dla.datalake.hoodie..dla.sync.partition.extractor.class | No | The class that is used to extract partition field values that you want to synchronize to DLA. The default value is org.apache.hudi.hive.NonPartitionedExtractor. |
Parameter | Required | Description |
---|---|---|
dla.datalake.system.convert.all.types.to.string | No | Specifies whether to convert all data types to the STRING type. The default value is false. |
dla.datalake.system.convert.decimal.to.string | No | Specifies whether to convert the DECIMAL type to the STRING type. The default value is true. |
dla.datalake.system.convert.decimal.to.double | No | Specifies whether to convert the DECIMAL type to the DOUBLE type. The default value is false. If this parameter is set to true, set dla.datalake.system.convert.decimal.to.string to false. |
dla.datalake.system.decimal.columns.definition | No | The definition of the DECIMAL type. The parameter value is in the format of Table name:Column name 1,precision,scale;Column name 2,precision,scale. Example: tableName1:c1,10,2;c2,5,2#tableName2:c,4,2. |
dla.datalake.system.convert.int.to.long | No | Specifies whether to convert the INT type to the LONG type. The default value is true. |